#!/usr/bin/env python
# coding=utf-8
"""
Analyze a data set of seismic arrival events on a per-station basis and try to
detect and estimate any station orientation error.
The event waveform dataset provided as input to this script (or to NetworkEventDataset
if calling directly into function `analyze_station_orientations`) is generated by the
script `seismic/extract_event_traces.py`.
In future, consider moving this script to the `inventory` module and applying
corrections to the station inventory xml (to the azimuth tag).
Reference:
- Wilde-Piórko, M., Grycuk, M., Polkowski, M. et al.
On the rotation of teleseismic seismograms based on the receiver function technique.
J Seismol 21, 857-868 (2017). https://doi.org/10.1007/s10950-017-9640-x
"""
import os
import json
import logging
import copy
from collections import defaultdict
from joblib import Parallel, delayed
import click
import numpy as np
from scipy import optimize, interpolate
from rf import RFStream
import matplotlib.pyplot as plt
from seismic.network_event_dataset import NetworkEventDataset
from seismic.inversion.wavefield_decomp.runners import curate_seismograms
from seismic.receiver_fn.generate_rf import transform_stream_to_rf
# pylint: disable=invalid-name
# Take care not to use any curation options that would vary if there were a station orientation error.
DEFAULT_CURATION_OPTS = {
"min_snr": 2.0,
"max_raw_amplitude": 20000.0,
"rms_amplitude_bounds": {"R/Z": 1.0, "T/Z": 1.0}
}
DEFAULT_CONFIG_FILTERING = {
"resample_rate": 10.0,
"taper_limit": 0.05,
"filter_band": [0.02, 1.0],
}
DEFAULT_CONFIG_PROCESSING = {
"rotation_type": "ZRT",
"deconv_domain": "time",
"normalize": True,
"trim_start_time": -10,
"trim_end_time": 30,
"spiking": 1.0
}
def _run_single_station(db_evid, angles, config_filtering, config_processing):
"""
Internal processing function for running sequence of candidate angles
over a single station.
:param db_evid: Dictionary of event streams (3-channel ZNE) keyed by event ID. \
Best obtained using class NetworkEventDataset
:type db_evid: sortedcontainers.SortedDict or similar dict-like
:param angles: Sequence of candidate correction angles to try (degrees)
:type angles: Iterable(float)
:param config_filtering: Waveform filtering options for RF processing
:type config_filtering: dict
:param config_processing: RF processing options
:type config_processing: dict
:return: Amplitude metric as a function of angle. Same length as angles array.
:rtype: list(float)
"""
ampls = []
for correction in angles:
rf_stream_all = RFStream()
for evid, stream in db_evid.items():
stream_rot = copy.deepcopy(stream)
for tr in stream_rot:
tr.stats.back_azimuth += correction
while tr.stats.back_azimuth < 0:
tr.stats.back_azimuth += 360
while tr.stats.back_azimuth >= 360:
tr.stats.back_azimuth -= 360
# end for
rf_3ch = transform_stream_to_rf(evid, RFStream(stream_rot),
config_filtering, config_processing)
if rf_3ch is None:
continue
rf_stream_all += rf_3ch
# end for
if len(rf_stream_all) > 0:
rf_stream_R = rf_stream_all.select(component='R')
rf_stream_R.trim2(-5, 5, reftime='onset')
rf_stream_R.detrend('linear')
rf_stream_R.taper(0.1)
R_stack = rf_stream_R.stack().trim2(-1, 1, reftime='onset')[0].data
ampl_mean = np.mean(R_stack)
else:
ampl_mean = np.nan
# endif
ampls.append(ampl_mean)
# end for
return ampls
# end func
[docs]def analyze_station_orientations(ned, curation_opts, config_filtering,
config_processing, save_plots_path=None):
"""
Main processing function for analyzing station orientation using 3-channel
event waveforms. Uses method of Wilde-Piorko https://doi.org/10.1007/s10950-017-9640-x
One should not worry about estimates that come back with error of less than
about 20 degrees from zero, since this analysis provides only an estimate.
:param ned: NetworkEventDataset containing waveforms to analyze. Note: the data in
this dataset will be modified by this function.
:type ned: seismic.network_event_dataset.NetworkEventDataset
:param curation_opts: Seismogram curation options.
Safe default to use is `DEFAULT_CURATION_OPTS`.
:type curation_opts: dict
:param config_filtering: Seismogram filtering options for RF computation.
Safe default to use is `DEFAULT_CONFIG_FILTERING`.
:type config_filtering: dict
:param config_processing: Seismogram RF processing options.
Safe default to use is `DEFAULT_CONFIG_PROCESSING`.
:type config_processing: dict
:param save_plots_path: Optional folder in which to save plot per station of mean
arrival RF amplitude as function of correction angle
:type save_plots_path: str or pathlib.Path
:return: Dict of estimated orientation error with net.sta code as the key.
:rtype: dict
"""
assert isinstance(ned, NetworkEventDataset), 'Pass NetworkEventDataset as input'
if len(ned) == 0:
return {}
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Determine limiting date range per station
results = defaultdict(dict)
for sta, db_evid in ned.by_station():
start_time = end_time = None
full_code = '.'.join([ned.network, sta])
for _evid, stream in db_evid.items():
if start_time is None:
start_time = stream[0].stats.starttime
# end if
if end_time is None:
end_time = stream[0].stats.endtime
# end if
for tr in stream:
start_time = min(start_time, tr.stats.starttime)
end_time = max(end_time, tr.stats.endtime)
# end for
# end for
results[full_code]['date_range'] = [str(start_time), str(end_time)]
# end for
evids_orig = set([evid for _, evid, _ in ned])
# Trim streams to time window
logger.info('Trimming dataset')
ned.apply(lambda stream: stream.trim(stream[0].stats.onset - 10, stream[0].stats.onset + 30))
# Downsample.
logger.info('Downsampling dataset')
fs = 20.0
ned.apply(lambda stream: stream.filter('lowpass', freq=fs/2.0, corners=2, zerophase=True)
.interpolate(fs, method='lanczos', a=10))
logger.info('Curating dataset')
curate_seismograms(ned, curation_opts, logger, rotate_to_zrt=False)
evids_to_keep = set([evid for _, evid, _ in ned])
evids_discarded = evids_orig - evids_to_keep
logger.info('Discarded {}/{} events'.format(len(evids_discarded), len(evids_orig)))
logger.info('Analysing arrivals')
angles = np.linspace(-180, 180, num=20, endpoint=False)
job_runner = Parallel(n_jobs=-2, verbose=5, max_nbytes='16M')
jobs = []
stations = []
for sta, db_evid in ned.by_station():
stations.append((sta, len(db_evid)))
job = delayed(_run_single_station)(db_evid, angles, config_filtering, config_processing)
jobs.append(job)
# end for
sta_ampls = job_runner(jobs)
sta_ori_metrics = [(sta, N, ampls) for (sta, N), ampls in zip(stations, sta_ampls)]
x = np.hstack((angles, angles[0] + 360))
angles_fine = np.linspace(-180, 180, num=361)
for sta, N, ampls in sta_ori_metrics:
y = np.array(ampls + [ampls[0]])
mask = np.isfinite(y)
n_valid = np.sum(mask)
if n_valid < 5:
logger.info('{}: Insufficient data ({} valid points)'.format(sta, n_valid))
continue
# end if
x_valid = x[mask]
y_valid = y[mask]
# Fit cubic spline through all points, used for visualization
if np.isfinite(y[0]):
bc_type = 'periodic'
else:
bc_type = 'not-a-knot'
# end if
interp_cubsp = interpolate.CubicSpline(x_valid, y_valid, bc_type=bc_type)
yint_cubsp = interp_cubsp(angles_fine)
# Fit cosine curve to all data points, used to estimate correction angle
fitted, cov = optimize.curve_fit(lambda _x, _amp, _ph: _amp*np.cos(np.deg2rad(_x + _ph)),
x_valid, y_valid, p0=[0.2, 0.0], bounds=([-1, -180], [1, 180]))
A_fit, ph_fit = fitted
y_fitted = A_fit*np.cos(np.deg2rad(angles_fine + ph_fit))
ph_uncertainty = np.sqrt(cov[1, 1])
# Estimate correction angle
angle_max = angles_fine[np.argmax(y_fitted)]
code = '.'.join([ned.network, sta])
results[code]['azimuth_correction'] = angle_max
logger.info('{}: {:2.3f}°, stddev {:2.3f}° (N = {:3d})'.format(
sta, angle_max, ph_uncertainty, N))
if save_plots_path is not None:
_f = plt.figure(figsize=(16, 9))
plt.plot(x_valid, y_valid, 'x', label='Computed P arrival strength')
plt.plot(angles_fine, yint_cubsp, '--', alpha=0.7, label='Cubic spline fit')
plt.plot(angles_fine, y_fitted, '--', alpha=0.7, label='Cosine fit')
plt.grid(linestyle=':', color="#80808080")
plt.xlabel('Orientation correction (deg)', fontsize=12)
plt.ylabel('P phase ampl. mean (0-1 sec)', fontsize=12)
plt.title('{}.{}'.format(ned.network, sta), fontsize=14)
plt.text(0.9, 0.9, 'N = {}'.format(N), ha='right', va='top',
transform=plt.gca().transAxes)
plt.legend(framealpha=0.7)
outfile = '_'.join([code, config_processing["deconv_domain"], 'ori.png'])
outfile = os.path.join(str(save_plots_path), outfile)
plt.savefig(outfile, dpi=300)
plt.close()
# end if
# end for
return results
# end func
[docs]def process_event_file(src_h5_event_file, curation_opts=None,
config_filtering=None, config_processing=None,
dest_file=None, save_plots_path=None):
"""
Use event dataset from an HDF5 file to analyze station for orientation errors.
:param src_h5_event_file: HDF5 file to load. Typically one created by `extract_event_traces.py` script
:type src_h5_event_file: str or pathlib.Path
:param curation_opts: Seismogram curation options.
Safe default to use is `DEFAULT_CURATION_OPTS`.
:type curation_opts: dict
:param config_filtering: Seismogram filtering options for RF computation.
Safe default to use is `DEFAULT_CONFIG_FILTERING`.
:type config_filtering: dict
:param config_processing: Seismogram RF processing options.
Safe default to use is `DEFAULT_CONFIG_PROCESSING`.
:type config_processing: dict
:param dest_file: File in which to save results in JSON format
:type dest_file: str or pathlib.Path
:param save_plots_path: Optional folder in which to save plot per station of mean
arrival RF amplitude as function of correction angle
:type save_plots_path: str or pathlib.Path
:return: None
"""
if curation_opts is None:
curation_opts = DEFAULT_CURATION_OPTS
if config_filtering is None:
config_filtering = DEFAULT_CONFIG_FILTERING
if config_processing is None:
config_processing = DEFAULT_CONFIG_PROCESSING
ned = NetworkEventDataset(src_h5_event_file)
results = analyze_station_orientations(ned, curation_opts, config_filtering,
config_processing, save_plots_path=save_plots_path)
if dest_file is not None:
with open(dest_file, 'w') as f:
json.dump(results, f, indent=4)
# end if
# end func
@click.command()
@click.option('--dest-file', type=click.Path(dir_okay=False),
help='Output file in which to store results in JSON text format')
@click.option('--save-plots-path', type=click.Path(file_okay=False),
help='Optional folder in which to save plot per station')
@click.argument('src-h5-event-file', type=click.Path(exists=True, dir_okay=False),
required=True)
def main(src_h5_event_file, dest_file=None, save_plots_path=None):
"""
Run station orientation checks.
Example usage::
python seismic/analyze_station_orientations.py --dest-file 7X_ori_estimates.json \
/g/data/ha3/am7399/shared/7X_RF_analysis/7X_event_waveforms_for_rf_20090616T034200-20110401T231849_rev2.h5
:param src_h5_event_file: Event waveform file whose waveforms are used to perform checks, typically
generated using `extract_event_traces.py`
:type src_h5_event_file: str or pathlib.Path
:param dest_file: Output file in which to store results in JSON text format
:type dest_file: str or pathlib.Path
:param save_plots_path: Optional folder in which to save plot per station of mean
arrival RF amplitude as function of correction angle
:type save_plots_path: str or pathlib.Path
"""
process_event_file(src_h5_event_file, dest_file=dest_file, save_plots_path=save_plots_path)
# end func
if __name__ == '__main__':
main() # pylint: disable=no-value-for-parameter
# end if