Skip to content
Snippets Groups Projects

Resolve "(Learner1D) add possibility to use the direct neighbors in the loss"

Merged Jorn Hoofwijk requested to merge 119-add-second-order-loss-to-adaptive into master
Compare and
8 files
+ 310
70
Compare changes
  • Side-by-side
  • Inline
Files
8
+ 192
53
@@ -3,16 +3,71 @@ from copy import deepcopy
import heapq
import itertools
import math
from collections import Iterable
import numpy as np
import sortedcontainers
from .base_learner import BaseLearner
from .learnerND import volume
from .triangulation import simplex_volume_in_embedding
from ..notebook_integration import ensure_holoviews
from ..utils import cache_latest
def uniform_loss(interval, scale, function_values):
def uses_nth_neighbors(n):
"""Decorator to specify how many neighboring intervals the loss function uses.
Wraps loss functions to indicate that they expect intervals together
with ``n`` nearest neighbors
The loss function will then receive the data of the N nearest neighbors
(``nth_neighbors``) aling with the data of the interval itself in a dict.
The `~adaptive.Learner1D` will also make sure that the loss is updated
whenever one of the ``nth_neighbors`` changes.
Examples
--------
The next function is a part of the `curvature_loss_function` function.
>>> @uses_nth_neighbors(1)
...def triangle_loss(xs, ys):
... xs = [x for x in xs if x is not None]
... ys = [y for y in ys if y is not None]
...
... if len(xs) == 2: # we do not have enough points for a triangle
... return xs[1] - xs[0]
...
... N = len(xs) - 2 # number of constructed triangles
... if isinstance(ys[0], Iterable):
... pts = [(x, *y) for x, y in zip(xs, ys)]
... vol = simplex_volume_in_embedding
... else:
... pts = [(x, y) for x, y in zip(xs, ys)]
... vol = volume
... return sum(vol(pts[i:i+3]) for i in range(N)) / N
Or you may define a loss that favours the (local) minima of a function,
assuming that you know your function will have a single float as output.
>>> @uses_nth_neighbors(1)
... def local_minima_resolving_loss(xs, ys):
... dx = xs[2] - xs[1] # the width of the interval of interest
...
... if not ((ys[0] is not None and ys[0] > ys[1])
... or (ys[3] is not None and ys[3] > ys[2])):
... return loss * 100
...
... return loss
"""
def _wrapped(loss_per_interval):
loss_per_interval.nth_neighbors = n
return loss_per_interval
return _wrapped
@uses_nth_neighbors(0)
def uniform_loss(xs, ys):
"""Loss function that samples the domain uniformly.
Works with `~adaptive.Learner1D` only.
@@ -27,33 +82,58 @@ def uniform_loss(interval, scale, function_values):
... loss_per_interval=uniform_sampling_1d)
>>>
"""
x_left, x_right = interval
x_scale, _ = scale
dx = (x_right - x_left) / x_scale
dx = xs[1] - xs[0]
return dx
def default_loss(interval, scale, function_values):
@uses_nth_neighbors(0)
def default_loss(xs, ys):
"""Calculate loss on a single interval.
Currently returns the rescaled length of the interval. If one of the
y-values is missing, returns 0 (so the intervals with missing data are
never touched. This behavior should be improved later.
"""
x_left, x_right = interval
y_right, y_left = function_values[x_right], function_values[x_left]
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
if y_scale == 0:
loss = dx
dx = xs[1] - xs[0]
if isinstance(ys[0], Iterable):
dy = [abs(a-b) for a, b in zip(*ys)]
return np.hypot(dx, dy).max()
else:
dy = ys[1] - ys[0]
return np.hypot(dx, dy)
@uses_nth_neighbors(1)
def triangle_loss(xs, ys):
xs = [x for x in xs if x is not None]
ys = [y for y in ys if y is not None]
if len(xs) == 2: # we do not have enough points for a triangle
return xs[1] - xs[0]
N = len(xs) - 2 # number of constructed triangles
if isinstance(ys[0], Iterable):
pts = [(x, *y) for x, y in zip(xs, ys)]
vol = simplex_volume_in_embedding
else:
dy = (y_right - y_left) / y_scale
try:
len(dy)
loss = np.hypot(dx, dy).max()
except TypeError:
loss = math.hypot(dx, dy)
return loss
pts = [(x, y) for x, y in zip(xs, ys)]
vol = volume
return sum(vol(pts[i:i+3]) for i in range(N)) / N
def curvature_loss_function(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
@uses_nth_neighbors(1)
def curvature_loss(xs, ys):
xs_middle = xs[1:3]
ys_middle = xs[1:3]
triangle_loss_ = triangle_loss(xs, ys)
default_loss_ = default_loss(xs_middle, ys_middle)
dx = xs_middle[0] - xs_middle[0]
return (area_factor * (triangle_loss_**0.5)
+ euclid_factor * default_loss_
+ horizontal_factor * dx)
return curvature_loss
def linspace(x_left, x_right, n):
@@ -79,6 +159,15 @@ def _get_neighbors_from_list(xs):
return sortedcontainers.SortedDict(neighbors)
def _get_intervals(x, neighbors, nth_neighbors):
nn = nth_neighbors
i = neighbors.index(x)
start = max(0, i - nn - 1)
end = min(len(neighbors), i + nn + 2)
points = neighbors.keys()[start:end]
return list(zip(points, points[1:]))
class Learner1D(BaseLearner):
"""Learns and predicts a function 'f:ℝ → ℝ^N'.
@@ -103,21 +192,34 @@ class Learner1D(BaseLearner):
Notes
-----
`loss_per_interval` takes 3 parameters: ``interval``, ``scale``, and
``function_values``, and returns a scalar; the loss over the interval.
interval : (float, float)
The bounds of the interval.
scale : (float, float)
The x and y scale over all the intervals, useful for rescaling the
interval loss.
function_values : dict(float → float)
A map containing evaluated function values. It is guaranteed
to have values for both of the points in 'interval'.
`loss_per_interval` takes 2 parameters: ``xs`` and ``ys``, and returns a
scalar; the loss over the interval.
xs : tuple of floats
The x values of the interval, if `nth_neighbors` is greater than zero it
also contains the x-values of the neighbors of the interval, in ascending
order. The interval we want to know the loss of is then the middle
interval. If no neighbor is available (at the edges of the domain) then
`None` will take the place of the x-value of the neighbor.
ys : tuple of function values
The output values of the function when evaluated at the `xs`. This is
either a float or a tuple of floats in the case of vector output.
The `loss_per_interval` function may also have an attribute `nth_neighbors`
that indicates how many of the neighboring intervals to `interval` are used.
If `loss_per_interval` doesn't have such an attribute, it's assumed that is
uses **no** neighboring intervals. Also see the `uses_nth_neighbors`
decorator for more information.
"""
def __init__(self, function, bounds, loss_per_interval=None):
self.function = function
if hasattr(loss_per_interval, 'nth_neighbors'):
self.nth_neighbors = loss_per_interval.nth_neighbors
else:
self.nth_neighbors = 0
self.loss_per_interval = loss_per_interval or default_loss
# A dict storing the loss function for each interval x_n.
@@ -176,25 +278,60 @@ class Learner1D(BaseLearner):
losses = self.losses if real else self.losses_combined
return max(losses.values()) if len(losses) > 0 else float('inf')
def _scale_x(self, x):
if x is None:
return None
return x / self._scale[0]
def _scale_y(self, y):
if y is None:
return None
y_scale = self._scale[1] or 1
return y / y_scale
def _get_point_by_index(self, ind):
if ind < 0 or ind >= len(self.neighbors):
return None
return self.neighbors.keys()[ind]
def _get_loss_in_interval(self, x_left, x_right):
assert x_left is not None and x_right is not None
if x_right - x_left < self._dx_eps:
return 0
nn = self.nth_neighbors
i = self.neighbors.index(x_left)
start = i - nn
end = i + nn + 2
xs = [self._get_point_by_index(i) for i in range(start, end)]
ys = [self.data.get(x, None) for x in xs]
xs_scaled = tuple(self._scale_x(x) for x in xs)
ys_scaled = tuple(self._scale_y(y) for y in ys)
# we need to compute the loss for this interval
return self.loss_per_interval(xs_scaled, ys_scaled)
def _update_interpolated_loss_in_interval(self, x_left, x_right):
if x_left is not None and x_right is not None:
dx = x_right - x_left
if dx < self._dx_eps:
loss = 0
else:
loss = self.loss_per_interval((x_left, x_right),
self._scale, self.data)
self.losses[x_left, x_right] = loss
# Iterate over all interpolated intervals in between
# x_left and x_right and set the newly interpolated loss.
a, b = x_left, None
while b != x_right:
b = self.neighbors_combined[a][1]
self.losses_combined[a, b] = (b - a) * loss / dx
a = b
if x_left is None or x_right is None:
return
loss = self._get_loss_in_interval(x_left, x_right)
self.losses[x_left, x_right] = loss
# Iterate over all interpolated intervals in between
# x_left and x_right and set the newly interpolated loss.
a, b = x_left, None
dx = x_right - x_left
while b != x_right:
b = self.neighbors_combined[a][1]
self.losses_combined[a, b] = (b - a) * loss / dx
a = b
def _update_losses(self, x, real=True):
"""Update all losses that depend on x"""
# When we add a new point x, we should update the losses
# (x_left, x_right) are the "real" neighbors of 'x'.
x_left, x_right = self._find_neighbors(x, self.neighbors)
@@ -207,10 +344,11 @@ class Learner1D(BaseLearner):
if real:
# We need to update all interpolated losses in the interval
# (x_left, x) and (x, x_right). Since the addition of the point
# 'x' could change their loss.
self._update_interpolated_loss_in_interval(x_left, x)
self._update_interpolated_loss_in_interval(x, x_right)
# (x_left, x), (x, x_right) and the nth_neighbors nearest
# neighboring intervals. Since the addition of the
# point 'x' could change their loss.
for ival in _get_intervals(x, self.neighbors, self.nth_neighbors):
self._update_interpolated_loss_in_interval(*ival)
# Since 'x' is in between (x_left, x_right),
# we get rid of the interval.
@@ -284,6 +422,9 @@ class Learner1D(BaseLearner):
if x in self.data:
# The point is already evaluated before
return
if y is None:
raise TypeError("Y-value may not be None, use learner.tell_pending(x)"
"to indicate that this value is currently being calculated")
# either it is a float/int, if not, try casting to a np.array
if not isinstance(y, (float, int)):
@@ -356,10 +497,8 @@ class Learner1D(BaseLearner):
# The the losses for the "real" intervals.
self.losses = {}
for x_left, x_right in intervals:
self.losses[x_left, x_right] = (
self.loss_per_interval((x_left, x_right), self._scale, self.data)
if x_right - x_left >= self._dx_eps else 0)
for ival in intervals:
self.losses[ival] = self._get_loss_in_interval(*ival)
# List with "real" intervals that have interpolated intervals inside
to_interpolate = []
Loading