Skip to content
Snippets Groups Projects
Commit 290ad878 authored by Bas Nijholt's avatar Bas Nijholt
Browse files

introduce self.futures

parent cdbbf9d8
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Adaptive # Adaptive
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import holoviews as hv import holoviews as hv
hv.notebook_extension() hv.notebook_extension()
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import numpy as np import numpy as np
import learner1D import learner1D
from time import sleep from time import sleep
from random import randint from random import randint
from functools import partial from functools import partial
import importlib import importlib
importlib.reload(learner1D) importlib.reload(learner1D)
def func(x, wait=False): def func(x, wait=False):
"""Function with a sharp peak on a smooth background""" """Function with a sharp peak on a smooth background"""
x = np.asarray(x) x = np.asarray(x)
a = 0.001 a = 0.001
if wait: if wait:
sleep(np.random.rand(1)/10) sleep(np.random.rand(1)/10)
return x + a**2/(a**2 + x**2) #+ np.random.rand(1) return x + a**2/(a**2 + x**2) #+ np.random.rand(1)
def plot(learner, nan_is_zero=False, show_interp=False): def plot(learner, nan_is_zero=False, show_interp=False):
if show_interp: if show_interp:
learner.interpolate() learner.interpolate()
d = learner.interp_data d = learner.interp_data
else: else:
d = learner.data d = learner.data
xy = [(k, d[k]) for k in sorted(d)] xy = [(k, d[k]) for k in sorted(d)]
x, y = np.array(xy, dtype=float).T x, y = np.array(xy, dtype=float).T
return hv.Scatter((x, y)) return hv.Scatter((x, y))
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
$$x + a^2/(a^2 + x^2)$$ $$x + a^2/(a^2 + x^2)$$
$$a = 0.001$$ $$a = 0.001$$
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
xs = np.linspace(-1, 1, 10) xs = np.linspace(-1, 1, 10)
ys = func(xs) ys = func(xs)
learner = learner1D.Learner1D(xs, ys) learner = learner1D.Learner1D(xs, ys)
plot(learner)[-1.1:1.1, -1.1:1.1] plot(learner)[-1.1:1.1, -1.1:1.1]
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
xs = learner.choose_points(n=10) xs = learner.choose_points(n=10)
ys = func(xs) ys = func(xs)
learner.add_data(xs, ys) learner.add_data(xs, ys)
plot(learner)[-1.1:1.1, -1.1:1.1] plot(learner)[-1.1:1.1, -1.1:1.1]
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
xs = learner.choose_points(n=30) xs = learner.choose_points(n=30)
# Do not calculate ys here. # Do not calculate ys here.
plot(learner, show_interp=True)[-1.1:1.1, -1.1:1.1] plot(learner, show_interp=True)[-1.1:1.1, -1.1:1.1]
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Parallel # Parallel
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from dask import delayed from dask import delayed
from distributed import Client from distributed import Client
client = Client() client = Client()
num_cores = sum(client.ncores().values()) num_cores = sum(client.ncores().values())
num_cores num_cores
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
func2 = partial(func, wait=True) func2 = partial(func, wait=True)
learner = learner1D.Learner1D(client=client) learner = learner1D.Learner1D(client=client)
learner.initialize(func2, -1, 1) learner.initialize(func2, -1, 1)
while True: while True:
if len(client.futures) < num_cores: if len(learner.futures) < num_cores:
xs = learner.choose_points(n=1) xs = learner.choose_points(n=1)
learner.map(func, xs) learner.map(func, xs)
if len(learner.data) > 100: # bad criterion if len(learner.data) > 100: # bad criterion
break break
plot(learner) plot(learner)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
func2 = partial(func, wait=True) func2 = partial(func, wait=True)
learner = learner1D.Learner1D(client=client) learner = learner1D.Learner1D(client=client)
learner.initialize(func2, -1, 1) learner.initialize(func2, -1, 1)
while True: while True:
if len(client.futures) < num_cores: if len(learner.futures) < num_cores:
xs = learner.choose_points(n=1) xs = learner.choose_points(n=1)
learner.map(func, xs) learner.map(func, xs)
if len(learner.get_done()) > 150: # bad criterion if len(learner.get_done()) > 150: # bad criterion
break break
plot(learner) plot(learner)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
func_wait = partial(func, wait=True) func_wait = partial(func, wait=True)
learner = learner1D.Learner1D(client=client) learner = learner1D.Learner1D(client=client)
learner.initialize(func_wait, -1, 1) learner.initialize(func_wait, -1, 1)
while True: while True:
if len(client.futures) < num_cores: if len(learner.futures) < num_cores:
xs = learner.choose_points(n=1) xs = learner.choose_points(n=1)
learner.map(func_wait, xs) learner.map(func_wait, xs)
if learner.get_largest_interval() < 0.01 * learner.x_range: if learner.get_largest_interval() < 0.01 * learner.x_range:
break break
print(len(learner.data), len(client.futures)) print(len(learner.data), len(learner.futures))
plot(learner) plot(learner)
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Desired interface ## Desired interface
```python ```python
async = learner.async_map(funcs, xs, tol=0.01) async = learner.async_map(funcs, xs, tol=0.01)
if async.done(): if async.done():
print('done') print('done')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
xs = np.linspace(-1, 1, 5000) xs = np.linspace(-1, 1, 5000)
ys = func(xs) ys = func(xs)
learner = learner1D.Learner1D(xs, ys) learner = learner1D.Learner1D(xs, ys)
plot(learner)[-1.1:1.1, -1.1:1.1] plot(learner)[-1.1:1.1, -1.1:1.1]
``` ```
......
...@@ -68,6 +68,8 @@ class Learner1D(object): ...@@ -68,6 +68,8 @@ class Learner1D(object):
self.num_done = 0 self.num_done = 0
self.futures = {}
def loss(self, x_left, x_right): def loss(self, x_left, x_right):
"""Calculate loss in the interval x_left, x_right. """Calculate loss in the interval x_left, x_right.
...@@ -152,10 +154,6 @@ class Learner1D(object): ...@@ -152,10 +154,6 @@ class Learner1D(object):
self.largest_interval = np.diff(xs).max() self.largest_interval = np.diff(xs).max()
return self.largest_interval return self.largest_interval
def get_done(self):
done = {x: y for x, y in self.data.items() if y is not None}
return done
def interpolate(self): def interpolate(self):
xdata = [] xdata = []
ydata = [] ydata = []
...@@ -199,10 +197,23 @@ class Learner1D(object): ...@@ -199,10 +197,23 @@ class Learner1D(object):
except KeyError: except KeyError:
pass pass
def get_done(self):
done = {x: y for x, y in self.data.items() if y is not None}
return done
def add_futures(self, xs, ys):
"""Add concurrent.futures to the self.futures dict."""
try:
for x, y in zip(xs, ys):
self.futures[x] = y
except TypeError:
self.futures[xs] = ys
def done_callback(self, n, tol): def done_callback(self, n, tol):
@synchronized @synchronized
def wrapped(future): def wrapped(future):
x, y = future.result() x, y = future.result()
self.futures.pop(x)
return self.add_data(x, y) return self.add_data(x, y)
return wrapped return wrapped
...@@ -210,6 +221,7 @@ class Learner1D(object): ...@@ -210,6 +221,7 @@ class Learner1D(object):
ys = self.client.map(add_arg(func), xs) ys = self.client.map(add_arg(func), xs)
for y in ys: for y in ys:
y.add_done_callback(self.done_callback(tol, n)) y.add_done_callback(self.done_callback(tol, n))
self.add_futures(xs, ys)
def initialize(self, func, xmin, xmax): def initialize(self, func, xmin, xmax):
self.map(func, [xmin, xmax]) self.map(func, [xmin, xmax])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment