Skip to content
Snippets Groups Projects
Commit 8d0d6fb0 authored by Christoph Groth's avatar Christoph Groth
Browse files

add missing remove_reader() calls

parent 0d3d3270
No related branches found
No related tags found
No related merge requests found
...@@ -135,7 +135,7 @@ def _process_worker(call_queue, result_queue): ...@@ -135,7 +135,7 @@ def _process_worker(call_queue, result_queue):
else: else:
result_queue.put(_ResultItem(call_item.work_id, None, r)) result_queue.put(_ResultItem(call_item.work_id, None, r))
def _terminate_pool(executor, processes, pending_work_items): def _terminate_pool(executor, processes, pending_work_items, loop):
# Mark the process pool broken so that submits fail right now. # Mark the process pool broken so that submits fail right now.
executor._broken = True executor._broken = True
executor._shutdown_thread = True executor._shutdown_thread = True
...@@ -153,6 +153,7 @@ def _terminate_pool(executor, processes, pending_work_items): ...@@ -153,6 +153,7 @@ def _terminate_pool(executor, processes, pending_work_items):
# Terminate remaining workers forcibly: the queues or their # Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever. # locks may be in a dirty state and block forever.
for p in processes.values(): for p in processes.values():
loop.remove_reader(p.sentinel)
p.terminate() p.terminate()
def _add_call_item_to_queue(pending_work_items, def _add_call_item_to_queue(pending_work_items,
...@@ -220,6 +221,7 @@ async def _queue_management_worker(executor, ...@@ -220,6 +221,7 @@ async def _queue_management_worker(executor,
# If .join() is not called on the created processes then # If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X. # some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values(): for p in processes.values():
loop.remove_reader(p.sentinel)
p.join() p.join()
loop.remove_reader(reader._handle) loop.remove_reader(reader._handle)
...@@ -387,7 +389,8 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -387,7 +389,8 @@ class ProcessPoolExecutor(_base.Executor):
self._loop.add_reader(p.sentinel, _terminate_pool, self._loop.add_reader(p.sentinel, _terminate_pool,
self, self,
self._processes, self._processes,
self._pending_work_items) self._pending_work_items,
self._loop)
def prioritize(self, priority, fn, *args, **kwargs): def prioritize(self, priority, fn, *args, **kwargs):
if self._broken: if self._broken:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment