Source code for seismic.stream_processing

#!/usr/bin/env python
"""Utility stream processing functions.
"""

import copy
import functools
import numbers
import json

import numpy as np


[docs]def zne_order(tr): """Channel ordering sort key function :param tr: Trace whose ordinal is to be determined. :type tr: obspy.Trace or RFTrace :return: Numeric index indicating ZNE sort order of traces in a stream """ trace_ordering = {'Z': 0, 'N': 1, 'E': 2} component = tr.stats.channel[-1].upper() if tr.stats.channel else False if component and component in trace_ordering: return trace_ordering[component] else: return 3
# end func
[docs]def zrt_order(tr): """Channel ordering sort key function :param tr: Trace whose ordinal is to be determined. :type tr: obspy.Trace or RFTrace :return: Numeric index indicating ZRT sort order of traces in a stream """ trace_ordering = {'Z': 0, 'R': 1, 'T': 2} component = tr.stats.channel[-1].upper() if tr.stats.channel else False if component and component in trace_ordering: return trace_ordering[component] else: return 3
# end func
[docs]def sinc_resampling(t, y, t_new): """Resample signal y for known times t onto new times t_new. Sampling rates do not need to match and time windows do not need to overlap. t_new should not have a lower sampling rate than t. :param t: numpy.array of times :param y: numpy.array of sample values :param t_new: numpy.array of new times to interpolate onto :return: numpy.array of new interpolated sample values """ dt = np.mean(np.diff(t)) Ts, T = np.meshgrid(t_new, t) y_new = np.matmul(np.sinc((Ts.T - T.T) / dt), y) return y_new
# end func
[docs]def back_azimuth_filter(baz, baz_range): """Check if back azimuth `baz` is within range. Inputs must be in the range [0, 360] degrees. :param baz: Value to check :type baz: int or float :param baz_range: Pair of angles in degrees. :type baz_range: List or array of 2 floats, min and max back azimuth :return: True if baz is within baz_range, False otherwise. """ assert not np.any(np.isinf(np.array(baz_range))) assert not np.any(np.isnan(np.array(baz_range))) assert 0 <= baz <= 360 assert 0 <= baz_range[0] <= 360 assert 0 <= baz_range[1] <= 360 baz_range = list(copy.copy(baz_range)) while baz_range[0] > baz_range[1]: baz_range[0] -= 360 return ((baz_range[0] <= baz <= baz_range[1]) or (baz_range[0] <= baz - 360 <= baz_range[1]) or (baz_range[0] <= baz + 360 <= baz_range[1]))
# end func
[docs]def swap_ne_channels(_event_id, stream): """ Swap N and E channels on a stream. Changes the input stream. :param _event_id: Ignored :param stream: Stream whose N and E channels are to be swapped :return: Stream with channel swapping applied """ stream_n = stream.select(component='N') stream_e = stream.select(component='E') data_n = copy.deepcopy(stream_n[0].data) stream_n[0].data = stream_e[0].data stream_e[0].data = data_n return stream
# end func
[docs]def negate_channel(_event_id, stream, channel): """ Negate the data in the given channel of the stream :param _event_id: Ignored :param stream: Stream containing channel to flip :param channel: Single character string indicating which component to flip :return: Stream with channel data negated """ trace_selected = stream.select(component=channel)[0] trace_selected.data = -trace_selected.data return stream
# end func
[docs]@functools.singledispatch def scalarize(_obj, _stats): raise NotImplementedError('Cannot convert generic object to scalar')
# end func @scalarize.register(numbers.Number) def _(val, _stats): return float(val) # end func @scalarize.register(dict) def _(d, stats): net = stats.network sta = stats.station key = '.'.join([net, sta]) record = d.get(key) if record is None: return 0.0 else: return record.get('azimuth_correction', 0.0) # end if # end func @scalarize.register(str) def _(filename, stats): @functools.lru_cache() def load_correction_database(_fname): with open(_fname, 'r') as _f: _db = json.load(_f) # end with return _db # end func # Load function uses lru_cache, effectively equivalent to runonce, so this function # can be performant when called at high frequency. db = load_correction_database(filename) return scalarize(db, stats) # end func
[docs]def correct_back_azimuth(_event_id, stream, baz_correction): """ Apply modification to the back azimuth value in the stream stats :param _event_id: Ignored :param stream: Stream to which correction is applied :param baz_correction: Any object with a registered `scalarize` function for generating an angle correction for a trace in degrees. E.g. could be a numeric value, a dictionary of correction values, or a file produced by script `analyze_station_orientations.py` :return: Stream with modified back azimuth """ for tr in stream: # Each station may have a custom correction. Expect that all such # possible corrections are represented in baz_correction argument. stats = tr.stats correction = scalarize(baz_correction, stats) baz = stats.back_azimuth baz += correction while baz < 0: baz += 360 while baz >= 360: baz -= 360 stats.back_azimuth = baz # end for return stream
# end func
[docs]def assert_homogenous_stream(stream, funcname): """ Verify that the given stream does not contain mixture of stations or channels/components. :param stream: Stream containing one or more traces :type stream: obspy.core.stream.Stream or rf.RFStream """ # Check station and channel uniqueness. It is not sensible to expect RF similarity for # different stations or channels. if not stream: return # end if expected_station = stream[0].stats.station expected_channel = stream[0].stats.channel assert np.all(np.array([(tr.stats.station == expected_station) for tr in stream])), \ 'Mixed station data incompatible with function {}'.format(funcname) assert np.all(np.array([(tr.stats.channel == expected_channel) for tr in stream])), \ 'Mixed channel data incompatible with function {}'.format(funcname)
# end func