Source code for seismic.receiver_fn.rf_h5_file_event_iterator
#!/usr/bin/env python
"""Helper class to iterate over 3-channel event traces in h5 file generated by rf library,
without loading all the traces into memory. This is a scalable solution for very large files.
"""
import logging
import numpy as np
import h5py
from obspyh5 import dataset2trace
from rf import RFStream
from seismic.stream_processing import zne_order
logging.basicConfig()
# pylint: disable=invalid-name, logging-format-interpolation
[docs]class IterRfH5FileEvents(object):
"""Helper class to iterate over events in h5 file generated by extract_event_traces.py and pass
them to RF generator. This class avoids having to load the whole file up front via obspy which
is slow and not scalable.
Yields (station_id, event_id, event_time, stream)
Due to the expected hierarchy structure of the input H5 file, yielded event traces are grouped
by station ID.
Data yielded per event can easily be hundreds of kB in size, depending on the length of the
event traces. rf library defaults to window of (-50, 150) sec about the P wave arrival time.
"""
def __init__(self, h5_filename, memmap=False, channel_pattern=None):
"""
Initializer
:param h5_filename: Name of file containing event seismograms in HDF5 format, indexed in \
seismic.stream_io.EVENTIO_H5INDEX format
:type h5_filename: str or pathlib.Path
:param memmap: If True, memmap the open file. Can improve tractability of handling very large files.
:type memmap: bool
:param channel_pattern: [OPTIONAL] Ordered list of preferred channels, e.g. 'HH*,BH*'. Channel mask to use
to select channels returned.
:type channel_pattern: str
"""
self.h5_filename = h5_filename
self.num_components = 3
self.memmap_input = memmap
self.channel_pattern = channel_pattern
def _open_source_file(self):
if self.memmap_input:
try:
return h5py.File(self.h5_filename, 'r', driver='core', backing_store=False)
except OSError as e:
logger = logging.getLogger(__name__)
logger.error("Failure to memmap input file with error:\n{}\nReverting to default driver."
.format(str(e)))
# end if
return h5py.File(self.h5_filename, 'r')
# end if
def __iter__(self):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info("Scanning jobs metadata from file {}".format(self.h5_filename))
with self._open_source_file() as f:
wf_data = f['waveforms']
num_stations = len(wf_data)
count = 0
event_count = 0
create_event_id = False
first_loop = True
for station_id in wf_data:
count += 1
logger.info("Station {} {}/{}".format(station_id, count, num_stations))
station_data = wf_data[station_id]
for event_time in station_data:
event_traces = station_data[event_time]
if not event_traces:
continue
if first_loop:
first_loop = False
tmp = list(event_traces.keys())[0]
create_event_id = ('event_id' not in event_traces[tmp].attrs)
traces = []
for trace_id in event_traces:
trace = dataset2trace(event_traces[trace_id])
traces.append(trace)
stream = RFStream(traces=traces)
if len(stream) != self.num_components and self.channel_pattern is not None:
for ch_mask in self.channel_pattern.split(','):
_stream = stream.select(channel=ch_mask)
logging.info("Tried channel mask {}, got {} channels".format(ch_mask, len(_stream)))
if len(_stream) == self.num_components:
stream = _stream
break
# end for
# end if
if len(stream) != self.num_components:
logging.warning("Incorrect number of traces ({}) for stn {} event {}, skipping"
.format(len(stream), station_id, event_time))
continue
# end if
# Force order of traces to ZNE ordering.
stream.traces = sorted(stream.traces, key=zne_order)
# Strongly assert expected ordering of traces. This must be respected so that
# RF normalization works properly.
assert stream.traces[0].stats.channel[-1] == 'Z'
assert stream.traces[1].stats.channel[-1] == 'N'
assert stream.traces[2].stats.channel[-1] == 'E'
event_count += 1
if create_event_id:
event_id = event_count
else:
event_id = traces[0].stats.event_id
assert np.all([(tr.stats.event_id == event_id) for tr in traces])
# end if
yield station_id, event_id, event_time, stream
# end for
# end for
# end with
logger.info("Yielded {} event traces to process".format(event_count))
# end func