#!/usr/bin/env python
import logging
import numpy as np
from scipy.interpolate import interp1d
import seismic.receiver_fn.rf_util as rf_util
from seismic.units_utils import KM_PER_DEG
# pylint: disable=invalid-name,logging-format-interpolation
logging.basicConfig()
DEFAULT_H_RANGE = tuple(np.linspace(20.0, 70.0, 251))
DEFAULT_k_RANGE = tuple(np.linspace(1.4, 2.0, 301))
[docs]def compute_hk_stack(cha_data, V_p=None, h_range=None,
k_range=None, root_order=1, include_t3=True):
"""Compute H-k stacking array on a dataset of receiver functions.
:param cha_data: List or iterable of RF traces to use for H-k stacking.
:type cha_data: Iterable(rf.RFTrace)
:param V_p: P-wave velocity in crustal layer, defaults to None in which case it is inferred from trace metadata
:type V_p: float, optional
:param h_range: Range of h values (Moho depth) values to cover, defaults to np.linspace(20.0, 70.0, 251)
:type h_range: numpy.array [1D], optional
:param k_range: Range of k values to cover, defaults to np.linspace(1.4, 2.0, 301)
:type k_range: numpy.array [1D], optional
:param root_order: Exponent for nth root stacking as per K.J.Muirhead (1968), defaults to 1
:type root_order: int, optional
:param include_t3: If True, include the t3 (PpSs+PsPs) multiple in the stacking, defaults to True
:type include_t3: bool, optional
:return: k-grid values [2D], h-grid values [2D], H-k stack in series of 2D layers having one layer per multiple
:rtype: numpy.array [2D], numpy.array [2D], numpy.array [3D]
"""
# It is *critical* for the correctness of the output of this function that the V_p passed in is the same as the
# V_p of the shallowest layer of the velocity model (from tau-py model) that was used to compute the arrival
# inclination of each trace. Leave as None to have this function infer V_p from the trace metadata.
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
infer_Vp = (V_p is None)
V_p_inferred = infer_Vp_from_traces(cha_data, log)
if infer_Vp:
V_p = V_p_inferred
log.debug("Inferred V_p = {}".format(V_p))
else:
log.debug("Using V_p = {} (inferred V_p from shallowest layer = {})".format(V_p, V_p_inferred))
# end if
if h_range is None:
h_range = DEFAULT_H_RANGE
# end if
if k_range is None:
k_range = DEFAULT_k_RANGE
# end if
# Pre-compute grid quantities
k_grid, h_grid = np.meshgrid(k_range, h_range)
# hk_stack = np.zeros_like(k_grid)
# Whether to use RF slowness as the ray parameter (after unit conversion) and as source of V_p,
# or else use specific V_p given by user.
stream_stack = []
# Loop over traces, compute times, and stack interpolated values at those times
channel_id = None
for tr in cha_data:
if channel_id is None:
channel_id = tr.stats.channel
else:
assert tr.stats.channel == channel_id, \
"Stacking mismatching channel data: expected {}, found {}".format(channel_id, tr.stats.channel)
# end if
t1, t2, t3 = compute_theoretical_phase_times(tr, h_grid, k_grid, V_p, include_t3=include_t3)
# Apply custom time offsets if given.
for tval, offset_name in ((t1, 't1_offset'), (t2, 't2_offset'), (t3, 't3_offset')):
try:
if tval is not None:
tval += tr.stats.sediment[offset_name]
except AttributeError:
pass
# end try
# end for
# Subtract lead time so that primary P-wave arrival is at time zero.
lead_time = tr.stats.onset - tr.stats.starttime
times = tr.times() - lead_time
# Create interpolator from stream signal for accurate time sampling.
interpolator = interp1d(times, tr.data, kind='linear', copy=False, bounds_error=False, assume_sorted=True)
phase_sum = []
phase_sum.append(rf_util.signed_nth_root(interpolator(t1), root_order))
phase_sum.append(rf_util.signed_nth_root(interpolator(t2), root_order))
if include_t3:
# Negative sign on the third term is intentional, see Chen et al. (2010) and Zhu & Kanamori (2000).
# It needs to be negative because the PpSs + PsPs peak has negative phase,
# see http://eqseis.geosc.psu.edu/~cammon/HTML/RftnDocs/rftn01.html
# Apply nth root technique to reduce uncorrelated noise (Chen et al. (2010))
phase_sum.append(-rf_util.signed_nth_root(interpolator(t3), root_order))
# end if
stream_stack.append(phase_sum)
# end for
# Perform the stacking (sum) across streams. hk_stack retains separate t1, t2, and t3 components here.
hk_stack = np.nanmean(np.array(stream_stack), axis=0)
# This inversion of the nth root is different to Sippl and Chen, but consistent with Muirhead
# who proposed the nth root technique. It improves the contrast of the resulting plot.
if root_order != 1:
hk_stack = rf_util.signed_nth_power(hk_stack, root_order)
return k_grid, h_grid, hk_stack
# end func
[docs]def compute_weighted_stack(hk_components, weighting=(0.5, 0.5, 0.0)):
"""Given stack components from function `compute_hk_stack`, compute the overall weighted stack.
:param hk_components: H-k stack layers returned from `compute_hk_stack`
:type hk_components: numpy.array
:param weighting: Weightings for (t1, t2, t3) layers respectively, defaults to (0.5, 0.5, 0.0)
:type weighting: tuple, optional
:return: Weighted stack in H-k space
:rtype: numpy.array
"""
assert hk_components.shape[0] == len(weighting), hk_components.shape
hk_phase_stacked = np.dot(np.moveaxis(hk_components, 0, -1), np.array(weighting))
return hk_phase_stacked
# end func
[docs]def infer_Vp_from_traces(cha_data, log=None):
"""
Infer the Vp value used in earth model for computing trace stats.
:param cha_data: Iterable of traces for a given event (e.g. obspy.Stream or rf.RFStream)
:type cha_data: Iterable(obspy.Stream) or Iterable(rf.RFStream)
:param log: Logging instance to log messages
:type log: logging.Logger
:return: Vp value
:rtype: float
"""
# Determine the internal V_p consistent with the trace ray parameters and inclinations.
V_p_values = []
for tr in cha_data:
p = tr.stats.slowness / KM_PER_DEG
incl_deg = tr.stats.inclination
incl_rad = np.deg2rad(incl_deg)
V_p_value = np.sin(incl_rad) / p
V_p_values.append(V_p_value)
# end for
V_p_values = np.array(V_p_values)
if log is not None and not np.allclose(V_p_values, V_p_values, rtol=1e-3, atol=1e-4):
log.error("Inconsistent V_p values inferred from traces, H-k stacking results may be unreliable!")
# end if
V_p = np.mean(V_p_values)
return V_p
# end func
[docs]def compute_theoretical_phase_times(tr, H, k, V_p, include_t3=True):
"""
Compute arrival times of Ps, PpPs and (PpSs + PsPs) phases relative to primary P-wave arrival time
for an assume Moho depth H, velocity ratio *k* = V<sub>p</sub>/V<sub>s</sub>, and p-wave velocity V<sub>p</sub>.
:param tr: Trace for which to compute the theoretical phase arrival times
:type tr: obspy.Trace or rf.RFTrace
:param H: Presumed Moho depth (km)
:type H: float
:param k: Presumed velocity ratio *k* (dimensionless)
:type k: float
:param V_p: Presumed p-wave velocity (km/sec)
:type V_p: float
:param include_t3: Flag whether or not to compute t<sub>3</sub> value (i.e. (PpSs + PsPs) arrival time)
:type include_t3: bool
:return: Triplet of phase arrival times (t1, t2, t3). t3 is None if include_t3 is False.
:rtype: tuple(float, float, float)
"""
incl_deg = tr.stats.inclination
incl_rad = np.deg2rad(incl_deg)
sin_i = np.sin(incl_rad)
p = sin_i / V_p
H_on_V_p = H/V_p
k2 = k*k
p2_Vp2 = p * p * V_p * V_p
term1 = H_on_V_p * np.sqrt(k2 - p2_Vp2)
term2 = H_on_V_p * np.sqrt(1 - p2_Vp2)
# Time for Ps
t1 = term1 - term2
# Time for PpPs
t2 = term1 + term2
if include_t3:
# Time for PpSs + PsPs
t3 = 2 * term1
else:
t3 = None
# end if
return t1, t2, t3
# end func
[docs]def find_global_hk_maximum(k_grid, h_grid, hk_weighted_stack):
"""Given the weighted stack computed from function `compute_weighted_stack` and the corresponding
k-grid and h-grid, find the location in H-k space of the global maximum.
:param k_grid: Grid of k-values
:type k_grid: Two-dimensional numpy.array
:param h_grid: Grid of H-values
:type h_grid: Two-dimensional numpy.array
:param hk_weighted_stack: Grid of stacked RF sample values produced by function
rf_stacking.computed_weighted_stack()
:type hk_weighted_stack: Two-dimensional numpy.array
:return: Location of global maximum on the H-k grid of the maximum stack value.
:rtype: tuple(float, float)
"""
max_loc = np.unravel_index(np.argmax(hk_weighted_stack), hk_weighted_stack.shape)
h_max = h_grid[max_loc[0], 0]
k_max = k_grid[0, max_loc[1]]
return (h_max, k_max)
# end func
[docs]def find_local_hk_maxima(k_grid, h_grid, hk_stack, min_rel_value=0.5):
"""Given the weighted stack computed from function `compute_weighted_stack` and the corresponding
k-grid and h-grid, find the locations in H-k space of all local maxima above a certain threshold.
:param k_grid: Grid of k-values
:type k_grid: Two-dimensional numpy.array
:param h_grid: Grid of H-values
:type h_grid: Two-dimensional numpy.array
:param hk_stack: Grid of stacked RF sample values produced by function
rf_stacking.computed_weighted_stack()
:type hk_stack: Two-dimensional numpy.array
:param min_rel_value: Minimum value required relative to the largest value in the stack, defaults to 0.5
:type min_rel_value: float, optional
:return: List of tuples containing parameters of local maxima solutions, with values in the following
order: (H, k, stack_value, row_index, col_index)
:rtype: list(tuple(float, float, float, int, int))
"""
# Determine global maximum, as we only keep local maxima that are at least min_rel_value
# proportion of this value.
global_max = np.nanmax(hk_stack)
# Compute 2D mask of all values that are greater than or equal to their 4 neighbours.
m = ((hk_stack[1:-1, 1:-1] > hk_stack[1:-1, 0:-2]) & (hk_stack[1:-1, 1:-1] > hk_stack[1:-1, 2:]) &
(hk_stack[1:-1, 1:-1] > hk_stack[0:-2, 1:-1]) & (hk_stack[1:-1, 1:-1] > hk_stack[2:, 1:-1]) &
(hk_stack[1:-1, 1:-1] >= min_rel_value*global_max))
# Find the row and column indices where m is True
m_idx = np.nonzero(m)
# Determine the stack values at the identified local maxima
stack_vals = hk_stack[1:-1, 1:-1][m_idx]
# Determine the k-values at the identified local maxima
k_vals = k_grid[1:-1, 1:-1][m_idx]
# Determine the h-values at the identified local maxima
h_vals = h_grid[1:-1, 1:-1][m_idx]
# Zip the candidate solutions into a tuple (H, k, stack, row_index, col_index).
# Note that the row and column index here are in the original k- and h-grid, so must have +1 added
# since the masking was done on the interior grid points only.
solutions = tuple(zip(h_vals, k_vals, stack_vals, m_idx[0] + 1, m_idx[1] + 1))
# Sort the solutions from highest stack value to the lowest.
solutions = sorted(solutions, key=lambda v: v[2], reverse=True)
return solutions
# end func