Commit f4832cfb authored by Artem Pulkin's avatar Artem Pulkin
Browse files

minimize: new module and hybrid minimization

parent 54aec4f4
Pipeline #86526 failed with stages
in 23 minutes and 58 seconds
from .util import unit_vectors, lookup, split2, cat
from .kernel import Cell, CellImages, ScalarFunctionWrapper
from .units import element_masses
from .minimize import minimize_scipy
from scipy.integrate import solve_ivp
from scipy.optimize import minimize as minimize_scipy, OptimizeResult
import numpy as np
from warnings import warn
......@@ -478,112 +478,6 @@ class PRDynamics(AbstractDynamics):
return cat(velocities, s_ddot, a_ddot)
def ase_minimize_wrapper(driver, f, x0, jac, n=3, options=None):
"""
Interface to ASE optimizers.
Parameters
----------
driver
ASE optimizer class.
f : BoundMethod
Should point to ``AbstractDynamics.potential_energy``.
x0 : np.ndarray
The starting point.
jac : BoundMethod
Should point to ``AbstractDynamics.potential_grad``.
n : int
The number of trailing dimensions.
options
Other arguments to ASE constructor.
Returns
-------
result : OptimizeResult
The result of optimization.
"""
class ASEAtoms:
def __init__(self, f, x0, jac, n):
"""
An interface squeaking like ase.Atoms.
Parameters
----------
f : Callable
Evaluates energy.
x0 : np.ndarray
The starting point.
jac : Callable
Evaluates energy gradients.
"""
self.f = f
self.x = x0.copy()
self.jac = jac
self.n = n
assert len(x0) % n == 0, f"vector size {len(x0)} is not a multiple of trailing dimension n={n:d}"
def get_forces(self) -> np.ndarray:
return - self.jac(self.x).reshape(-1, self.n)
def get_positions(self) -> np.ndarray:
return self.x.reshape(-1, self.n)
def set_positions(self, x: np.ndarray):
self.x = x.ravel()
def get_potential_energy(self, **_) -> float:
return self.f(self.x)
def __len__(self):
return len(self.x) // self.n
init_options = {
"logfile": "/dev/null",
}
run_options = {}
if options is None:
options = {}
for src, dst in ("maxiter", "steps"), ("gtol", "fmax"): # kinda analogues (maybe not exactly)
if src in options:
run_options[dst] = options.pop(src)
init_options.update(options)
optimizer = driver(ASEAtoms(f, x0, jac, n), **init_options)
optimizer.run(**run_options)
success = optimizer.converged()
return OptimizeResult({"x": optimizer.atoms.x, "success": success, "message": None if success else
"ASE failed to reach the desired force threshold"})
def minimize_ase(f, x0, jac, n=3, method="GoodOldQuasiNewton", options=None):
"""
Scipy-like interface to ASE optimizers.
Parameters
----------
f : BoundMethod
Should point to ``AbstractDynamics.potential_energy``.
x0 : np.ndarray
The starting point.
jac : BoundMethod
Should point to ``AbstractDynamics.potential_grad``.
n : int
The number of trailing dimensions.
method : str
One of ASE methods to use for optimizing.
options
Other arguments to ASE constructor.
Returns
-------
result : OptimizeResult
The result of optimization.
"""
from ase import optimize
driver = getattr(optimize, method)
return ase_minimize_wrapper(driver, f, x0, jac, n=n, options=options)
def _enumerate_snapshots(snapshots, kind):
for i, c in enumerate(snapshots):
c.meta['step'] = i
......
import numpy as np
from scipy import optimize
from scipy.optimize import OptimizeResult
from functools import partial, wraps
class MinimizerException(Exception):
def __init__(self, message, sender):
super().__init__(message)
self.sender = sender
class OutOfBudget(MinimizerException):
pass
class Unstable(MinimizerException):
pass
def catches(minimizer):
"""
Enhances minimizer with catching ``MinimizerException``
which could be raised by the function.
Returns
-------
result : Callable
The decorated minimizer.
"""
@wraps(minimizer)
def _wrapped(fun, *args, **kwargs):
last_x = None
last_f = None
def _fun(_x, *_fun_args, **_fun_kwargs):
nonlocal last_x, last_f
last_f = fun(_x, *_fun_args, **_fun_kwargs)
last_x = np.array(_x)
return last_f
try:
return minimizer(_fun, *args, **kwargs)
except MinimizerException as e:
return OptimizeResult({
"fun": last_f,
"x": last_x,
"message": "Target function raised an exception.",
"exception": e,
"success": False,
})
return _wrapped
class count_calls:
def __init__(self, fun, budget=None):
"""
Counts function calls.
Parameters
----------
fun : Callable
Function to track.
budget : int
The maximum allowed number of calls.
"""
self.fun = fun
self.n = 0
self.budget = budget
@property
def available(self) -> float:
if self.budget is None:
return float("+inf")
return self.budget - self.n
@property
def out_of_budget(self):
return self.available <= 0
def __call__(self, *args, **kwargs):
if self.out_of_budget:
raise OutOfBudget(f"function call count exceeds {self.budget:d}", self)
self.n += 1
return self.fun(*args, **kwargs)
def __enter__(self):
self.n = 0
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class ensure_trend:
def __init__(self, fun, criterion):
"""
Tracks values returned by a function and ensures they
satisfy a certain trend.
Parameters
----------
fun : Callable
A function to track.
criterion : Callable
A ``function(history: list) -> bool`` returning False
if trend is not satisfied and True otherwise.
"""
self.fun = fun
self.criterion = criterion
self.history = []
def __call__(self, *args, **kwargs):
result = self.fun(*args, **kwargs)
self.history.append(result)
if not self.criterion(self.history):
raise Unstable(f"function trend not satisfied")
return result
def __enter__(self):
self.history = []
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def descending_trend(n):
"""
Descending trend: returns True as long as
the global minimum is somewhere within last
n entries.
Parameters
----------
n : int
The number of last entries.
Returns
-------
function : Callable
The resulting function.
"""
def _check(history: list) -> bool:
if len(history) <= n:
return True
return min(history[-n:]) < min(history[:-n])
return _check
def eval_budget(minimizer):
"""
Enhances minimizer with a ``fun_budget`` keyword argument
which limits the number of function evaluations.
Returns
-------
result : Callable
The decorated minimizer.
"""
@wraps(minimizer)
def _wrapped(fun, *args, fun_budget=None, **kwargs):
with count_calls(fun, budget=fun_budget) as counting_f:
return minimizer(counting_f, *args, **kwargs)
return _wrapped
@eval_budget
@catches
def minimize_scipy(f, x0, jac, **kwargs):
"""
Scipy minimizer matcher brought to the common signature.
Parameters
----------
f : Callable
Function value.
x0 : np.ndarray
The starting point.
jac : Callable
Function gradient.
kwargs
Other arguments to scipy minimizer.
Returns
-------
result : OptimizeResult
The result of the minimization.
"""
return optimize.minimize(f, x0, jac=jac, **kwargs)
def ase_minimize_wrapper(driver, f, x0, jac, n=3, options=None):
"""
Interface to ASE optimizers.
Parameters
----------
driver
ASE optimizer class.
f : Callable
Function value.
x0 : np.ndarray
The starting point.
jac : Callable
Function gradient.
n : int
The number of trailing dimensions.
options
Other arguments to ASE constructor.
Returns
-------
result : OptimizeResult
The result of optimization.
"""
class ASEAtoms:
def __init__(self, f, x0, jac, n):
"""
An interface squeaking like ase.Atoms.
Parameters
----------
f : Callable
Evaluates energy.
x0 : np.ndarray
The starting point.
jac : Callable
Evaluates energy gradients.
"""
self.f = f
self.x = np.array(x0).copy().reshape(-1)
self.jac = jac
self.n = n
assert len(self.x) % n == 0, f"vector size {len(self.x)} is not a multiple of the trailing dimension n={n:d}"
def get_forces(self) -> np.ndarray:
return - self.jac(self.x).reshape(-1, self.n)
def get_positions(self) -> np.ndarray:
return self.x.reshape(-1, self.n)
def set_positions(self, x: np.ndarray):
self.x = x.ravel()
def get_potential_energy(self, **_) -> float:
return self.f(self.x)
def __len__(self):
return len(self.x) // self.n
init_options = {
"logfile": "/dev/null",
}
run_options = {}
if options is None:
options = {}
else:
options = dict(options)
for src, dst in ("maxiter", "steps"), ("gtol", "fmax"): # kinda analogues (maybe not exactly)
if src in options:
run_options[dst] = options.pop(src)
init_options.update(options)
optimizer = driver(ASEAtoms(f, x0, jac, n), **init_options)
optimizer.run(**run_options)
success = bool(optimizer.converged())
return OptimizeResult({
"fun": optimizer.atoms.get_potential_energy(),
"x": optimizer.atoms.x,
"success": success,
"message": "Optimization terminated successfully."
if success else "ASE failed to reach the desired force threshold",
})
@eval_budget
@catches
def minimize_ase(f, x0, jac, n=3, method="GoodOldQuasiNewton", options=None):
"""
Scipy-like interface to ASE optimizers.
Parameters
----------
f : Callable
Function value.
x0 : np.ndarray
The starting point.
jac : Callable
Function gradient.
n : int
The number of trailing dimensions.
method : str
One of ASE methods to use for optimizing.
options
Other arguments to ASE constructor.
Returns
-------
result : OptimizeResult
The result of optimization.
"""
from ase import optimize
driver = getattr(optimize, method)
return ase_minimize_wrapper(driver, f, x0, jac, n=n, options=options)
def minimize_openmx(f, x0, jac, qn_method=None, qn_kwargs=None, sd_method=None, sd_kwargs=None,
kwargs=None, stability_checker=None, sd_steps=200, fun_budget=None):
"""
A minimizer that alternates quasi-Newton and steepest descend dynamics.
Useful in cases when quasi-Newton methods end up at a local saddle point
rather than a local minimum. Peeked from OpenMX.
Parameters
----------
f : Callable
Function value.
x0 : np.ndarray
The starting point.
jac : Callable
Function gradient.
qn_method : Callable
The quasi-Newton method to use. Defaults to ASE BFGSLineSearch.
qn_kwargs : dict
Options for the quasi-Newton method.
sd_method : Callable
The steepest descend method to use. Defaults to ASE FIRE.
sd_kwargs : dict
Options for the steepest descend method.
kwargs : dict
Common options to both minimizers.
stability_checker : Callable
A callable ``f(f_history) -> bool`` that returns True as long as
the quasi-newton process is still stable. Once False is returned,
the minimization algorithm is switched towards steepest descend.
Defaults to ``descending_trend(10)``.
sd_steps : int
A fixed number of steepest descend steps. This is a shortcut to
``sd_method_options['budget']``.
fun_budget : int
Total calls to the function.
Returns
-------
result : OptimizeResult
The result of the optimization.
"""
if qn_kwargs is None:
qn_kwargs = {}
if sd_kwargs is None:
sd_kwargs = {}
if kwargs is None:
kwargs = {}
qn_kwargs = {**kwargs, **qn_kwargs}
if qn_method is None:
qn_method = minimize_ase
qn_kwargs = {"method": "BFGSLineSearch", **qn_kwargs}
sd_kwargs = {"fun_budget": sd_steps, **kwargs, **sd_kwargs}
if sd_method is None:
sd_method = minimize_ase
sd_kwargs = {"method": "FIRE", **sd_kwargs}
if stability_checker is None:
stability_checker = descending_trend(10)
with count_calls(f, fun_budget) as counting_f:
while True:
with ensure_trend(counting_f, stability_checker) as stable_f:
result = qn_method(stable_f, x0, jac, **qn_kwargs)
if result.success or counting_f.out_of_budget: # success or out of budget
return result
x0 = result.x
result = sd_method(counting_f, x0, jac, **sd_kwargs)
if result.success or counting_f.out_of_budget:
return result
from . import kernel, potentials, dyn, util, units
from . import kernel, potentials, dyn, util, units, minimize
import numpy as np
from numpy import testing
......@@ -78,7 +78,7 @@ class EKMatchMixin:
class ASEMixin:
def _test_ase(self, optimizer):
self.test_relax(relax_kwargs={
"driver": dyn.minimize_ase,
"driver": minimize.minimize_ase,
"method": optimizer,
"options": {"gtol": 1e-3}
}, xtol=1e-3, ftol=1.1e-3, etol=1e-5)
......@@ -123,6 +123,13 @@ class ASEMixin:
def test_relax_ase_ode12r(self):
self._test_ase("ODE12r")
@ase_only
def test_relax_openmx(self):
self.test_relax(relax_kwargs={
"driver": minimize.minimize_openmx,
"kwargs": {"options": {"gtol": 1e-3}}
}, xtol=1e-3, ftol=1.1e-3, etol=1e-5)
class MoleculeTests(ASEMixin, UpdateTestMixin, EKMatchMixin, TestCase):
@classmethod
......
......@@ -446,6 +446,32 @@ class MainTest(TestCase):
cells = load_cell_list(out["cells_direct"])
assert len(cells) == 10
@pytest.mark.skip_if(no_ase, "skip ase tests")
def test_openmx_relax(self):
out = self.__run_yaml__("""
test-relax:
init:
units: default
prepare:
random_cells:
n: 10
density: !nu 0.0222 / Å ** 3
atoms:
bi: 26
se: 39
potentials:
- tag: harmonic repulsion
parameters:
a: !nu 2*Å
epsilon: !nu 5*eV
run:
driver: openmx
fun_budget: 10
save: {cells_direct}
""", cells=amorphous_bise_json_sample, cells_direct_out=True)
cells = load_cell_list(out["cells_direct"])
assert len(cells) == 10
def test_parallel_descriptors(self):
out = self.__run_yaml__("""
fit:
......
from . import minimize
import pytest
import numpy as np
from numpy import testing
@pytest.fixture()
def no_ase(pytestconfig):
return bool(pytestconfig.getoption("no_ase"))
ase_only = pytest.mark.skip_if(no_ase, "skip ase tests")
def f_xsq(x):
return x ** 2
def f_xsq_d(x):
return 2 * x
def f_fermi_d(x):
"""Non-convex"""
x = np.linalg.norm(x)
return - 1. / (2 + np.exp(x) + np.exp(-x))
def f_fermi_dd(x):
u = x / np.linalg.norm(x)
x = np.linalg.norm(x)
return f_fermi_d(x) ** 2 * (np.exp(x) - np.exp(-x)) * u
def test_budget():
result = minimize.minimize_scipy(f_xsq, 1, f_xsq_d, fun_budget=1)
assert result.success is False
assert isinstance(result.exception, minimize.OutOfBudget)
def test_ase():
result = minimize.minimize_ase(f_fermi_d, (3, 2, 1), f_fermi_dd, method="FIRE", options={"gtol": 1e-8})
assert result.success is True
testing.assert_allclose(result.fun, -.25)
testing.assert_allclose(result.x, (0, 0, 0), atol=1e-7)
def test_openmx():
invocations = []
def _tracking_ase(*args, **kwargs):
invocations.append(kwargs)
return minimize.minimize_ase(*args, **kwargs)
result = minimize.minimize_openmx(
f_fermi_d, np.array((3, 2, 1)), f_fermi_dd,