bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pi… · python/cpython@a5cbab5
@@ -80,18 +80,23 @@
80808181class _ThreadWakeup:
8282def __init__(self):
83+self._closed = False
8384self._reader, self._writer = mp.Pipe(duplex=False)
84858586def close(self):
86-self._writer.close()
87-self._reader.close()
87+if not self._closed:
88+self._closed = True
89+self._writer.close()
90+self._reader.close()
88918992def wakeup(self):
90-self._writer.send_bytes(b"")
93+if not self._closed:
94+self._writer.send_bytes(b"")
91959296def clear(self):
93-while self._reader.poll():
94-self._reader.recv_bytes()
97+if not self._closed:
98+while self._reader.poll():
99+self._reader.recv_bytes()
951009610197102def _python_exit():
@@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs):
160165161166class _SafeQueue(Queue):
162167"""Safe Queue set exception to the future object linked to a job"""
163-def __init__(self, max_size=0, *, ctx, pending_work_items):
168+def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
164169self.pending_work_items = pending_work_items
170+self.thread_wakeup = thread_wakeup
165171super().__init__(max_size, ctx=ctx)
166172167173def _on_queue_feeder_error(self, e, obj):
168174if isinstance(obj, _CallItem):
169175tb = traceback.format_exception(type(e), e, e.__traceback__)
170176e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171177work_item = self.pending_work_items.pop(obj.work_id, None)
178+self.thread_wakeup.wakeup()
172179# work_item can be None if another process terminated. In this case,
173180# the queue_manager_thread fails all work_items with BrokenProcessPool
174181if work_item is not None:
@@ -339,6 +346,8 @@ def shutdown_worker():
339346340347# Release the queue's resources as soon as possible.
341348call_queue.close()
349+call_queue.join_thread()
350+thread_wakeup.close()
342351# If .join() is not called on the created processes then
343352# some ctx.Queue methods may deadlock on Mac OS X.
344353for p in processes.values():
@@ -566,29 +575,30 @@ def __init__(self, max_workers=None, mp_context=None,
566575self._pending_work_items = {}
567576self._cancel_pending_futures = False
568577578+# _ThreadWakeup is a communication channel used to interrupt the wait
579+# of the main loop of queue_manager_thread from another thread (e.g.
580+# when calling executor.submit or executor.shutdown). We do not use the
581+# _result_queue to send the wakeup signal to the queue_manager_thread
582+# as it could result in a deadlock if a worker process dies with the
583+# _result_queue write lock still acquired.
584+self._queue_management_thread_wakeup = _ThreadWakeup()
585+569586# Create communication channels for the executor
570587# Make the call queue slightly larger than the number of processes to
571588# prevent the worker processes from idling. But don't make it too big
572589# because futures in the call queue cannot be cancelled.
573590queue_size = self._max_workers + EXTRA_QUEUED_CALLS
574591self._call_queue = _SafeQueue(
575592max_size=queue_size, ctx=self._mp_context,
576-pending_work_items=self._pending_work_items)
593+pending_work_items=self._pending_work_items,
594+thread_wakeup=self._queue_management_thread_wakeup)
577595# Killed worker processes can produce spurious "broken pipe"
578596# tracebacks in the queue's own worker thread. But we detect killed
579597# processes anyway, so silence the tracebacks.
580598self._call_queue._ignore_epipe = True
581599self._result_queue = mp_context.SimpleQueue()
582600self._work_ids = queue.Queue()
583601584-# _ThreadWakeup is a communication channel used to interrupt the wait
585-# of the main loop of queue_manager_thread from another thread (e.g.
586-# when calling executor.submit or executor.shutdown). We do not use the
587-# _result_queue to send the wakeup signal to the queue_manager_thread
588-# as it could result in a deadlock if a worker process dies with the
589-# _result_queue write lock still acquired.
590-self._queue_management_thread_wakeup = _ThreadWakeup()
591-592602def _start_queue_management_thread(self):
593603if self._queue_management_thread is None:
594604# When the executor gets garbarge collected, the weakref callback
@@ -692,16 +702,11 @@ def shutdown(self, wait=True, *, cancel_futures=False):
692702# To reduce the risk of opening too many files, remove references to
693703# objects that use file descriptors.
694704self._queue_management_thread = None
695-if self._call_queue is not None:
696-self._call_queue.close()
697-if wait:
698-self._call_queue.join_thread()
699-self._call_queue = None
705+self._call_queue = None
700706self._result_queue = None
701707self._processes = None
702708703709if self._queue_management_thread_wakeup:
704-self._queue_management_thread_wakeup.close()
705710self._queue_management_thread_wakeup = None
706711707712shutdown.__doc__ = _base.Executor.shutdown.__doc__