Skip to content
Snippets Groups Projects
Commit 9290504e authored by Christoph Groth's avatar Christoph Groth
Browse files

fill overlong lines

parent a81da3af
No related branches found
No related tags found
No related merge requests found
Pipeline #
...@@ -272,7 +272,8 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -272,7 +272,8 @@ class ProcessPoolExecutor(_base.Executor):
""" """
def shutdown_worker(): def shutdown_worker():
# This is an upper bound # This is an upper bound
nb_children_alive = sum(p.is_alive() for p in self._processes.values()) nb_children_alive = sum(p.is_alive()
for p in self._processes.values())
for i in range(0, nb_children_alive): for i in range(0, nb_children_alive):
self._call_queue.put_nowait(None) self._call_queue.put_nowait(None)
# Release the queue's resources as soon as possible. # Release the queue's resources as soon as possible.
...@@ -303,7 +304,8 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -303,7 +304,8 @@ class ProcessPoolExecutor(_base.Executor):
result_item = reader.recv() result_item = reader.recv()
self._reader_event.clear() self._reader_event.clear()
if result_item is not None: if result_item is not None:
work_item = self._pending_work_items.pop(result_item.work_id, None) work_item = self._pending_work_items.pop(result_item.work_id,
None)
# work_item can be None if another process terminated (see above) # work_item can be None if another process terminated (see above)
if work_item is not None: if work_item is not None:
if not work_item.future.cancelled(): if not work_item.future.cancelled():
...@@ -313,18 +315,19 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -313,18 +315,19 @@ class ProcessPoolExecutor(_base.Executor):
work_item.future.set_result(result_item.result) work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284 # Delete references to object. See issue16284
del work_item del work_item
# No more work items can be added if the executor that owns this worker # No more work items can be added if the executor that owns this
# has been shutdown. # worker has been shutdown.
if self._shutdown_thread: if self._shutdown_thread:
try: try:
# Since no new work items can be added, it is safe to shutdown # Since no new work items can be added, it is safe to
# this thread if there are no pending work items. # shutdown this thread if there are no pending work items.
if not self._pending_work_items: if not self._pending_work_items:
shutdown_worker() shutdown_worker()
return return
except Full: except Full:
# This is not a problem: we will eventually be woken up (in # This is not a problem: we will eventually be woken up (in
# self._result_queue.get()) and be able to send a sentinel again. # self._result_queue.get()) and be able to send a sentinel
# again.
pass pass
def _start_queue_management_thread(self): def _start_queue_management_thread(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment