Source code for seismic.receiver_fn.rf_deconvolution

#!/usr/bin/env python
"""Custom receiver function deconvolution methods and support functions.
"""

import logging

import numpy as np
import scipy
import scipy.signal

# pylint: disable=invalid-name,too-many-locals

logging.basicConfig()


def _xcorrelate(f, g):
    """
    Correlation routine - correlates f and g, normalized by the zero-lag autocorrelation of g.
    Returns only part of correlation corresponding to positive lag.
    """
    n = len(f)
    assert len(g) == n

    # Use the Numerical Recipes routine to compute the cross correlation
    #     call correl(f, g, n2, c) # output ends up in c
    c = scipy.signal.correlate(f, g)
    # Zero or positive lags in result are in elements [-n:] of c.
    c_pos = c[-n:]

    # compute the zero-lag autocorrelation of g
    sum0 = np.dot(g, g)
    normalize_factor = 1.0 / sum0
    xc = c_pos * normalize_factor

    return xc
# end func


def _get_residual(x, y):
    """
    Get the residual between x and y and the power of the residual.
    """
    r = x - y
    sumsq = np.dot(r, r)

    return r, sumsq
# end func


def _gauss_filter(x, gwidth_factor, dt):
    """
    Apply low-pass filter to input x using Gaussian function in freq domain.
    """
    n = len(x)
    two_pi = 2 * np.pi

    fft_x = np.fft.rfft(x)  # complex array
    n2 = len(fft_x)

    df = 1.0 / (float(n) * dt)
    d_omega = two_pi * df
    gwidth = 4.0 * gwidth_factor * gwidth_factor

    omega = np.arange(n2) * d_omega
    gauss = np.exp(-omega * omega / gwidth)
    # Note: normalization factor to correct RF amplitude for inversion is 2*df*np.sum(gauss)
    fft_x = fft_x * gauss

    x_filt = np.fft.irfft(fft_x, n)  # real_array

    return x_filt
# end func


def _build_decon(amps, shifts, n, gwidth, dt):
    """
    Compute the predicted RF time series from a set of pulse amplitudes and time shifts.
    """
    p = np.zeros(n)
    for i, amp in zip(shifts, amps):
        p[i] += amp

    p_filt = _gauss_filter(p, gwidth, dt)

    return p_filt, p
# end func


def _convolve(x, y, leadin):
    """
    Helper function for convolving approximate receiver function with source signal
    to get estimated observation.
    """
    n = len(x)
    assert len(y) == n
    z = scipy.signal.convolve(x, y, mode='full')
    z_sync = z[leadin:leadin + n]
    return z_sync
# end func


[docs]def iter_deconv_pulsetrain(numerator, denominator, sampling_rate, time_shift, max_pulses=1000, tol=1.0e-3, gwidth=2.5, only_positive=False, log=None): """ Iterative deconvolution of source and response signal to generate seismic receiver function. Adapted to Python by Andrew Medlin, Geoscience Australia (2019), from Chuck Ammon's (Saint Louis University) `iterdeconfd` Fortran code, version 1.04. Note this is not really a frequency-domain deconvolution method, since the deconvolution is based on generating a time-domain pulse train which is filtered and convolved with source signal in time domain in order to try to replicate observation. Results should be the same even if some of the spectral techniques used in functions (such as `gauss_filter`) were replaced by non-spectral equivalents. :param numerator: The observed response signal (e.g. R, Q or T component) :type numerator: numpy.array(float) :param denominator: The source signal (e.g. L or Z component) :type denominator: numpy.array(float) :param sampling_rate: The sampling rate in Hz of the numerator and denominator signals :type sampling_rate: float :param time_shift: Time shift (sec) from start of input signals until expected P-wave arrival onset. :type time_shift: float :param max_pulses: Maximum number of delta function pulses to synthesize in the unfiltered RF (up to 1000) :type max_pulses: int :param tol: Convergence tolerance, iteration stops if change in error falls below this value :type tol: float :param gwidth: Gaussian filter width in the normalized frequency domain. :type gwidth: float :param only_positive: If true, only use positive pulses in the RF. :type only_positive: bool :param log: Log instance to log messages to. :type log: logging.Logger :return: RF trace, pulse train, expected response signal, predicted response signal, quality of fit statistic :rtype: numpy.array(float), numpy.array(float), numpy.array(float), numpy.array(float), float """ MAXPTS = 100000 assert len(denominator) <= MAXPTS # Sanity check input size assert len(numerator) <= MAXPTS MAX_PULSES = 1000 # Maximum number of pulses to synthesize in p MIN_POWER = 1.0e-8 amps = [] shifts = [] if max_pulses > MAX_PULSES: log.warning('Maximum Number of pulses is %d, clipping input value', MAX_PULSES) max_pulses = MAX_PULSES # end if # Clone numerator data f = numerator.copy() npts = len(f) # Clone denominator data g = denominator.copy() assert len(g) == npts, "g should be same length as f" dt = 1.0 / sampling_rate # Now begin the cross-correlation procedure # Put the filter in the signals f_hat = _gauss_filter(f, gwidth, dt) g_hat = _gauss_filter(g, gwidth, dt) # compute the power in the "numerator" for error scaling power = np.dot(f_hat, f_hat) # correlate the signals xc = _xcorrelate(f_hat, g_hat) # Exclude non-causal offsets from being added to the pulse locations. non_causal_leadin = int(np.ceil(time_shift/dt)) max_causal_idx = npts - non_causal_leadin assert max_causal_idx > 0 if only_positive: shift_index = np.argmax(xc[:max_causal_idx]) else: xc_abs = np.abs(xc) shift_index = np.argmax(xc_abs[:max_causal_idx]) # end if shifts.append(shift_index + non_causal_leadin) amps.append(xc[shift_index]) num_pulses = 1 # compute the predicted deconvolution result p, pulses = _build_decon(amps, shifts, npts, gwidth, dt) # convolve the prediction with the denominator signal f_hat_predicted = _convolve(p, g, non_causal_leadin) # compute the residual (initial error is 1.0) resid, sumsq_ip1 = _get_residual(f_hat, f_hat_predicted) # Early exit for case when data is all close to zero if power < MIN_POWER: rf_trace = p fit = 100.0 return rf_trace, pulses, f_hat, f_hat_predicted, fit # end if sumsq_i = 1.0 sumsq_ip1 = sumsq_ip1 / power d_error = 100 * (sumsq_i - sumsq_ip1) if log: log.info('%11s %s', 'Iteration', ' Spike amplitude Spike delay Misfit Improvement') log.info('%10d %16.9e %10.3f %7.2f%% %9.4f%%', num_pulses, dt * amps[-1], (shifts[-1] - 1) * dt, 100 * sumsq_ip1, d_error) # endf if # ************************************************************************* while (np.abs(d_error) > tol) and (num_pulses < max_pulses): num_pulses += 1 sumsq_i = sumsq_ip1 xc = _xcorrelate(resid, g_hat) if only_positive: shift_index = np.argmax(xc[:max_causal_idx]) else: xc_abs = np.abs(xc) shift_index = np.argmax(xc_abs[:max_causal_idx]) # end if shifts.append(shift_index + non_causal_leadin) amps.append(xc[shift_index]) p, pulses = _build_decon(amps, shifts, npts, gwidth, dt) f_hat_predicted = _convolve(p, g, non_causal_leadin) resid, sumsq_ip1 = _get_residual(f_hat, f_hat_predicted) sumsq_ip1 = sumsq_ip1 / power d_error = 100 * (sumsq_i - sumsq_ip1) if log: log.info('%10d %16.9e %10.3f %7.2f%% %9.4f%%', num_pulses, dt * amps[-1], shifts[-1] * dt, 100 * sumsq_ip1, d_error) # end if # end while # ************************************************************************* if log: log.info('Last Error Change = %9.4f%%', d_error) # end if # Compute final fit fit = 100 - 100 * sumsq_ip1 if log and (num_pulses >= max_pulses) and (np.abs(d_error) > tol): log.warning('Hit the max number of pulses - not halting due to convergence.') # end if num_unique_pulses = len(set(shifts)) if log: log.info('Number of pulses in final result: %d', num_unique_pulses) log.info('The final deconvolution reproduces %5.1f%% of the signal.', fit) # end if # ************************************************************************* # compute the final prediction p, pulses = _build_decon(amps, shifts, npts, gwidth, dt) f_hat_predicted = _convolve(p, g, non_causal_leadin) rf_trace = p return rf_trace, pulses, f_hat, f_hat_predicted, fit
# end func
[docs]def rf_iter_deconv(response_data, source_data, sr, tshift, min_fit_threshold=80.0, normalize=0, **kwargs): """Adapter function to rf library. To use, add arguments `deconvolve='func', func=rf_iter_deconv` to `rf.RFStream.rf()` function call. :param response_data: List of response signals for which to compute receiver function :type response_data: list of numpy.array(float) :param source_data: Source signal to use for computing the receiver functions :type source_data: numpy.array(float) :param sampling_rate: Sampling rate of the input signals (Hz) :type sampling_rate: float :param time_shift: Time shift (seconds) from start of signal to onset :type time_shift: float :param min_fit_threshold: Minimum percentage of fit to include trace in results, otherwise will be returned as empty numpy array. :type min_fit_threshold: float :param normalize: Component of stream to use for normalization, usually component 0 (the vertical component). Set to None to disable normalization. :type normalize: int or None :return: Receiver functions corresponding to the list of input response signals. :rtype: list of numpy.array(float) """ sampling_rate = sr time_shift = tshift denominator = source_data receiver_fns = [] log = logging.getLogger(__name__) log.setLevel(logging.INFO) for numerator in response_data: # Any non-default parameters of deconvolution should be packed into kwargs rf_trace, _, _, _, fit = iter_deconv_pulsetrain(numerator, denominator, sampling_rate, time_shift, **kwargs) # TODO: # - store the fit percentage in output RF metadata # - store boolean indicating whether deconvolution converged if fit < min_fit_threshold: receiver_fns.append(np.array([])) log.warning("RF fit {:.2f}% below minimum acceptable threshold, discarding".format(fit)) receiver_fns.append(None) else: log.info("RF fit to observation = {:.2f}%".format(fit)) receiver_fns.append(rf_trace) # end if # end for if normalize is not None: if receiver_fns[normalize] is not None: norm_factor = 1.0/np.nanmax(np.abs(receiver_fns[normalize])) for r in receiver_fns: if r is not None: r *= norm_factor # end if # end for elif np.any(np.array([r is not None for r in receiver_fns])): log.warning('Unable to normalize RF components because normalization component is None') # end if # end if # Remove null results before returning receiver_fns = [r for r in receiver_fns if r is not None] return receiver_fns
# end func