Skip to content
Snippets Groups Projects

Resolve "(Learner1D) add possibility to use the direct neighbors in the loss"

Merged Jorn Hoofwijk requested to merge 119-add-second-order-loss-to-adaptive into master
Compare and Show latest version
9 files
+ 269
117
Compare changes
  • Side-by-side
  • Inline
Files
9
+ 163
92
@@ -3,17 +3,70 @@ from copy import deepcopy
import heapq
import itertools
import math
from collections import Iterable
import numpy as np
import sortedcontainers
from .base_learner import BaseLearner
from .learnerND import volume
from .triangulation import simplex_volume_in_embedding
from ..notebook_integration import ensure_holoviews
from ..utils import cache_latest
def uniform_loss(interval, scale, function_values):
def uses_nth_neighbors(n):
"""Decorator to specify how many neighboring intervals the loss function uses.
Wraps loss functions to indicate that they expect intervals together
with ``n`` nearest neighbors
The loss function will then receive the data of the N nearest neighbors
(``nth_neighbors``) aling with the data of the interval itself in a dict.
The `~adaptive.Learner1D` will also make sure that the loss is updated
whenever one of the ``nth_neighbors`` changes.
Examples
--------
The next function is a part of the `get_curvature_loss` function.
>>> @uses_nth_neighbors(1)
...def triangle_loss(xs, ys):
... xs = [x for x in xs if x is not None]
... ys = [y for y in ys if y is not None]
...
... if len(xs) == 2: # we do not have enough points for a triangle
... return xs[1] - xs[0]
...
... N = len(xs) - 2 # number of constructed triangles
... if isinstance(ys[0], Iterable):
... pts = [(x, *y) for x, y in zip(xs, ys)]
... vol = simplex_volume_in_embedding
... else:
... pts = [(x, y) for x, y in zip(xs, ys)]
... vol = volume
... return sum(vol(pts[i:i+3]) for i in range(N)) / N
Or you may define a loss that favours the (local) minima of a function.
>>> @uses_nth_neighbors(1)
... def local_minima_resolving_loss(xs, ys):
... dx = xs[2] - xs[1] # the width of the interval of interest
...
... if not ((y[0] is not None and y[0] > y[1])
... or (y[3] is not None and y[3] > y[2])):
... return loss * 100
...
... return loss
"""
def _wrapped(loss_per_interval):
loss_per_interval.nth_neighbors = n
return loss_per_interval
return _wrapped
@uses_nth_neighbors(0)
def uniform_loss(xs, ys):
"""Loss function that samples the domain uniformly.
Works with `~adaptive.Learner1D` only.
@@ -28,67 +81,56 @@ def uniform_loss(interval, scale, function_values):
... loss_per_interval=uniform_sampling_1d)
>>>
"""
x_left, x_right = interval
x_scale, _ = scale
dx = (x_right - x_left) / x_scale
dx = xs[1] - xs[0]
return dx
def default_loss(interval, scale, function_values):
@uses_nth_neighbors(0)
def default_loss(xs, ys):
"""Calculate loss on a single interval.
Currently returns the rescaled length of the interval. If one of the
y-values is missing, returns 0 (so the intervals with missing data are
never touched. This behavior should be improved later.
"""
x_left, x_right = interval
y_right, y_left = function_values[x_right], function_values[x_left]
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
if y_scale == 0:
loss = dx
dx = xs[1] - xs[0]
dy = ys[1] - ys[0]
if isinstance(dy, Iterable):
loss = np.hypot(dx, dy).max()
else:
dy = (y_right - y_left) / y_scale
try:
len(dy)
loss = np.hypot(dx, dy).max()
except TypeError:
loss = math.hypot(dx, dy)
loss = np.hypot(dx, dy)
return loss
def _loss_of_multi_interval(xs, ys):
pts = list(zip(xs, ys))
N = len(pts) - 2
return sum(volume(pts[i:i+3]) for i in range(N)) / N
def triangle_loss(interval, neighbours, scale, function_values):
x_left, x_right = interval
neighbour_left, neighbour_right = neighbours
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
xs = [neighbour_left, x_left, x_right, neighbour_right]
# The neighbours could be None if we are at the boundary, in that case we
# have to filter this out
@uses_nth_neighbors(1)
def triangle_loss(xs, ys):
xs = [x for x in xs if x is not None]
y_scale = y_scale or 1
ys = [function_values[x] / y_scale for x in xs]
xs = [x / x_scale for x in xs]
ys = [y for y in ys if y is not None]
if len(xs) <= 2:
return dx
if len(xs) == 2: # we do not have enough points for a triangle
return xs[1] - xs[0]
N = len(xs) - 2 # number of constructed triangles
if isinstance(ys[0], Iterable):
pts = [(x, *y) for x, y in zip(xs, ys)]
vol = simplex_volume_in_embedding
else:
return _loss_of_multi_interval(xs, ys)
pts = [(x, y) for x, y in zip(xs, ys)]
vol = volume
return sum(vol(pts[i:i+3]) for i in range(N)) / N
def get_curvature_loss(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
def curvature_loss(interval, neighbours, scale, function_values):
triangle_loss_ = triangle_loss(interval, neighbours, scale, function_values)
default_loss_ = default_loss(interval, scale, function_values)
dx = interval[1] - interval[0]
@uses_nth_neighbors(1)
def curvature_loss(xs, ys):
xs_middle = xs[1:3]
ys_middle = xs[1:3]
triangle_loss_ = triangle_loss(xs, ys)
default_loss_ = default_loss(xs_middle, ys_middle)
dx = xs_middle[0] - xs_middle[0]
return (area_factor * (triangle_loss_**0.5)
+ euclid_factor * old_loss
+ euclid_factor * default_loss_
+ horizontal_factor * dx)
return curvature_loss
@@ -116,6 +158,15 @@ def _get_neighbors_from_list(xs):
return sortedcontainers.SortedDict(neighbors)
def _get_intervals(x, neighbors, nth_neighbors):
nn = nth_neighbors
i = neighbors.index(x)
start = max(0, i - nn - 1)
end = min(len(neighbors), i + nn + 2)
points = neighbors.keys()[start:end]
return list(zip(points, points[1:]))
class Learner1D(BaseLearner):
"""Learns and predicts a function 'f:ℝ → ℝ^N'.
@@ -140,27 +191,35 @@ class Learner1D(BaseLearner):
Notes
-----
`loss_per_interval` takes 3 parameters: ``interval``, ``scale``, and
``function_values``, and returns a scalar; the loss over the interval.
interval : (float, float)
The bounds of the interval.
scale : (float, float)
The x and y scale over all the intervals, useful for rescaling the
interval loss.
function_values : dict(float → float)
A map containing evaluated function values. It is guaranteed
to have values for both of the points in 'interval'.
`loss_per_interval` takes 2 parameters: ``xs`` and ``ys``, and returns a
scalar; the loss over the interval.
xs : tuple of floats
The x values of the interval, if `nth_neighbors` is greater than zero it
also contains the x-values of the neighbors of the interval, in ascending
order. The interval we want to know the loss of is then the middle
interval. If no neighbor is available (at the edges of the domain) then
`None` will take the place of the x-value of the neighbor.
ys : tuple of function values
The output values of the function when evaluated at the `xs`. This is
either a float or a tuple of floats in the case of vector output.
The `loss_per_interval` function should also have
an attribute `nth_neighbors` that indicates how many of the neighboring
intervals to `interval` are used. If `loss_per_interval` doesn't
have such an attribute, it's assumed that is uses **no** neighboring
intervals. Also see the `uses_nth_neighbors` decorator.
"""
def __init__(self, function, bounds, loss_per_interval=None, loss_depends_on_neighbours=False):
def __init__(self, function, bounds, loss_per_interval=None):
self.function = function
self._loss_depends_on_neighbours = loss_depends_on_neighbours
if loss_depends_on_neighbours:
self.loss_per_interval = loss_per_interval or curvature_loss
if hasattr(loss_per_interval, 'nth_neighbors'):
self.nth_neighbors = loss_per_interval.nth_neighbors
else:
self.loss_per_interval = loss_per_interval or default_loss
self.nth_neighbors = 0
self.loss_per_interval = loss_per_interval or default_loss
# A dict storing the loss function for each interval x_n.
self.losses = {}
@@ -218,37 +277,55 @@ class Learner1D(BaseLearner):
losses = self.losses if real else self.losses_combined
return max(losses.values()) if len(losses) > 0 else float('inf')
def _get_loss_in_interval(self, x_left, x_right):
if x_left is None or x_right is None:
def _scale_x(self, x):
if x is None:
return None
return x / self._scale[0]
def _scale_y(self, y):
if y is None:
return None
y_scale = self._scale[1] or 1
return y / y_scale
dx = x_right - x_left
if dx < self._dx_eps:
def _get_point_by_index(self, ind):
if ind < 0 or ind >= len(self.neighbors):
return None
return self.neighbors.keys()[ind]
def _get_loss_in_interval(self, x_left, x_right):
assert x_left is not None and x_right is not None
if x_right - x_left < self._dx_eps:
return 0
nn = self.nth_neighbors
i = self.neighbors.index(x_left)
start = i - nn
end = i + nn + 2
# we need to compute the loss for this interval
interval = (x_left, x_right)
if self._loss_depends_on_neighbours:
neighbour_left = self._find_neighbors(x_left , self.neighbors)[0]
neighbour_right = self._find_neighbors(x_right, self.neighbors)[1]
neighbours = neighbour_left, neighbour_right
return self.loss_per_interval(interval, neighbours,
self._scale, self.data)
else:
return self.loss_per_interval(interval, self._scale, self.data)
xs = [self._get_point_by_index(i) for i in range(start, end)]
ys = [self.data.get(x, None) for x in xs]
xs_scaled = [self._scale_x(x) for x in xs]
ys_scaled = [self._scale_y(y) for y in ys]
# we need to compute the loss for this interval
print(xs_scaled, ys_scaled)
return self.loss_per_interval(xs_scaled, ys_scaled)
def _update_interpolated_loss_in_interval(self, x_left, x_right):
if x_left is None or x_right is None:
return None
return
dx = x_right - x_left
loss = self._get_loss_in_interval(x_left, x_right)
self.losses[x_left, x_right] = loss
# Iterate over all interpolated intervals in between
# x_left and x_right and set the newly interpolated loss.
a, b = x_left, None
dx = x_right - x_left
while b != x_right:
b = self.neighbors_combined[a][1]
self.losses_combined[a, b] = (b - a) * loss / dx
@@ -268,17 +345,11 @@ class Learner1D(BaseLearner):
if real:
# We need to update all interpolated losses in the interval
# (x_left, x) and (x, x_right). Since the addition of the point
# 'x' could change their loss.
self._update_interpolated_loss_in_interval(x_left, x)
self._update_interpolated_loss_in_interval(x, x_right)
# if the loss depends on the neighbors we should also update those losses
if self._loss_depends_on_neighbours:
neighbour_left = self._find_neighbors(x_left , self.neighbors)[0]
neighbour_right = self._find_neighbors(x_right, self.neighbors)[1]
self._update_interpolated_loss_in_interval(neighbour_left, x_left)
self._update_interpolated_loss_in_interval(x_right, neighbour_right)
# (x_left, x), (x, x_right) and the nth_neighbors nearest
# neighboring intervals. Since the addition of the
# point 'x' could change their loss.
for ival in _get_intervals(x, self.neighbors, self.nth_neighbors):
self._update_interpolated_loss_in_interval(*ival)
# Since 'x' is in between (x_left, x_right),
# we get rid of the interval.
@@ -287,6 +358,7 @@ class Learner1D(BaseLearner):
elif x_left is not None and x_right is not None:
# 'x' happens to be in between two real points,
# so we can interpolate the losses.
print(x_left, x_right)
dx = x_right - x_left
loss = self.losses[x_left, x_right]
self.losses_combined[a, x] = (x - a) * loss / dx
@@ -307,9 +379,7 @@ class Learner1D(BaseLearner):
@staticmethod
def _find_neighbors(x, neighbors):
if x in neighbors:
return neighbors
if x is None:
return None, None
return neighbors[x]
pos = neighbors.bisect_left(x)
keys = neighbors.keys()
x_left = keys[pos - 1] if pos != 0 else None
@@ -354,6 +424,9 @@ class Learner1D(BaseLearner):
if x in self.data:
# The point is already evaluated before
return
if y is None:
raise TypeError("Y-value may not be None, use learner.tell_pending(x)"
"to indicate that this value is currently being calculated")
# either it is a float/int, if not, try casting to a np.array
if not isinstance(y, (float, int)):
@@ -426,10 +499,8 @@ class Learner1D(BaseLearner):
# The the losses for the "real" intervals.
self.losses = {}
for x_left, x_right in intervals:
self.losses[x_left, x_right] = (
self.loss_per_interval((x_left, x_right), self._scale, self.data)
if x_right - x_left >= self._dx_eps else 0)
for ival in intervals:
self.losses[ival] = self._get_loss_in_interval(*ival)
# List with "real" intervals that have interpolated intervals inside
to_interpolate = []
Loading