[3.6] Revert "bpo-34172: multiprocessing.Pool leaks resources after b… · python/cpython@eb38ee0
@@ -147,9 +147,8 @@ class Pool(object):
147147 '''
148148_wrap_exception = True
149149150-@staticmethod
151-def Process(ctx, *args, **kwds):
152-return ctx.Process(*args, **kwds)
150+def Process(self, *args, **kwds):
151+return self._ctx.Process(*args, **kwds)
153152154153def __init__(self, processes=None, initializer=None, initargs=(),
155154maxtasksperchild=None, context=None):
@@ -176,15 +175,13 @@ def __init__(self, processes=None, initializer=None, initargs=(),
176175177176self._worker_handler = threading.Thread(
178177target=Pool._handle_workers,
179-args=(self._cache, self._taskqueue, self._ctx, self.Process,
180-self._processes, self._pool, self._inqueue, self._outqueue,
181-self._initializer, self._initargs, self._maxtasksperchild,
182-self._wrap_exception)
178+args=(self, )
183179 )
184180self._worker_handler.daemon = True
185181self._worker_handler._state = RUN
186182self._worker_handler.start()
187183184+188185self._task_handler = threading.Thread(
189186target=Pool._handle_tasks,
190187args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -210,62 +207,43 @@ def __init__(self, processes=None, initializer=None, initargs=(),
210207exitpriority=15
211208 )
212209213-@staticmethod
214-def _join_exited_workers(pool):
210+def _join_exited_workers(self):
215211"""Cleanup after any worker processes which have exited due to reaching
216212 their specified lifetime. Returns True if any workers were cleaned up.
217213 """
218214cleaned = False
219-for i in reversed(range(len(pool))):
220-worker = pool[i]
215+for i in reversed(range(len(self._pool))):
216+worker = self._pool[i]
221217if worker.exitcode is not None:
222218# worker exited
223219util.debug('cleaning up worker %d' % i)
224220worker.join()
225221cleaned = True
226-del pool[i]
222+del self._pool[i]
227223return cleaned
228224229225def _repopulate_pool(self):
230-return self._repopulate_pool_static(self._ctx, self.Process,
231-self._processes,
232-self._pool, self._inqueue,
233-self._outqueue, self._initializer,
234-self._initargs,
235-self._maxtasksperchild,
236-self._wrap_exception)
237-238-@staticmethod
239-def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
240-outqueue, initializer, initargs,
241-maxtasksperchild, wrap_exception):
242226"""Bring the number of pool processes up to the specified number,
243227 for use after reaping workers which have exited.
244228 """
245-for i in range(processes - len(pool)):
246-w = Process(ctx, target=worker,
247-args=(inqueue, outqueue,
248-initializer,
249-initargs, maxtasksperchild,
250-wrap_exception)
251- )
252-pool.append(w)
229+for i in range(self._processes - len(self._pool)):
230+w = self.Process(target=worker,
231+ args=(self._inqueue, self._outqueue,
232+ self._initializer,
233+ self._initargs, self._maxtasksperchild,
234+ self._wrap_exception)
235+ )
236+self._pool.append(w)
253237w.name = w.name.replace('Process', 'PoolWorker')
254238w.daemon = True
255239w.start()
256240util.debug('added worker')
257241258-@staticmethod
259-def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
260-initializer, initargs, maxtasksperchild,
261-wrap_exception):
242+def _maintain_pool(self):
262243"""Clean up any exited workers and start replacements for them.
263244 """
264-if Pool._join_exited_workers(pool):
265-Pool._repopulate_pool_static(ctx, Process, processes, pool,
266-inqueue, outqueue, initializer,
267-initargs, maxtasksperchild,
268-wrap_exception)
245+if self._join_exited_workers():
246+self._repopulate_pool()
269247270248def _setup_queues(self):
271249self._inqueue = self._ctx.SimpleQueue()
@@ -418,20 +396,16 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
418396return result
419397420398@staticmethod
421-def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
422-inqueue, outqueue, initializer, initargs,
423-maxtasksperchild, wrap_exception):
399+def _handle_workers(pool):
424400thread = threading.current_thread()
425401426402# Keep maintaining workers until the cache gets drained, unless the pool
427403# is terminated.
428-while thread._state == RUN or (cache and thread._state != TERMINATE):
429-Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
430-outqueue, initializer, initargs,
431-maxtasksperchild, wrap_exception)
404+while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
405+pool._maintain_pool()
432406time.sleep(0.1)
433407# send sentinel to stop workers
434-taskqueue.put(None)
408+pool._taskqueue.put(None)
435409util.debug('worker handler exiting')
436410437411@staticmethod
@@ -807,7 +781,7 @@ class ThreadPool(Pool):
807781_wrap_exception = False
808782809783@staticmethod
810-def Process(ctx, *args, **kwds):
784+def Process(*args, **kwds):
811785from .dummy import Process
812786return Process(*args, **kwds)
813787