Skip to content
Snippets Groups Projects

Resolve "(Learner1D) add possibility to use the direct neighbors in the loss"

Merged Jorn Hoofwijk requested to merge 119-add-second-order-loss-to-adaptive into master
1 unresolved thread
Compare and Show latest version
7 files
+ 219
88
Compare changes
  • Side-by-side
  • Inline
Files
7
+ 129
65
@@ -3,16 +3,78 @@ from copy import deepcopy
import heapq
import itertools
import math
from collections import Iterable
import numpy as np
import sortedcontainers
from .base_learner import BaseLearner
from .learnerND import volume
from .triangulation import simplex_volume_in_embedding
from ..notebook_integration import ensure_holoviews
from ..utils import cache_latest
def uniform_loss(interval, scale, function_values):
def use_nn_neighbors(n):
"""Decorator to specify how many neighboring intervals the loss function uses.
This decorator can wrap around a loss function to let `~adaptive.Learner1D`
know that you would like to look at the N-nearest neighboring intervals.
The loss function is then guaranteed to receive the data of at least the
nn-neighbors and a dict that tells you what the neighboring points of these
are. And the Learner1D will then make sure that the loss is updated whenever
on of the nn-neighbours changes.
Examples
--------
This is a part of the curvature loss function
>>> @use_nn_neighbors(1)
...def triangle_loss(interval, scale, data, neighbors):
... x_left, x_right = interval
... xs = [neighbors[x_left][0], x_left, x_right, neighbors[x_right][1]]
... # at the boundary, neighbours[<left boundary x>] is (None, <some other x>)
... xs = [x for x in xs if x is not None]
... if len(xs) <= 2:
... return (x_right - x_left) / scale[0]
...
... y_scale = scale[1] or 1
... ys_scaled = [data[x] / y_scale for x in xs]
... xs_scaled = [x / scale[0] for x in xs]
... N = len(xs) - 2
... pts = [(x, y) for x, y in zip(xs_scaled, ys_scaled)]
... return sum(volume(pts[i:i+3]) for i in range(N)) / N
Or you may define a loss that favours the (local) minima of a function.
>>>@use_nn_neighbors(1)
...def loss(interval, scale, data, neighbors):
... x_left, x_right = interval
... n_left = neighbors[x_left][0]
... n_right = neighbors[x_right][1]
... is_min = True
...
... if n_left is not None and data[x_left] > data[n_left]:
... is_min = False
... if n_right is not None and data[x_right] > data[n_right]:
... is_min = False
...
... loss = (x_right - x_left) / scale[0]
...
... if is_min:
... return loss * 100
... return loss
"""
def _wrapped(loss_per_interval):
loss_per_interval.nn_neighbors = n
return loss_per_interval
return _wrapped
@use_nn_neighbors(0)
def uniform_loss(interval, scale, data, neighbors):
"""Loss function that samples the domain uniformly.
Works with `~adaptive.Learner1D` only.
@@ -33,7 +95,8 @@ def uniform_loss(interval, scale, function_values):
return dx
def default_loss(interval, scale, function_values):
@use_nn_neighbors(0)
def default_loss(interval, scale, data, neighbors):
"""Calculate loss on a single interval.
Currently returns the rescaled length of the interval. If one of the
@@ -41,7 +104,7 @@ def default_loss(interval, scale, function_values):
never touched. This behavior should be improved later.
"""
x_left, x_right = interval
y_right, y_left = function_values[x_right], function_values[x_left]
y_right, y_left = data[x_right], data[x_left]
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
if y_scale == 0:
@@ -56,38 +119,40 @@ def default_loss(interval, scale, function_values):
return loss
def loss_of_multi_interval(xs, ys):
pts = list(zip(xs, ys))
vols = [volume(pts[i:i+3]) for i in range(len(pts)-2)]
return np.average(vols)
def _loss_of_multi_interval(xs, ys):
N = len(xs) - 2
if isinstance(ys[0], Iterable):
pts = [(x, *y) for x, y in zip(xs, ys)]
vol = simplex_volume_in_embedding
else:
pts = [(x, y) for x, y in zip(xs, ys)]
vol = volume
return sum(vol(pts[i:i+3]) for i in range(N)) / N
def triangle_loss(interval, neighbours, scale, function_values):
@use_nn_neighbors(1)
def triangle_loss(interval, scale, data, neighbors):
x_left, x_right = interval
neighbour_left, neighbour_right = neighbours
x_scale, y_scale = scale
dx = (x_right - x_left) / x_scale
xs = [neighbour_left, x_left, x_right, neighbour_right]
# The neighbours could be None if we are at the boundary, in that case we
# have to filter this out
xs = [neighbors[x_left][0], x_left, x_right, neighbors[x_right][1]]
xs = [x for x in xs if x is not None]
y_scale = y_scale or 1
ys = [function_values[x] / y_scale for x in xs]
xs = [x / x_scale for x in xs]
if len(xs) <= 2:
return dx
return (x_right - x_left) / scale[0]
else:
return loss_of_multi_interval(xs, ys)
y_scale = scale[1] or 1
ys_scaled = [data[x] / y_scale for x in xs]
xs_scaled = [x / scale[0] for x in xs]
return _loss_of_multi_interval(xs_scaled, ys_scaled)
def get_curvature_loss(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02):
def curvature_loss(interval, neighbours, scale, function_values):
triangle_loss_ = triangle_loss(interval, neighbours, scale, function_values)
default_loss_ = default_loss(interval, scale, function_values)
dx = interval[1] - interval[0]
@use_nn_neighbors(1)
def curvature_loss(interval, scale, data, neighbors):
triangle_loss_ = triangle_loss(interval, scale, data, neighbors)
default_loss_ = default_loss(interval, scale, data, neighbors)
dx = (interval[1] - interval[0]) / scale[0]
return (area_factor * (triangle_loss_**0.5)
+ euclid_factor * old_loss
+ euclid_factor * default_loss_
+ horizontal_factor * dx)
return curvature_loss
@@ -115,6 +180,15 @@ def _get_neighbors_from_list(xs):
return sortedcontainers.SortedDict(neighbors)
def _get_intervals(x, neighbors, nn_neighbors):
nn = nn_neighbors
i = neighbors.index(x)
start = max(0, i - nn - 1)
end = min(len(neighbors), i + nn + 2)
points = neighbors.keys()[start:end]
return list(zip(points, points[1:]))
class Learner1D(BaseLearner):
"""Learns and predicts a function 'f:ℝ → ℝ^N'.
@@ -129,6 +203,12 @@ class Learner1D(BaseLearner):
A function that returns the loss for a single interval of the domain.
If not provided, then a default is used, which uses the scaled distance
in the x-y plane as the loss. See the notes for more details.
nn_neighbors : int, optional, default: None
The number of neighboring intervals that the loss function
takes into account. If ``loss_per_interval`` doesn't use the neighbors
at all, then it should be 0. By default we try to access the
``loss_per_interval.nn_neighbors`` attribute which is set for all
implemented loss functions.
Attributes
----------
@@ -139,27 +219,30 @@ class Learner1D(BaseLearner):
Notes
-----
`loss_per_interval` takes 3 parameters: ``interval``, ``scale``, and
``function_values``, and returns a scalar; the loss over the interval.
`loss_per_interval` takes 4 parameters: ``interval``, ``scale``,
``data``, and ``neighbors``, and returns a scalar; the loss over
the interval.
interval : (float, float)
The bounds of the interval.
scale : (float, float)
The x and y scale over all the intervals, useful for rescaling the
interval loss.
function_values : dict(float → float)
data : dict(float → float)
A map containing evaluated function values. It is guaranteed
to have values for both of the points in 'interval'.
neighbors : dict(float → (float, float))
A map containing points as keys to its neighbors as a tuple.
"""
def __init__(self, function, bounds, loss_per_interval=None, loss_depends_on_neighbours=False):
def __init__(self, function, bounds, loss_per_interval=None):
self.function = function
self._loss_depends_on_neighbours = loss_depends_on_neighbours
if loss_depends_on_neighbours:
self.loss_per_interval = loss_per_interval or curvature_loss
if hasattr(loss_per_interval, 'nn_neighbors'):
self.nn_neighbors = loss_per_interval.nn_neighbors
else:
self.loss_per_interval = loss_per_interval or default_loss
self.nn_neighbors = 0
self.loss_per_interval = loss_per_interval or default_loss
# A dict storing the loss function for each interval x_n.
self.losses = {}
@@ -218,36 +301,27 @@ class Learner1D(BaseLearner):
return max(losses.values()) if len(losses) > 0 else float('inf')
def _get_loss_in_interval(self, x_left, x_right):
if x_left is None or x_right is None:
return None
assert x_left is not None and x_right is not None
dx = x_right - x_left
if dx < self._dx_eps:
if x_right - x_left < self._dx_eps:
return 0
# we need to compute the loss for this interval
interval = (x_left, x_right)
if self._loss_depends_on_neighbours:
neighbour_left = self._find_neighbors(x_left , self.neighbors)[0]
neighbour_right = self._find_neighbors(x_right, self.neighbors)[1]
neighbours = neighbour_left, neighbour_right
return self.loss_per_interval(interval, neighbours,
self._scale, self.data)
else:
return self.loss_per_interval(interval, self._scale, self.data)
return self.loss_per_interval(
(x_left, x_right), self._scale, self.data, self.neighbors)
def _update_interpolated_loss_in_interval(self, x_left, x_right):
if x_left is None or x_right is None:
return None
return
dx = x_right - x_left
loss = self._get_loss_in_interval(x_left, x_right)
self.losses[x_left, x_right] = loss
# Iterate over all interpolated intervals in between
# x_left and x_right and set the newly interpolated loss.
a, b = x_left, None
dx = x_right - x_left
while b != x_right:
b = self.neighbors_combined[a][1]
self.losses_combined[a, b] = (b - a) * loss / dx
@@ -267,17 +341,11 @@ class Learner1D(BaseLearner):
if real:
# We need to update all interpolated losses in the interval
# (x_left, x) and (x, x_right). Since the addition of the point
# 'x' could change their loss.
self._update_interpolated_loss_in_interval(x_left, x)
self._update_interpolated_loss_in_interval(x, x_right)
# if the loss depends on the neighbors we should also update those losses
if self._loss_depends_on_neighbours:
neighbour_left = self._find_neighbors(x_left , self.neighbors)[0]
neighbour_right = self._find_neighbors(x_right, self.neighbors)[1]
self._update_interpolated_loss_in_interval(neighbour_left, x_left)
self._update_interpolated_loss_in_interval(x_right, neighbour_right)
# (x_left, x), (x, x_right) and the nn_neighbors nearest
# neighboring intervals. Since the addition of the
# point 'x' could change their loss.
for ival in _get_intervals(x, self.neighbors, self.nn_neighbors):
self._update_interpolated_loss_in_interval(*ival)
# Since 'x' is in between (x_left, x_right),
# we get rid of the interval.
@@ -307,8 +375,6 @@ class Learner1D(BaseLearner):
def _find_neighbors(x, neighbors):
if x in neighbors:
return neighbors[x]
if x is None:
return None, None
pos = neighbors.bisect_left(x)
keys = neighbors.keys()
x_left = keys[pos - 1] if pos != 0 else None
@@ -425,10 +491,8 @@ class Learner1D(BaseLearner):
# The the losses for the "real" intervals.
self.losses = {}
for x_left, x_right in intervals:
self.losses[x_left, x_right] = (
self.loss_per_interval((x_left, x_right), self._scale, self.data)
if x_right - x_left >= self._dx_eps else 0)
for ival in intervals:
self.losses[ival] = self._get_loss_in_interval(*ival)
# List with "real" intervals that have interpolated intervals inside
to_interpolate = []
Loading