Source code for seismic.pick_harvester.pick
#!/bin/env python
"""
Description:
Harvests picks from ASDF data-sets in parallel
References:
CreationDate: 13/09/18
Developer: rakib.hassan@ga.gov.au
Revision History:
LastUpdate: 13/09/18 RH
LastUpdate: dd/mm/yyyy Who Optional description
"""
from mpi4py import MPI
import os
from ordered_set import OrderedSet as set
import numpy as np
from obspy import Trace
from datetime import datetime
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
import click
from obspy.taup import TauPyModel
from obspy.signal.rotate import rotate_ne_rt
from obspy.geodetics.base import gps2dist_azimuth, kilometers2degrees
from PhasePApy.phasepapy.phasepicker import fbpicker
from PhasePApy.phasepapy.phasepicker import ktpicker
from PhasePApy.phasepapy.phasepicker import aicdpicker
from seismic.pick_harvester.utils import CatalogCSV, ProgressTracker, recursive_glob
import psutil
import gc
from seismic.pick_harvester.quality import compute_quality_measures
[docs]def extract_p(taupy_model, pickerlist, event, station_longitude, station_latitude,
st, win_start=-50, win_end=50, resample_hz=20,
bp_freqmins=[0.5, 2., 5.],
bp_freqmaxs=[5., 10., 10.],
margin=None,
max_amplitude=1e8,
plot_output_folder=None):
po = event.preferred_origin
if (not po): return None
atimes = []
try:
atimes = taupy_model.get_travel_times_geo(po.depthkm, po.lat,
po.lon, station_latitude,
station_longitude,
phase_list=('P',))
except:
return None
# end try
if (len(atimes) == 0): return None
tat = atimes[0].time # theoretical arrival time
buffer_start = -10
buffer_end = 10
try:
snrst = st.slice(po.utctime + tat + win_start + buffer_start, po.utctime + tat + win_end + buffer_end)
snrst = snrst.copy()
snrst.resample(resample_hz)
snrst.detrend('linear')
except:
return None
# end try
if (len(st) == 0 or len(snrst) == 0): return None
snrtr = snrst[0]
if (type(snrtr.data) == np.ndarray):
if (np.max(snrtr.data) > max_amplitude): return None
pickslist = []
snrlist = []
residuallist = []
bandindex = -1
pickerindex = -1
taper_percentage = float(buffer_end) / float(win_end)
foundpicks = False
for i in range(len(bp_freqmins)):
trc = snrtr.copy()
trc.taper(max_percentage=taper_percentage, type='hann')
trc.filter('bandpass', freqmin=bp_freqmins[i],
freqmax=bp_freqmaxs[i], corners=4,
zerophase=True)
trc = trc.slice(po.utctime + tat + win_start, po.utctime + tat + win_end)
for ipicker, picker in enumerate(pickerlist):
try:
scnl, picks, polarity, snr, uncert = picker.picks(trc)
for ipick, pick in enumerate(picks):
actualArrival = pick - po.utctime
residual = actualArrival - tat
if ((margin and np.fabs(residual) < margin) or (margin == None)):
pickslist.append(pick)
plotinfo = None
if (plot_output_folder):
plotinfo = {'eventid': event.public_id,
'origintime': po.utctime,
'mag': event.preferred_magnitude.magnitude_value,
'net': trc.stats.network,
'sta': trc.stats.station,
'phase': 'p',
'ppsnr': snr[ipick],
'pickid': ipick,
'outputfolder': plot_output_folder}
# end if
wab = snrtr.slice(pick - 3, pick + 3)
wab_filtered = trc.slice(pick - 3, pick + 3)
scales = np.logspace(0.15, 1.5, 30)
cwtsnr, dom_freq, slope_ratio = compute_quality_measures(wab, wab_filtered, scales,
plotinfo)
snrlist.append([snr[ipick], cwtsnr, dom_freq, slope_ratio])
residuallist.append(residual)
bandindex = i
pickerindex = ipicker
foundpicks = True
# end if
# end for
except:
continue
# end try
if (foundpicks): break
# end for
if (foundpicks): break
# end for
if (len(pickslist)):
return pickslist, residuallist, \
np.array(snrlist), bandindex, pickerindex
# end if
# end if
return None
# end func
[docs]def extract_s(taupy_model, pickerlist, event, station_longitude, station_latitude,
stn, ste, ba, win_start=-50, win_end=50, resample_hz=20,
bp_freqmins=[0.01, 0.01, 0.5],
bp_freqmaxs=[1, 2., 5.],
margin=None,
max_amplitude=1e8,
plot_output_folder=None):
po = event.preferred_origin
if (not po): return None
atimes = []
try:
atimes = taupy_model.get_travel_times_geo(po.depthkm, po.lat,
po.lon, station_latitude,
station_longitude,
phase_list=('S',))
except:
return None
# end try
if (len(atimes) == 0): return None
tat = atimes[0].time # theoretical arrival time
buffer_start = -10
buffer_end = 10
snrtr = None
try:
stn = stn.slice(po.utctime + tat + win_start + buffer_start, po.utctime + tat + win_end + buffer_end)
stn = stn.copy()
stn.resample(resample_hz)
stn.detrend('linear')
if (ste):
ste = ste.slice(po.utctime + tat + win_start + buffer_start, po.utctime + tat + win_end + buffer_end)
ste = ste.copy()
ste.resample(resample_hz)
ste.detrend('linear')
# end if
if (ste):
if (type(stn[0].data) == np.ndarray and type(ste[0].data) == np.ndarray):
rc, tc = rotate_ne_rt(stn[0].data, ste[0].data, ba)
snrtr = Trace(data=tc, header=stn[0].stats)
snrtr.detrend('linear')
# end if
else:
if (type(stn[0].data) == np.ndarray):
snrtr = stn[0]
# end if
# end if
except Exception as e:
return None
# end try
if (type(snrtr.data) == np.ndarray):
if (np.max(snrtr.data) > max_amplitude): return None
pickslist = []
snrlist = []
residuallist = []
bandindex = -1
pickerindex = -1
taper_percentage = float(buffer_end) / float(win_end)
foundpicks = False
for i in range(len(bp_freqmins)):
trc = snrtr.copy()
trc.taper(max_percentage=taper_percentage, type='hann')
trc.filter('bandpass', freqmin=bp_freqmins[i],
freqmax=bp_freqmaxs[i], corners=4,
zerophase=True)
trc = trc.slice(po.utctime + tat + win_start, po.utctime + tat + win_end)
for ipicker, picker in enumerate(pickerlist):
try:
scnl, picks, polarity, snr, uncert = picker.picks(trc)
for ipick, pick in enumerate(picks):
actualArrival = pick - po.utctime
residual = actualArrival - tat
if ((margin and np.fabs(residual) < margin) or (margin == None)):
pickslist.append(pick)
plotinfo = None
if (plot_output_folder):
plotinfo = {'eventid': event.public_id,
'origintime': po.utctime,
'mag': event.preferred_magnitude.magnitude_value,
'net': trc.stats.network,
'sta': trc.stats.station,
'phase': 's',
'ppsnr': snr[ipick],
'pickid': ipick,
'outputfolder': plot_output_folder}
# end if
wab = snrtr.slice(pick - 3, pick + 3)
wab_filtered = trc.slice(pick - 3, pick + 3)
scales = np.logspace(0.5, 4, 30)
cwtsnr, dom_freq, slope_ratio = compute_quality_measures(wab, wab_filtered, scales,
plotinfo)
snrlist.append([snr[ipick], cwtsnr, dom_freq, slope_ratio])
residuallist.append(residual)
bandindex = i
pickerindex = ipicker
foundpicks = True
# end if
# end for
except:
continue
# end try
if (foundpicks): break
# end for
if (foundpicks): break
# end for
if (len(pickslist)):
return pickslist, residuallist, \
np.array(snrlist), bandindex, pickerindex
# end if
# end if
return None
# end func
[docs]def getWorkloadEstimate(fds, originTimestamps):
totalTraceCount = 0
for nc, sc, start_time, end_time in fds.local_net_sta_list():
day = 24 * 3600
curr = start_time
step = day
while (curr < end_time):
if (curr + step > end_time):
step = end_time - curr
# end if
eventIndices = (np.where((originTimestamps >= curr.timestamp) & \
(originTimestamps <= (curr + day).timestamp)))[0]
if (eventIndices.shape[0] > 0): totalTraceCount += 1
curr += step
# wend
# end for
return totalTraceCount
# end func
[docs]def dropBogusTraces(st, sampling_rate_cutoff=5):
badTraces = [tr for tr in st if tr.stats.sampling_rate < sampling_rate_cutoff]
for tr in badTraces: st.remove(tr)
# end func
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('asdf-source', required=True,
type=click.Path(exists=True))
@click.argument('event-folder', required=True,
type=click.Path(exists=True))
@click.argument('output-path', required=True,
type=click.Path(exists=True))
@click.option('--min-magnitude', default=-1, help='Minimum magnitude of event; all events are analyzed by default')
@click.option('--max-amplitude', default=1e8, help='Maximum amplitude in waveforms analyzed; default is 1e8. '
'This parameter is useful for filtering out waveform snippets '
'with spurious spikes')
@click.option('--restart', default=False, is_flag=True, help='Restart job')
@click.option('--save-quality-plots', default=False, is_flag=True, help='Save plots of quality estimates')
def process(asdf_source, event_folder, output_path, min_magnitude, max_amplitude, restart, save_quality_plots):
"""
ASDF_SOURCE: Text file containing a list of paths to ASDF files
EVENT_FOLDER: Path to folder containing event files\n
OUTPUT_PATH: Output folder \n
"""
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()
proc_workload = None
if (rank == 0):
def outputConfigParameters():
# output config parameters
fn = 'pick.%s.cfg' % (datetime.now().strftime('%Y-%m-%d-%H-%M-%S'))
fn = os.path.join(output_path, fn)
f = open(fn, 'w+')
f.write('Parameter Values:\n\n')
f.write('%25s\t\t: %s\n' % ('ASDF_SOURCE', asdf_source))
f.write('%25s\t\t: %s\n' % ('EVENT_FOLDER', event_folder))
f.write('%25s\t\t: %s\n' % ('OUTPUT_PATH', output_path))
f.write('%25s\t\t: %s\n' % ('MIN_MAGNITUDE', min_magnitude))
f.write('%25s\t\t: %s\n' % ('MAX_AMPLITUDE', max_amplitude))
f.write('%25s\t\t: %s\n' % ('RESTART_MODE', 'TRUE' if restart else 'FALSE'))
f.write('%25s\t\t: %s\n' % ('SAVE_PLOTS', 'TRUE' if save_quality_plots else 'FALSE'))
f.close()
# end func
outputConfigParameters()
# end if
# ==================================================
# Create output-folder for snr-plots
# ==================================================
plot_output_folder = None
if (save_quality_plots):
plot_output_folder = os.path.join(output_path, 'plots')
if (rank == 0):
if (not os.path.exists(plot_output_folder)):
os.mkdir(plot_output_folder)
# end if
comm.Barrier()
# end if
# ==================================================
# Read catalogue and retrieve origin times
# ==================================================
cat = CatalogCSV(event_folder)
events = cat.get_events()
originTimestamps = cat.get_preferred_origin_timestamps()
# ==================================================
# Create lists of pickers for both p- and s-arrivals
# ==================================================
sigmalist = np.arange(8, 3, -1)
pickerlist_p = []
pickerlist_s = []
for sigma in sigmalist:
picker_p = aicdpicker.AICDPicker(t_ma=5, nsigma=sigma, t_up=1, nr_len=5,
nr_coeff=2, pol_len=10, pol_coeff=10, uncert_coeff=3)
picker_s = aicdpicker.AICDPicker(t_ma=15, nsigma=sigma, t_up=1, nr_len=5,
nr_coeff=2, pol_len=10, pol_coeff=10, uncert_coeff=3)
pickerlist_p.append(picker_p)
pickerlist_s.append(picker_s)
# end for
# ==================================================
# Define theoretical model
# Instantiate data-access object
# Retrieve estimated workload
# ==================================================
taupyModel = TauPyModel(model='iasp91')
fds = FederatedASDFDataSet(asdf_source, logger=None)
workload = getWorkloadEstimate(fds, originTimestamps)
# ==================================================
# Define output header and open output files
# depending on the mode of operation (fresh/restart)
# ==================================================
header = '#eventID originTimestamp mag originLon originLat originDepthKm net sta cha pickTimestamp stationLon stationLat az baz distance ttResidual snr qualityMeasureCWT domFreq qualityMeasureSlope bandIndex nSigma\n'
ofnp = os.path.join(output_path, 'p_arrivals.%d.txt' % (rank))
ofns = os.path.join(output_path, 's_arrivals.%d.txt' % (rank))
ofp = None
ofs = None
if (restart == False):
ofp = open(ofnp, 'w+')
ofs = open(ofns, 'w+')
ofp.write(header)
ofs.write(header)
else:
ofp = open(ofnp, 'a+')
ofs = open(ofns, 'a+')
# end if
progTracker = ProgressTracker(output_folder=output_path, restart_mode=restart)
totalTraceCount = 0
for nc, sc, start_time, end_time in fds.local_net_sta_list():
day = 24 * 3600
dayCount = 0
curr = start_time
traceCountP = 0
pickCountP = 0
traceCountS = 0
pickCountS = 0
sw_start = datetime.now()
step = day
while (curr < end_time):
if (curr + step > end_time):
step = end_time - curr
# end if
eventIndices = (np.where((originTimestamps >= curr.timestamp) & \
(originTimestamps <= (curr + day).timestamp)))[0]
if (eventIndices.shape[0] > 0):
totalTraceCount += 1
stations = fds.get_stations(curr, curr + day, network=nc, station=sc)
stations_zch = [s for s in stations if 'Z' == s[3][-1].upper()] # only Z channels
stations_nch = [s for s in stations if 'N' == s[3][-1].upper() or '1' == s[3][-1]] # only N channels
stations_ech = [s for s in stations if 'E' == s[3][-1].upper() or '2' == s[3][-1]] # only E channels
for codes in stations_zch:
if (progTracker.increment()):
pass
else:
continue
st = fds.get_waveforms(codes[0], codes[1], codes[2], codes[3],
curr,
curr + step,
trace_count_threshold=200)
try:
st.merge(method=-1)
except Exception as e:
print(e)
continue
# end try
if (len(st) == 0): continue
dropBogusTraces(st)
slon, slat = codes[4], codes[5]
for ei in eventIndices:
event = events[ei]
po = event.preferred_origin
da = gps2dist_azimuth(po.lat, po.lon, slat, slon)
mag = None
if (event.preferred_magnitude):
mag = event.preferred_magnitude.magnitude_value
elif (len(po.magnitude_list)):
mag = po.magnitude_list[0].magnitude_value
if (mag == None): mag = np.NaN
if (np.isnan(mag) or mag < min_magnitude): continue
result = extract_p(taupyModel, pickerlist_p, event, slon, slat, st,
max_amplitude=max_amplitude,
plot_output_folder=plot_output_folder)
if (result):
picklist, residuallist, snrlist, bandindex, pickerindex = result
arcdistance = kilometers2degrees(da[0] / 1e3)
for ip, pick in enumerate(picklist):
line = '%s %f %f %f %f %f ' \
'%s %s %s %f %f %f ' \
'%f %f %f ' \
'%f %f %f %f %f ' \
'%d %d\n' % (
event.public_id, po.utctime.timestamp, mag, po.lon, po.lat, po.depthkm,
codes[0], codes[1], codes[3], pick.timestamp, slon, slat,
da[1], da[2], arcdistance,
residuallist[ip], snrlist[ip, 0], snrlist[ip, 1], snrlist[ip, 2], snrlist[ip, 3],
bandindex, sigmalist[pickerindex])
ofp.write(line)
# end for
ofp.flush()
pickCountP += 1
# end if
if (len(stations_nch) == 0 and len(stations_ech) == 0):
result = extract_s(taupyModel, pickerlist_s, event, slon, slat, st, None, da[2],
max_amplitude=max_amplitude,
plot_output_folder=plot_output_folder)
if (result):
picklist, residuallist, snrlist, bandindex, pickerindex = result
arcdistance = kilometers2degrees(da[0] / 1e3)
for ip, pick in enumerate(picklist):
line = '%s %f %f %f %f %f ' \
'%s %s %s %f %f %f ' \
'%f %f %f ' \
'%f %f %f %f %f ' \
'%d %d\n' % (
event.public_id, po.utctime.timestamp, mag, po.lon, po.lat, po.depthkm,
codes[0], codes[1], codes[3], pick.timestamp, slon, slat,
da[1], da[2], arcdistance,
residuallist[ip], snrlist[ip, 0], snrlist[ip, 1], snrlist[ip, 2],
snrlist[ip, 3],
bandindex, sigmalist[pickerindex])
ofs.write(line)
# end for
ofs.flush()
pickCountS += 1
# end if
# end if
# end for
traceCountP += len(st)
# end for
if (len(stations_nch) > 0 and len(stations_nch) == len(stations_ech)):
for codesn, codese in zip(stations_nch, stations_ech):
if (progTracker.increment()):
pass
else:
continue
stn = fds.get_waveforms(codesn[0], codesn[1], codesn[2], codesn[3],
curr,
curr + step,
trace_count_threshold=200)
ste = fds.get_waveforms(codese[0], codese[1], codese[2], codese[3],
curr,
curr + step,
trace_count_threshold=200)
try:
stn.merge(method=-1)
ste.merge(method=-1)
except Exception as e:
print(e)
continue
# end try
dropBogusTraces(stn)
dropBogusTraces(ste)
if (len(stn) == 0): continue
if (len(ste) == 0): continue
slon, slat = codesn[4], codesn[5]
for ei in eventIndices:
event = events[ei]
po = event.preferred_origin
da = gps2dist_azimuth(po.lat, po.lon, slat, slon)
mag = None
if (event.preferred_magnitude):
mag = event.preferred_magnitude.magnitude_value
elif (len(po.magnitude_list)):
mag = po.magnitude_list[0].magnitude_value
if (mag == None): mag = np.NaN
if (np.isnan(mag) or mag < min_magnitude): continue
result = extract_s(taupyModel, pickerlist_s, event, slon, slat, stn, ste, da[2],
plot_output_folder=plot_output_folder)
if (result):
picklist, residuallist, snrlist, bandindex, pickerindex = result
arcdistance = kilometers2degrees(da[0] / 1e3)
for ip, pick in enumerate(picklist):
line = '%s %f %f %f %f %f ' \
'%s %s %s %f %f %f ' \
'%f %f %f ' \
'%f %f %f %f %f ' \
'%d %d\n' % (
event.public_id, po.utctime.timestamp, mag, po.lon, po.lat, po.depthkm,
codesn[0], codesn[1], '00T', pick.timestamp, slon, slat,
da[1], da[2], arcdistance,
residuallist[ip], snrlist[ip, 0], snrlist[ip, 1], snrlist[ip, 2],
snrlist[ip, 3],
bandindex, sigmalist[pickerindex])
ofs.write(line)
# end for
ofs.flush()
pickCountS += 1
# end if
# end for
traceCountS += (len(stn) + len(ste))
# end for
# end if
# end if
curr += step
dayCount += 1
# wend
sw_stop = datetime.now()
totalTime = (sw_stop - sw_start).total_seconds()
gc.collect()
print(('(Rank %d: %5.2f%%, %d/%d) Processed %d traces and found %d p-arrivals and %d s-arrivals for ' \
'network %s station %s in %f s. Memory usage: %5.2f MB.' % \
(rank, (float(totalTraceCount) / float(workload) * 100) if workload > 0 else 100, totalTraceCount,
workload,
traceCountP + traceCountS, pickCountP, pickCountS, nc, sc, totalTime,
round(psutil.Process().memory_info().rss / 1024. / 1024., 2))))
# end for
ofp.close()
ofs.close()
print(('Processing complete on rank %d' % (rank)))
del fds
# Ensuring all processes have completed before merging results
comm.Barrier()
# Merge results on proc 0
if (rank == 0):
merge_results(output_path)
# end if
# end func
[docs]def merge_results(output_path):
search_strings = ['p_arrivals*', 's_arrivals*']
output_fns = ['p_combined.txt', 's_combined.txt']
for ss, ofn in zip(search_strings, output_fns):
files = recursive_glob(output_path, ss)
ofn = open('%s/%s' % (output_path, ofn), 'w+')
data = set()
for i, fn in enumerate(files):
lines = open(fn, 'r').readlines()
if (i == 0):
ofn.write(lines[0])
# end if
for j in range(1, len(lines)):
data.add(lines[j])
# end for
os.system('rm %s' % (fn))
# end for
for l in data:
ofn.write(l)
# end for
ofn.close()
# end for
# end func
if (__name__ == '__main__'):
process()
# end if