Source code for seismic.receiver_fn.rf_h5_file_station_iterator

#!/usr/bin/env python
"""Helper class to iterate over station events in h5 file generated by rf library,
   without loading all the traces into memory. This is a scalable solution for very large files.
"""

import logging

import h5py
from obspyh5 import dataset2trace
from rf import RFStream

logging.basicConfig()

# pylint: disable=invalid-name, logging-format-interpolation

[docs]class IterRfH5StationEvents(object): """Helper class to iterate over stations in h5 file generated by extract_event_traces.py and pass them to RF generator. This class avoids having to load the whole file up front via obspy which is slow and not scalable. Data yielded per station can easily be many MB in size. """ def __init__(self, h5_filename, memmap=False): self.h5_filename = h5_filename # self.num_components = 3 self.memmap_input = memmap def _open_source_file(self): if self.memmap_input: try: return h5py.File(self.h5_filename, 'r', driver='core', backing_store=False) except OSError as e: logger = logging.getLogger(__name__) logger.error("Failure to memmap input file with error:\n{}\nReverting to default driver." .format(str(e))) # end if return h5py.File(self.h5_filename, 'r') # end if def __iter__(self): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Scanning station groups from file {}".format(self.h5_filename)) with self._open_source_file() as f: wf_data = f['waveforms'] num_stations = len(wf_data) count = 0 event_count = 0 for station_id in wf_data: count += 1 logger.info("Station {} {}/{}".format(station_id, count, num_stations)) station_data = wf_data[station_id] station_stream3c = [] for event_time in station_data: event_traces = station_data[event_time] # if len(event_traces) != self.num_components: # logging.warning("Incorrect number of traces ({}) for stn {} event {}, skipping" # .format(len(event_traces), station_id, event_time)) # continue traces = [] for trace_id in event_traces: trace = dataset2trace(event_traces[trace_id]) traces.append(trace) event_count += 1 station_stream3c.append(RFStream(traces=traces).sort()) # end for # Yield the results with 3-channel trace triplets grouped together in RFStream instances. yield station_id, station_stream3c # end for # end with logger.info("Yielded {} event traces to process".format(event_count))
# end func