Source code for seismic.receiver_fn.rf_h5_file_station_iterator
#!/usr/bin/env python
"""Helper class to iterate over station events in h5 file generated by rf library,
without loading all the traces into memory. This is a scalable solution for very large files.
"""
import logging
import h5py
from obspyh5 import dataset2trace
from rf import RFStream
logging.basicConfig()
# pylint: disable=invalid-name, logging-format-interpolation
[docs]class IterRfH5StationEvents(object):
"""Helper class to iterate over stations in h5 file generated by extract_event_traces.py and pass
them to RF generator. This class avoids having to load the whole file up front via obspy which
is slow and not scalable.
This class yields 3-channel traces per station per event.
Data yielded per station can easily be many MB in size.
"""
def __init__(self, h5_filename, memmap=False):
"""Initializer
:param h5_filename: Name of file containing event seismograms in HDF5 format, indexed in \
seismic.stream_io.EVENTIO_H5INDEX format
:type h5_filename: str or pathlib.Path
:param memmap: If True, memmap the open file. Can improve tractability of handling very large files.
:type memmap: bool
"""
self.h5_filename = h5_filename
# self.num_components = 3
self.memmap_input = memmap
def _open_source_file(self):
if self.memmap_input:
try:
return h5py.File(self.h5_filename, 'r', driver='core', backing_store=False)
except OSError as e:
logger = logging.getLogger(__name__)
logger.error("Failure to memmap input file with error:\n{}\nReverting to default driver."
.format(str(e)))
# end if
return h5py.File(self.h5_filename, 'r')
# end if
def __iter__(self):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info("Scanning station groups from file {}".format(self.h5_filename))
with self._open_source_file() as f:
wf_data = f['waveforms']
num_stations = len(wf_data)
count = 0
event_count = 0
for station_id in wf_data:
count += 1
logger.info("Station {} {}/{}".format(station_id, count, num_stations))
station_data = wf_data[station_id]
station_stream3c = []
for event_time in station_data:
event_traces = station_data[event_time]
# if len(event_traces) != self.num_components:
# logging.warning("Incorrect number of traces ({}) for stn {} event {}, skipping"
# .format(len(event_traces), station_id, event_time))
# continue
traces = []
for trace_id in event_traces:
trace = dataset2trace(event_traces[trace_id])
traces.append(trace)
event_count += 1
station_stream3c.append(RFStream(traces=traces).sort())
# end for
# Yield the results with 3-channel trace triplets grouped together in RFStream instances.
yield station_id, station_stream3c
# end for
# end with
logger.info("Yielded {} event traces to process".format(event_count))
# end func