#!/usr/env python
"""
Description:
Reads waveform data from a FederatedASDFDataSet and generates data-quality plots into a multi-page PDF file
For each station, the program read-in all the waveform data over the period specified (usually over a year),
and then perform statistics analysis over the waveform data for subsequent plotting.
The first page of the PDF shows the stations on a basemap with marker colored according to the number of usable days
("good" data available in the day, not gap, not all zeros) And the color is determined by c=mapper.to_rgba(days):
Relatively speaking, black/blue/cold-ish colors indicate higher percentage of good data in the station.
And red/yellow/warm-ish colors indicate higher percentage of problematic data.
The following pages of the PDF will be plotting the daily-averaged seismic wave data
with gaps and all-zeros days are shaded.
CreationDate: 19/09/19
Developer: rakib.hassan@ga.gov.au
Revision History:
LastUpdate: 19/09/2019 RH
LastUpdate: 27/05/2020 FZ Refactoring and docs run examples etc.
Todo:
The script currently have a low pylint score 4.3/10.
Need to refactor to comply with Python standard pep-8: add required docstrings.
use smaller function to modularize better.
Consider to generate an additional page to executive summarise info for user.
"""
import gc
import logging
from collections import defaultdict
import click
import matplotlib
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.dates import DateFormatter, AutoDateLocator
from mpi4py import MPI
from mpl_toolkits.basemap import Basemap
from obspy import UTCDateTime
from ordered_set import OrderedSet as set
from tqdm import tqdm
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
logging.basicConfig()
[docs]def split_list(lst, npartitions):
k, m = divmod(len(lst), npartitions)
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(npartitions)]
# end func
[docs]def setup_logger(name, log_file, level=logging.INFO):
"""
Function to setup a logger; adapted from stackoverflow
"""
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler = logging.FileHandler(log_file, mode='w')
handler.setFormatter(formatter)
logger = logging.getLogger(name + log_file)
logger.setLevel(level)
logger.addHandler(handler)
return logger
# end func
[docs]def process_data(rank, fds, stations, start_time, end_time):
"""
:param rank: processor rank in parallel runs
:param fds: FederatedASDFDataSer instance
:param stations: list containing tuples (net, sta, loc, cha, lon, lat)
:param start_time: start-time in UTCDateTime format
:param end_time: end-time in UTCDateTime format
:return: a list containing UTCDateTimes marking the start of each day
and their corresponding means in each row
"""
results = []
day = 3600 * 24
# Discard aux channels not in the following list
chaSet = {'BH1', 'BH2', 'BHE', 'BHN', 'BHZ', 'BNZ', 'EHE', 'EHN', 'EHZ', 'HHE', 'HHN',
'HHZ', 'LHE', 'LHN', 'LHZ', 'SHE', 'SHN', 'SHZ'}
for s in tqdm(stations, desc='Rank: %d' % (rank)):
# discard aux channels
if (s[3] not in chaSet):
results.append(([], []))
continue
# end if
st, et = fds.get_global_time_range(s[0], s[1])
if (start_time > st): st = start_time
if (end_time < et): et = end_time
# align st to day
st = UTCDateTime(year=st.year, month=st.month, day=st.day)
ct = st
times = []
means = []
while (ct < et):
times.append(ct)
debug = False
if not debug:
stream = fds.get_waveforms(s[0], s[1], s[2], s[3],
ct, ct + day,
trace_count_threshold=200)
if len(stream):
try:
tr = stream.merge()
means.append(np.mean(tr[0].data))
except:
# empty or streams that cannot be merged are marked by nans
means.append(np.nan)
else:
means.append(np.nan)
# end if
else:
# Used for debug purposes only
if np.int_(np.round(np.random.random())):
means.append(np.random.random())
else:
means.append(np.nan)
# end if
# end if
ct += day
# end while
results.append([times, means])
# end for
return results
# end func
[docs]def plot_results(stations, results, output_basename):
# collate indices for each channel for each station
assert len(stations) == len(results)
groupIndices = defaultdict(list)
for i in np.arange(len(results)):
groupIndices['%s.%s' % (stations[i][0], stations[i][1])].append(i)
# end for
# gather number of days of usable data for each station
usableStationDays = defaultdict(int)
maxUsableDays = -1e32
minUsableDays = 1e32
for k, v in groupIndices.items():
for i, index in enumerate(v):
x, means = results[index]
means = np.array(means)
days = np.sum(~np.isnan(means) & np.bool_(means != 0))
if usableStationDays[k] < days:
usableStationDays[k] = days
maxUsableDays = max(maxUsableDays, days)
minUsableDays = min(minUsableDays, days)
# end if
# end for
# end for
# Plot station map
pdf = PdfPages('%s.pdf' % output_basename)
fig = plt.figure(figsize=(20, 30))
ax1 = fig.add_axes([0.05, 0.05, 0.9, 0.7])
ax2 = fig.add_axes([0.05, 0.7, 0.9, 0.3])
ax2.set_visible(False)
minLon = 1e32
maxLon = -1e32
minLat = 1e32
maxLat = -1e32
for s in stations:
lon, lat = s[4], s[5]
if lon < 0:
lon += 360
# end if
minLon = min(minLon, lon)
maxLon = max(maxLon, lon)
minLat = min(minLat, lat)
maxLat = max(maxLat, lat)
# end for
minLon -= 1
maxLon += 1
minLat -= 1
maxLat += 1
m = Basemap(ax=ax1, projection='merc',
resolution='i', llcrnrlat=minLat, urcrnrlat=maxLat,
llcrnrlon=minLon, urcrnrlon=maxLon,
lat_0=(minLat + maxLat) / 2., lon_0=(minLon + maxLon) / 2.)
# draw coastlines.
m.drawcoastlines()
# draw grid
parallels = np.linspace(np.around(minLat / 5) * 5 - 5, np.around(maxLat / 5) * 5 + 5, 6)
m.drawparallels(parallels, labels=[True, True, False, False], fontsize=20)
meridians = np.linspace(np.around(minLon / 5) * 5 - 5, np.around(maxLon / 5) * 5 + 5, 6)
m.drawmeridians(meridians, labels=[False, False, True, True], fontsize=20)
# plot stations
norm = matplotlib.colors.Normalize(vmin=minUsableDays, vmax=maxUsableDays, clip=True)
mapper = cm.ScalarMappable(norm=norm, cmap=cm.jet_r)
plotted = set()
for s in stations:
if s[1] in plotted:
continue
else:
plotted.add(s[1])
# end if
lon, lat = s[4], s[5]
px, py = m(lon, lat)
pxl, pyl = m(lon, lat - 0.1)
days = usableStationDays['%s.%s' % (s[0], s[1])]
m.scatter(px, py, s=400, marker='v',
c=mapper.to_rgba(days),
edgecolor='none', label='%s: %d' % (s[1], days))
ax1.annotate(s[1], xy=(px + 0.05, py + 0.05), fontsize=22)
# end for
fig.axes[0].set_title("Network Name: %s" % s[0], fontsize=30, y=1.05)
fig.axes[0].legend(prop={'size': 16}, loc=(0.2, 1.3),
ncol=5, fancybox=True, title='No. of Usable Days',
title_fontsize=16)
pdf.savefig()
plt.close()
# Plot results
for k, v in groupIndices.items():
axesCount = 0
for i in v:
assert (k == '%s.%s' % (stations[i][0], stations[i][1]))
# only need axes for non-null results
a, b = results[i]
if len(a) and len(b):
axesCount += 1
# end if
# end for
fig, axes = plt.subplots(axesCount, sharex=True)
fig.set_size_inches(20, 15)
axes = np.atleast_1d(axes)
if len(axes):
axesIdx = 0
for i, index in enumerate(v):
try:
x, means = results[index]
if len(x) and len(means):
x = [a.matplotlib_date for a in x]
d = np.array(means)
if len(d):
d[0] = np.nanmedian(d)
# end if
dnorm = d
dnormmin = np.nanmin(dnorm)
dnormmax = np.nanmax(dnorm)
axes[axesIdx].scatter(x, dnorm, marker='.', s=20)
axes[axesIdx].plot(x, dnorm, c='k', label='24 hr mean\n'
'Gaps indicate no-data', lw=2, alpha=0.7)
axes[axesIdx].grid(axis='x', linestyle=':', alpha=0.3)
axes[axesIdx].fill_between(x, dnormmax * np.int_(d == 0), dnormmin * np.int_(d == 0),
where=dnormmax * np.int_(d == 0) - dnormmin * np.int_(d == 0) > 0,
color='r', alpha=0.5, label='All 0 Samples')
axes[axesIdx].fill_between(x, dnormmax * np.int_(np.isnan(d)), dnormmin * np.int_(np.isnan(d)),
where=dnormmax * np.int_(np.isnan(d)) - dnormmin * np.int_(
np.isnan(d)) > 1,
color='b', alpha=0.5, label='No Data')
axes[axesIdx].xaxis.set_major_locator(AutoDateLocator())
axes[axesIdx].xaxis.set_major_formatter(DateFormatter('%Y-%m-%d'))
for tick in axes[axesIdx].get_xticklabels():
tick.set_rotation(45)
# end for
axes[axesIdx].legend(loc='upper right', prop={'size': 12})
axes[axesIdx].tick_params(axis='both', labelsize=16)
stn = stations[index]
axes[axesIdx].set_title('Channel %s.%s' % (stn[2], stn[3]),
fontsize=18, y=0.95, va='top')
axes[axesIdx].set_xlim(xmin=min(x), xmax=max(x))
axes[axesIdx].set_ylim(ymin=dnormmin, ymax=dnormmax)
axes[axesIdx].set_ylabel('Ampl.', fontsize=16)
axesIdx += 1
# end if
except:
# plotting fails when each axes contain <2 values; just move on in those instances
logging.warning('Plotting failed on station %s' % k)
# end try
# end for
axes[-1].set_xlabel('Days', fontsize=16)
# end if
plt.suptitle('%s Data Availability (~%d days)' % (k, usableStationDays[k]),
y=0.96, fontsize=20)
pdf.savefig()
plt.close()
gc.collect()
# end for
pdf.close()
# end func
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('asdf-source', required=True,
type=click.Path(exists=True))
@click.argument('start-time', required=True,
type=str)
@click.argument('end-time', required=True,
type=str)
@click.argument('net', required=True,
type=str)
@click.argument('sta', required=True,
type=str)
@click.argument('cha', required=True,
type=str)
@click.argument('output-basename', required=True,
type=str)
def process(asdf_source, start_time, end_time, net, sta, cha, output_basename):
"""
ASDF_SOURCE: Text file containing a list of paths to ASDF files\n
START_TIME: Start time in UTCDateTime format\n
END_TIME: End time in UTCDateTime format\n
NET: Network name\n
STA: Station name ('*' for all stations; note that * must be in quotation marks)\n
CHA: Channel name ('*' for all channels; note that * must be in quotation marks) \n
OUTPUT_BASENAME: Basename of output file
Example usage:
mpirun -np 112 python plot_data_quality.py asdf_files.txt 1980:01:01 2020:01:01 OA '*' '*' data_quality.oa
"""
start_time = UTCDateTime(start_time)
end_time = UTCDateTime(end_time)
if (sta == '*'): sta = None
if (cha == '*'): cha = None
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()
l = setup_logger(name=output_basename, log_file='%s.log' % output_basename)
fds = FederatedASDFDataSet(asdf_source, logger=l)
stations = []
if rank == 0:
stations = fds.get_stations(start_time, end_time, network=net, station=sta, channel=cha)
stations = split_list(sorted(stations), nproc)
# end if
stations = comm.bcast(stations, root=0)
results = process_data(rank, fds, sorted(stations[rank]), start_time, end_time)
results = comm.gather(results, root=0)
if rank == 0:
results = [item for sublist in results for item in sublist] # flatten sublists for each proc
stations = [item for sublist in stations for item in sublist] # flatten sublists for each proc
plot_results(stations, results, output_basename)
# end if
# end func
if (__name__ == '__main__'):
"""
Example usage:
mpirun -np 112 python plot_data_quality.py asdf_files.txt 1980:01:01 2020:01:01 OA '*' '*' data_quality.oa
How to run without MPI in background
Example commandline:
(hiperseispy37) fxz547@vdi-n16 /g/data/ha3/fxz547/Githubz/hiperseis (develop)
$ nohup python seismic/ASDFdatabase/plot_data_quality.py /g/data/ha3/GASeisDataArchive/DevSpace/test_asdf_files.txt 2019:01:01 2019:03:01 'AU' '*' '*' test_plot_data_quality_AU &
"""
process()