#!/usr/bin/env python
"""Filter out invalid RFs, and compute a variety of quality metrics for the
remaining RFs. These metrics can be combined in various ways downstream to
perform different kinds of filtering. Quality metrics are stored in the stats
of each trace.
"""
from builtins import range # pylint: disable=redefined-builtin
import logging
import itertools
import traceback
from multiprocessing import Process, Manager
import numpy as np
import click
import h5py
from scipy import signal
from scipy import stats
# from fastdtw import fastdtw
from sklearn.cluster import DBSCAN
from joblib import Parallel, delayed
import rf
from seismic.receiver_fn.rf_process_io import async_write
from seismic.receiver_fn.rf_h5_file_station_iterator import IterRfH5StationEvents
from seismic.receiver_fn import rf_util
logging.basicConfig()
# pylint: disable=invalid-name, logging-format-interpolation
def _crossSpectrum(x, y, num_subsegs=32):
# -------------------Remove mean-------------------
# nperseg chosen arbitrary based on 126 samples RF signal, experiment to get best results
nperseg = int(np.floor(x.size/num_subsegs))
cross = np.zeros(nperseg, dtype='complex128')
max_ind = int(nperseg * np.floor(x.size / nperseg))
for ind in range(0, max_ind, nperseg):
xp = x[ind: ind + nperseg]
yp = y[ind: ind + nperseg]
xp = xp - np.mean(xp)
yp = yp - np.mean(xp)
# Do FFT
cfx = np.fft.fft(xp)
cfy = np.fft.fft(yp)
# Get cross spectrum
cross += cfx.conj()*cfy
# end for
freq = np.fft.fftfreq(nperseg)
return cross, freq
def _coh(y, y2):
# This subroutine determines a coherence level between two signals on normilised frequency
p11, freq = _crossSpectrum(y, y)
p22, freq = _crossSpectrum(y2, y2)
p12, freq = _crossSpectrum(y, y2)
# coherence
part1 = np.divide(np.abs(p12)**2, p11.real,
out=np.zeros_like(np.abs(p12)**2), where=p11.real != 0)
coh = np.divide(part1, p22.real, out=np.zeros_like(part1), where=p22.real != 0)
return freq[freq > 0], coh[freq > 0]
[docs]def rf_group_by_similarity(swipe, similarity_eps):
"""Cluster waveforms by similarity
:param swipe: Numpy array of RF rowwise
:type swipe: numpy.array
:param similarity_eps: Tolerance on similarity between traced to be considered in the same group.
:type similarity_eps: float
:return: Index of the group for each trace. -1 if no group is found for a given trace.
:rtype: numpy.array
"""
def _compare_pairs_rmsdist(data):
rms_dist = np.sqrt(np.nanmean(np.square(data[0] - data[1])))
return rms_dist
# end func
distance = list(map(_compare_pairs_rmsdist, itertools.combinations(swipe, 2)))
index = list((i, j) for ((i, _), (j, _)) in itertools.combinations(enumerate(swipe), 2))
# First check that distance between points
index = np.array(index)
distance = np.array(distance)
matrix = np.zeros((np.amax(index) + 1, 1 + np.amax(index))) + np.amax(distance)
matrix[index[:, 0], index[:, 1]] = distance[:]
clustering = DBSCAN(eps=similarity_eps, min_samples=5, metric='precomputed', n_jobs=-3).fit_predict(matrix)
return clustering
[docs]def compute_max_coherence(orig_stream, f1, f2):
""" Finding coherence between two signals in frequency domain
f1 and f2 - normalised min and max frequencies, f1 < f2 <= ~0.5
returns array of indexes for coherent traces with median
Suggested minimum level of coherence for good results: 0.6
"""
assert 0.0 <= f1 < f2 <= 1.0
# Copy stream since moveout modifies self
stream = orig_stream.copy()
stream.moveout()
# Clip off the noise - take everything after 2 sec before onset as seismic event signal.
SIGNAL_WINDOW = (-2.0, None)
pick_signal = stream.slice2(*SIGNAL_WINDOW, reftime='onset')
# Taper at ends to ensure that end discontinuities do not inject spectral energy at high frequencies
pick_signal = pick_signal.taper(0.5, max_length=1.0)
data = np.array([tr.data for tr in pick_signal])
median = np.median(data, axis=0)
assert len(median) == data.shape[1]
max_coh = []
# TODO: Vectorize this loop
for i in range(data.shape[0]):
f, c = _coh(median, data[i, :])
max_coh.append(np.amax(c[(f >= f1) & (f <= f2)]))
# end for
return np.array(max_coh)
# def knive(swipe, k_level1, sn_level2):
# ind = np.ones((swipe.shape[0],), bool)
# dev = []
# ch = []
# knive = []
# sn = []
# pulse_ind = np.max(t[t < 0])-1.
#
# for i in range(swipe.shape[0]):
# ch.append(np.amax(coh(average, swipe[i, :])))
# dev.append(np.sum((swipe[i, :]-average)**2)/(swipe.shape[0]-1))
# ind[i] = False
# knive.append(np.std(swipe[ind]))
# sn.append(np.std(swipe[i, t > pulse_ind]) /
# np.std(swipe[i, t < pulse_ind]))
#
# knive = np.array(knive)
# sn = np.array(sn)
# return knive < k_level, sn > sn_level
[docs]def spectral_entropy(stream):
"""Compute the spectral entropy of a trace
:param trace: Single channel seismic trace
:type trace: rf.RFTrace
:return: Spectral entropy of the trace waveform
:rtype: float
"""
data = np.array([tr.data for tr in stream])
_, psd = signal.periodogram(data, detrend='linear')
psd = psd.T/np.sum(psd, axis=1)
psd = psd.T
entropy = -np.sum(psd*np.log(psd), axis=1)
return entropy
[docs]def get_rf_stream_components(stream):
"""Identify the RF component types and return them.
:param stream: Stream containing mixed RF components.
:type stream: rf.RFStream
:return: (RF component type, primary RF component (R or Q), transverse RF component (T), source component (Z or L))
:rtype: (str, rf.RFStream, rf.RFStream, rf.RFStream)
"""
DEFAULT_RF_TYPE = 'LQT-Q'
rf_type = DEFAULT_RF_TYPE
primary_stream = stream.select(component='Q')
if primary_stream:
transverse_stream = stream.select(component='T')
source_stream = stream.select(component='L')
else:
rf_type = 'ZRT-R'
primary_stream = stream.select(component='R')
if primary_stream:
transverse_stream = stream.select(component='T')
source_stream = stream.select(component='Z')
# Only return Z traces which are labelled as type rf, as we shouldn't return raw trace data here.
source_stream = source_stream.__class__(
[tr for tr in source_stream if tr.stats.get('type') is not None and tr.stats['type'] == 'rf'])
else:
return None, None, None, None
# end if
# end if
return rf_type, primary_stream, transverse_stream, source_stream
[docs]def compute_rf_quality_metrics(station_id, station_stream3c, similarity_eps):
"""Top level function for adding quality metrics to trace metadata.
:param station_id: Station ID
:type station_id: str
:param station_stream3c: 3-channel stream
:type station_stream3c: list(rf.RFStream) with 3 components
:param similarity_eps: Distance threshold used for DBSCAN clustering
:type similarity_eps: float
:return: Triplet of RF streams with Z, R or Q, and T components with populated
quality metrics. Otherwise return None in case of failure.
"""
logger = logging.getLogger(__name__)
# Filter out traces with NaNs - simplifies downstream code so that can don't have to worry about NaNs.
# We use the fact that traces are bundled into 3-channel triplets here to discard all or none of the related
# channels for an event.
nonan_streams = []
for stream in station_stream3c:
skip_stream = False
for tr in stream:
if tr.stats.type == 'rf' and np.any(np.isnan(tr.data)):
logger.warning("NaN data found in station {} trace\n{}\n, skipping!".format(station_id, tr))
skip_stream = True
break
# end for
if skip_stream:
continue
nonan_streams.append(stream)
# end for
if len(nonan_streams) < len(station_stream3c):
num_supplied = len(station_stream3c)
num_discarded = num_supplied - len(nonan_streams)
logger.info("Discarded {}/{} events from station {} due to NaNs in at least one channel"
.format(num_discarded, num_supplied, station_id))
# end if
# Early exit if nothing left
if not nonan_streams:
logger.warning("nonan_streams empty after filtering out nan traces! {}. Skipping station {}"
.format(nonan_streams, station_id))
return None
# end if
# Flatten the traces into a single RFStream for subsequent processing
rf_streams = rf.RFStream([tr for stream in nonan_streams for tr in stream if tr.stats.type == 'rf'])
# Subsequent functions process the data in bulk square matrices, so it is essential all traces are the same length.
# If not, processing will fail due to incompatible data structure. So here we filter out traces that do not have
# the expected length. Expected length is assumed to be the most common length amongst all the traces.
num_traces_before = len(rf_streams)
all_trace_lens = np.array([len(tr) for tr in rf_streams])
expected_len, _ = stats.mode(all_trace_lens, axis=None)
expected_len = expected_len[0]
if expected_len <= 1:
logger.warning("Cannot compute quality metrics on trace length {} <= 1! Skipping station {}"
.format(expected_len, station_id))
return None
# end if
keep_traces = []
for tr in rf_streams:
if len(tr) != expected_len:
logger.error("Trace {} of station {} has inconsistent sample length {} (expected {}), discarding!"
.format(tr, station_id, len(tr), expected_len))
else:
keep_traces.append(tr)
# end if
# end for
streams = rf.RFStream(keep_traces)
num_traces_after = len(streams)
if num_traces_after < num_traces_before:
num_discarded = num_traces_before - num_traces_after
logger.warning("Discarded {}/{} traces due to inconsistent trace length"
.format(num_discarded, num_traces_before))
# end if
# Extract RF type, the primary polarized component and transverse component (ignore source stream)
rf_type, p_stream, t_stream, z_stream = get_rf_stream_components(streams)
if rf_type is None:
logger.error("Unrecognized RF type for station {}. File might not be RF file!".format(station_id))
return None
# end if
# Note that we only compute quality metrics on the p_stream. The filtering of t_stream traces should match
# the filtering of p_stream traces, so t_stream does not need independent metrics.
# Compute S/N ratios for primary component RFs
rf_util.compute_rf_snr(p_stream)
# Compute spectral entropy for primary component RFs
sp_entropy = spectral_entropy(p_stream)
for i, tr in enumerate(p_stream):
md_dict = {'entropy': sp_entropy[i]}
tr.stats.update(md_dict)
# end for
# Compute log10 of amplitude metrics, as these are more useful than straight amplitudes for quality classifier
for tr in p_stream:
tr.stats['log10_amp_max'] = np.log10(tr.stats['amp_max'])
tr.stats['log10_amp_rms'] = np.log10(tr.stats['amp_rms'])
tr.stats['log10_z_amp_max'] = np.log10(tr.stats['z_amp_max'])
tr.stats['log10_z_amp_rms'] = np.log10(tr.stats['z_amp_rms'])
# end for
# Define time windows relative to onset for computing statistical ratios
EVENT_SIGNAL_WINDOW = (-5.0, 25.0)
NOISE_SIGNAL_WINDOW = (None, -5.0)
event_signal = p_stream.copy().slice2(*EVENT_SIGNAL_WINDOW, reftime='onset').taper(0.5, max_length=1.0)
noise_signal = p_stream.copy().slice2(*NOISE_SIGNAL_WINDOW, reftime='onset').taper(0.5, max_length=1.0)
rf_util.compute_extra_rf_stats(event_signal)
rf_util.compute_extra_rf_stats(noise_signal)
for _i, _tr in enumerate(p_stream):
_tr.stats['delta_mean_log10_cplx_amp'] = (event_signal[_i].stats.mean_log10_cplx_amp -
noise_signal[_i].stats.mean_log10_cplx_amp)
_tr.stats['delta_log10_amp_20pc'] = (event_signal[_i].stats.log10_amp_20pc -
noise_signal[_i].stats.log10_amp_20pc)
_tr.stats['delta_log10_amp_80pc'] = (event_signal[_i].stats.log10_amp_80pc -
noise_signal[_i].stats.log10_amp_80pc)
_tr.stats['delta_log10_rms_amp'] = event_signal[_i].stats.log10_rms_amp - noise_signal[_i].stats.log10_rms_amp
# end for
# Compute ratios of spectral histogram statistics
noise_data = np.array([tr.data for tr in noise_signal])
event_data = np.array([tr.data for tr in event_signal])
noise_bins, noise_power = signal.welch(noise_data, detrend='linear')
event_bins, event_power = signal.welch(event_data, detrend='linear')
# Compute moments of the frequency distribution. Only use lower frequency bands up to 1/4 Nyquist.
noise_bins = noise_bins[0:32]
noise_power = noise_power[:, 0:32]
event_bins = event_bins[0:32]
event_power = event_power[:, 0:32]
noise_m0 = np.sum(noise_power, axis=1)
event_m0 = np.sum(event_power, axis=1)
spectral_m0_ratio = np.log10(event_m0/noise_m0)
noise_m1 = np.sum(noise_power*noise_bins, axis=1)
event_m1 = np.sum(event_power*event_bins, axis=1)
spectral_m1_ratio = np.log10(event_m1/noise_m1)
noise_m2 = np.sum(noise_power*noise_bins**2, axis=1)
event_m2 = np.sum(event_power*event_bins**2, axis=1)
spectral_m2_ratio = np.log10(event_m2/noise_m2)
for i, tr in enumerate(p_stream):
md_dict = {
'm0_delta': event_m0[i] - noise_m0[i],
'm1_delta': event_m1[i] - noise_m1[i],
'm2_delta': event_m2[i] - noise_m2[i],
'm0_ratio': spectral_m0_ratio[i],
'm1_ratio': spectral_m1_ratio[i],
'm2_ratio': spectral_m2_ratio[i]
}
tr.stats.update(md_dict)
# end for
# Compute coherence metric within targeted normalized frequency band.
# Note that settings here are relative to the sampling rate. If the sampling
# rate changes and you want the same absolute frequency range to be used for
# coherence, then these settings need to be updated.
fn_low = 0.15
fn_high = 0.3
max_coherence = compute_max_coherence(p_stream, fn_low, fn_high)
for i, tr in enumerate(p_stream):
md_dict = {'max_coherence': max_coherence[i]}
tr.stats.update(md_dict)
# end for
# TODO: Compute phase weighting vector per station per 2D (back_azimuth, distance) bin
# Perform clustering for all traces in a station, and assign group IDs.
# This will be super expensive when there are a lot of events, as the distance calculation grows as N^2.
clustering_stream = p_stream.copy()
clustering_stream = clustering_stream.trim2(-5.0, 25.0, 'onset')
swipe = np.array([tr.data for tr in clustering_stream])
if swipe.shape[0] > 1:
ind = rf_group_by_similarity(swipe, similarity_eps)
else:
ind = np.array([0])
# end if
num_groups = np.amax(ind)
logger.info("Station {}: detected {} clusters".format(station_id, num_groups))
# Apply group
for i, tr in enumerate(p_stream):
md_dict = {'rf_group': ind[i] if ind[i] >= 0 else None}
tr.stats.update(md_dict)
# end for
# TODO: Research techniques for grouping waveforms using singular value decomposition (SVD), possibly of
# the complex waveform (from Hilbert transform) to determine the primary phase and amplitude components.
# High similarity to the strongest eigenvectors might indicate waves in the primary group (group 0 in DBSCAN)
# without the N^2 computational cost of DBSCAN.
return (z_stream, p_stream, t_stream)
# end func
[docs]def rf_quality_metrics_queue(oqueue, station_id, station_stream3c, similarity_eps, drop_z=True):
"""Produce RF quality metrics in a stream and queue the QC'd components for
downstream processing.
:param oqueue: Output queue where filtered streams are queued
:type oqueue: queue or multiprocessing.Manager.Queue
:param station_id: Station ID
:type station_id: str
:param station_stream3c: 3-channel stream
:type station_stream3c: list(rf.RFStream) with 3 components
:param similarity_eps: Distance threshold used for DBSCAN clustering
:type similarity_eps: float
"""
streams_qual = compute_rf_quality_metrics(station_id, station_stream3c, similarity_eps)
if streams_qual is not None:
z_stream, p_stream, t_stream = streams_qual
if drop_z:
stream_qual = rf.RFStream([tr for doublet in zip(p_stream, t_stream)
for tr in doublet])
else:
stream_qual = rf.RFStream([tr for triplet in zip(z_stream, p_stream, t_stream)
for tr in triplet])
# end if
oqueue.put(stream_qual)
# end if
# end func
@click.command()
@click.argument('input-file', type=click.Path(exists=True, dir_okay=False), required=True)
@click.argument('output-file', type=click.Path(dir_okay=False), required=True)
@click.option('--temp-dir', type=click.Path(dir_okay=True), help="Temporary directory to use for best performance")
@click.option('--parallel/--no-parallel', default=True, show_default=True, help="Use parallel execution")
def main(input_file, output_file, temp_dir=None, parallel=True):
""" This module filters RFs according to input options and then computes some quality metrics on each RF.
This enables different downstream approaches to selecting and filtering for good quality RFs.
The stats attribute of each RF is populated with these quality metrics. In addition, a new root group
is added to the hdf5 file containing a Pandas DataFrame that tabulates the attributes of each trace
to allow easy event filtering in the downstream workflow.
Available methods:
1. rf_group_by_similarity - grouping method based on calculation of euclidean distances and clustering by
similarity ( aca machine learning approach)
2. TODO: coherence - finding the coherent signals (in frequency domain) relative to median. Consequently, moveout
should be applied to use this technique
3. TODO knive - analysing the change of RMS relative to median. Noisy stations will give higher input. Moveout
should be applied to use this technique
4. S/N ratio
5. Spectral entropy
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
similarity_eps = 0.05
# Set up asynchronous buffered writing of results to file
mgr = Manager()
with h5py.File(input_file, mode='r') as h5f:
config_str = h5f.attrs['metadata'] if 'metadata' in h5f.attrs else ''
write_queue = mgr.Queue()
output_thread = Process(target=async_write, args=(write_queue, output_file, 20, config_str))
output_thread.daemon = True
output_thread.start()
logger.info("Processing source file {}".format(input_file))
if parallel:
logger.info("Parallel processing")
Parallel(n_jobs=-3, verbose=5, max_nbytes='16M', temp_folder=temp_dir) \
(delayed(rf_quality_metrics_queue)(write_queue, station_id, station_stream3c, similarity_eps)
for station_id, station_stream3c in IterRfH5StationEvents(input_file))
else:
logger.info("Serial processing")
for station_id, station_stream3c in IterRfH5StationEvents(input_file):
try:
rf_quality_metrics_queue(write_queue, station_id, station_stream3c, similarity_eps)
except (ValueError, AssertionError) as e:
traceback.print_exc()
logger.error("Unhandled exception occurred in rf_quality_metrics_queue for station {}. "
"Data will be omitted for this station!\nError:\n{}".format(station_id, str(e)))
# end try
# end for
# end if
# Signal completion
logger.info("Finishing...")
write_queue.put(None)
write_queue.join()
logger.info("rf_quality_filter SUCCESS!")
# end func
if __name__ == '__main__':
main() # pylint: disable=no-value-for-parameter