Skip to content
Snippets Groups Projects

Resolve "(Learner1D) add possibility to use the direct neighbors in the loss"

Merged Jorn Hoofwijk requested to merge 119-add-second-order-loss-to-adaptive into master
1 unresolved thread
Compare and Show latest version
10 files
+ 262
125
Compare changes
  • Side-by-side
  • Inline
Files
10
+ 131
74
@@ -15,15 +15,59 @@ from ..notebook_integration import ensure_holoviews
from ..utils import cache_latest
def annotate_with_nn_neighbors(nn_neighbors):
def uses_nth_neighbors(n):
"""Decorator to specify how many neighboring intervals the loss function uses.
Wraps loss functions to indicate that they expect intervals together
with ``n`` nearest neighbors
The loss function will then receive the data of the N nearest neighbors
(``nth_neighbors``) aling with the data of the interval itself in a dict.
The `~adaptive.Learner1D` will also make sure that the loss is updated
whenever one of the ``nth_neighbors`` changes.
Examples
--------
The next function is a part of the `curvature_loss_function` function.
>>> @uses_nth_neighbors(1)
...def triangle_loss(xs, ys):
... xs = [x for x in xs if x is not None]
... ys = [y for y in ys if y is not None]
...
... if len(xs) == 2: # we do not have enough points for a triangle
... return xs[1] - xs[0]
...
... N = len(xs) - 2 # number of constructed triangles
... if isinstance(ys[0], Iterable):
... pts = [(x, *y) for x, y in zip(xs, ys)]
... vol = simplex_volume_in_embedding
... else:
... pts = [(x, y) for x, y in zip(xs, ys)]
... vol = volume
... return sum(vol(pts[i:i+3]) for i in range(N)) / N
Or you may define a loss that favours the (local) minima of a function,
assuming that you know your function will have a single float as output.
>>> @uses_nth_neighbors(1)
... def local_minima_resolving_loss(xs, ys):
... dx = xs[2] - xs[1] # the width of the interval of interest
...
... if not ((ys[0] is not None and ys[0] > ys[1])
... or (ys[3] is not None and ys[3] > ys[2])):
... return loss * 100
...
... return loss
"""
def _wrapped(loss_per_interval):
loss_per_interval.nn_neighbors = nn_neighbors
loss_per_interval.nth_neighbors = n
return loss_per_interval
return _wrapped
@annotate_with_nn_neighbors(0)
def uniform_loss(interval, scale, data, neighbors):
@uses_nth_neighbors(0)
def uniform_loss(xs, ys):
"""Loss function that samples the domain uniformly.
Works with `~adaptive.Learner1D` only.
@@ -38,38 +82,36 @@ def uniform_loss(interval, scale, data, neighbors):
... loss_per_interval=uniform_sampling_1d)
>>>
"""
x_left, x_right = interval
x_scale, _ = scale
dx = (x_right - x_left) / x_scale
dx = xs[1] - xs[0]
return dx
@annotate_with_nn_neighbors(0)
def default_loss(interval, scale, data, neighbors):
@uses_nth_neighbors(0)
def default_loss(xs, ys):
"""Calculate loss on a single interval.
Currently returns the rescaled length of the interval. If one of the
y-values is missing, returns 0 (so the intervals with missing data are
never touched. This behavior should be improved later.
"""
x_left, x_right = interval
y_right, y_left = data[x_right], data[x_left]
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
if y_scale == 0:
loss = dx
dx = xs[1] - xs[0]
if isinstance(ys[0], Iterable):
dy = [abs(a-b) for a, b in zip(*ys)]
return np.hypot(dx, dy).max()
else:
dy = (y_right - y_left) / y_scale
try:
len(dy)
loss = np.hypot(dx, dy).max()
except TypeError:
loss = math.hypot(dx, dy)
return loss
dy = ys[1] - ys[0]
return np.hypot(dx, dy)
def _loss_of_multi_interval(xs, ys):
N = len(xs) - 2
@uses_nth_neighbors(1)
def triangle_loss(xs, ys):
xs = [x for x in xs if x is not None]
ys = [y for y in ys if y is not None]
if len(xs) == 2: # we do not have enough points for a triangle
return xs[1] - xs[0]
N = len(xs) - 2 # number of constructed triangles
if isinstance(ys[0], Iterable):
pts = [(x, *y) for x, y in zip(xs, ys)]
vol = simplex_volume_in_embedding
@@ -79,27 +121,15 @@ def _loss_of_multi_interval(xs, ys):
return sum(vol(pts[i:i+3]) for i in range(N)) / N
@annotate_with_nn_neighbors(1)
def triangle_loss(interval, scale, data, neighbors):
x_left, x_right = interval
xs = [neighbors[x_left][0], x_left, x_right, neighbors[x_right][1]]
xs = [x for x in xs if x is not None]
def curvature_loss_function(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
@uses_nth_neighbors(1)
def curvature_loss(xs, ys):
xs_middle = xs[1:3]
ys_middle = xs[1:3]
if len(xs) <= 2:
return (x_right - x_left) / scale[0]
else:
y_scale = scale[1] or 1
ys_scaled = [data[x] / y_scale for x in xs]
xs_scaled = [x / scale[0] for x in xs]
return _loss_of_multi_interval(xs_scaled, ys_scaled)
def get_curvature_loss(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
@annotate_with_nn_neighbors(1)
def curvature_loss(interval, scale, data, neighbors):
triangle_loss_ = triangle_loss(interval, scale, data, neighbors)
default_loss_ = default_loss(interval, scale, data, neighbors)
dx = (interval[1] - interval[0]) / scale[0]
triangle_loss_ = triangle_loss(xs, ys)
default_loss_ = default_loss(xs_middle, ys_middle)
dx = xs_middle[0] - xs_middle[0]
return (area_factor * (triangle_loss_**0.5)
+ euclid_factor * default_loss_
+ horizontal_factor * dx)
@@ -129,8 +159,8 @@ def _get_neighbors_from_list(xs):
return sortedcontainers.SortedDict(neighbors)
def _get_intervals(x, neighbors, nn_neighbors):
nn = nn_neighbors
def _get_intervals(x, neighbors, nth_neighbors):
nn = nth_neighbors
i = neighbors.index(x)
start = max(0, i - nn - 1)
end = min(len(neighbors), i + nn + 2)
@@ -152,12 +182,6 @@ class Learner1D(BaseLearner):
A function that returns the loss for a single interval of the domain.
If not provided, then a default is used, which uses the scaled distance
in the x-y plane as the loss. See the notes for more details.
nn_neighbors : int, optional, default: None
The number of neighboring intervals that the loss function
takes into account. If ``loss_per_interval`` doesn't use the neighbors
at all, then it should be 0. By default we try to access the
``loss_per_interval.nn_neighbors`` attribute which is set for all
implemented loss functions.
Attributes
----------
@@ -168,28 +192,33 @@ class Learner1D(BaseLearner):
Notes
-----
`loss_per_interval` takes 4 parameters: ``interval``, ``scale``,
``data``, and ``neighbors``, and returns a scalar; the loss over
the interval.
interval : (float, float)
The bounds of the interval.
scale : (float, float)
The x and y scale over all the intervals, useful for rescaling the
interval loss.
data : dict(float → float)
A map containing evaluated function values. It is guaranteed
to have values for both of the points in 'interval'.
neighbors : dict(float → (float, float))
A map containing points as keys to its neighbors as a tuple.
`loss_per_interval` takes 2 parameters: ``xs`` and ``ys``, and returns a
scalar; the loss over the interval.
xs : tuple of floats
The x values of the interval, if `nth_neighbors` is greater than zero it
also contains the x-values of the neighbors of the interval, in ascending
order. The interval we want to know the loss of is then the middle
interval. If no neighbor is available (at the edges of the domain) then
`None` will take the place of the x-value of the neighbor.
ys : tuple of function values
The output values of the function when evaluated at the `xs`. This is
either a float or a tuple of floats in the case of vector output.
The `loss_per_interval` function may also have an attribute `nth_neighbors`
that indicates how many of the neighboring intervals to `interval` are used.
If `loss_per_interval` doesn't have such an attribute, it's assumed that is
uses **no** neighboring intervals. Also see the `uses_nth_neighbors`
decorator for more information.
"""
def __init__(self, function, bounds, loss_per_interval=None):
self.function = function
if hasattr(loss_per_interval, 'nn_neighbors'):
self.nn_neighbors = loss_per_interval.nn_neighbors
if hasattr(loss_per_interval, 'nth_neighbors'):
self.nth_neighbors = loss_per_interval.nth_neighbors
else:
self.nn_neighbors = 0
self.nth_neighbors = 0
self.loss_per_interval = loss_per_interval or default_loss
@@ -249,16 +278,41 @@ class Learner1D(BaseLearner):
losses = self.losses if real else self.losses_combined
return max(losses.values()) if len(losses) > 0 else float('inf')
def _scale_x(self, x):
if x is None:
return None
return x / self._scale[0]
def _scale_y(self, y):
if y is None:
return None
y_scale = self._scale[1] or 1
return y / y_scale
def _get_point_by_index(self, ind):
if ind < 0 or ind >= len(self.neighbors):
return None
return self.neighbors.keys()[ind]
def _get_loss_in_interval(self, x_left, x_right):
assert x_left is not None and x_right is not None
if x_right - x_left < self._dx_eps:
return 0
# we need to compute the loss for this interval
return self.loss_per_interval(
(x_left, x_right), self._scale, self.data, self.neighbors)
nn = self.nth_neighbors
i = self.neighbors.index(x_left)
start = i - nn
end = i + nn + 2
xs = [self._get_point_by_index(i) for i in range(start, end)]
ys = [self.data.get(x, None) for x in xs]
xs_scaled = tuple(self._scale_x(x) for x in xs)
ys_scaled = tuple(self._scale_y(y) for y in ys)
# we need to compute the loss for this interval
return self.loss_per_interval(xs_scaled, ys_scaled)
def _update_interpolated_loss_in_interval(self, x_left, x_right):
if x_left is None or x_right is None:
@@ -290,10 +344,10 @@ class Learner1D(BaseLearner):
if real:
# We need to update all interpolated losses in the interval
# (x_left, x), (x, x_right) and the nn_neighbors nearest
# (x_left, x), (x, x_right) and the nth_neighbors nearest
# neighboring intervals. Since the addition of the
# point 'x' could change their loss.
for ival in _get_intervals(x, self.neighbors, self.nn_neighbors):
for ival in _get_intervals(x, self.neighbors, self.nth_neighbors):
self._update_interpolated_loss_in_interval(*ival)
# Since 'x' is in between (x_left, x_right),
@@ -368,6 +422,9 @@ class Learner1D(BaseLearner):
if x in self.data:
# The point is already evaluated before
return
if y is None:
raise TypeError("Y-value may not be None, use learner.tell_pending(x)"
"to indicate that this value is currently being calculated")
# either it is a float/int, if not, try casting to a np.array
if not isinstance(y, (float, int)):
Loading