Skip to content
Snippets Groups Projects
Commit d5b9cac6 authored by Christoph Groth's avatar Christoph Groth
Browse files

use PriorityQueue, add prioritize()

parent 4476ba66
No related branches found
No related tags found
No related merge requests found
......@@ -179,7 +179,7 @@ def _add_call_item_to_queue(pending_work_items,
while True:
if call_queue.full() or work_ids.empty():
return
work_id = work_ids.get_nowait()
work_id = work_ids.get_nowait()[1]
work_item = pending_work_items[work_id]
if not work_item.future.cancelled():
......@@ -354,7 +354,7 @@ class ProcessPoolExecutor(_base.Executor):
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = multiprocessing.SimpleQueue()
self._work_ids = asyncio.Queue()
self._work_ids = asyncio.PriorityQueue()
self._queue_management_thread = None
# Map of pids to processes
self._processes = {}
......@@ -401,7 +401,7 @@ class ProcessPoolExecutor(_base.Executor):
self._processes,
self._pending_work_items)
def submit(self, fn, *args, **kwargs):
def prioritize(self, priority, fn, *args, **kwargs):
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
......@@ -412,11 +412,14 @@ class ProcessPoolExecutor(_base.Executor):
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put_nowait(self._queue_count)
self._work_ids.put_nowait((priority, self._queue_count))
self._queue_count += 1
self._start_queue_management_thread()
return f
def submit(self, fn, *args, **kwargs):
return self.prioritize(0, fn, *args, **kwargs)
submit.__doc__ = _base.Executor.submit.__doc__
def map(self, fn, *iterables, timeout=None, chunksize=1):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment