Source code for seismic.pick_harvester.createEnsembleXML

#!/bin/env python
"""
Description:
    This script was initially written for inserting new picks into the ISC catalogue.
    We now use a unified csv catalogue (that Babak has prepared) and this script merges existing
    picks with those picked by our parallel picker and creates self-consistent SC3ML files
    to be ingested into Seiscomp3.

CreationDate:   20/11/18
Developer:      rakib.hassan@ga.gov.au

Revision History:
    LastUpdate:     20/11/18   RH

"""

import click
import os, glob, fnmatch, sys
from random import shuffle
from obspy import UTCDateTime, read_events, read_inventory
from obspy.taup.taup_geo import calc_dist
from obspy.clients.iris import Client as IrisClient
from obspy.clients.fdsn import Client
from obspy.taup import TauPyModel
from obspy.signal.trigger import trigger_onset, z_detect, classic_sta_lta, recursive_sta_lta, ar_pick
from obspy.signal.rotate import rotate_ne_rt
from obspy.core.event import Pick as OPick, \
    CreationInfo as OCreationInfo, \
    WaveformStreamID as OWaveformStreamID, \
    ResourceIdentifier as OResourceIdentifier, \
    Arrival as OArrical, Event as OEvent,\
     Origin as OOrigin, Arrival as OArrival, \
    OriginQuality as OOriginQuality, Magnitude as OMagnitude, \
    Comment as OComment, Catalog as OCatalog
from ordered_set import OrderedSet as set
import numpy as np
import scipy
from scipy.spatial import cKDTree
from mpi4py import MPI
from collections import defaultdict
from pykml import parser
import copy
from obspy.geodetics.base import gps2dist_azimuth, kilometers2degrees

from seismic.pick_harvester.utils import recursive_glob, split_list
import logging
from tqdm import tqdm

