Source code for seismic.inversion.wavefield_decomp.solvers

#!/usr/bin/env python
# coding: utf-8
"""
Objective function minimization solvers.
"""

import copy
import time

import numpy as np
from tqdm.auto import tqdm
from scipy.optimize import OptimizeResult
from sortedcontainers import SortedList
from sklearn.cluster import dbscan

from seismic.inversion.wavefield_decomp.call_count_decorator import call_counter


# pylint: disable=invalid-name

DEFAULT_CLUSTER_EPS = 0.05


[docs]class SolverGlobalMhMcmc: """ Drop-in custom solver for scipy.optimize.minimize, based on Metrolpolis-Hastings Monte Carlo Markov Chain random walk with burn-in and adaptive acceptance rate, followed by N-dimensional clustering. Rather than returning one global solution, the solver returns the N best ranked local solutions. It also returns a probability distribution of each unknown based on Monte Carlo statistics. """ # TODO: Migrate free functions and classes into this solver class. # But keep free function optimize_minimize_mhmcmc_cluster() for user convenience. # Copied from ticket PV-149 description: # # Two code cleanup tasks to bring code in module seismic/inversion up to best practice and modular design: # # In solver.py: # o Migrate relevant free functions into the SolverGlobalMhMCMC class. # o Improve documentation on classes. # o Add unit testing for modular classes HistogramIncremental, BoundedRandNStepper and AdaptiveStepsize # o Factor out separate class for the MCMC solution, derived from scipy.optimize.OptimizationResult # # In runners.py: # o Move save_mcmc_solution() free function into member method of new MCMC solution class pass
# end class
[docs]class HistogramIncremental(): """Class to incrementally accumulate N-dimensional histogram stats at runtime. """ def __init__(self, bounds, nbins=20): self._ndims = len(bounds.lb) assert len(bounds.ub) == self._ndims self._bins = np.linspace(bounds.lb, bounds.ub, nbins + 1).T self._hist = np.zeros((self._ndims, nbins), dtype=int) # end func def __iadd__(self, x): assert len(x) == self._ndims for i, _x in enumerate(x): idx = np.digitize(_x, self._bins[i, :]) # pylint: disable=unsubscriptable-object self._hist[i, idx - 1] += 1 # end for return self # end func @property def dims(self): return self._ndims # end func @property def bins(self): if self.dims == 1: return self._bins.flatten() else: return self._bins.copy() # end if # end func @property def histograms(self): if self.dims == 1: return self._hist.flatten() else: return self._hist.copy()
# end if # end func # end class
[docs]class BoundedRandNStepper(): """Step one dimensional at a time using normal distribution of steps within defined parameter bounds. """ def __init__(self, bounds, initial_step=None): self.bounds = bounds self.range = bounds.ub - bounds.lb if initial_step is not None: self._stepsize = initial_step else: self._stepsize = 0.15*(bounds.ub - bounds.lb) # end if # end func def __call__(self, x): ndims = len(x) while True: dim = np.random.randint(0, ndims) x_new = x[dim] + self._stepsize[dim]*np.random.randn() if self.bounds.lb[dim] <= x_new <= self.bounds.ub[dim]: break # end if # end while result = x.copy() result[dim] = x_new return result # end func @property def stepsize(self): return self._stepsize.copy() # end func @stepsize.setter def stepsize(self, stepsize_new): # Limit step size adaption so that attempt to reach desired acceptance rate does not # diverge or vanish the stepping quanta. stepsize_new = np.maximum(np.minimum(stepsize_new, self.range), 1e-3*self.range) self._stepsize = stepsize_new
# end func # end class
[docs]class AdaptiveStepsize(): """ Class to implement adaptive stepsize. Pulled from scipy.optimize.basinhopping and adapted. """ def __init__(self, stepper, accept_rate=0.5, interval=50, factor=0.9, ar_tolerance=0.05): self.takestep = stepper self.target_accept_rate = accept_rate self.interval = interval self.factor = factor self.tolerance = ar_tolerance self.logger = None # Assign directly to customize logger callable self.nstep = 0 self.nstep_tot = 0 self.naccept = 0 # end func def __call__(self, x): return self.take_step(x) # end func def _adjust_step_size(self): old_stepsize = copy.copy(self.takestep.stepsize) accept_rate = float(self.naccept) / self.nstep scaled_step = False if accept_rate > self.target_accept_rate + self.tolerance: # We're accepting too many steps. Take bigger steps. self.takestep.stepsize /= self.factor scaled_step = True elif accept_rate < self.target_accept_rate - self.tolerance: # We're not accepting enough steps. Take smaller steps. self.takestep.stepsize *= self.factor scaled_step = True # end if if scaled_step and self.logger: self.logger("adaptive stepsize: acceptance rate {} target {} new stepsize {} old stepsize {}" .format(accept_rate, self.target_accept_rate, self.takestep.stepsize, old_stepsize)) # end if # end func
[docs] def take_step(self, x): self.nstep += 1 self.nstep_tot += 1 if self.nstep % self.interval == 0: self._adjust_step_size() return self.takestep(x)
# end func
[docs] def notify_accept(self): self.naccept += 1
# end func # end class
[docs]def optimize_minimize_mhmcmc_cluster(objective, bounds, args=(), x0=None, T=1, N=3, burnin=100000, maxiter=1000000, target_ar=0.4, ar_tolerance=0.05, cluster_eps=DEFAULT_CLUSTER_EPS, rnd_seed=None, collect_samples=None, logger=None): """ Minimize objective function and return up to N local minima solutions. :param objective: Objective function to minimize. Takes unpacked args as function call arguments and returns a float. :type objective: Callable(\*args) -> float :param bounds: Bounds of the parameter space. :type bounds: scipy.optimize.Bounds :param args: Any additional fixed parameters needed to completely specify the objective function. :type args: tuple or list :param x0: Initial guess. If None, will be selected randomly and uniformly within the parameter bounds. :type x0: numpy.array with same shape as elements of bounds :param T: The "temperature" parameter for the accept or reject criterion. To sample the domain well, should be in the order of the typical difference in local minima objective valuations. :type T: float :param N: Maximum number of minima to return :type N: int :param burnin: Number of random steps to discard before starting to accumulate statistics. :type burnin: int :param maxiter: Maximum number of steps to take (including burnin). :type maxiter: int :param target_ar: Target acceptance rate of point samples generated by stepping. :type target_ar: float between 0 and 1 :param ar_tolerance: Tolerance on the acceptance rate before actively adapting the step size. :type ar_tolerance: float :param cluster_eps: Point proximity tolerance for DBSCAN clustering, in normalized bounds coordinates. :type cluster_eps: float :param rnd_seed: Random seed to force deterministic behaviour :type rnd_seed: int :param collect_samples: If not None and integral type, collect collect_samples at regular intervals and return as part of solution. :type collect_samples: int or NoneType :param logger: Logger instance for outputting log messages. :return: OptimizeResult containing solution(s) and solver data. :rtype: scipy.optimize.OptimizeResult with additional attributes """ @call_counter def obj_counted(*args): return objective(*args) # end func assert maxiter >= 2*burnin, "maxiter {} should be at least twice burnin steps {}".format(maxiter, burnin) main_iter = maxiter - burnin if collect_samples is not None: assert isinstance(collect_samples, int), "collect_samples expected to be integral type" assert collect_samples > 0, "collect_samples expected to be positive" # end if beta = 1.0/T if rnd_seed is None: rnd_seed = int(time.time()*1000) % (1 << 31) # end if np.random.seed(rnd_seed) if logger: logger.info('Using random seed {}'.format(rnd_seed)) # end if x0 is None: x0 = np.random.uniform(bounds.lb, bounds.ub) # end if assert np.all((x0 >= bounds.lb) & (x0 <= bounds.ub)) x = x0.copy() funval = obj_counted(x, *args) # Set up stepper with adaptive acceptance rate stepper = BoundedRandNStepper(bounds) stepper = AdaptiveStepsize(stepper, accept_rate=target_ar, ar_tolerance=ar_tolerance, interval=50) # ------------------------------- # DO BURN-IN rejected_randomly = 0 accepted_burnin = 0 tracked_range = tqdm(range(burnin), total=burnin, desc='BURN-IN') if logger: stepper.logger = lambda msg: tracked_range.write(logger.name + ':' + msg) else: stepper.logger = tracked_range.write # end if for _ in tracked_range: x_new = stepper(x) funval_new = obj_counted(x_new, *args) log_alpha = -(funval_new - funval)*beta if log_alpha > 0 or np.log(np.random.rand()) <= log_alpha: x = x_new funval = funval_new stepper.notify_accept() accepted_burnin += 1 elif log_alpha <= 0: rejected_randomly += 1 # end if # end for ar = float(accepted_burnin)/burnin if logger: logger.info("Burn-in acceptance rate: {}".format(ar)) # end if # ------------------------------- # DO MAIN LOOP if collect_samples is not None: nsamples = min(collect_samples, main_iter) sample_cadence = main_iter/nsamples samples = np.zeros((nsamples, len(x))) samples_fval = np.zeros(nsamples) # end if accepted = 0 rejected_randomly = 0 minima_sorted = SortedList(key=lambda rec: rec[1]) # Sort by objective function value hist = HistogramIncremental(bounds, nbins=100) # Cached a lot of potential minimum values, as these need to be clustered before return N results N_cached = int(np.ceil(N*main_iter/500)) next_sample = 0.0 sample_count = 0 tracked_range = tqdm(range(main_iter), total=main_iter, desc='MAIN') if logger: stepper.logger = lambda msg: tracked_range.write(logger.name + ':' + msg) else: stepper.logger = tracked_range.write # end if for i in tracked_range: if collect_samples and i >= next_sample: assert sample_count < collect_samples samples[sample_count] = x samples_fval[sample_count] = funval sample_count += 1 next_sample += sample_cadence # end if x_new = stepper(x) funval_new = obj_counted(x_new, *args) log_alpha = -(funval_new - funval)*beta if log_alpha > 0 or np.log(np.random.rand()) <= log_alpha: x = x_new funval = funval_new minima_sorted.add((x, funval)) if len(minima_sorted) > N_cached: minima_sorted.pop() # end if stepper.notify_accept() hist += x accepted += 1 elif log_alpha <= 0: rejected_randomly += 1 # end if # end for stepper.logger = None ar = float(accepted)/main_iter if logger: logger.info("Acceptance rate: {}".format(ar)) logger.info("Best minima (before clustering):\n{}".format(np.array([_mx[0] for _mx in minima_sorted[:10]]))) # end if # ------------------------------- # Cluster minima and associate each cluster with a local minimum. # Using a normalized coordinate space for cluster detection. x_range = bounds.ub - bounds.lb pts = np.array([x[0] for x in minima_sorted]) fvals = np.array([x[1] for x in minima_sorted]) pts_norm = (pts - bounds.lb)/x_range _, labels = dbscan(pts_norm, eps=cluster_eps, min_samples=21, n_jobs=-1) # Compute mean of each cluster and evaluate objective function at cluster mean locations. minima_candidates = [] for grp in range(max(labels) + 1): mask = (labels == grp) mean_loc = np.mean(pts[mask, :], axis=0) # Evaluate objective function precisely at the mean location of each cluster fval = obj_counted(mean_loc, *args) minima_candidates.append((mean_loc, grp, fval)) # end for # Rank minima locations by objective function. minima_candidates.sort(key=lambda c: c[2]) # Pick up to N solutions solutions = minima_candidates[:N] # Put results into OptimizeResult container. # Add histograms to output result (in form of scipy.stats.rv_histogram) solution = OptimizeResult() solution.x = np.array([s[0] for s in solutions]) solution.clusters = [pts[(labels == s[1])] for s in solutions] solution.cluster_funvals = [fvals[(labels == s[1])] for s in solutions] solution.bins = hist.bins solution.distribution = hist.histograms solution.acceptance_rate = ar solution.success = True solution.status = 0 if len(solutions) > 0: solution.message = 'SUCCESS: Found {} local minima'.format(len(solutions)) else: solution.message = 'WARNING: Found no clusters within tolerance {}'.format(cluster_eps) # end if solution.fun = np.array([s[2] for s in solutions]) solution.jac = None solution.nfev = obj_counted.counter solution.njev = 0 solution.nit = main_iter solution.maxcv = None solution.samples = samples if collect_samples else None solution.sample_funvals = samples_fval if collect_samples else None solution.bounds = bounds solution.version = 's0.3' # Solution version for future traceability solution.rnd_seed = rnd_seed return solution
# end func