Commit 2a424d39 authored by Mathias Diez's avatar Mathias Diez Committed by Anton Akhmerov
Browse files

ImportError on cluster_run now passed. Added ignore_version to...

ImportError on cluster_run now passed. Added ignore_version to exec_job_list_ipython. Improvements on the code of loop_step (sets and readability).
parent 22388e2d
......@@ -17,7 +17,10 @@ from functools import reduce
from IPython.parallel import TimeoutError
import numpy as np
import numpy.lib.recfunctions as rfn
#import cluster_run
import cluster_run
except ImportError:
import utils
prod = itertools.product
......@@ -423,7 +426,7 @@ class Oracle(object):
self._unrequested_x = set()
return points
def exec_job_list_ipython(ipycluster, module, jobs):
def exec_job_list_ipython(ipycluster, module, jobs, ignore_version=False):
"""Execute a series of calculations using an ipython cluster client.
This function allows for a flexible definition of the calculation to be
......@@ -444,6 +447,9 @@ def exec_job_list_ipython(ipycluster, module, jobs):
names of the functions to be executed, and arguments to these
functions. The values of the arguments define the sequence of
calculations to be performed, with the format specified below.
ignore_version : bool
Whether to ignore the availability of git version or raise an error.
Only set to True for testing purposes.
Calculation format
......@@ -540,7 +546,7 @@ def exec_job_list_ipython(ipycluster, module, jobs):
res = IPyResult()
launch = lambda job: ipycluster.apply_async(exec_job, module,
'return', False, *job)
'return', ignore_version, *job)
# Function returning a submitted job from the adaptive variable value.
def make_job(spec, point):
......@@ -576,28 +582,27 @@ def exec_job_list_ipython(ipycluster, module, jobs):
res._running_jobs = res._running_jobs | launched
adaptive_tasks.append([launched, spec, a, o])
# Helper functions to extract x and y for finished adaptive jobs:
extract_x = lambda job, spec: job.get(0)[spec[0][spec[1]].name][spec[2]]
def extract_y(job, spec):
y = job.get(0)[spec[0][spec[1]].name]['_out']
if hasattr(y, '__iter__'):
return y
return [y]
# Convenience functions to run the adaptive loop.
def loop_step(task):
launched, spec, a, o = task
if len(launched) == 0:
return False
# Update the calculations and sort them out if finished
finished = []
for j in launched:
if j.ready():
for j in finished:
finished = {j for j in launched if j.ready()}
launched -= finished
# Submit to oracle finished and start new requests.
if hasattr(j.get(0)[spec[0][spec[1]].name]['_out'], '__iter__')
else [j.get(0)[spec[0][spec[1]].name]['_out']]) for j in finished])
o.add_data([(extract_x(j, spec), extract_y(j, spec)) for j in finished])
new_jobs = {make_job(spec, x) for x in o.request_points()}
for j in new_jobs:
launched |= new_jobs
return True
......@@ -671,8 +676,8 @@ def exec_job(module, output_mode='print', ignore_version=False, *job):
"'print_raw', 'return'.")
t = - time.time()
git_version = get_version_from_git(module)
#if git_version == 'unknown' and not ignore_version:
# raise RuntimeError('Unknown source code version.')
if git_version == 'unknown' and not ignore_version:
raise RuntimeError('Unknown source code version.')
module = realpath(module)
m = imp.load_source('m', module)
result = {}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment