Source code for seismic.stream_io

#!/usr/bin/env python
"""Helper functions for seismic stream IO.
"""

import os
import itertools
import logging
import numpy as np
from sortedcontainers import SortedDict, SortedSet, SortedList
import obspy
from obspy.taup import TauPyModel
from obspy.io.sac.sactrace import SACTrace
import h5py
import obspyh5
from obspyh5 import dataset2trace, is_obspyh5, trace2group
from os.path import splitext

from seismic.units_utils import KM_PER_DEG
from rf.rfstream import rfstats, obj2stats
from rf.util import _get_stations
from obspy.geodetics import gps2dist_azimuth
from obspy.geodetics import kilometers2degrees

# pylint: disable=invalid-name


logging.basicConfig()


# Source event centric indexing format
EVENTIO_TF = '.datetime:%Y-%m-%dT%H:%M:%S'
EVENTIO_H5INDEX = (
    'waveforms/{network}.{station}.{location}/{event_time%s}/' % EVENTIO_TF +
    '{channel}_{starttime%s}_{endtime%s}' % (EVENTIO_TF, EVENTIO_TF)
    )

[docs]def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, phase='P', request_window=None, pad=10, pbar=None, **kwargs): """ Return iterator yielding three component streams per station and event. :param events: list of events or `~obspy.core.event.Catalog` instance :param inventory: `~obspy.core.inventory.inventory.Inventory` instance with station and channel information :param get_waveforms: Function returning the data. It has to take the arguments network, station, location, channel, starttime, endtime. :param phase: Considered phase, e.g. 'P', 'S', 'PP' :type request_window: tuple (start, end) :param request_window: requested time window around the onset of the phase :param float pad: add specified time in seconds to request window and trim afterwards again :param pbar: tqdm_ instance for displaying a progressbar :param kwargs: all other kwargs are passed to `~rf.rfstream.rfstats()` :return: three component streams with raw data Example usage with progressbar:: from tqdm import tqdm from rf.util import iter_event_data with tqdm() as t: for stream3c in iter_event_data(*args, pbar=t): do_something(stream3c) .. _tqdm: https://pypi.python.org/pypi/tqdm """ from rf.rfstream import rfstats, RFStream method = phase[-1].upper() if request_window is None: request_window = (-50, 150) if method == 'P' else (-100, 50) stations = _get_stations(inventory) if pbar is not None: pbar.total = len(events) * len(stations) for event, seedid in itertools.product(events, stations): if pbar is not None: pbar.update(1) origin_time = (event.preferred_origin() or event.origins[0])['time'] try: args = (seedid[:-1] + stations[seedid], origin_time) coords = inventory.get_coordinates(*args) except Exception: # station not available at that time continue stats = None if(use_rfstats): try: stats = rfstats(station=coords, event=event, phase=phase, **kwargs) except Exception as ex: from warnings import warn warn('Error "%s" in rfstats call for event %s, station %s.' % (ex, event.resource_id, seedid)) continue if not stats: continue else: stats = obj2stats(event, coords) dist_range = kwargs.get('dist_range') if(dist_range): dist, baz, _ = gps2dist_azimuth(stats.station_latitude, stats.station_longitude, stats.event_latitude, stats.event_longitude) dist = kilometers2degrees(dist / 1e3) if dist_range and not dist_range[0] <= dist <= dist_range[1]: continue # end if # end if # end if net, sta, loc, cha = seedid.split('.') if(use_rfstats): starttime = stats.onset + request_window[0] endtime = stats.onset + request_window[1] else: starttime = origin_time + request_window[0] endtime = origin_time + request_window[1] # end if kws = {'network': net, 'station': sta, 'location': loc, 'channel': cha, 'starttime': starttime - pad, 'endtime': endtime + pad} try: stream = get_waveforms(**kws) stream.trim(starttime, endtime) stream.merge() except Exception: # no data available continue if len(stream) != 3: from warnings import warn warn('Need 3 component seismograms. %d components ' 'detected for event %s, station %s.' % (len(stream), event.resource_id, seedid)) continue # end if if any(isinstance(tr.data, np.ma.masked_array) for tr in stream): def has_masked_values(data_stream): rval = False for tr in data_stream: if(isinstance(tr.data, np.ma.masked_array)): if (np.any(tr.data.mask)): rval = True break # end if # end if # end for return rval # end func if(has_masked_values(stream)): from warnings import warn warn('Gaps or overlaps detected for event %s, station %s.' % (event.resource_id, seedid)) continue else: for tr in stream: tr.data = np.array(tr.data) # end if # end for for tr in stream: tr.stats.update(stats) # end for yield RFStream(stream)
# end func
[docs]def safe_h5_root(src_file, root): # Origianlly, 'P' waveforms we re all written under '/waveforms' in write_h5_event_stream. # However, it has now been updated to be able to extract 'P', 'S' and 'Surface-wave' waveforms # and store them under 'P', 'S' and 'SW' sub-folders under '/waveforms', respectively. # This function returns a backward compatible H5 root. if (root.strip('/') == 'waveforms'): hf = h5py.File(src_file, 'r') keys = hf['waveforms'].keys() if (np.sum([item in keys for item in ['P', 'S', 'SW']])): root = '/waveforms/P' # set default to point ot 'P' waveforms # end if hf.close() # end if return root
# end func
[docs]def read_h5_stream(src_file, network=None, station=None, loc='', root='/waveforms'): """Helper function to load stream data from hdf5 file saved by obspyh5 HDF5 file IO. Typically the source file is generated using `extract_event_traces.py` script. For faster loading time, a particular network and station may be specified. :param src_file: File from which to load data :type src_file: str or Path :param network: Specific network to load, defaults to None :type network: str, optional :param station: Specific station to load, defaults to None :type station: str, optional :param root: Root path in hdf5 file where to start looking for data, defaults to '/waveforms' :type root: str, optional :return: All the loaded data in an obspy Stream. :rtype: obspy.Stream """ # Ensure backward compatibility root = safe_h5_root(src_file, root) logger = logging.getLogger(__name__) if (network is None and station is not None) or (network is not None and station is None): logger.warning("network and station should both be specified - IGNORING incomplete specification") group = root elif network and station: group = root + '/{}.{}.{}'.format(network.upper(), station.upper(), loc.upper()) else: group = root # end if stream = obspy.Stream() try: stream = obspy.read(src_file, format='h5', group=group) except Exception as e: print(str(e), ' Returning empty stream..') # end try return stream
# end func
[docs]def get_obspyh5_index(src_file, seeds_only=False, root='/waveforms'): """Scrape the index (only) from an obspyh5 file. :param src_file: Name of file to extract index from :type src_file: str or pathlib.Path :param seeds_only: If True, only get the seed IDs of the traces. Otherwise (default), get full index. :type seeds_only: bool :return: Sorted dictionary with index of waveforms in the file :rtype: sortedcontainers.SortedDict """ # We use SortedSet rather than SortedList for the inner container # because it allows set operations that are useful, such as finding # event IDs for events which are common across multiple stations. assert is_obspyh5(src_file), '{} is not an obspyh5 file'.format(src_file) index = SortedDict() with h5py.File(src_file, mode='r') as h5f: root = safe_h5_root(src_file, root) root_g = h5f[root] if seeds_only: return [key for key in root_g.keys()] # end if for seedid, station_grp in root_g.items(): for event_grp in station_grp.values(): evid = None for channel in event_grp.values(): evid = channel.attrs['event_id'] break # end for if evid: index.setdefault(seedid, SortedSet()).add(evid) # end if # end for # end for # end with return index
# end func
[docs]def iter_h5_stream(src_file, headonly=False, root='/waveforms'): """ Iterate over hdf5 file containing streams in obspyh5 format. :param src_file: Path to file to read :type src_file: str or pathlib.Path :param headonly: Only read trace stats, do not read actual time series data :type headonly: bool :yield: obspy.Stream containing traces for a single seismic event. """ assert is_obspyh5(src_file), '{} is not an obspyh5 file'.format(src_file) logger = logging.getLogger(__name__) fname = os.path.split(src_file)[-1] with h5py.File(src_file, mode='r') as h5f: root = safe_h5_root(src_file, root) root_g = h5f[root] for seedid, station_grp in root_g.items(): logger.info('{}: Group {}'.format(fname, seedid)) num_events = len(station_grp) for i, (_src_event_time, event_grp) in enumerate(station_grp.items()): traces = [] for _trace_id, channel in event_grp.items(): traces.append(dataset2trace(channel, headonly=headonly)) # end for evid = traces[0].stats.event_id for tr in traces[1:]: assert tr.stats.event_id == evid # end for logger.info('Event {} ({}/{})'.format(evid, i + 1, num_events)) yield seedid, evid, obspy.Stream(traces)
# end for # end for # end with # end func
[docs]def write_h5_event_stream(dest_h5_file, stream, index=EVENTIO_H5INDEX, mode='a', ignore=()): """ Write stream to HDF5 file in event indexed format using obspy. :param dest_h5_file: File in which to write the stream. :type dest_h5_file: str or pathlib.Path :param stream: The stream to write :type stream: obspy.Stream :param mode: Write mode, such as 'w' or 'a'. Use 'a' to iteratively write multiple streams to one file. :type mode: str :param ignore: List of headers to ignore when writing attributes to group. Passed on directly to obspyh5.writeh5 :type ignore: Any iterable of str """ # Lock down format of obspyh5 node layout in HDF5 to ensure compatibility with # custom iterators. assert mode.lower() != 'r', 'Write mode cannot be \'r\'' prior_index = obspyh5._INDEX obspyh5.set_index(index) stream.write(dest_h5_file, 'H5', mode=mode, ignore=ignore) obspyh5.set_index(prior_index)
# end func
[docs]def remove_group(hdf_fn, net_sta_loc, root='/waveforms', logger=None): try: if(os.path.exists(hdf_fn)): hdf_keys = get_obspyh5_index(hdf_fn, seeds_only=True, root=root) if(net_sta_loc in hdf_keys): del_key = 'waveforms/%s' % net_sta_loc with h5py.File(hdf_fn, "a") as fh: if (len(fh[del_key].keys())): del fh[del_key] if(logger): logger.info('Removing group {}'.format(del_key)) # end if # end with # end if # end if except: raise IOError('Failed to remove group: waveforms/{}'.format(net_sta_loc))
# end try # end func
[docs]def sac2hdf5(src_folder, basenames, channels, dest_h5_file, tt_model_id='iasp91'): """ Convert collection of SAC files from a folder into a single HDF5 stream file. :param src_folder: Path to folder containing SAC files :type src_folder: str or Path :param basenames: List of base filenames (file name excluding extension) to load. :type basenames: list of str :param channels: List of channels to load. For each base filename in basenames, there is expected to be a file with each channel as the filename extension. :type channels: List of str :param dest_h5_file: Path to output file. Will be created, or overwritten if already exists. :type dest_h5_file: str or Path :param tt_model_id: Which travel time earth model to use for synthesizing trace metadata. Must be known to obspy.taup.TauPyModel :type tt_model_id: str :return: None """ tt_model = TauPyModel(tt_model_id) traces = [] for basename, channel in itertools.product(basenames, channels): fname = os.path.join(src_folder, '.'.join([basename, channel])) channel_stream = obspy.read(fname, 'sac') tr = channel_stream[0] event_depth_km = tr.stats.sac['evdp'] dist_deg = tr.stats.sac['dist'] / KM_PER_DEG arrivals = tt_model.get_travel_times(event_depth_km, dist_deg, ('P',)) arrival = arrivals[0] inc = arrival.incident_angle slowness = arrival.ray_param_sec_degree src_dic = tr.stats.sac sac_tr = SACTrace(nzyear=src_dic['nzyear'], nzjday=src_dic['nzjday'], nzhour=src_dic['nzhour'], nzmin=src_dic['nzmin'], nzsec=src_dic['nzsec'], nzmsec=src_dic['nzmsec']) onset = sac_tr.reftime if 'nevid' in src_dic: event_id = src_dic['nevid'] else: event_id = basename # end if stats = {'distance': dist_deg, 'back_azimuth': src_dic['baz'], 'inclination': inc, 'onset': onset, 'slowness': slowness, 'phase': 'P', 'tt_model': tt_model_id, 'event_id': event_id} tr.stats.update(stats) traces.append(tr) # end for stream_all = obspy.Stream(traces) stream_all.write(dest_h5_file, 'H5')
# end func