[3.6] Revert "bpo-34172: multiprocessing.Pool leaks resources after b… · python/cpython@eb38ee0

@@ -147,9 +147,8 @@ class Pool(object):

147147

'''

148148

_wrap_exception = True

149149150-

@staticmethod

151-

def Process(ctx, *args, **kwds):

152-

return ctx.Process(*args, **kwds)

150+

def Process(self, *args, **kwds):

151+

return self._ctx.Process(*args, **kwds)

153152154153

def __init__(self, processes=None, initializer=None, initargs=(),

155154

maxtasksperchild=None, context=None):

@@ -176,15 +175,13 @@ def __init__(self, processes=None, initializer=None, initargs=(),

176175177176

self._worker_handler = threading.Thread(

178177

target=Pool._handle_workers,

179-

args=(self._cache, self._taskqueue, self._ctx, self.Process,

180-

self._processes, self._pool, self._inqueue, self._outqueue,

181-

self._initializer, self._initargs, self._maxtasksperchild,

182-

self._wrap_exception)

178+

args=(self, )

183179

)

184180

self._worker_handler.daemon = True

185181

self._worker_handler._state = RUN

186182

self._worker_handler.start()

187183184+188185

self._task_handler = threading.Thread(

189186

target=Pool._handle_tasks,

190187

args=(self._taskqueue, self._quick_put, self._outqueue,

@@ -210,62 +207,43 @@ def __init__(self, processes=None, initializer=None, initargs=(),

210207

exitpriority=15

211208

)

212209213-

@staticmethod

214-

def _join_exited_workers(pool):

210+

def _join_exited_workers(self):

215211

"""Cleanup after any worker processes which have exited due to reaching

216212

their specified lifetime. Returns True if any workers were cleaned up.

217213

"""

218214

cleaned = False

219-

for i in reversed(range(len(pool))):

220-

worker = pool[i]

215+

for i in reversed(range(len(self._pool))):

216+

worker = self._pool[i]

221217

if worker.exitcode is not None:

222218

# worker exited

223219

util.debug('cleaning up worker %d' % i)

224220

worker.join()

225221

cleaned = True

226-

del pool[i]

222+

del self._pool[i]

227223

return cleaned

228224229225

def _repopulate_pool(self):

230-

return self._repopulate_pool_static(self._ctx, self.Process,

231-

self._processes,

232-

self._pool, self._inqueue,

233-

self._outqueue, self._initializer,

234-

self._initargs,

235-

self._maxtasksperchild,

236-

self._wrap_exception)

237-238-

@staticmethod

239-

def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,

240-

outqueue, initializer, initargs,

241-

maxtasksperchild, wrap_exception):

242226

"""Bring the number of pool processes up to the specified number,

243227

for use after reaping workers which have exited.

244228

"""

245-

for i in range(processes - len(pool)):

246-

w = Process(ctx, target=worker,

247-

args=(inqueue, outqueue,

248-

initializer,

249-

initargs, maxtasksperchild,

250-

wrap_exception)

251-

)

252-

pool.append(w)

229+

for i in range(self._processes - len(self._pool)):

230+

w = self.Process(target=worker,

231+

args=(self._inqueue, self._outqueue,

232+

self._initializer,

233+

self._initargs, self._maxtasksperchild,

234+

self._wrap_exception)

235+

)

236+

self._pool.append(w)

253237

w.name = w.name.replace('Process', 'PoolWorker')

254238

w.daemon = True

255239

w.start()

256240

util.debug('added worker')

257241258-

@staticmethod

259-

def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,

260-

initializer, initargs, maxtasksperchild,

261-

wrap_exception):

242+

def _maintain_pool(self):

262243

"""Clean up any exited workers and start replacements for them.

263244

"""

264-

if Pool._join_exited_workers(pool):

265-

Pool._repopulate_pool_static(ctx, Process, processes, pool,

266-

inqueue, outqueue, initializer,

267-

initargs, maxtasksperchild,

268-

wrap_exception)

245+

if self._join_exited_workers():

246+

self._repopulate_pool()

269247270248

def _setup_queues(self):

271249

self._inqueue = self._ctx.SimpleQueue()

@@ -418,20 +396,16 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,

418396

return result

419397420398

@staticmethod

421-

def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,

422-

inqueue, outqueue, initializer, initargs,

423-

maxtasksperchild, wrap_exception):

399+

def _handle_workers(pool):

424400

thread = threading.current_thread()

425401426402

# Keep maintaining workers until the cache gets drained, unless the pool

427403

# is terminated.

428-

while thread._state == RUN or (cache and thread._state != TERMINATE):

429-

Pool._maintain_pool(ctx, Process, processes, pool, inqueue,

430-

outqueue, initializer, initargs,

431-

maxtasksperchild, wrap_exception)

404+

while thread._state == RUN or (pool._cache and thread._state != TERMINATE):

405+

pool._maintain_pool()

432406

time.sleep(0.1)

433407

# send sentinel to stop workers

434-

taskqueue.put(None)

408+

pool._taskqueue.put(None)

435409

util.debug('worker handler exiting')

436410437411

@staticmethod

@@ -807,7 +781,7 @@ class ThreadPool(Pool):

807781

_wrap_exception = False

808782809783

@staticmethod

810-

def Process(ctx, *args, **kwds):

784+

def Process(*args, **kwds):

811785

from .dummy import Process

812786

return Process(*args, **kwds)

813787