Source code for seismic.stream_processing
#!/usr/bin/env python
"""Utility stream processing functions.
"""
import copy
import functools
import numbers
import json
import numpy as np
# pylint: disable=invalid-name
[docs]def zne_order(tr):
"""Channel ordering sort key function for ZNE ordering
:param tr: Trace whose ordinal is to be determined.
:type tr: obspy.Trace or rf.RFTrace
:return: Numeric index indicating ZNE sort order of traces in a stream
:rtype: int
"""
trace_ordering = {'Z': 0, 'N': 1, 'E': 2}
component = tr.stats.channel[-1].upper() if tr.stats.channel else False
if component and component in trace_ordering:
return trace_ordering[component]
else:
return 3
# end func
[docs]def zrt_order(tr):
"""Channel ordering sort key function for ZRT ordering
:param tr: Trace whose ordinal is to be determined.
:type tr: obspy.Trace or rf.RFTrace
:return: Numeric index indicating ZRT sort order of traces in a stream
:rtype: int
"""
trace_ordering = {'Z': 0, 'R': 1, 'T': 2}
component = tr.stats.channel[-1].upper() if tr.stats.channel else False
if component and component in trace_ordering:
return trace_ordering[component]
else:
return 3
# end func
[docs]def sinc_resampling(t, y, t_new):
"""Resample signal y for known times t onto new times t_new.
Sampling rates do not need to match and time windows do not need
to overlap. t_new should not have a lower sampling rate than t.
:param t: 1D array of times
:type t: numpy.array
:param y: 1D array of sample values
:type y: numpy.array
:param t_new: 1D array of new times to interpolate onto
:type t_new: numpy.array
:return: 1D array of new interpolated sample values
:rtype: numpy.array
"""
dt = np.mean(np.diff(t))
Ts, T = np.meshgrid(t_new, t)
y_new = np.matmul(np.sinc((Ts.T - T.T) / dt), y)
return y_new
# end func
[docs]def back_azimuth_filter(back_azi, back_azi_range):
"""Check if back azimuth `back_azi` is within range. Inputs must be in the range [0, 360] degrees.
:param back_azi: Value to check
:type back_azi: int or float
:param back_azi_range: Pair of angles in degrees.
:type back_azi_range: List or array of 2 floats, min and max back azimuth
:return: True if back_azi is within back_azi_range, False otherwise.
:rtype: bool
"""
assert not np.any(np.isinf(np.array(back_azi_range)))
assert not np.any(np.isnan(np.array(back_azi_range)))
assert 0 <= back_azi <= 360
assert 0 <= back_azi_range[0] <= 360
assert 0 <= back_azi_range[1] <= 360
back_azi_range = list(copy.copy(back_azi_range))
while back_azi_range[0] > back_azi_range[1]:
back_azi_range[0] -= 360
return ((back_azi_range[0] <= back_azi <= back_azi_range[1]) or
(back_azi_range[0] <= back_azi - 360 <= back_azi_range[1]) or
(back_azi_range[0] <= back_azi + 360 <= back_azi_range[1]))
# end func
[docs]def swap_ne_channels(_event_id, stream):
"""
Swap N and E channels on a stream. Changes the input stream.
:param _event_id: Ignored
:param stream: Stream whose N and E channels are to be swapped
:type stream: obspy.Stream
:return: Stream with channel swapping applied
:rtype: obspy.Stream
"""
stream_n = stream.select(component='N')
stream_e = stream.select(component='E')
data_n = copy.deepcopy(stream_n[0].data)
stream_n[0].data = stream_e[0].data
stream_e[0].data = data_n
return stream
# end func
[docs]def negate_channel(_event_id, stream, channel):
"""
Negate the data in the given channel of the stream
:param _event_id: Ignored
:param stream: Stream containing channel to flip
:type stream: obspy.Stream
:param channel: Single character string indicating which component to flip
:type channel: str
:return: Stream with channel data negated
:rtype: obspy.Stream
"""
trace_selected = stream.select(component=channel)[0]
trace_selected.data = -trace_selected.data
return stream
# end func
[docs]@functools.singledispatch
def scalarize(_obj, _stats):
"""Fallback scalarize function for non-specialized type
"""
raise NotImplementedError('Cannot convert generic object to scalar')
# end func
@scalarize.register(numbers.Number)
def _(val, _stats):
"""Scalarize function for numeric types
:rtype: float
"""
return float(val)
# end func
@scalarize.register(dict)
def _(d, stats):
"""Scalarize function for dict type, *assumed to be a container of obspy Stats*,
to azimuth correction scalar.
:rtype: float
"""
net = stats.network
sta = stats.station
key = '.'.join([net, sta])
record = d.get(key)
if record is None:
return 0.0
else:
return record.get('azimuth_correction', 0.0)
# end if
# end func
@scalarize.register(str)
def _(filename, stats):
@functools.lru_cache()
def load_correction_database(_fname):
with open(_fname, 'r') as _f:
_db = json.load(_f)
# end with
return _db
# end func
# Load function uses lru_cache, effectively equivalent to runonce, so this function
# can be performant when called at high frequency.
db = load_correction_database(filename)
return scalarize(db, stats)
# end func
[docs]def correct_back_azimuth(_event_id, stream, baz_correction):
"""
Apply modification to the back azimuth value in the stream stats
:param _event_id: Ignored
:param stream: Stream to which correction is applied
:type stream: obspy.Stream or rf.RFSTream
:param baz_correction: Any object with a registered `scalarize` function for
generating an angle correction for a trace in degrees. E.g. could be a
numeric value, a dictionary of correction values, or a file produced by
script `analyze_station_orientations.py`
:return: Stream with modified back azimuth
:rtype: Same as type(stream)
"""
for tr in stream:
# Each station may have a custom correction. Expect that all such
# possible corrections are represented in baz_correction argument.
stats = tr.stats
correction = scalarize(baz_correction, stats)
back_azi = stats.back_azimuth
back_azi += correction
while back_azi < 0:
back_azi += 360
while back_azi >= 360:
back_azi -= 360
stats.back_azimuth = back_azi
# end for
return stream
# end func
[docs]def assert_homogenous_stream(stream, funcname):
"""
Verify that the given stream does not contain mixture of stations or channels/components.
:param stream: Stream containing one or more traces
:type stream: obspy.Stream or rf.RFStream
:return: None
"""
# Check station and channel uniqueness. It is not sensible to expect RF similarity for
# different stations or channels.
if not stream:
return
# end if
expected_station = stream[0].stats.station
expected_channel = stream[0].stats.channel
assert np.all(np.array([(tr.stats.station == expected_station) for tr in stream])), \
'Mixed station data incompatible with function {}'.format(funcname)
assert np.all(np.array([(tr.stats.channel == expected_channel) for tr in stream])), \
'Mixed channel data incompatible with function {}'.format(funcname)
# end func