from mpi4py import MPI
from lxml import etree as ET
import os, fnmatch
import re
from collections import defaultdict
from obspy import UTCDateTime
import numpy as np
from random import shuffle
[docs]def recursive_glob(treeroot, pattern):
results = []
for base, dirs, files in os.walk(treeroot):
goodfiles = fnmatch.filter(files, pattern)
results.extend(os.path.join(base, f) for f in goodfiles)
return results
# end func
[docs]def split_list(lst, npartitions):
k, m = divmod(len(lst), npartitions)
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(npartitions)]
# end func
[docs]class Origin:
def __init__(self, utctime, lat, lon, depthkm):
self.utctime = utctime
self.lat = lat
self.lon = lon
self.depthkm = depthkm
self.magnitude_list = []
# end func
# end class
[docs]class Event:
def __init__(self):
self.public_id = None
self.preferred_origin = None
self.preferred_magnitude = None
self.origin_list = []
# end func
# end class
[docs]class Magnitude:
def __init__(self, mag, mag_type):
self.magnitude_value = mag
self.magnitude_type = mag_type
# end func
# end class
[docs]class EventParser:
def __init__(self, xml_filename):
self.xml_filename = xml_filename
self.events = []
self.origins = []
self.magnitudes = []
for xmlevent, elem in ET.iterparse(xml_filename):
tag = ET.QName(elem.tag).localname
if (tag.upper() == 'EVENT'):
self.events.append(elem)
elif (tag.upper() == 'ORIGIN'):
self.origins.append(elem)
elif (tag.upper() == 'MAGNITUDE'):
self.magnitudes.append(elem)
# end if
# end for
self.origin_dict = defaultdict(list)
self.magnitude_dict = defaultdict(list)
# parse magnitudes
for m in self.magnitudes:
pid = None
try:
pid = int(re.split("[/=]", m.get('publicID'))[-1])
except:
pass
# end try
if(pid): self.magnitude_dict[pid] = self.parseMagnitude(m)
# end for
# parse origins
for o in self.origins:
pid = int(re.split("[/=]", o.get('publicID'))[-1])
self.origin_dict[pid] = self.parseOrigin(o)
# end for
assert len(self.origins) == len(self.origin_dict), 'Invalid originIDs found..'
self.event_dict = defaultdict(list)
# parse events
for e in self.events:
pid = int(re.split("[/=]", e.get('publicID'))[-1])
self.event_dict[pid] = self.parseEvent(e)
# end for
assert len(self.events) == len(self.event_dict), 'Invalid eventIDs found..'
# end func
[docs] def parseEvent(self, e):
resultEvent = Event()
origList = []
resultEvent.public_id = e.get('publicID')
for element in e:
if ('preferredOriginID' in element.tag):
oid = int(re.split("[/=]", element.text)[-1])
resultEvent.preferred_origin = self.origin_dict[oid]
elif('preferredMagnitudeID' in element.tag):
mid = int(re.split("[/=]", element.text)[-1])
resultEvent.preferred_magnitude = self.magnitude_dict[mid]
elif ('origin' in element.tag and \
'Reference' not in element.tag):
oid = int(re.split("[/=]", element.get('publicID'))[-1])
origList.append(self.origin_dict[oid])
# end if
# end for
resultEvent.origin_list = origList
return resultEvent
# end func
[docs] def parseOrigin(self, o):
utctime = None
lat = None
lon = None
depthkm = None
pref_mag = None
magList = []
for element in o:
if ('time' in element.tag):
for subelement in element:
if ('value' in subelement.tag):
utctime = UTCDateTime(subelement.text)
# end if
# end for
elif ('latitude' in element.tag):
for subelement in element:
if ('value' in subelement.tag):
lat = float(subelement.text)
# end if
# end for
# end if
elif ('longitude' in element.tag):
for subelement in element:
if ('value' in subelement.tag):
lon = float(subelement.text)
# end if
# end for
# end if
elif ('depth' in element.tag):
for subelement in element:
if ('value' in subelement.tag):
depthkm = float(subelement.text)
# end if
# end for
elif ('magnitude' in element.tag and \
'station' not in element.tag):
mid = None
try:
mid = int(re.split("[/=]", element.get('publicID'))[-1])
except:
pass
# end try
if(mid): magList.append(self.magnitude_dict[mid])
# end if
# end for
assert None not in [utctime, lat, lon], 'Failed to find required values for Origin'
o = Origin(utctime, lat, lon, depthkm)
o.magnitude_list = magList
return o
# end func
[docs] def parseMagnitude(self, m):
mag_value = None
mag_type = None
for element in m:
if ('magnitude' in element.tag):
for subelement in element:
if ('value' in subelement.tag):
mag_value = float(subelement.text)
# end if
# end for
elif ('type' in element.tag):
mag_type = element.text
# end if
# end for
return Magnitude(mag_value, mag_type)
# end func
[docs] def getEvents(self):
return self.event_dict
# end func
# end class
[docs]class Catalog():
def __init__(self, event_folder):
self.event_folder = event_folder
self.comm = MPI.COMM_WORLD
self.nproc = self.comm.Get_size()
self.rank = self.comm.Get_rank()
# retrieve list of all event xml files
self.xml_files = recursive_glob(self.event_folder, '*.xml')
shuffle(self.xml_files) # shuffle to avoid concentration of large files
self.proc_workload = split_list(self.xml_files, self.nproc)
# broadcast workload to all procs
self.proc_workload = self.comm.bcast(self.proc_workload, root=0)
print (('Rank %d: processing %d files' % (self.rank, len(self.proc_workload[self.rank]))))
self._load_events()
# end func
def _load_events(self):
eventList = []
poTimestamps = []
for ifn, fn in enumerate(self.proc_workload[self.rank]):
es = EventParser(fn).getEvents()
for i, (eid, e) in enumerate(es.iteritems()):
po = e.preferred_origin
if (not (po.depthkm >= 0)): continue
eventList.append(e)
poTimestamps.append(po.utctime.timestamp)
# end for
# end for
eventList = self.comm.allgather(eventList)
poTimestamps = self.comm.allgather(poTimestamps)
allEventList = []
allPOTimestamps = []
for iproc in np.arange(self.nproc):
for i, e in enumerate(eventList[iproc]):
allEventList.append(e)
allPOTimestamps.append(poTimestamps[iproc][i])
# end for
# end for
self.allPOTimestamps = np.array(allPOTimestamps)
self.allEventList = allEventList
if (self.rank == 0):
print ('Collected %d event origins' % (len(self.allEventList)))
hasPM = 0
hasMultipleMags = 0
for e in self.allEventList:
o = e.preferred_origin
if (e.preferred_magnitude): hasPM += 1
if (len(o.magnitude_list)): hasMultipleMags += 1
# end for
print ('%d preferred origins have a preferred magnitude' % (hasPM))
print ('%d preferred origins have at least one magnitude' % (hasMultipleMags))
# end if
# end func
[docs] def get_events(self):
return self.allEventList
# end func
[docs] def get_preferred_origin_timestamps(self):
return self.allPOTimestamps
# end func
# end class
[docs]class CatalogCSV:
def __init__(self, event_folder):
self.event_folder = event_folder
self.comm = MPI.COMM_WORLD
self.nproc = self.comm.Get_size()
self.rank = self.comm.Get_rank()
self.event_folder = event_folder
# retrieve list of all csv files
self.csv_files = sorted(recursive_glob(self.event_folder, '*.csv'))
self._load_events()
# end func
def _load_events(self):
eventList = []
poTimestamps = []
if(self.rank==0):
for ifn, fn in enumerate(self.csv_files):
print ('Reading %s' % (fn))
for line in open(fn, 'r'):
if(line[0]=='#'):
items = line.split(',')
vals = list(map(float, items[1:]))
year = int(vals[0])
month = int(vals[1])
day = int(vals[2])
hour = int(vals[3] if vals[3] >=0 else 0)
minute = int(vals[4] if vals[4] >=0 else 0)
second = vals[5] if vals[5] >=0 else 0
lon = vals[6]
lat = vals[7]
if (lon <- 180 or lon > 180): continue
if (lat <- 90 or lat > 90): continue
depth = vals[8] if vals[8] >=0 else 0
mb = vals[10]
ms = vals[11]
mi = vals[12]
mw = vals[13]
mag = 0
magtype='mw'
if(mw>0):
mag = mw
magtype='mw'
elif(ms>0):
mag = ms
magtype = 'ms'
elif(mb>0):
mag = mb
magtype = 'mb'
elif(mi>0):
mag = mi
magtype = 'mi'
# end if
eventid = vals[-1]
utctime = None
try:
utctime = UTCDateTime(year, month, day,
hour, minute, second)
except:
continue
# end try
origin = Origin(utctime, lat, lon, depth)
event = Event()
event.public_id = eventid
event.preferred_origin = origin
event.preferred_magnitude = Magnitude(mag, magtype)
eventList.append(event)
poTimestamps.append(origin.utctime.timestamp)
# end if
# end for
# end for
# end if
self.allEventList = self.comm.bcast(eventList, root=0)
self.allPOTimestamps = self.comm.bcast(poTimestamps, root=0)
self.allPOTimestamps = np.array(self.allPOTimestamps)
# end func
[docs] def get_events(self):
return self.allEventList
# end func
[docs] def get_preferred_origin_timestamps(self):
return self.allPOTimestamps
# end func
# end class
[docs]class ProgressTracker:
def __init__(self, output_folder, restart_mode=False):
self.output_folder = output_folder
self.restart_mode = restart_mode
self.comm = MPI.COMM_WORLD
self.nproc = self.comm.Get_size()
self.rank = self.comm.Get_rank()
self.prev_progress = 0 # progress from a previous run
self.progress = 0
self.proc_fn = os.path.join(output_folder, 'prog.%d.txt' % (self.rank))
if(self.restart_mode):
if(not os.path.exists(self.proc_fn)):
raise Exception('Progress file (%s) not found'%(self.proc_fn))
# end if
self.prev_progress = int(open(self.proc_fn).read())
# end if
# end func
[docs] def increment(self):
self.progress += 1
if(self.restart_mode and (self.prev_progress > 0) and (self.progress <= self.prev_progress)):
return False
else:
tmpfn = self.proc_fn + '.tmp'
f = open(tmpfn, 'w+')
f.write(str(self.progress))
f.close()
os.rename(tmpfn, self.proc_fn)
return True
# end if
# end func
# end class