bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pi… · python/cpython@a5cbab5

@@ -80,18 +80,23 @@

80808181

class _ThreadWakeup:

8282

def __init__(self):

83+

self._closed = False

8384

self._reader, self._writer = mp.Pipe(duplex=False)

84858586

def close(self):

86-

self._writer.close()

87-

self._reader.close()

87+

if not self._closed:

88+

self._closed = True

89+

self._writer.close()

90+

self._reader.close()

88918992

def wakeup(self):

90-

self._writer.send_bytes(b"")

93+

if not self._closed:

94+

self._writer.send_bytes(b"")

91959296

def clear(self):

93-

while self._reader.poll():

94-

self._reader.recv_bytes()

97+

if not self._closed:

98+

while self._reader.poll():

99+

self._reader.recv_bytes()

951009610197102

def _python_exit():

@@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs):

160165161166

class _SafeQueue(Queue):

162167

"""Safe Queue set exception to the future object linked to a job"""

163-

def __init__(self, max_size=0, *, ctx, pending_work_items):

168+

def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):

164169

self.pending_work_items = pending_work_items

170+

self.thread_wakeup = thread_wakeup

165171

super().__init__(max_size, ctx=ctx)

166172167173

def _on_queue_feeder_error(self, e, obj):

168174

if isinstance(obj, _CallItem):

169175

tb = traceback.format_exception(type(e), e, e.__traceback__)

170176

e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))

171177

work_item = self.pending_work_items.pop(obj.work_id, None)

178+

self.thread_wakeup.wakeup()

172179

# work_item can be None if another process terminated. In this case,

173180

# the queue_manager_thread fails all work_items with BrokenProcessPool

174181

if work_item is not None:

@@ -339,6 +346,8 @@ def shutdown_worker():

339346340347

# Release the queue's resources as soon as possible.

341348

call_queue.close()

349+

call_queue.join_thread()

350+

thread_wakeup.close()

342351

# If .join() is not called on the created processes then

343352

# some ctx.Queue methods may deadlock on Mac OS X.

344353

for p in processes.values():

@@ -566,29 +575,30 @@ def __init__(self, max_workers=None, mp_context=None,

566575

self._pending_work_items = {}

567576

self._cancel_pending_futures = False

568577578+

# _ThreadWakeup is a communication channel used to interrupt the wait

579+

# of the main loop of queue_manager_thread from another thread (e.g.

580+

# when calling executor.submit or executor.shutdown). We do not use the

581+

# _result_queue to send the wakeup signal to the queue_manager_thread

582+

# as it could result in a deadlock if a worker process dies with the

583+

# _result_queue write lock still acquired.

584+

self._queue_management_thread_wakeup = _ThreadWakeup()

585+569586

# Create communication channels for the executor

570587

# Make the call queue slightly larger than the number of processes to

571588

# prevent the worker processes from idling. But don't make it too big

572589

# because futures in the call queue cannot be cancelled.

573590

queue_size = self._max_workers + EXTRA_QUEUED_CALLS

574591

self._call_queue = _SafeQueue(

575592

max_size=queue_size, ctx=self._mp_context,

576-

pending_work_items=self._pending_work_items)

593+

pending_work_items=self._pending_work_items,

594+

thread_wakeup=self._queue_management_thread_wakeup)

577595

# Killed worker processes can produce spurious "broken pipe"

578596

# tracebacks in the queue's own worker thread. But we detect killed

579597

# processes anyway, so silence the tracebacks.

580598

self._call_queue._ignore_epipe = True

581599

self._result_queue = mp_context.SimpleQueue()

582600

self._work_ids = queue.Queue()

583601584-

# _ThreadWakeup is a communication channel used to interrupt the wait

585-

# of the main loop of queue_manager_thread from another thread (e.g.

586-

# when calling executor.submit or executor.shutdown). We do not use the

587-

# _result_queue to send the wakeup signal to the queue_manager_thread

588-

# as it could result in a deadlock if a worker process dies with the

589-

# _result_queue write lock still acquired.

590-

self._queue_management_thread_wakeup = _ThreadWakeup()

591-592602

def _start_queue_management_thread(self):

593603

if self._queue_management_thread is None:

594604

# When the executor gets garbarge collected, the weakref callback

@@ -692,16 +702,11 @@ def shutdown(self, wait=True, *, cancel_futures=False):

692702

# To reduce the risk of opening too many files, remove references to

693703

# objects that use file descriptors.

694704

self._queue_management_thread = None

695-

if self._call_queue is not None:

696-

self._call_queue.close()

697-

if wait:

698-

self._call_queue.join_thread()

699-

self._call_queue = None

705+

self._call_queue = None

700706

self._result_queue = None

701707

self._processes = None

702708703709

if self._queue_management_thread_wakeup:

704-

self._queue_management_thread_wakeup.close()

705710

self._queue_management_thread_wakeup = None

706711707712

shutdown.__doc__ = _base.Executor.shutdown.__doc__