Source code for seismic.receiver_fn.generate_rf
#!/usr/bin/env python
"""Generate Receiver Functions (RF) from collection of 3-channel seismic traces.
"""
import copy
from mpi4py import MPI
import logging
import json
import numpy as np
import click
import os
import tqdm.auto as tqdm
from seismic.receiver_fn.rf_corrections import Corrections
from seismic.receiver_fn import rf_util
from seismic.receiver_fn.generate_rf_helper import transform_stream_to_rf
from seismic.rf_station_orientations import analyze_station_orientations
from seismic.network_event_dataset import NetworkEventDataset
from seismic.stream_processing import zne_order, negate_channel, swap_ne_channels, correct_back_azimuth
from seismic.stream_io import remove_group, get_obspyh5_index
from rf import RFStream
from collections import defaultdict
from seismic.misc import split_list
# pylint: disable=invalid-name, logging-format-interpolationa
logging.basicConfig()
[docs]def event_waveforms_to_rf(input_file, output_file, config, network_list='*', station_list='*', only_corrections=False):
"""
Main entry point for generating RFs from event traces.
Config file consists of 3 sub-dictionaries. One named "filtering" for
input stream filtering settings, one named "processing" for RF processing
settings, and one named "correction" for rotating/swapping/negating channel
data for one or more named stations with potential orientation discrepancies.
Each of these sub-dicts is described below::
"filtering": # Filtering settings
{
"resample_rate": float # Resampling rate in Hz
"taper_limit": float # Fraction of signal to taper at end, between 0 and 0.5
"filter_band": (float, float) # Filter pass band (Hz). Not required for freq-domain deconvolution.
"channel_pattern": # Ordered list of preferred channels, e.g. 'HH*,BH*',
# where channel selection is ambiguous.
"baz_range": (float, float) or [(float, float), ...] # Discrete ranges of source back azimuth to use (degrees).
# Each value must be between 0 and 360. May be a pair or a list of pairs for multiple ranges.
}
"processing": # RF processing settings
{
"custom_preproc":
{
"import": 'import custom symbols', # statement to import required symbols
"func": 'preproc functor' # expression to get handle to custom preprocessing functor
"args": {} # additional kwargs to pass to func
}
"trim_start_time": float # Trace trim start time in sec, relative to onset
"trim_end_time": float # Trace trim end time in sec, relative to onset
"rotation_type": str # Choice of ['zrt', 'lqt']. Rotational coordinate system
# for aligning ZNE trace components with incident wave direction
"deconv_domain": str # Choice of ['time', 'freq', 'iter']. Whether to perform deconvolution
# in time or freq domain, or iterative technique
"gauss_width": float # Gaussian freq domain filter width. Only required for freq-domain deconvolution
"water_level": float # Water-level for freq domain spectrum. Only required for freq-domain deconvolution
"spiking": float # Spiking factor (noise suppression), only required for time-domain deconvolution
"normalize": bool # Whether to normalize RF amplitude
}
"correction": # corrections to be applied to data for named stations prior to RF computation
{
"plot_dir": str # path to folder where plots related to orientation corrections are to be saved
"swap_ne": list # list of NET.STA.LOC for which N and E channels are to be swapped, e.g ["OA.BL27."],
"rotate": list # list of NET.STA.LOC that are to be rotated to maximize P-arrival energy on \
the primary RF component, e.g ["OA.BL27."]
"negate": list # list of NET.STA.LOC.CHA that are to be negated, e.g ["OA.BL27..HHZ"]
}
:param input_file: Event waveform source file for seismograms, generated using extract_event_traces.py script
:type input_file: str or pathlib.Path
:param output_file: Name of hdf5 file to produce containing RFs
:type output_file: str or pathlib.Path
:param config: Dictionary containing job configuration parameters
:type config: dict
:return: None
"""
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()
proc_hdfkeys = None
corrections = None
config_filtering = config.setdefault("filtering", {})
channel_pattern = config_filtering.get("channel_pattern")
config_processing = config.setdefault("processing", {})
config_correction = config.setdefault("correction", {})
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if channel_pattern is not None:
channel_pattern = channel_pattern.strip()
logger.info("Using channel matching pattern {}".format(channel_pattern))
# end if
if(rank == 0):
logger.info("Processing source file {}".format(input_file))
# retrieve all available hdf_keys
proc_hdfkeys = get_obspyh5_index(input_file, seeds_only=True)
# trim stations to be processed based on the user-provided network- and station-list
proc_hdfkeys = rf_util.trim_hdf_keys(proc_hdfkeys, network_list, station_list)
if(only_corrections): # trim the hdf_keys if processing only corrections
corrections = Corrections(config_correction, proc_hdfkeys)
proc_hdfkeys = [item for item in proc_hdfkeys if corrections.needsCorrections(item)]
# end if
# end if
# broadcast workload to all procs
proc_hdfkeys = comm.bcast(proc_hdfkeys, root=0)
# load corrections-config
corrections = Corrections(config_correction, proc_hdfkeys)
# split stations over all ranks
proc_hdfkeys = split_list(proc_hdfkeys, nproc)
pbar = tqdm.tqdm(total=len(proc_hdfkeys[rank]))
proc_rf_stream = RFStream()
baz_corrections = defaultdict(dict)
for proc_hdfkey in proc_hdfkeys[rank]:
nsl = proc_hdfkey # network-station-location
pbar.set_description("Rank {}: {}".format(rank, nsl))
net, sta, loc = nsl.split('.')
# note that ned contains a single station
ned = NetworkEventDataset(input_file, network=net, station=sta, location=loc)
# corrections
if (corrections.needsCorrections(proc_hdfkey)):
# channel negation
ch_list = corrections.needsNegation(proc_hdfkey)
if(ch_list):
for ch in ch_list:
logger.info('Rank {}: {}: Negating component {}'.format(rank, nsl, ch))
ned.apply(lambda st: negate_channel(None, st, ch))
# end for
# end if
# channel swaps
if(corrections.needsChannelSwap(proc_hdfkey)):
logger.info('Rank {}: {}: Applying a NE channel swap '.format(rank, nsl))
ned.apply(lambda st: swap_ne_channels(None, st))
# end if
# channel rotations through baz correction
if(corrections.needsRotation(proc_hdfkey)):
# TODO : expose the following parameters
bazcorr_curation_opts = {"min_slope_ratio": 5,
"min_snr": 2.0,
"rms_amplitude_bounds": {"R/Z": 1.0, "T/Z": 1.0}}
bazcorr_config_filtering = {"resample_rate": 4.0,
"taper_limit": 0.05,
"filter_band": [0.01, 0.5]}
result = analyze_station_orientations(copy.deepcopy(ned),
curation_opts=bazcorr_curation_opts,
config_filtering=bazcorr_config_filtering,
save_plots_path=corrections.plot_dir)
baz_corrections.update({nsl: result[list(result.keys())[0]]})
try:
logger.info('Rank {}: {}: Applying a baz correction '
'of {}'.format(rank,
nsl,
result[nsl]['azimuth_correction']))
ned.apply(lambda st: correct_back_azimuth(None, st, result[nsl]['azimuth_correction']))
except:
logger.warning('Channel rotation failed for {}. Moving along..'.format(nsl))
# end try
# end if
# end if
status_list = []
for sta, db_evid in ned.by_station(): # note that ned contains a single station
for evid, stream in db_evid.items():
rf_3ch = None
try:
stream = RFStream(stream.traces)
stream.traces = sorted(stream.traces, key=zne_order)
# Strongly assert expected ordering of traces. This must be respected so that
# RF normalization works properly.
assert stream.traces[0].stats.channel[-1] == 'Z'
assert stream.traces[1].stats.channel[-1] == 'N'
assert stream.traces[2].stats.channel[-1] == 'E'
rf_3ch = transform_stream_to_rf(evid, stream, config_filtering,
config_processing)
except Exception as e:
print(str(e) + ' in station {}, event {}'.format(nsl, evid))
status_list.append(False)
continue
# end try
if rf_3ch is None:
status_list.append(False)
else:
status_list.append(True)
proc_rf_stream += rf_3ch
# end if
# end for
# end for
pbar.update()
num_tasks = len(status_list)
num_success = np.sum(status_list)
num_rejected = num_tasks - num_success
logger.info("{}: {}/{} streams returned valid RF, {} streams rejected".format(nsl, num_success, num_tasks, num_rejected))
# end for
pbar.close()
# gather and output baz corrections
baz_corrections = comm.gather(baz_corrections, root=0)
if(rank == 0):
if(len(baz_corrections) and corrections.plot_dir):
output_dict = {}
for d in baz_corrections:
if(len(d)): output_dict.update(d)
# end for
json.dump(output_dict, open(os.path.join(corrections.plot_dir,
'azimuth_corrections.json'), 'w'))
# end if
# end if
# serialize writing of rf_streams to disk
for irank in np.arange(nproc):
if(irank == rank):
if(len(proc_rf_stream)):
for hdf_key in proc_hdfkeys[rank]:
# remove existing traces if there are any
try:
remove_group(output_file, hdf_key, logger=logger)
except Exception as e:
logger.warning(str(e))
# end try
logger.info("Writing RF stream(s) for {} on rank {}...".format(hdf_key, rank))
# end for
proc_rf_stream.write(output_file, format='H5', mode='a')
# end if
# end if
comm.Barrier()
# end for
if(rank == 0):
print("Finishing...")
print("generate_rf SUCCESS!")
# end if
# end func
@click.command()
@click.argument('input-file', type=click.Path(exists=True, dir_okay=False), required=True)
@click.argument('output-file', type=click.Path(dir_okay=False), required=True)
@click.option('--network-list', default='*', help='A space-separated list of networks (within quotes) to process.', type=str,
show_default=True)
@click.option('--station-list', default='*', help='A space-separated list of stations (within quotes) or a text file '
'with station names in each row, w/wo location codes.', type=str,
show_default=True)
@click.option('--config-file', type=click.Path(dir_okay=False), default=None,
show_default=True, help="Run configuration file in JSON format")
@click.option('--only-corrections', is_flag=True, default=False, show_default=True,
help="Compute and apply corrections for stations listed under 'correction' in the "
"input json config file -- all other stations are ignored. Note that "
"preexisting data (for relevant channels, if present) are deleted before "
"saving the corrections")
def _main(input_file, output_file, network_list, station_list, config_file, only_corrections):
"""
INPUT_FILE : Input waveforms in H5 format\n
(output of extract_event_traces.py)\n
OUTPUT_FILE : Output H5 file name
"""
if config_file is not None:
with open(config_file, 'r') as cf:
config = json.load(cf)
# end with
config_correction = config.setdefault("correction", {})
if(not len(config_correction) and only_corrections):
assert 0, 'A correction block is required in the config file for --only-corrections'
# end if
else:
config = {} # all default settings
# end if
# Dispatch call to worker function. See worker function for documentation.
event_waveforms_to_rf(input_file, output_file, config, network_list=network_list, station_list=station_list,
only_corrections=only_corrections)
# end main
if __name__ == "__main__":
_main() # pylint: disable=no-value-for-parameter
# end if