Source code for seismic.network_event_dataset

#!/usr/bin/env python
# coding: utf-8
"""Class encapsulating a collection of event waveforms for stations of one network.
"""

import os
import sys

from sortedcontainers import SortedDict
import obspy

from seismic.stream_io import read_h5_stream, write_h5_event_stream
from seismic.stream_processing import zne_order, zrt_order


# pylint: disable=invalid-name


[docs]class NetworkEventDataset: """Collection of 3-channel ZNE streams with traces aligned to a fixed time window about seismic P-wave arrival events, *for a given network*. Two indexes are provided. One indexes hierarchically by station code and event ID, yielding a 3-channel ZNE stream per event, so that you can easily gather all traces for a given station by iterating over events. The other index indexes hierarchically by event ID and station code, yielding a 3-channel ZNE stream per station. Using this index you can easily gather all traces for a given event across multiple stations. Preferably each input trace will already have an 'event_id' attribute in its stats. If not, an event ID will be invented based on station identifiers and time window. """ def __init__(self, stream_src, network=None, station=None, location='', ordering='ZNE'): """ Initialize from data source (file or obspy.Stream). Traces are COPIED into the dataset in order to leave input object intact, since many obspy functions mutate traces in-place. All streams in the input data source stream_src are expected to belong to the same network. This is checked as the data is ingested. A discrepant network code is an error condition. :param stream_src: Source of input streams. May be a file name or an Obspy Stream :type stream_src: str, pathlib.Path or obspy.Stream :param network: Network code of streams to load. If stream_src is an Obspy Stream, the \ streams will be filtered to match this network code. :type network: str :param station: Station code of streams to load. If stream_src is an Obspy Stream, the \ streams will be filtered to match this station code. :type station: str :param location: [OPTIONAL] Location code of streams to load. Leave as default (empty string) \ if location code is empty in the data source. :type location: str :param ordering: Channel ordering to be applied to the data after loading. The channel labelling \ must be consistent with the requested ordering - rotation to the coordinate system implied \ by the ordering is *NOT* applied. :type ordering: str :raises AssertionError: If discrepant network code is found in input data """ if isinstance(stream_src, obspy.Stream): net = network sta = station loc = location or None if net or sta or loc: data_src = stream_src.select(net, sta, loc) else: data_src = stream_src # end if elif os.path.isfile(stream_src): data_src = read_h5_stream(stream_src, network, station, location) else: assert False, "Unknown data source {}".format(type(stream_src)) # end if self.network = network # Data in data_src collects all traces together under a single Stream object. # In order to get control over data slicing and traceability in processing, we # break it down into one Stream per ZNE channel triplet of a given event. self.db_sta = SortedDict() for tr in data_src: net, sta, loc, _ = tr.id.split('.') if self.network: assert net == self.network else: self.network = net # end if # Create single copy of the trace to be shared by both dicts. dupe_trace = tr.copy() try: event_id = tr.stats.event_id except AttributeError: event_id = '.'.join([net, sta, loc, '_'.join([str(tr.stats.starttime), str(tr.stats.endtime)])]) # end try self.db_sta.setdefault(sta, SortedDict()).setdefault(event_id, obspy.Stream()).append(dupe_trace) # end for # Index same obspy.Stream instances in event dict. This way, any changes # to a given event stream will be seen by both indexes. self.db_evid = SortedDict() for sta, ev_db in self.db_sta.items(): for evid, stream in ev_db.items(): self.db_evid.setdefault(evid, SortedDict())[sta] = stream # end for # end for # Sort each stream into specific order. if ordering.upper() == 'ZNE': ordinal = zne_order elif ordering.upper() == 'ZRT': ordinal = zrt_order else: ordinal = None # end if if ordinal is not None: self.apply(lambda x: x.traces.sort(key=ordinal)) # end if # end func def __iter__(self): """ Flat iterator. Loops over self.db_sta depth first and returns tuple of keys and matching stream. Equivalent to:: ```Python for sta, ev_db in self.db_sta.items(): for evid, stream in ev_db.items(): yield (sta, evid, stream) ``` """ return ((sta, evid, stream) for sta, ev_db in self.db_sta.items() for evid, stream in ev_db.items()) # end if def __len__(self): """Returns number of streams""" return sum((len(x) for x in self.db_sta.values())) # end func def __repr__(self): """Displays summary string for all streams""" return '\n'.join((evid + ', ' + str(stream) for _, evid, stream in iter(self))) # end func
[docs] def num_stations(self): """ Get number of stations in the dataset. :return: Number of stations :rtype: int """ return len(self.db_sta)
# end func
[docs] def station(self, station_code): """ Accessor for events for a given station. :param station_code: Station to get :type station_code: str :return: Event index for station, if station is found :rtype: SortedDict """ return self.db_sta.get(station_code)
# end func
[docs] def num_events(self): """ Get number of events in the dataset. :return: Number of events :rtype: int """ return len(self.db_evid)
# end func
[docs] def event(self, event_id): """ Accessor for stations for a given event. :param event_id: ID of event to look up :type event_id: str :return: Station index for given event, if event ID is found, otherwise None :rtype: SortedDict or NoneType """ return self.db_evid.get(event_id)
# end func
[docs] def curate(self, curator): """ Curate the dataset according to a callable curator. Modifies collection in-place to remove streams that do not satisfy the curation criteria of the callable. Curator call signature must be consitent with:: callable(station_code, event_id, stream) -> bool The callable returns a boolean indicating whether to keep the Stream or not. :param curator: Function or callable delegate to adjudicate whether to keep each given stream. :type curator: Callable :return: None """ # Only need to loop over one db, since they both reference the same underlying Stream instances. PY2 = (sys.version_info[0] == 2) if PY2: from itertools import ifilterfalse as filterfalse # pylint: disable=no-name-in-module, import-outside-toplevel else: from itertools import filterfalse # pylint: disable=import-outside-toplevel # end if discard_items = [(x[0], x[1]) for x in filterfalse(lambda rec: curator(*rec), iter(self))] self.prune(discard_items)
# end func
[docs] def apply(self, _callable): """Apply a callable across all streams. Use to apply uniform processing steps to the whole dataset. :param _callable: Callable object that takes an obspy Stream as input and applies itself to that Stream. \ Expect that stream may be mutated in-place by the callable. :type _callable: Any Callable compatible with the call signature. :return: None """ for _1, _2, stream in iter(self): _callable(stream)
# end func
[docs] def by_station(self): """ Iterate over station sub-dictionaries. :return: Iterable over the stations, each element consisting of pair containing \ (station code, event dict). :rtype: Iterable(tuple) """ return iter(self.db_sta.items())
# end func
[docs] def by_event(self): """ Iterate over event sub-dictionaries. :return: Iterable over the discrete events, each element consisting of pair containing \ (event id, station dict). :rtype: Iterable(tuple) """ return iter(self.db_evid.items())
# end func
[docs] def prune(self, items, cull=True): """ Remove a given sequence of (station, event) pairs from the dataset. :param items: Iterable of (station, event) pairs :type items: Iterable(tuple) :param cull: If True, then empty entries in the top level index will be removed. :type cull: boolean :return: None """ for station, event_id in items: self.db_sta[station].pop(event_id) self.db_evid[event_id].pop(station) if cull: if not self.db_sta[station]: self.db_sta.pop(station) # end if if not self.db_evid[event_id]: self.db_evid.pop(event_id)
# end if # end if # end for # end func
[docs] def write(self, output_h5_filename, index_format='event'): """ Write event dataset back out to HDF5 file. :param output_h5_filename: Output file name :type output_h5_filename: str or path :param index_format: Format to use for index. Must be 'event' (default) or 'standard' (obspy default) :type index_format: str :return: True if file was written :rtype: boolean """ assert not os.path.exists(output_h5_filename), 'Output file already exists' if index_format not in ['event', 'standard']: raise ValueError('Index format %s not supported' % index_format) # end if all_stream = obspy.Stream() for sta, evid, stream in iter(self): all_stream += stream # end for if index_format == 'event': write_h5_event_stream(output_h5_filename, all_stream, mode='w') elif index_format == 'standard': all_stream.write(output_h5_filename, format='H5', mode='w') # end if return os.path.isfile(output_h5_filename)
# end func # end class