Skip to content
Snippets Groups Projects
Commit fde1774b authored by Jorn Hoofwijk's avatar Jorn Hoofwijk Committed by Bas Nijholt
Browse files

change loss function signature

parent 27331280
No related branches found
No related tags found
1 merge request!131Resolve "(Learner1D) add possibility to use the direct neighbors in the loss"
Pipeline #13516 passed
......@@ -21,44 +21,42 @@ def uses_nth_neighbors(n):
Wraps loss functions to indicate that they expect intervals together
with ``n`` nearest neighbors
The loss function is then guaranteed to receive the data of at least the
N nearest neighbors (``nth_neighbors``) in a dict that tells you what the
neighboring points of these are. And the `~adaptive.Learner1D` will
then make sure that the loss is updated whenever one of the
``nth_neighbors`` changes.
The loss function will then receive the data of the N nearest neighbors
(``nth_neighbors``) aling with the data of the interval itself in a dict.
The `~adaptive.Learner1D` will also make sure that the loss is updated
whenever one of the ``nth_neighbors`` changes.
Examples
--------
The next function is a part of the `get_curvature_loss` function.
The next function is a part of the `curvature_loss_function` function.
>>> @uses_nth_neighbors(1)
... def triangle_loss(interval, scale, data, neighbors):
... x_left, x_right = interval
... xs = [neighbors[x_left][0], x_left, x_right, neighbors[x_right][1]]
... # at the boundary, neighbors[<left boundary x>] is (None, <some other x>)
... xs = [x for x in xs if x is not None]
... if len(xs) <= 2:
... return (x_right - x_left) / scale[0]
...def triangle_loss(xs, ys):
... xs = [x for x in xs if x is not None]
... ys = [y for y in ys if y is not None]
...
... y_scale = scale[1] or 1
... ys_scaled = [data[x] / y_scale for x in xs]
... xs_scaled = [x / scale[0] for x in xs]
... N = len(xs) - 2
... pts = [(x, y) for x, y in zip(xs_scaled, ys_scaled)]
... return sum(volume(pts[i:i+3]) for i in range(N)) / N
Or you may define a loss that favours the (local) minima of a function.
... if len(xs) == 2: # we do not have enough points for a triangle
... return xs[1] - xs[0]
...
... N = len(xs) - 2 # number of constructed triangles
... if isinstance(ys[0], Iterable):
... pts = [(x, *y) for x, y in zip(xs, ys)]
... vol = simplex_volume_in_embedding
... else:
... pts = [(x, y) for x, y in zip(xs, ys)]
... vol = volume
... return sum(vol(pts[i:i+3]) for i in range(N)) / N
Or you may define a loss that favours the (local) minima of a function,
assuming that you know your function will have a single float as output.
>>> @uses_nth_neighbors(1)
... def local_minima_resolving_loss(interval, scale, data, neighbors):
... x_left, x_right = interval
... n_left = neighbors[x_left][0]
... n_right = neighbors[x_right][1]
... loss = (x_right - x_left) / scale[0]
... def local_minima_resolving_loss(xs, ys):
... dx = xs[2] - xs[1] # the width of the interval of interest
...
... if not ((n_left is not None and data[x_left] > data[n_left])
... or (n_right is not None and data[x_right] > data[n_right])):
... if not ((ys[0] is not None and ys[0] > ys[1])
... or (ys[3] is not None and ys[3] > ys[2])):
... return loss * 100
...
... return loss
......@@ -68,9 +66,8 @@ def uses_nth_neighbors(n):
return loss_per_interval
return _wrapped
@uses_nth_neighbors(0)
def uniform_loss(interval, scale, data, neighbors):
def uniform_loss(xs, ys):
"""Loss function that samples the domain uniformly.
Works with `~adaptive.Learner1D` only.
......@@ -85,38 +82,36 @@ def uniform_loss(interval, scale, data, neighbors):
... loss_per_interval=uniform_sampling_1d)
>>>
"""
x_left, x_right = interval
x_scale, _ = scale
dx = (x_right - x_left) / x_scale
dx = xs[1] - xs[0]
return dx
@uses_nth_neighbors(0)
def default_loss(interval, scale, data, neighbors):
def default_loss(xs, ys):
"""Calculate loss on a single interval.
Currently returns the rescaled length of the interval. If one of the
y-values is missing, returns 0 (so the intervals with missing data are
never touched. This behavior should be improved later.
"""
x_left, x_right = interval
y_right, y_left = data[x_right], data[x_left]
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
if y_scale == 0:
loss = dx
dx = xs[1] - xs[0]
if isinstance(ys[0], Iterable):
dy = [abs(a-b) for a, b in zip(*ys)]
return np.hypot(dx, dy).max()
else:
dy = (y_right - y_left) / y_scale
try:
len(dy)
loss = np.hypot(dx, dy).max()
except TypeError:
loss = math.hypot(dx, dy)
return loss
dy = ys[1] - ys[0]
return np.hypot(dx, dy)
def _loss_of_multi_interval(xs, ys):
N = len(xs) - 2
@uses_nth_neighbors(1)
def triangle_loss(xs, ys):
xs = [x for x in xs if x is not None]
ys = [y for y in ys if y is not None]
if len(xs) == 2: # we do not have enough points for a triangle
return xs[1] - xs[0]
N = len(xs) - 2 # number of constructed triangles
if isinstance(ys[0], Iterable):
pts = [(x, *y) for x, y in zip(xs, ys)]
vol = simplex_volume_in_embedding
......@@ -126,27 +121,15 @@ def _loss_of_multi_interval(xs, ys):
return sum(vol(pts[i:i+3]) for i in range(N)) / N
@uses_nth_neighbors(1)
def triangle_loss(interval, scale, data, neighbors):
x_left, x_right = interval
xs = [neighbors[x_left][0], x_left, x_right, neighbors[x_right][1]]
xs = [x for x in xs if x is not None]
if len(xs) <= 2:
return (x_right - x_left) / scale[0]
else:
y_scale = scale[1] or 1
ys_scaled = [data[x] / y_scale for x in xs]
xs_scaled = [x / scale[0] for x in xs]
return _loss_of_multi_interval(xs_scaled, ys_scaled)
def get_curvature_loss(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
def curvature_loss_function(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
@uses_nth_neighbors(1)
def curvature_loss(interval, scale, data, neighbors):
triangle_loss_ = triangle_loss(interval, scale, data, neighbors)
default_loss_ = default_loss(interval, scale, data, neighbors)
dx = (interval[1] - interval[0]) / scale[0]
def curvature_loss(xs, ys):
xs_middle = xs[1:3]
ys_middle = xs[1:3]
triangle_loss_ = triangle_loss(xs, ys)
default_loss_ = default_loss(xs_middle, ys_middle)
dx = xs_middle[0] - xs_middle[0]
return (area_factor * (triangle_loss_**0.5)
+ euclid_factor * default_loss_
+ horizontal_factor * dx)
......@@ -209,29 +192,24 @@ class Learner1D(BaseLearner):
Notes
-----
`loss_per_interval` takes 4 parameters: ``interval``, ``scale``,
``data``, and ``neighbors``, and returns a scalar; the loss over
the interval.
interval : (float, float)
The bounds of the interval.
scale : (float, float)
The x and y scale over all the intervals, useful for rescaling the
interval loss.
data : dict(float → float)
A map containing evaluated function values. It is guaranteed
to have values for both of the points in 'interval'.
neighbors : dict(float → (float, float))
A map containing points as keys to its neighbors as a tuple.
At the left ``x_left`` and right ``x_left`` most boundary it has
``x_left: (None, float)`` and ``x_right: (float, None)``.
The `loss_per_interval` function should also have
an attribute `nth_neighbors` that indicates how many of the neighboring
intervals to `interval` are used. If `loss_per_interval` doesn't
have such an attribute, it's assumed that is uses **no** neighboring
intervals. Also see the `uses_nth_neighbors` decorator.
**WARNING**: When modifying the `data` and `neighbors` datastructures
the learner will behave in an undefined way.
`loss_per_interval` takes 2 parameters: ``xs`` and ``ys``, and returns a
scalar; the loss over the interval.
xs : tuple of floats
The x values of the interval, if `nth_neighbors` is greater than zero it
also contains the x-values of the neighbors of the interval, in ascending
order. The interval we want to know the loss of is then the middle
interval. If no neighbor is available (at the edges of the domain) then
`None` will take the place of the x-value of the neighbor.
ys : tuple of function values
The output values of the function when evaluated at the `xs`. This is
either a float or a tuple of floats in the case of vector output.
The `loss_per_interval` function may also have an attribute `nth_neighbors`
that indicates how many of the neighboring intervals to `interval` are used.
If `loss_per_interval` doesn't have such an attribute, it's assumed that is
uses **no** neighboring intervals. Also see the `uses_nth_neighbors`
decorator for more information.
"""
def __init__(self, function, bounds, loss_per_interval=None):
......@@ -300,16 +278,41 @@ class Learner1D(BaseLearner):
losses = self.losses if real else self.losses_combined
return max(losses.values()) if len(losses) > 0 else float('inf')
def _scale_x(self, x):
if x is None:
return None
return x / self._scale[0]
def _scale_y(self, y):
if y is None:
return None
y_scale = self._scale[1] or 1
return y / y_scale
def _get_point_by_index(self, ind):
if ind < 0 or ind >= len(self.neighbors):
return None
return self.neighbors.keys()[ind]
def _get_loss_in_interval(self, x_left, x_right):
assert x_left is not None and x_right is not None
if x_right - x_left < self._dx_eps:
return 0
# we need to compute the loss for this interval
return self.loss_per_interval(
(x_left, x_right), self._scale, self.data, self.neighbors)
nn = self.nth_neighbors
i = self.neighbors.index(x_left)
start = i - nn
end = i + nn + 2
xs = [self._get_point_by_index(i) for i in range(start, end)]
ys = [self.data.get(x, None) for x in xs]
xs_scaled = tuple(self._scale_x(x) for x in xs)
ys_scaled = tuple(self._scale_y(y) for y in ys)
# we need to compute the loss for this interval
return self.loss_per_interval(xs_scaled, ys_scaled)
def _update_interpolated_loss_in_interval(self, x_left, x_right):
if x_left is None or x_right is None:
......@@ -419,6 +422,9 @@ class Learner1D(BaseLearner):
if x in self.data:
# The point is already evaluated before
return
if y is None:
raise TypeError("Y-value may not be None, use learner.tell_pending(x)"
"to indicate that this value is currently being calculated")
# either it is a float/int, if not, try casting to a np.array
if not isinstance(y, (float, int)):
......
......@@ -4,7 +4,7 @@ import random
import numpy as np
from ..learner import Learner1D
from ..learner.learner1D import get_curvature_loss
from ..learner.learner1D import curvature_loss_function
from ..runner import simple
......@@ -347,7 +347,7 @@ def test_curvature_loss():
def f(x):
return np.tanh(20*x)
loss = get_curvature_loss()
loss = curvature_loss_function()
assert loss.nth_neighbors == 1
learner = Learner1D(f, (-1, 1), loss_per_interval=loss)
simple(learner, goal=lambda l: l.npoints > 100)
......@@ -358,7 +358,7 @@ def test_curvature_loss_vectors():
def f(x):
return np.tanh(20*x), np.tanh(20*(x-0.4))
loss = get_curvature_loss()
loss = curvature_loss_function()
assert loss.nth_neighbors == 1
learner = Learner1D(f, (-1, 1), loss_per_interval=loss)
simple(learner, goal=lambda l: l.npoints > 100)
......
......@@ -17,4 +17,4 @@ Custom loss functions
.. autofunction:: adaptive.learner.learner1D.triangle_loss
.. autofunction:: adaptive.learner.learner1D.get_curvature_loss
.. autofunction:: adaptive.learner.learner1D.curvature_loss_function
......@@ -150,10 +150,10 @@ by specifying ``loss_per_interval``.
.. jupyter-execute::
from adaptive.learner.learner1D import (get_curvature_loss,
from adaptive.learner.learner1D import (curvature_loss_function,
uniform_loss,
default_loss)
curvature_loss = get_curvature_loss()
curvature_loss = curvature_loss_function()
learner = adaptive.Learner1D(f, bounds=(-1, 1), loss_per_interval=curvature_loss)
runner = adaptive.Runner(learner, goal=lambda l: l.loss() < 0.01)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment