Source code for seismic.receiver_fn.rf_h5_file_station_iterator
#!/usr/bin/env python
"""Helper class to iterate over station events in h5 file generated by rf library,
without loading all the traces into memory. This is a scalable solution for very large files.
"""
import logging
import h5py
from obspyh5 import dataset2trace
from rf import RFStream
logging.basicConfig()
# pylint: disable=invalid-name, logging-format-interpolation
[docs]class IterRfH5StationEvents(object):
"""Helper class to iterate over stations in h5 file generated by extract_event_traces.py and pass
them to RF generator. This class avoids having to load the whole file up front via obspy which
is slow and not scalable.
Data yielded per station can easily be many MB in size.
"""
def __init__(self, h5_filename, memmap=False):
self.h5_filename = h5_filename
# self.num_components = 3
self.memmap_input = memmap
def _open_source_file(self):
if self.memmap_input:
try:
return h5py.File(self.h5_filename, 'r', driver='core', backing_store=False)
except OSError as e:
logger = logging.getLogger(__name__)
logger.error("Failure to memmap input file with error:\n{}\nReverting to default driver."
.format(str(e)))
# end if
return h5py.File(self.h5_filename, 'r')
# end if
def __iter__(self):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info("Scanning station groups from file {}".format(self.h5_filename))
with self._open_source_file() as f:
wf_data = f['waveforms']
num_stations = len(wf_data)
count = 0
event_count = 0
for station_id in wf_data:
count += 1
logger.info("Station {} {}/{}".format(station_id, count, num_stations))
station_data = wf_data[station_id]
station_stream3c = []
for event_time in station_data:
event_traces = station_data[event_time]
# if len(event_traces) != self.num_components:
# logging.warning("Incorrect number of traces ({}) for stn {} event {}, skipping"
# .format(len(event_traces), station_id, event_time))
# continue
traces = []
for trace_id in event_traces:
trace = dataset2trace(event_traces[trace_id])
traces.append(trace)
event_count += 1
station_stream3c.append(RFStream(traces=traces).sort())
# end for
# Yield the results with 3-channel trace triplets grouped together in RFStream instances.
yield station_id, station_stream3c
# end for
# end with
logger.info("Yielded {} event traces to process".format(event_count))
# end func