Source code for seismic.xcorqc.xcorqc

#!/usr/bin/env python
"""
Description:
    Cross-correlation functionality

References:

CreationDate:   29/06/17

Developer:      laurence.davies@ga.gov.au

Revision History:
    LastUpdate:     29/06/17   LD       First commit of xcor code.
    LastUpdate:     13/07/17   LD       Fixed xcor filtering issue when traces have different sample rates.
    LastUpdate:     11/08/17   RH       Implement ASDF-based cross-correlation workflow
    LastUpdate:     11/07/18   RH       Implemented parallel cross-correlator
    LastUpdate:     19/07/18   RH       Implemented cross-correlation approaches described in Habel et al. 2018

    LastUpdate:     dd/mm/yyyy  Who     Optional description
"""

import os
import logging
import math
from collections import defaultdict

import numpy as np
import scipy

from obspy.core import UTCDateTime, Stats
from obspy import Trace
from obspy.signal.filter import bandpass, highpass, lowpass
from obspy.geodetics.base import gps2dist_azimuth
from scipy import signal

from seismic.xcorqc.fft import *
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
from seismic.xcorqc.utils import get_stream
from netCDF4 import Dataset
from functools import reduce

logging.basicConfig()


[docs]def setup_logger(name, log_file, level=logging.INFO): """ Function to setup a logger; adapted from stackoverflow """ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler = logging.FileHandler(log_file, mode='w') handler.setFormatter(formatter) logger = logging.getLogger(name+log_file) logger.setLevel(level) logger.addHandler(handler) logger.propagate = False return logger
# end func
[docs]def zeropad(tr, padlen): assert (tr.shape[0] < padlen) padded = np.zeros(padlen) padded[0:tr.shape[0]] = tr return padded
# end func
[docs]def zeropad_ba(tr, padlen): assert (tr.shape[0] < padlen) padded = np.zeros(padlen, dtype=np.complex_) s = int((padlen - tr.shape[0]) / 2) padded[s:(s + tr.shape[0])] = scipy.fftpack.fftshift(tr) return scipy.fftpack.ifftshift(padded)
# end func
[docs]def taper(tr, taperlen): tr[0:taperlen] *= 0.5 * (1 + np.cos(np.linspace(-math.pi, 0, taperlen))) tr[-taperlen:] *= 0.5 * (1 + np.cos(np.linspace(0, math.pi, taperlen))) return tr
# end func
[docs]def whiten(a, sampling_rate, window_freq=0): """ Applies spectral whitening to trace samples. When window_freq=0, all frequency bins are normalized by their amplitudes, i.e. all frequency bins end up with an amplitude of 1. When window_freq is nonzero, a smoothed amplitude spectrum (smoothing window length is as computed below) is used to normalize the frequency bins. :param a: trace samples :param sampling_rate: sampling rate :param window_freq: smoothing window length (Hz) :return: spectrally whitened samples """ # frequency step npts = a.shape[0] deltaf = sampling_rate / npts ffta = np.fft.rfft(a) # smooth amplitude spectrum halfwindow = int(round(window_freq / deltaf / 2.0)) if halfwindow > 0: # moving average weight = np.convolve(np.abs(ffta), np.ones(halfwindow * 2 + 1) / (halfwindow * 2 + 1), mode='same') else: weight = np.abs(ffta) ss = ffta / weight a = np.fft.irfft(ss) return a
# end func
[docs]def xcorr2(tr1, tr2, sta1_inv=None, sta2_inv=None, instrument_response_output='vel', water_level=50., window_seconds=3600, window_overlap=0.1, window_buffer_length=0, interval_seconds=86400, taper_length=0.05, resample_rate=None, flo=None, fhi=None, clip_to_2std=False, whitening=False, whitening_window_frequency=0, one_bit_normalize=False, envelope_normalize=False, verbose=1, logger=None): # Length of window_buffer in seconds window_buffer_seconds = window_buffer_length * window_seconds adjusted_taper_length = taper_length if window_buffer_seconds: # adjust taper length adjusted_taper_length = taper_length / (1. + window_buffer_length * 2.) # end if sr1 = tr1.stats.sampling_rate sr2 = tr2.stats.sampling_rate sr1_orig = sr1 sr2_orig = sr2 tr1_d_all = tr1.data # refstn tr2_d_all = tr2.data lentr1_all = tr1_d_all.shape[0] lentr2_all = tr2_d_all.shape[0] window_samples_1 = (window_seconds + 2*window_buffer_seconds) * sr1 window_samples_2 = (window_seconds + 2*window_buffer_seconds) * sr2 interval_samples_1 = interval_seconds * sr1 interval_samples_2 = interval_seconds * sr2 # sr = 0 resll = [] # set day-aligned start-indices maxStartTime = max(tr1.stats.starttime, tr2.stats.starttime) dayAlignedStartTime = UTCDateTime(year=maxStartTime.year, month=maxStartTime.month, day=maxStartTime.day) itr1s = (dayAlignedStartTime - tr1.stats.starttime) * sr1 itr2s = (dayAlignedStartTime - tr2.stats.starttime) * sr2 if resample_rate: sr1 = resample_rate sr2 = resample_rate # end if sr = max(sr1, sr2) xcorlen = int(2 * window_seconds * sr - 1) fftlen = 2 ** (int(np.log2(xcorlen)) + 1) intervalCount = 0 windowsPerInterval = [] # Stores the number of windows processed per interval intervalStartSeconds = [] intervalEndSeconds = [] while itr1s < lentr1_all and itr2s < lentr2_all: itr1e = min(lentr1_all, itr1s + interval_samples_1) itr2e = min(lentr2_all, itr2s + interval_samples_2) while (itr1s < 0) or (itr2s < 0): itr1s += (window_samples_1 - 2*window_buffer_seconds*sr1_orig) - \ (window_samples_1 - 2*window_buffer_seconds*sr1_orig) * window_overlap itr2s += (window_samples_2 - 2*window_buffer_seconds*sr2_orig) - \ (window_samples_2 - 2*window_buffer_seconds*sr2_orig) * window_overlap # end while if (np.fabs(itr1e - itr1s) < sr1_orig) or (np.fabs(itr2e - itr2s) < sr2_orig): itr1s = itr1e itr2s = itr2e continue # end if if (tr1.stats.starttime + itr1s / sr1_orig != tr2.stats.starttime + itr2s / sr2_orig): if logger: logger.warning('Detected misaligned traces..') windowCount = 0 wtr1s = int(itr1s) wtr2s = int(itr2s) resl = [] while wtr1s < itr1e and wtr2s < itr2e: wtr1e = int(min(itr1e, wtr1s + window_samples_1)) wtr2e = int(min(itr2e, wtr2s + window_samples_2)) # Discard small windows if ((wtr1e - wtr1s < window_samples_1) or (wtr2e - wtr2s < window_samples_2) or (wtr1e - wtr1s < sr1_orig) or (wtr2e - wtr2s < sr2_orig)): wtr1s = int(np.ceil(itr1e)) wtr2s = int(np.ceil(itr2e)) continue # end if # Discard windows with masked regions, i.e. with gaps or windows that are all zeros if (not (np.ma.is_masked(tr1_d_all[wtr1s:wtr1e]) or np.ma.is_masked(tr2_d_all[wtr2s:wtr2e]) or np.sum(tr1_d_all[wtr1s:wtr1e]) == 0 or np.sum(tr2_d_all[wtr2s:wtr2e]) == 0)): # logger.info('%s, %s' % (tr1.stats.starttime + wtr1s / 200., tr1.stats.starttime + wtr1e / sr1_orig)) # logger.info('%s, %s' % (tr2.stats.starttime + wtr2s / 200., tr2.stats.starttime + wtr2e / sr2_orig)) tr1_d = np.array(tr1_d_all[wtr1s:wtr1e], dtype=np.float32) tr2_d = np.array(tr2_d_all[wtr2s:wtr2e], dtype=np.float32) # STEP 1: detrend tr1_d = signal.detrend(tr1_d) tr2_d = signal.detrend(tr2_d) # STEP 2: demean tr1_d -= np.mean(tr1_d) tr2_d -= np.mean(tr2_d) # STEP 3: remove response if sta1_inv: resp_tr1 = Trace(data=tr1_d, header=Stats(header={'sampling_rate': sr1_orig, 'npts': len(tr1_d), 'network': tr1.stats.network, 'station': tr1.stats.station, 'location': tr1.stats.location, 'channel': tr1.stats.channel, 'starttime': tr1.stats.starttime + float(wtr1s)/sr1_orig, 'endtime': tr1.stats.starttime + float(wtr1e)/sr1_orig})) try: resp_tr1.remove_response(inventory=sta1_inv, output=instrument_response_output.upper(), water_level=water_level) except Exception as e: logger.error(str(e)) # end try tr1_d = resp_tr1.data # end if # remove response if sta2_inv: resp_tr2 = Trace(data=tr2_d, header=Stats(header={'sampling_rate': sr2_orig, 'npts': len(tr2_d), 'network': tr2.stats.network, 'station': tr2.stats.station, 'location': tr2.stats.location, 'channel': tr2.stats.channel, 'starttime': tr2.stats.starttime + float(wtr2s)/sr2_orig, 'endtime': tr2.stats.starttime + float(wtr2e)/sr2_orig})) try: resp_tr2.remove_response(inventory=sta2_inv, output=instrument_response_output.upper(), water_level=water_level) except Exception as e: logger.error(str(e)) # end try tr2_d = resp_tr2.data # end if # STEPS 4, 5: resample after lowpass @ resample_rate/2 Hz if resample_rate: tr1_d = lowpass(tr1_d, resample_rate/2., sr1_orig, corners=2, zerophase=True) tr2_d = lowpass(tr2_d, resample_rate/2., sr2_orig, corners=2, zerophase=True) tr1_d = Trace(data=tr1_d, header=Stats(header={'sampling_rate': sr1_orig, 'npts': window_samples_1})).resample(resample_rate, no_filter=True).data tr2_d = Trace(data=tr2_d, header=Stats(header={'sampling_rate': sr2_orig, 'npts': window_samples_2})).resample(resample_rate, no_filter=True).data # end if # STEP 6: Bandpass if flo and fhi: tr1_d = bandpass(tr1_d, flo, fhi, sr1, corners=2, zerophase=True) tr2_d = bandpass(tr2_d, flo, fhi, sr2, corners=2, zerophase=True) # end if # STEP 7: time-domain normalization # clip to +/- 2*std if clip_to_2std: std_tr1 = np.std(tr1_d) std_tr2 = np.std(tr2_d) clip_indices_tr1 = np.fabs(tr1_d) > 2 * std_tr1 clip_indices_tr2 = np.fabs(tr2_d) > 2 * std_tr2 tr1_d[clip_indices_tr1] = 2 * std_tr1 * np.sign(tr1_d[clip_indices_tr1]) tr2_d[clip_indices_tr2] = 2 * std_tr2 * np.sign(tr2_d[clip_indices_tr2]) # end if # 1-bit normalization if one_bit_normalize: tr1_d = np.sign(tr1_d) tr2_d = np.sign(tr2_d) # end if # Apply Rhys Hawkins-style default time domain normalization if (clip_to_2std == 0) and (one_bit_normalize == 0): # 0-mean tr1_d -= np.mean(tr1_d) tr2_d -= np.mean(tr2_d) # unit-std tr1_d /= np.std(tr1_d) tr2_d /= np.std(tr2_d) # end if # STEP 8: taper if adjusted_taper_length > 0: tr1_d = taper(tr1_d, int(np.round(adjusted_taper_length*tr1_d.shape[0]))) tr2_d = taper(tr2_d, int(np.round(adjusted_taper_length*tr2_d.shape[0]))) # end if # STEP 9: spectral whitening if whitening: tr1_d = whiten(tr1_d, sr1, window_freq=whitening_window_frequency) tr2_d = whiten(tr2_d, sr2, window_freq=whitening_window_frequency) # STEP 10: taper if adjusted_taper_length > 0: tr1_d = taper(tr1_d, int(np.round(adjusted_taper_length*tr1_d.shape[0]))) tr2_d = taper(tr2_d, int(np.round(adjusted_taper_length*tr2_d.shape[0]))) # end if # end if # STEP 11: Final bandpass # apply zero-phase bandpass if flo and fhi: tr1_d = bandpass(tr1_d, flo, fhi, sr1, corners=2, zerophase=True) tr2_d = bandpass(tr2_d, flo, fhi, sr2, corners=2, zerophase=True) # end if if window_buffer_seconds: # extract window of interest from buffered window tr1_d = tr1_d[int(window_buffer_seconds*sr1):-int(window_buffer_seconds*sr1)] tr2_d = tr2_d[int(window_buffer_seconds*sr2):-int(window_buffer_seconds*sr2)] # end if # cross-correlate waveforms if sr1 < sr2: fftlen2 = fftlen fftlen1 = int((fftlen2 * 1.0 * sr1) / sr) rf = zeropad_ba(fftn(zeropad(tr1_d, fftlen1), shape=[fftlen1]), fftlen2) * fftn( zeropad(ndflip(tr2_d), fftlen2), shape=[fftlen2]) elif sr1 > sr2: fftlen1 = fftlen fftlen2 = int((fftlen1 * 1.0 * sr2) / sr) rf = fftn(zeropad(tr1_d, fftlen1), shape=[fftlen1]) * zeropad_ba( fftn(zeropad(ndflip(tr2_d), fftlen2), shape=[fftlen2]), fftlen1) else: rf = fftn(zeropad(tr1_d, fftlen), shape=[fftlen]) * fftn(zeropad(ndflip(tr2_d), fftlen), shape=[fftlen]) # end if if not np.isnan(rf).any(): resl.append(rf) windowCount += 1 # end if # end if wtr1s += int((window_samples_1 - 2*window_buffer_seconds*sr1_orig) - (window_samples_1 - 2*window_buffer_seconds*sr1_orig) * window_overlap) wtr2s += int((window_samples_2 - 2*window_buffer_seconds*sr2_orig) - (window_samples_2 - 2*window_buffer_seconds*sr2_orig) * window_overlap) # end while (windows within interval) if verbose > 1: if logger: logger.info('\tProcessed %d windows in interval %d' % (windowCount, intervalCount)) # end if intervalStartSeconds.append(itr1s/sr1_orig + tr1.stats.starttime.timestamp) intervalEndSeconds.append(itr1e/sr1_orig + tr1.stats.starttime.timestamp) itr1s = itr1e itr2s = itr2e intervalCount += 1 # Append an array of zeros if no windows were processed for the current interval if windowCount == 0: resl.append(np.zeros(fftlen)) if verbose > 1: if logger: logger.info('\tWarning: No windows processed due to gaps in data in current interval') # end if # end if windowsPerInterval.append(windowCount) if windowCount > 0: mean = reduce((lambda tx, ty: tx + ty), resl) / float(windowCount) else: mean = reduce((lambda tx, ty: tx + ty), resl) # end if if envelope_normalize: step = np.sign(np.fft.fftfreq(fftlen, 1.0 / sr)) mean = mean + step * mean # compute analytic # end if mean = ifftn(mean) if envelope_normalize: # Compute magnitude of mean mean = np.abs(mean) normFactor = np.max(mean) # mean can be 0 for a null result if normFactor > 0: mean /= normFactor # end if # end if resll.append(mean[:xcorlen]) # end while (iteration over intervals) if len(resll): return np.array(resll), np.array(windowsPerInterval), \ np.array(intervalStartSeconds, dtype='i8'), \ np.array(intervalEndSeconds, dtype='i8'), \ sr else: return None, None, None, None, sr
# end if # end func
[docs]def IntervalStackXCorr(refds, tempds, start_time, end_time, ref_net_sta, temp_net_sta, ref_sta_inv, temp_sta_inv, instrument_response_output, water_level, ref_cha, temp_cha, baz_ref_net_sta, baz_temp_net_sta, resample_rate=None, taper_length=0.05, buffer_seconds=864000, interval_seconds=86400, window_seconds=3600, window_overlap=0.1, window_buffer_length=0, flo=None, fhi=None, clip_to_2std=False, whitening=False, whitening_window_frequency=0, one_bit_normalize=False, envelope_normalize=False, ensemble_stack=False, outputPath='/tmp', verbose=1, tracking_tag=''): """ This function rolls through two ASDF data sets, over a given time-range and cross-correlates waveforms from all possible station-pairs from the two data sets. To allow efficient, random data access asdf data sources, an instance of a SeisDB object, instantiated from the corresponding Json database is passed in (tempds_db) -- although this parameter is not mandatory, data-access from large ASDF files will be slow without it. Station-ids to be processed from the two data-sources can be specified as lists of strings, while wildcards can be used to process all stations. Data is fetched from the sources in chunks to limit memory usage and data-windows with gaps are discarded. Cross-correlation results are written out for each station-pair, in the specified folder, as NETCDF4 files. Panoply (https://www.giss.nasa.gov/tools/panoply/), already installed on the NCI VDIs can be used to interrogate these results. :type refds: FederatedASDFDataSet :param refds: FederatedASDFDataSet containing reference-station data :type tempds: FederatedASDFDataSet :param tempds: FederatedASDFDataSet containing temporary-stations data :type start_time: UTCDateTime :param: start_time: Start-time (UTCDateTime format) for data to be used in cross-correlation :type end_time: UTCDateTime :param: end_time: End-time (UTCDateTime format) for data to be used in cross-correlation :type ref_net_sta: str :param ref_net_sta: Network.Station for the reference Dataset. :type temp_net_sta: str :param temp_net_sta: Network.Station for the temporary Dataset. :type ref_sta_inv: Inventory :param ref_sta_inv: Inventory containing instrument response for station :type temp_sta_inv: Inventory :param temp_sta_inv: Inventory containing instrument response for station :type instrument_response_output: str :param instrument_response_output: Output of instrument response correction; can be either 'vel' or 'disp' :type water_level: float :param water_level: Water-level used during instrument response correction :type ref_cha: str :param ref_cha: Channel name for the reference Dataset :type temp_cha: str :param temp_cha: Channel name for the temporary Dataset :type baz_ref_net_sta: float :param baz_ref_net_sta: Back-azimuth of ref station from temp station in degrees :type baz_temp_net_sta: float :param baz_temp_net_sta: Back-azimuth of temp station from ref station in degrees :type resample_rate: float :param resample_rate: Resampling rate (Hz). Applies to both data-sets :type taper_length: float :param taper_length: Taper length as a fraction of window length :type buffer_seconds: int :param buffer_seconds: The amount of data to be fetched per call from the ASDFDataSets, because \ we may not be able to fetch all the data (from start_time to end_time) at \ once. The default is set to 10 days and should be a multiple of \ interval_seconds. :type interval_seconds: int :param interval_seconds: The interval in seconds, over which cross-correlation windows are \ stacked. Default is 1 day. :type window_seconds: int :param window_seconds: Length of cross-correlation window in seconds. Default is 1 hr. :type window_overlap: float :param window_overlap: Window overlap fraction. Default is 0.1. :type window_buffer_length: float :param window_buffer_length: Buffer length as a fraction of 'window-seconds' around actual data windows of \ interest. This helps exclude effects of tapering and other edge artefacts from \ data windows before cross-correlation. Default is 0 :type flo: float :param flo: Lower frequency for Butterworth bandpass filter :type fhi: float :param fhi: Upper frequency for Butterworth bandpass filter :type clip_to_2std: bool :param clip_to_2std: Clip data in each window to +/- 2 standard deviations :type whitening: bool :param whitening: Apply spectral whitening :type whitening_window_frequency: float :param whitening_window_frequency: Window frequency (Hz) used to determine length of averaging window \ for smoothing spectral amplitude :type one_bit_normalize: bool :param one_bit_normalize: Apply one-bit normalization to data in each window :type envelope_normalize: bool :param envelope_normalize: Envelope via Hilbert transforms and normalize :type ensemble_stack: bool :param ensemble_stack: Outputs a single CC function stacked over all data for a given station-pair :type verbose: int :param verbose: Verbosity of printouts. Default is 1; maximum is 3. :type tracking_tag: str :param tracking_tag: File tag to be added to output file names so runtime settings can be tracked :type outputPath: str :param outputPath: Folder to write results to :return: 1: 1d np.array with time samples spanning [-window_samples+dt:window_samples-dt] 2: A dictionary of 2d np.arrays containing cross-correlation results for each station-pair. \ Rows in each 2d array represent number of interval_seconds processed and columns \ represent stacked samples of length window_seconds. 3: A dictionary of 1d np.arrays containing number of windows processed, within each \ interval_seconds period, for each station-pair. These Window-counts could be helpful \ in assessing robustness of results. """ ####################################### # check consistency of parameterization ####################################### if resample_rate and fhi: if resample_rate < 2*fhi: raise RuntimeError('Resample-rate should be >= 2*fmax') if clip_to_2std and one_bit_normalize: raise RuntimeError('Mutually exclusive parameterization: clip_to_2std and one-bit-normalizations' 'together is redundant') # end if # setup logger stationPair = '%s.%s' % (ref_net_sta, temp_net_sta) fn = os.path.join(outputPath, '%s.log' % (stationPair if not tracking_tag else '.'.join([stationPair, tracking_tag]))) logger = setup_logger('%s.%s' % (ref_net_sta, temp_net_sta), fn) ####################################### # Initialize variables for main loop ####################################### startTime = UTCDateTime(start_time) endTime = UTCDateTime(end_time) cTime = startTime xcorrResultsDict = defaultdict(list) # Results dictionary indexed by station-pair string windowCountResultsDict = defaultdict(list) # Window-count dictionary indexed by station-pair string intervalStartTimesDict = defaultdict(list) intervalEndTimesDict = defaultdict(list) sr = 0 while cTime < endTime: cStep = buffer_seconds if (cTime + cStep) > endTime: cStep = endTime - cTime logger.info('====Time range [%s - %s]====' % (str(cTime), str(cTime + cStep))) logger.info('Fetching data for station %s..' % ref_net_sta) refSt = None try: rnc, rsc = ref_net_sta.split('.') refSt = get_stream(refds, rnc, rsc, ref_cha, cTime, cTime + cStep, baz=baz_ref_net_sta, logger=logger, verbose=verbose) except Exception as e: logger.error('\t'+str(e)) logger.warning('\tError encountered while fetching data. Skipping along..') if refSt is None: logger.info('Failed to fetch data..') cTime += cStep continue elif len(refSt) == 0: logger.info('Data source exhausted. Skipping time interval [%s - %s]' % (str(cTime), str(cTime + cStep))) cTime += cStep continue else: pass # print refSt # end if logger.info('\tFetching data for station %s..' % temp_net_sta) tempSt = None try: tnc, tsc = temp_net_sta.split('.') tempSt = get_stream(tempds, tnc, tsc, temp_cha, cTime, cTime + cStep, baz=baz_temp_net_sta, logger=logger, verbose=verbose) except Exception as e: logger.error('\t'+str(e)) logger.warning('\tError encountered while fetching data. Skipping along..') # end try if tempSt is None: logger.info('Failed to fetch data..') cTime += cStep continue elif len(tempSt) == 0: logger.info('Data source exhausted. Skipping time interval [%s - %s]' % (str(cTime), str(cTime + cStep))) cTime += cStep continue else: pass # print tempSt # end if if verbose > 2: logger.debug('\t\tData Gaps:') tempSt.print_gaps() # output sent to stdout; fix this print("\n") logger.info('\tCross-correlating station-pair: %s' % stationPair) xcl, winsPerInterval, \ intervalStartSeconds, intervalEndSeconds, sr = \ xcorr2(refSt[0], tempSt[0], ref_sta_inv, temp_sta_inv, instrument_response_output=instrument_response_output, water_level=water_level, window_seconds=window_seconds, window_overlap=window_overlap, window_buffer_length=window_buffer_length, interval_seconds=interval_seconds, resample_rate=resample_rate, taper_length=taper_length, flo=flo, fhi=fhi, clip_to_2std=clip_to_2std, whitening=whitening, whitening_window_frequency=whitening_window_frequency, one_bit_normalize=one_bit_normalize, envelope_normalize=envelope_normalize, verbose=verbose, logger=logger) # Continue if no results were returned due to data-gaps if xcl is None: logger.warning("\t\tWarning: no cross-correlation results returned for station-pair %s, " % stationPair + " due to gaps in data.") cTime += cStep continue # end if xcorrResultsDict[stationPair].append(xcl) windowCountResultsDict[stationPair].append(winsPerInterval) intervalStartTimesDict[stationPair].append(intervalStartSeconds) intervalEndTimesDict[stationPair].append(intervalEndSeconds) cTime += cStep # wend (loop over time range) x = None # skippedCount = 0 # Concatenate results for k in list(xcorrResultsDict.keys()): combinedXcorrResults = None combinedWindowCountResults = None combinedIntervalStartTimes = None combinedIntervalEndTimes = None for i in np.arange(len(xcorrResultsDict[k])): if i == 0: combinedXcorrResults = xcorrResultsDict[k][0] combinedWindowCountResults = windowCountResultsDict[k][0] combinedIntervalStartTimes = intervalStartTimesDict[k][0] combinedIntervalEndTimes = intervalEndTimesDict[k][0] # Generate time samples (only needs to be done once) if x is None: dt = 1./sr x = np.linspace(-window_seconds + dt, window_seconds - dt, xcorrResultsDict[k][0].shape[1]) # end if if ensemble_stack: if combinedXcorrResults.shape[0] > 1: combinedXcorrResults = np.expand_dims(np.sum(combinedXcorrResults, axis=0), axis=0) # end if # end if else: if combinedXcorrResults.shape[1] == xcorrResultsDict[k][i].shape[1]: if ensemble_stack: if xcorrResultsDict[k][i].shape[0] > 1: combinedXcorrResults += np.expand_dims(np.sum(xcorrResultsDict[k][i], axis=0), axis=0) else: combinedXcorrResults += xcorrResultsDict[k][i] # end if else: combinedXcorrResults = np.concatenate((combinedXcorrResults, xcorrResultsDict[k][i])) # end if else: if ensemble_stack: pass else: combinedXcorrResults = np.concatenate((combinedXcorrResults, np.zeros((xcorrResultsDict[k][i].shape[0], combinedXcorrResults.shape[1])))) # end if logger.warning("\t\tVariable sample rates detected. Current station-pair: %s" % k) # end if combinedWindowCountResults = np.concatenate((combinedWindowCountResults, windowCountResultsDict[k][i])) combinedIntervalStartTimes = np.concatenate((combinedIntervalStartTimes, intervalStartTimesDict[k][i])) combinedIntervalEndTimes = np.concatenate((combinedIntervalEndTimes, intervalEndTimesDict[k][i])) # end if # end for # Replace lists with combined results xcorrResultsDict[k] = combinedXcorrResults windowCountResultsDict[k] = combinedWindowCountResults intervalStartTimesDict[k] = combinedIntervalStartTimes intervalEndTimesDict[k] = combinedIntervalEndTimes # end for # Save Results for i, k in enumerate(list(xcorrResultsDict.keys())): fn = os.path.join(outputPath, '%s.nc' % (k if not tracking_tag else '.'.join([k, tracking_tag]))) root_grp = Dataset(fn, 'w', format='NETCDF4') root_grp.description = 'Cross-correlation results for station-pair: %s' % k # Dimensions root_grp.createDimension('lag', xcorrResultsDict[k].shape[1]) root_grp.createDimension('nchar', 10) lag = root_grp.createVariable('lag', 'f4', ('lag',)) # Add metadata lon1 = root_grp.createVariable('Lon1', 'f4') lat1 = root_grp.createVariable('Lat1', 'f4') lon2 = root_grp.createVariable('Lon2', 'f4') lat2 = root_grp.createVariable('Lat2', 'f4') distance = root_grp.createVariable('Distance', 'f4') ref_sta_coords = refds.unique_coordinates[ref_net_sta] temp_sta_coords = tempds.unique_coordinates[temp_net_sta] lon1[:] = ref_sta_coords[0] if len(ref_sta_coords) == 2 else -999 lat1[:] = ref_sta_coords[1] if len(ref_sta_coords) == 2 else -999 lon2[:] = temp_sta_coords[0] if len(temp_sta_coords) == 2 else -999 lat2[:] = temp_sta_coords[1] if len(temp_sta_coords) == 2 else -999 if np.min([v != -999 for v in [lon1[:], lat1[:], lon2[:], lat2[:]]]): distance[:], _, _ = gps2dist_azimuth(lat1[:], lon1[:], lat2[:], lon2[:]) # end if # Add data if ensemble_stack: nsw = root_grp.createVariable('NumStackedWindows', 'i8') avgnsw = root_grp.createVariable('AvgNumStackedWindowsPerInterval', 'f4') ist = root_grp.createVariable('IntervalStartTime', 'i8') iet = root_grp.createVariable('IntervalEndTime', 'i8') xc = root_grp.createVariable('xcorr', 'f4', ('lag',)) totalIntervalCount = int(np.sum(windowCountResultsDict[k] > 0)) totalWindowCount = int(np.sum(windowCountResultsDict[k])) nsw[:] = totalWindowCount avgnsw[:] = np.mean(windowCountResultsDict[k][windowCountResultsDict[k]>0]) ist[:] = int(np.min(intervalStartTimesDict[k])) iet[:] = int(np.max(intervalEndTimesDict[k])) if totalIntervalCount > 0: xc[:] = xcorrResultsDict[k].real / float(totalIntervalCount) else: xc[:] = xcorrResultsDict[k].real # end if else: root_grp.createDimension('interval', xcorrResultsDict[k].shape[0]) # Variables interval = root_grp.createVariable('interval', 'f4', ('interval',)) nsw = root_grp.createVariable('NumStackedWindows', 'f4', ('interval',)) ist = root_grp.createVariable('IntervalStartTimes', 'i8', ('interval',)) iet = root_grp.createVariable('IntervalEndTimes', 'i8', ('interval',)) xc = root_grp.createVariable('xcorr', 'f4', ('interval', 'lag',)) # Populate variables interval[:] = np.arange(xcorrResultsDict[k].shape[0]) nsw[:] = windowCountResultsDict[k] ist[:] = intervalStartTimesDict[k] iet[:] = intervalEndTimesDict[k] xc[:, :] = xcorrResultsDict[k].real # end if lag[:] = x # Add and populate a new group for parameters used pg = root_grp.createGroup('Parameters') params = {'corr_chans': '%s.%s' % (ref_cha, temp_cha), 'instr_corr_applied_1': 1 if ref_sta_inv else 0, 'instr_corr_applied_2': 1 if temp_sta_inv else 0, 'instr_corr_output': instrument_response_output, 'instr_corr_water_level_db': water_level, 'resample_rate': resample_rate if resample_rate else -999, 'taper_length': taper_length, 'buffer_seconds': buffer_seconds, 'interval_seconds': interval_seconds, 'window_seconds': window_seconds, 'window_overlap': window_overlap, 'window_buffer_length': window_buffer_length, 'bandpass_fmin': flo if flo else -999, 'bandpass_fmax': fhi if fhi else -999, 'clip_to_2std': int(clip_to_2std), 'one_bit_normalize': int(one_bit_normalize), 'zero_mean_1std_normalize': int(clip_to_2std is False and one_bit_normalize is False), 'spectral_whitening': int(whitening), 'envelope_normalize': int(envelope_normalize), 'ensemble_stack': int(ensemble_stack)} if whitening: params['whitening_window_frequency'] = whitening_window_frequency for _k, _v in params.items(): setattr(pg, _k, _v) # end for root_grp.close() # end for return x, xcorrResultsDict, windowCountResultsDict
# end func