[docs]def setup_logger(name, log_file, level=logging.INFO): """ Function to setup a logger; adapted from stackoverflow """ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler = logging.FileHandler(log_file, mode='w') handler.setFormatter(formatter) logger = logging.getLogger(name+log_file) logger.setLevel(level) logger.addHandler(handler) return logger
# end func
[docs]class Origin: __slots__ = ['utctime', 'lat', 'lon', 'depthkm', 'magnitude_list', 'arrival_list'] def __init__(self, utctime, lat, lon, depthkm): self.utctime = utctime self.lat = lat self.lon = lon self.depthkm = depthkm self.magnitude_list = [] self.arrival_list = []
# end func # end class
[docs]class Event: __slots__ = ['public_id', 'preferred_origin', 'preferred_magnitude', 'origin_list'] def __init__(self): self.public_id = None self.preferred_origin = None self.preferred_magnitude = None self.origin_list = []
# end func # end class
[docs]class Magnitude: __slots__ = ['magnitude_value', 'magnitude_type'] def __init__(self, mag, mag_type): self.magnitude_value = mag self.magnitude_type = mag_type
# end func # end class
[docs]class Arrival: __slots__ = ['net', 'sta', 'loc', 'cha', 'lon', 'lat', 'elev', 'phase', 'utctime', 'distance'] def __init__(self, net, sta, loc, cha, lon, lat, elev, phase, utctime, distance): self.net = net self.sta = sta self.loc = loc self.cha = cha self.lon = lon self.lat = lat self.elev = elev self.phase = phase self.utctime = utctime self.distance = distance
# end func # end class
[docs]class FDSNInv:
[docs] def rtp2xyz(self, r, theta, phi): if(type(r)==float): xout = [0,0,0] rst = r * np.sin(theta); xout[0] = rst * np.cos(phi) xout[1] = rst * np.sin(phi) xout[2] = r * np.cos(theta) else: xout = np.zeros((r.shape[0], 3)) rst = r * np.sin(theta); xout[:, 0] = rst * np.cos(phi) xout[:, 1] = rst * np.sin(phi) xout[:, 2] = r * np.cos(theta) # end if return xout
# end func def __init__(self, fn, host=None, port=None): def tree(): def the_tree(): return defaultdict(the_tree) # end func return the_tree() # end func if(host and port): self.host = host self.port = port try: self.client = Client('http://'+self.host+':%d'%self.port) if not self.client: raise Exception('Connection failed..') except Exception as e: print (e) print ('Failed to connect to client. Aborting..') # end try self.inv = self.client.get_stations(level='station') else: self.fn = fn self.inv = read_inventory(fn) # end if self.t = tree() for n in self.inv.networks: for s in n.stations: self.t[n.code][s.code] = [s.longitude, s.latitude, s.elevation] # end for # end for self.nsList = [] self.nsCoordsList = [] for nk, n in self.t.items(): for sk, s in n.items(): self.nsList.append('%s.%s'%(nk, sk)) self.nsCoordsList.append(s) #end for # end for self.nsList = np.array(self.nsList) self.nsCoordsList = np.array(self.nsCoordsList) ncoords = self.nsCoordsList.shape[0] self.xyz = self.rtp2xyz(6371e3*np.ones(ncoords), np.radians(90-self.nsCoordsList[:,1]), np.radians(self.nsCoordsList[:,0])) self.kdtree = cKDTree(self.xyz) # end func
[docs] def getClosestStation(self, lon, lat, maxdist=1e3): xyz = self.rtp2xyz(6371e3, np.radians(90-lat), np.radians(lon)) d, i = self.kdtree.query(xyz, distance_upper_bound=maxdist) if(d != np.inf): return self.nsList[i], self.nsCoordsList[i] else: return None
# end func # end func # end class
[docs]class Catalog(): def __init__(self, isc_coords_file, fdsn_inventory, our_picks, event_folder, output_path, discard_old_picks=False): self.event_folder = event_folder self.output_path = output_path self.comm = MPI.COMM_WORLD self.nproc = self.comm.Get_size() self.rank = self.comm.Get_rank() self.counter = 0 self.discard_old_picks = discard_old_picks self.logger = setup_logger('log.%d'%(self.rank), '%s/log.%d.txt'%(self.output_path, self.rank)) self.isc_coords_file = isc_coords_file k = parser.fromstring(open(self.isc_coords_file, 'rb').read()) ns = list(k.nsmap.values())[0] elist=k.findall('.//{%s}Placemark'%(ns)) self.isc_coords_dict = defaultdict(list) for e in elist: self.isc_coords_dict[e['name']] = list(map(float, (str(e['Point']['coordinates'])).split(','))) # end for # retrieve list of all event xml files self.event_folder = event_folder # retrieve list of all csv files self.csv_files = sorted(recursive_glob(self.event_folder, '*.csv')) self.fdsn_inventory = fdsn_inventory self.our_picks = our_picks self._load_events() # end func def _load_events_helper(self): eventList = [] poTimestamps = [] lines = [] for ifn, fn in enumerate(self.csv_files): print (('Reading %s' % (fn))) for iline, line in enumerate(open(fn, 'r').readlines()): lines.append(line) # end for # end for if(self.rank==0): eid_set = set() for iline, line in enumerate(lines): evtline = '' if(line[0]=='#'): items = line.split(',') vals = list(map(float, items[1:])) year = int(vals[0]) month = int(vals[1]) day = int(vals[2]) hour = int(vals[3]) minute = int(vals[4]) second = vals[5] lon = vals[6] lat = vals[7] if (lon <- 180 or lon > 180): continue if (lat <- 90 or lat > 90): continue depth = vals[8] if vals[8] >=0 else 0 mb = vals[10] ms = vals[11] mi = vals[12] mw = vals[13] mag = 0 magtype='mw' if(mw>0): mag = mw magtype='mw' elif(ms>0): mag = ms magtype = 'ms' elif(mb>0): mag = mb magtype = 'mb' elif(mi>0): mag = mi magtype = 'mi' # end if eventid = vals[-1] if(eventid in eid_set): raise RuntimeError('Duplicate event-id found. Aborting..') else: eid_set.add(eventid) # end if utctime = None try: utctime = UTCDateTime(year, month, day, hour, minute, second) except Exception: continue # end try origin = Origin(utctime, lat, lon, depth) event = Event() event.public_id = int(eventid) event.preferred_origin = origin event.preferred_magnitude = Magnitude(mag, magtype) eventList.append(event) poTimestamps.append(origin.utctime.timestamp) else: eventList[-1].preferred_origin.arrival_list.append(iline) # end if #if(iline%1000==0): print iline # end for eventList = split_list(eventList, self.nproc) poTimestamps = split_list(poTimestamps, self.nproc) # end if # broadcast workload to all procs eventList = self.comm.scatter(eventList, root=0) poTimestamps = self.comm.scatter(poTimestamps, root=0) print (('Processing %d events on rank %d'%(len(eventList), self.rank))) for e in eventList: lineIndices = copy.copy(e.preferred_origin.arrival_list) e.preferred_origin.arrival_list = [] for lineIndex in lineIndices: items = lines[lineIndex].split(',') vals = list(map(float, items[8:])) year = int(vals[0]) month = int(vals[1]) day = int(vals[2]) hour = int(vals[3]) minute = int(vals[4]) second = vals[5] utctime = None try: utctime = UTCDateTime(year, month, day, hour, minute, second) except Exception: continue # end try try: lon = float(items[4]) except: lon = 0 try: lat = float(items[5]) except: lat = 0 try: elev = float(items[6]) except: elev = 0 distance = vals[-1] a = Arrival(items[3].strip(), items[0].strip(), items[2].strip(), items[1].strip(), lon, lat, elev, items[7].strip(), utctime, distance) e.preferred_origin.arrival_list.append(a) # end for # end for self.eventList = eventList self.poTimestamps = np.array(poTimestamps) # end func
[docs] def get_id(self): self.counter += 1 return str(self.counter) + 'r%d'%self.rank
# end func def _load_events(self): self._load_events_helper() cache = {} notFound = defaultdict(int) oEvents = [] missingStations = defaultdict(int) lines = [] for e in tqdm(self.eventList, desc='Rank %d'%(self.rank)): if(e.preferred_origin and len(e.preferred_origin.arrival_list)): cullList = [] for a in e.preferred_origin.arrival_list: if(len(a.net)): continue seedid = '%s.%s.%s.%s'%(a.net, a.sta, a.loc, a.cha) newCode = None if(seedid not in cache): sc = a.sta lonlat = self.isc_coords_dict[sc] if(len(lonlat)==0): cullList.append(a) continue # end if r = self.fdsn_inventory.getClosestStation(lonlat[0], lonlat[1], maxdist=1e3) # 1km #if(a.sta=='KUM'): print a.net, a.sta, a.loc, a.cha, r if(not r): notFound[sc]+=1 else: c = r[0].split('.')[0] newCode = c # end if if(newCode): cache[seedid] = newCode # end if else: newCode = cache[seedid] # end if if(newCode): #print a.net, newCode a.net = newCode sc = self.fdsn_inventory.t[a.net][a.sta] if(type(sc)==defaultdict): cullList.append(a) continue # end if da = gps2dist_azimuth(e.preferred_origin.lat, e.preferred_origin.lon, sc[1], sc[0]) dist = kilometers2degrees(da[0]/1e3) if(np.fabs(a.distance-dist)>0.5): #print ([e.preferred_origin.lon, e.preferred_origin.lat, sc[0], sc[1]]) cullList.append(a) # end if # end if # end for for c in cullList: e.preferred_origin.arrival_list.remove(c) # end if # Create obspy event object ci = OCreationInfo(author='GA', creation_time=UTCDateTime(), agency_id='GA-iteration-1') oid = self.get_id() origin = OOrigin(resource_id=OResourceIdentifier(id=oid), time=UTCDateTime(e.preferred_origin.utctime), longitude=e.preferred_origin.lon, latitude=e.preferred_origin.lat, depth=e.preferred_origin.depthkm*1e3, method_id=OResourceIdentifier(id='unknown'), earth_model_id=OResourceIdentifier(id='iasp91'), evaluation_mode='automatic', creation_info=ci) magnitude = OMagnitude(resource_id=OResourceIdentifier(id=self.get_id()), mag=e.preferred_magnitude.magnitude_value, magnitude_type=e.preferred_magnitude.magnitude_type, origin_id=OResourceIdentifier(id=oid), creation_info=ci) event = OEvent(resource_id=OResourceIdentifier(id=str(e.public_id)), creation_info=ci, event_type='earthquake') event.origins = [origin] event.magnitudes = [magnitude] event.preferred_magnitude_id = magnitude.resource_id event.preferred_origin_id = origin.resource_id # Insert old picks if(not self.discard_old_picks): for a in e.preferred_origin.arrival_list: if(type(self.fdsn_inventory.t[a.net][a.sta]) == defaultdict): missingStations[a.net+'.'+a.sta] += 1 continue # end if oldPick = OPick( resource_id=OResourceIdentifier(id=self.get_id()), time=UTCDateTime(a.utctime), waveform_id=OWaveformStreamID(network_code=a.net, station_code=a.sta, channel_code=a.cha), methodID=OResourceIdentifier('unknown'), phase_hint=a.phase, evaluation_mode='automatic', creation_info=ci ) oldArr = OArrival( resource_id=OResourceIdentifier(id=oldPick.resource_id.id+"#"), pick_id=oldPick.resource_id, phase=oldPick.phase_hint, distance=a.distance, earth_model_id=OResourceIdentifier('quakeml:ga.gov.au/earthmodel/iasp91'), creation_info=ci ) event.picks.append(oldPick) event.preferred_origin().arrivals.append(oldArr) # polulate list for text output line = [str(e.public_id), '{:<25s}', e.preferred_origin.utctime.timestamp, '{:f}', e.preferred_magnitude.magnitude_value, '{:f}', e.preferred_origin.lon, '{:f}', e.preferred_origin.lat, '{:f}', e.preferred_origin.depthkm, '{:f}', a.net, '{:<5s}', a.sta, '{:<5s}', a.cha, '{:<5s}', a.utctime.timestamp, '{:f}', a.phase, '{:<5s}', self.fdsn_inventory.t[a.net][a.sta][0], '{:f}', self.fdsn_inventory.t[a.net][a.sta][1], '{:f}', -999, '{:f}', -999, '{:f}', a.distance, '{:f}', -999, '{:f}', -999, '{:f}', -999, '{:f}', -999, '{:f}', -999, '{:f}', -999, '{:d}', -999, '{:d}'] lines.append(line) # end for # end if # Insert our picks opList = self.our_picks.picks[e.public_id] if(len(opList)): for op in opList: if(type(self.fdsn_inventory.t[op[1]][op[2]]) == defaultdict): missingStations[op[1]+'.'+op[2]] += 1 continue # end if newPick = OPick( resource_id=OResourceIdentifier(id=self.get_id()), time=UTCDateTime(op[0]), waveform_id=OWaveformStreamID(network_code=op[1], station_code=op[2], channel_code=op[3]), methodID=OResourceIdentifier('phasepapy/aicd'), backazimuth=op[-1], phase_hint=op[4], evaluation_mode='automatic', comments= [OComment(text = 'phasepapy_snr = ' + str(op[6][0]) + ', quality_measure_cwt = ' + str(op[6][1]) + ', dom_freq = ' + str(op[6][2]) + ', quality_measure_slope = ' + str(op[6][3]) + ', band_index = ' + str(op[6][4]) + ', nsigma = ' + str(op[6][5]), force_resource_id=False)], creation_info=ci ) newArr = OArrival( resource_id=OResourceIdentifier(id=newPick.resource_id.id+"#"), pick_id=newPick.resource_id, phase=newPick.phase_hint, azimuth=op[-2], distance=op[-3], time_residual=op[5], time_weight=1., earth_model_id=OResourceIdentifier('quakeml:ga.gov.au/earthmodel/iasp91'), creation_info=ci ) event.picks.append(newPick) event.preferred_origin().arrivals.append(newArr) # polulate list for text output line = [str(e.public_id), '{:<25s}', e.preferred_origin.utctime.timestamp, '{:f}', e.preferred_magnitude.magnitude_value, '{:f}', e.preferred_origin.lon, '{:f}', e.preferred_origin.lat, '{:f}', e.preferred_origin.depthkm, '{:f}', op[1], '{:<5s}', op[2], '{:<5s}', op[3], '{:<5s}', UTCDateTime(op[0]).timestamp, '{:f}', op[4], '{:<5s}', op[10], '{:f}', op[9], '{:f}', op[12], '{:f}', op[13], '{:f}', op[11], '{:f}', op[5], '{:f}', op[6][0], '{:f}', op[6][1], '{:f}', op[6][2], '{:f}', op[6][3], '{:f}', int(op[6][4]), '{:d}', int(op[6][5]), '{:d}'] lines.append(line) # end for # end if quality= OOriginQuality(associated_phase_count= len(e.preferred_origin.arrival_list) * \ int(self.discard_old_picks) + \ len(self.our_picks.picks[e.public_id]), used_phase_count=len(e.preferred_origin.arrival_list) * \ int(self.discard_old_picks) + \ len(self.our_picks.picks[e.public_id])) event.preferred_origin().quality = quality if(len(self.our_picks.picks[e.public_id])==0 and self.discard_old_picks): continue # end if oEvents.append(event) # end for // loop over e if(len(missingStations)): for k, v in missingStations.items(): self.logger.warning('Missing station %s: %d picks'%(k, v)) # end for # end if # write xml output if(len(oEvents)): cat = OCatalog(events=oEvents) ofn = self.output_path + '/%d.xml'%(self.rank) cat.write(ofn, format='SC3ML') # end if # write text output procfile = open('%s/proc.%d.txt' % (self.output_path, self.rank), 'w+') for line in lines: lineout = ' '.join(line[1::2]).format(*line[::2]) procfile.write(lineout + '\n') # end for procfile.close() # combine text output header = '#eventID originTimestamp mag originLon originLat originDepthKm net sta cha pickTimestamp phase stationLon stationLat az baz distance ttResidual snr qualityMeasureCWT domFreq qualityMeasureSlope bandIndex nSigma\n' self.comm.barrier() if (self.rank == 0): of = open('%s/ensemble.txt'%(self.output_path), 'w+') of.write(header) for i in range(self.nproc): fn = '%s/proc.%d.txt' % (self.output_path, i) lines = open(fn, 'r').readlines() for line in lines: of.write(line) # end for if (os.path.exists(fn)): os.remove(fn) # end for of.close()
# end if # end func # end class
[docs]class OurPicks: def __init__(self, fnList, phaseList): self.fnList = fnList self.picks = defaultdict(list) for fn, phase in zip(self.fnList, phaseList): for iline, line in enumerate(open(fn, 'r')): if(iline == 0): continue items = line.split() """ 0 eventID 1 originTimestamp 2 mag 3 originLon 4 originLat 5 originDepthKm 6 net 7 sta 8 cha 9 pickTimestamp 10 stationLon 11 stationLat 12 az 13 baz 14 distance 15 ttResidual 16 snr 17 qualityMeasureCWT 18 domFreq 19 qualityMeasureSlope 20 bandIndex 21 nSigma """ t = float(items[9]) d = float(items[14]) baz = float(items[13]) az = float(items[12]) snr = float(items[16]) quality_measure_cwt = float(items[17]) dom_freq = float(items[18]) quality_measure_slope = float(items[19]) bi = int(items[20]) nsigma = int(items[21]) mag = float(items[2]) elon = float(items[3]) elat = float(items[4]) ed = float(items[5]) slon = float(items[10]) slat = float(items[11]) residual = float(items[15]) net = items[6] sta = items[7] cha = items[8] auxData = [snr, quality_measure_cwt, dom_freq, quality_measure_slope, bi, nsigma] self.picks[int(float(items[0]))].append([t, net, sta, cha, phase, residual, auxData, elat, elon, slat, slon, d, az, baz]) # end for print (('Read %s'%fn))
# end for # end func # end class CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.command(context_settings=CONTEXT_SETTINGS) @click.argument('event-folder', required=True, type=click.Path(exists=True)) @click.argument('inventory', required=True, type=click.Path(exists=True)) @click.argument('isc-station-coords', required=True, type=click.Path(exists=True)) @click.argument('output-path', required=True, type=click.Path(exists=True)) @click.option('--p-arrivals', default=None, help='Text file containing p-arrivals') @click.option('--s-arrivals', default=None, help='Text file containing s-arrivals') @click.option('--discard-old-picks', default=False, is_flag=True, help='Discards picks in the events catalog; only keeps ' 'picks provided through p- and s-arrivals') def process(event_folder, inventory, isc_station_coords, output_path, p_arrivals, s_arrivals, discard_old_picks): """ EVENT_FOLDER: Folder containing CSV Catalogue(s) \n INVENTORY: Station inventory in FDSNstationxml format \n ISC_STATION_COORDS: ISC station coordinates in kml format \n OUTPUT_PATH: Path to output folder \n usage: mpirun -np 112 python createEnsembleXML.py /g/data/ha3/Passive/Events/Unified/ sc3Inventory.xml stations.kml \ updatedEvents/ --p-arrivals p_combined.txt --s-arrivals s_combined.txt """ arrival_files = [] arrival_types = [] if (p_arrivals): arrival_files.append(p_arrivals) arrival_types.append('P') if (s_arrivals): arrival_files.append(s_arrivals) arrival_types.append('S') fi = FDSNInv(inventory) op = OurPicks(arrival_files, arrival_types) c = Catalog(isc_coords_file=isc_station_coords, fdsn_inventory=fi, our_picks=op, event_folder=event_folder, output_path=output_path, discard_old_picks=discard_old_picks) # end func if (__name__ == '__main__'): process() # end if