Commit b207ef86 authored by Mathias Diez's avatar Mathias Diez Committed by Anton Akhmerov
Browse files

Finished and tested integration of Oracle into loop_step().

parent f8ebcbcc
......@@ -551,14 +551,6 @@ def exec_job_list_ipython(ipycluster, module, jobs):
spec[0][spec[1]].kwargs[spec[2]] = point
return launch(spec[0])
def split_interval(interval, value, spec, n_points=1):
points = np.linspace(interval[0], interval[1], n_points + 2)
values = [value[0]] + [make_job(spec, point) for point in
points[1: -1]] + [value[1]]
result = dict(((points[i], points[i + 1]), [values[i], values[i + 1]])
for i in xrange(len(points) - 1))
return result, set((v for v in values if hasattr(v, 'msg_ids')))
# Initialize the data for the adaptive jobs.
adaptive_tasks = []
for job in commands:
......@@ -583,101 +575,35 @@ def exec_job_list_ipython(ipycluster, module, jobs):
# Launch initial sampling points.
spec = job, func, key
o = Oracle(a)
launched = dict([(x, make_job(spec, x)) for x in o.request_points()])
#start, end = a.interval
#intervals, launched = split_interval((start, end),
# [make_job(spec, start),
# make_job(spec, end)],
# spec, a.init_points)
#res._running_jobs = res._running_jobs | launched
res._running_jobs = res._running_jobs | set((v for v in launched.itervalues() if hasattr(v, 'msg_ids')))
#adaptive_tasks.append([intervals, spec, a, None, None,
# 0])
launched = {make_job(spec, x) for x in o.request_points()}
res._running_jobs = res._running_jobs | launched
adaptive_tasks.append([launched, spec, a, o])
# Convenience functions to run the adaptive loop.
def sq_norm(origin, end, sq_scales):
return sum((i - j) ** 2 / k for i, j, k in itertools.izip(origin, end,
sq_scales))
def max_min(points):
point_ar = np.array(points)
max_vec, min_vec = np.apply_along_axis(np.max, 0, point_ar), \
np.apply_along_axis(np.min, 0, point_ar)
return max_vec, min_vec
def get_sq_scales(max_vec, min_vec):
return [max(i - j, 1e-4 * i, 1e-8) for i, j in itertools.izip(max_vec,
min_vec)]
def loop_step(task):
launched, spec, a, o = task
if len(launched) == 0:
return False
# Update the calculations and sort them out if finished
done = lambda v: not any((hasattr(i, 'msg_ids') for i in v))
finished = []
for k, v in launched.iteritems():
v = a.get_result(v)
if done(v):
finished.append((k, v))
for k, v in finished:
del launched[k]
for j in launched:
if j.ready():
finished.append(j)
for j in finished:
launched.remove(j)
# Submit to oracle finished and start new requests.
o.add_data(finished)
launched.update([(x, make_job(spec, x)) for x in o.request_points()])
return True
def loop_step_old(task):
intervals, spec, a, max_vec, min_vec, n_done = task
if len(intervals) == 0:
return False
# Update the calculations if they have finished.
for v in intervals.itervalues():
v[0], v[1] = a.get_result(v[0]), a.get_result(v[1])
# Get the available numerical values of data.
y_data = [i[0] for i in intervals.itervalues()
if not hasattr(i[0], 'msg_ids')]
if max_vec is not None:
y_data += [max_vec, min_vec]
# Update extremal values (no need to do it too often)
if len(y_data) > 7:
max_vec, min_vec = max_min(y_data)
o.add_data([(j.get(0)[spec[0][spec[1]].name][spec[2]],
j.get(0)[spec[0][spec[1]].name]['_out']
if hasattr(j.get(0)[spec[0][spec[1]].name]['_out'], '__iter__')
else [j.get(0)[spec[0][spec[1]].name]['_out']]) for j in finished])
new_jobs = {make_job(spec, x) for x in o.request_points()}
for j in new_jobs:
launched.add(j)
res.queue.put(new_jobs)
new_intervals = {}
finished_intervals = []
done = lambda v: not any((hasattr(i, 'msg_ids') for i in v))
if n_done < max(a.init_points * .75, 7):
# We're in the beginning -> split all finished.
for k, v in intervals.iteritems():
if done(v):
new_int, new_jobs = split_interval(k, v, spec, a.n_split)
new_intervals.update(new_int)
res.queue.put(new_jobs)
finished_intervals.append(k)
n_done += 1
else:
# Main loop, split only if not meeting precision goal.
sq_scales = get_sq_scales(max_vec, min_vec)
for k, v in intervals.iteritems():
if done(v):
finished_intervals.append(k)
n_done += 1
dx = (k[0] - k[1]) ** 2 / a.sq_x_scale
dist = dx + sq_norm(v[0], v[1], sq_scales)
if dist > a.precision_goal_sq and dx > a.min_sq_x_scale:
new_int, new_jobs = split_interval(k, v, spec,
a.n_split)
new_intervals.update(new_int)
res.queue.put(new_jobs)
for k in finished_intervals:
del intervals[k]
intervals.update(new_intervals)
task[0] = intervals
task[-3:] = max_vec, min_vec, n_done
return True
def threaded_loop(tasks):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment