Source code for seismic.inversion.wavefield_decomp.solvers

#!/usr/bin/env python
# coding: utf-8
Objective function minimization solvers.

import numpy as np
import copy
import time

from import tqdm
from scipy.optimize import OptimizeResult
from sortedcontainers import SortedList
from sklearn.cluster import dbscan

from seismic.inversion.wavefield_decomp.call_count_decorator import call_counter

# pylint: disable=invalid-name


[docs]class SolverGlobalMhMcmc: """ Drop-in custom solver for scipy.optimize.minimize, based on Metrolpolis-Hastings Monte Carlo Markov Chain random walk with burn-in and adaptive acceptance rate, followed by N-dimensional clustering. Rather than returning one global solution, the solver returns the N best ranked local solutions. It also returns a probability distribution of each unknown based on Monte Carlo statistics. """ # TODO: Migrate free functions and classes into this solver class. # But keep free function optimize_minimize_mhmcmc_cluster() for user convenience. pass
# end class
[docs]class HistogramIncremental(): """Class to incrementally accumulate N-dimensional histogram stats at runtime. """ def __init__(self, bounds, nbins=20): self._ndims = len( assert len(bounds.ub) == self._ndims self._bins = np.linspace(, bounds.ub, nbins + 1).T self._hist = np.zeros((self._ndims, nbins), dtype=int) # end func def __iadd__(self, x): assert len(x) == self._ndims for i, _x in enumerate(x): idx = np.digitize(_x, self._bins[i, :]) self._hist[i, idx - 1] += 1 # end for return self # end func @property def dims(self): return self._ndims # end func @property def bins(self): if self.dims == 1: return self._bins.flatten() else: return self._bins.copy() # end if # end func @property def histograms(self): if self.dims == 1: return self._hist.flatten() else: return self._hist.copy()
# end if # end func # end class
[docs]class BoundedRandNStepper(): """Step one dimensional at a time using normal distribution of steps within defined parameter bounds. """ def __init__(self, bounds, initial_step=None): self.bounds = bounds self.range = bounds.ub - if initial_step is not None: self._stepsize = initial_step else: self._stepsize = 0.15*(bounds.ub - # end if # end func def __call__(self, x): ndims = len(x) while True: dim = np.random.randint(0, ndims) x_new = x[dim] + self._stepsize[dim]*np.random.randn() if[dim] <= x_new <= self.bounds.ub[dim]: break # end if # end while result = x.copy() result[dim] = x_new return result # end func @property def stepsize(self): return self._stepsize.copy() # end func @stepsize.setter def stepsize(self, stepsize_new): # Limit step size adaption so that attempt to reach desired acceptance rate does not # diverge or vanish the stepping quanta. stepsize_new = np.maximum(np.minimum(stepsize_new, self.range), 1e-3*self.range) self._stepsize = stepsize_new
# end func # end class
[docs]class AdaptiveStepsize(): """ Class to implement adaptive stepsize. Pulled from scipy.optimize.basinhopping and adapted. """ def __init__(self, stepper, accept_rate=0.5, interval=50, factor=0.9, ar_tolerance=0.05): self.takestep = stepper self.target_accept_rate = accept_rate self.interval = interval self.factor = factor self.tolerance = ar_tolerance self.logger = None # Assign directly to customize logger callable self.nstep = 0 self.nstep_tot = 0 self.naccept = 0 # end func def __call__(self, x): return self.take_step(x) # end func def _adjust_step_size(self): old_stepsize = copy.copy(self.takestep.stepsize) accept_rate = float(self.naccept) / self.nstep scaled_step = False if accept_rate > self.target_accept_rate + self.tolerance: # We're accepting too many steps. Take bigger steps. self.takestep.stepsize /= self.factor scaled_step = True elif accept_rate < self.target_accept_rate - self.tolerance: # We're not accepting enough steps. Take smaller steps. self.takestep.stepsize *= self.factor scaled_step = True # end if if scaled_step and self.logger: self.logger("adaptive stepsize: acceptance rate {} target {} new stepsize {} old stepsize {}" .format(accept_rate, self.target_accept_rate, self.takestep.stepsize, old_stepsize)) # end if # end func
[docs] def take_step(self, x): self.nstep += 1 self.nstep_tot += 1 if self.nstep % self.interval == 0: self._adjust_step_size() return self.takestep(x)
# end func
[docs] def notify_accept(self): self.naccept += 1
# end func # end class
[docs]def optimize_minimize_mhmcmc_cluster(objective, bounds, args=(), x0=None, T=1, N=3, burnin=100000, maxiter=1000000, target_ar=0.4, ar_tolerance=0.05, cluster_eps=DEFAULT_CLUSTER_EPS, rnd_seed=None, collect_samples=None, logger=None): """ Minimize objective function and return up to N local minima solutions. :param objective: Objective function to minimize :param bounds: Bounds of the parameter space. :param args: Any additional fixed parameters needed to completely specify the objective function. :param x0: Initial guess. If None, will be selected randomly and uniformly within the parameter bounds. :param T: The "temperature" parameter for the accept or reject criterion. To sample the domain well, should be in the order of the typical difference in local minima objective valuations. :param N: Maximum number of minima to return :param burnin: Number of random steps to discard before starting to accumulate statistics. :param maxiter: Maximum number of steps to take (including burnin). :param target_ar: Target acceptance rate of point samples generated by stepping. :param ar_tolerance: Tolerance on the acceptance rate before actively adapting the step size. :param cluster_eps: Point proximity tolerance for DBSCAN clustering, in normalized bounds coordinates. :param rnd_seed: Random seed to force deterministic behaviour :param collect_samples: If not None and integral type, collect collect_samples at regular intervals and return in solution. :param logger: Logger instance for outputting log messages. :return: OptimizeResult containing solution(s) and solver data. """ @call_counter def obj_counted(*args): return objective(*args) # end func assert maxiter >= 2*burnin, "maxiter {} should be at least twice burnin steps {}".format(maxiter, burnin) main_iter = maxiter - burnin if collect_samples is not None: assert isinstance(collect_samples, int), "collect_samples expected to be integral type" assert collect_samples > 0, "collect_samples expected to be positive" # end if beta = 1.0/T if rnd_seed is None: rnd_seed = int(time.time()*1000) % (1 << 31) # end if np.random.seed(rnd_seed) if logger:'Using random seed {}'.format(rnd_seed)) # end if x0 is None: x0 = np.random.uniform(, bounds.ub) # end if assert np.all((x0 >= & (x0 <= bounds.ub)) x = x0.copy() funval = obj_counted(x, *args) # Set up stepper with adaptive acceptance rate stepper = BoundedRandNStepper(bounds) stepper = AdaptiveStepsize(stepper, accept_rate=target_ar, ar_tolerance=ar_tolerance, interval=50) # ------------------------------- # DO BURN-IN rejected_randomly = 0 accepted_burnin = 0 tracked_range = tqdm(range(burnin), total=burnin, desc='BURN-IN') if logger: stepper.logger = lambda msg: tracked_range.write( + ':' + msg) else: stepper.logger = tracked_range.write # end if for _ in tracked_range: x_new = stepper(x) funval_new = obj_counted(x_new, *args) log_alpha = -(funval_new - funval)*beta if log_alpha > 0 or np.log(np.random.rand()) <= log_alpha: x = x_new funval = funval_new stepper.notify_accept() accepted_burnin += 1 elif log_alpha <= 0: rejected_randomly += 1 # end if # end for ar = float(accepted_burnin)/burnin if logger:"Burn-in acceptance rate: {}".format(ar)) # end if # ------------------------------- # DO MAIN LOOP if collect_samples is not None: nsamples = min(collect_samples, main_iter) sample_cadence = main_iter/nsamples samples = np.zeros((nsamples, len(x))) samples_fval = np.zeros(nsamples) # end if accepted = 0 rejected_randomly = 0 minima_sorted = SortedList(key=lambda rec: rec[1]) # Sort by objective function value hist = HistogramIncremental(bounds, nbins=100) # Cached a lot of potential minimum values, as these need to be clustered before return N results N_cached = int(np.ceil(N*main_iter/500)) next_sample = 0.0 sample_count = 0 tracked_range = tqdm(range(main_iter), total=main_iter, desc='MAIN') if logger: stepper.logger = lambda msg: tracked_range.write( + ':' + msg) else: stepper.logger = tracked_range.write # end if for i in tracked_range: if collect_samples and i >= next_sample: assert sample_count < collect_samples samples[sample_count] = x samples_fval[sample_count] = funval sample_count += 1 next_sample += sample_cadence # end if x_new = stepper(x) funval_new = obj_counted(x_new, *args) log_alpha = -(funval_new - funval)*beta if log_alpha > 0 or np.log(np.random.rand()) <= log_alpha: x = x_new funval = funval_new minima_sorted.add((x, funval)) if len(minima_sorted) > N_cached: minima_sorted.pop() # end if stepper.notify_accept() hist += x accepted += 1 elif log_alpha <= 0: rejected_randomly += 1 # end if # end for stepper.logger = None ar = float(accepted)/main_iter if logger:"Acceptance rate: {}".format(ar))"Best minima (before clustering):\n{}".format(np.array([_mx[0] for _mx in minima_sorted[:10]]))) # end if # ------------------------------- # Cluster minima and associate each cluster with a local minimum. # Using a normalized coordinate space for cluster detection. x_range = bounds.ub - pts = np.array([x[0] for x in minima_sorted]) fvals = np.array([x[1] for x in minima_sorted]) pts_norm = (pts - _, labels = dbscan(pts_norm, eps=cluster_eps, min_samples=21, n_jobs=-1) # Compute mean of each cluster and evaluate objective function at cluster mean locations. minima_candidates = [] for grp in range(max(labels) + 1): mask = (labels == grp) mean_loc = np.mean(pts[mask, :], axis=0) # Evaluate objective function precisely at the mean location of each cluster fval = obj_counted(mean_loc, *args) minima_candidates.append((mean_loc, grp, fval)) # end for # Rank minima locations by objective function. minima_candidates.sort(key=lambda c: c[2]) # Pick up to N solutions solutions = minima_candidates[:N] # Put results into OptimizeResult container. # Add histograms to output result (in form of scipy.stats.rv_histogram) solution = OptimizeResult() solution.x = np.array([s[0] for s in solutions]) solution.clusters = [pts[(labels == s[1])] for s in solutions] solution.cluster_funvals = [fvals[(labels == s[1])] for s in solutions] solution.bins = hist.bins solution.distribution = hist.histograms solution.acceptance_rate = ar solution.success = True solution.status = 0 if len(solutions) > 0: solution.message = 'SUCCESS: Found {} local minima'.format(len(solutions)) else: solution.message = 'WARNING: Found no clusters within tolerance {}'.format(cluster_eps) # end if = np.array([s[2] for s in solutions]) solution.jac = None solution.nfev = obj_counted.counter solution.njev = 0 solution.nit = main_iter solution.maxcv = None solution.samples = samples if collect_samples else None solution.sample_funvals = samples_fval if collect_samples else None solution.bounds = bounds solution.version = 's0.3' # Solution version for future traceability solution.rnd_seed = rnd_seed return solution
# end func