Silence error output in test_concurrent_futures (bpo-21423) (#4347) · python/cpython@0a2ff23

@@ -9,7 +9,10 @@

991010

import contextlib

1111

import itertools

12+

import logging

13+

from logging.handlers import QueueHandler

1214

import os

15+

import queue

1316

import sys

1417

import threading

1518

import time

@@ -61,7 +64,12 @@ def init(x):

6164

def get_init_status():

6265

return INITIALIZER_STATUS

636664-

def init_fail():

67+

def init_fail(log_queue=None):

68+

if log_queue is not None:

69+

logger = logging.getLogger('concurrent.futures')

70+

logger.addHandler(QueueHandler(log_queue))

71+

logger.setLevel('CRITICAL')

72+

logger.propagate = False

6573

time.sleep(0.1) # let some futures be scheduled

6674

raise ValueError('error in initializer')

6775

@@ -101,18 +109,15 @@ def setUp(self):

101109

super().setUp()

102110103111

self.t1 = time.time()

104-

try:

105-

if hasattr(self, "ctx"):

106-

self.executor = self.executor_type(

107-

max_workers=self.worker_count,

108-

mp_context=get_context(self.ctx),

109-

**self.executor_kwargs)

110-

else:

111-

self.executor = self.executor_type(

112-

max_workers=self.worker_count,

113-

**self.executor_kwargs)

114-

except NotImplementedError as e:

115-

self.skipTest(str(e))

112+

if hasattr(self, "ctx"):

113+

self.executor = self.executor_type(

114+

max_workers=self.worker_count,

115+

mp_context=self.get_context(),

116+

**self.executor_kwargs)

117+

else:

118+

self.executor = self.executor_type(

119+

max_workers=self.worker_count,

120+

**self.executor_kwargs)

116121

self._prime_executor()

117122118123

def tearDown(self):

@@ -126,6 +131,9 @@ def tearDown(self):

126131127132

super().tearDown()

128133134+

def get_context(self):

135+

return get_context(self.ctx)

136+129137

def _prime_executor(self):

130138

# Make sure that the executor is ready to do work before running the

131139

# tests. This should reduce the probability of timeouts in the tests.

@@ -143,10 +151,10 @@ class ProcessPoolForkMixin(ExecutorMixin):

143151

executor_type = futures.ProcessPoolExecutor

144152

ctx = "fork"

145153146-

def setUp(self):

154+

def get_context(self):

147155

if sys.platform == "win32":

148156

self.skipTest("require unix system")

149-

super().setUp()

157+

return super().get_context()

150158151159152160

class ProcessPoolSpawnMixin(ExecutorMixin):

@@ -158,10 +166,10 @@ class ProcessPoolForkserverMixin(ExecutorMixin):

158166

executor_type = futures.ProcessPoolExecutor

159167

ctx = "forkserver"

160168161-

def setUp(self):

169+

def get_context(self):

162170

if sys.platform == "win32":

163171

self.skipTest("require unix system")

164-

super().setUp()

172+

return super().get_context()

165173166174167175

def create_executor_tests(mixin, bases=(BaseTestCase,),

@@ -206,7 +214,18 @@ class FailingInitializerMixin(ExecutorMixin):

206214

worker_count = 2

207215208216

def setUp(self):

209-

self.executor_kwargs = dict(initializer=init_fail)

217+

if hasattr(self, "ctx"):

218+

# Pass a queue to redirect the child's logging output

219+

self.mp_context = self.get_context()

220+

self.log_queue = self.mp_context.Queue()

221+

self.executor_kwargs = dict(initializer=init_fail,

222+

initargs=(self.log_queue,))

223+

else:

224+

# In a thread pool, the child shares our logging setup

225+

# (see _assert_logged())

226+

self.mp_context = None

227+

self.log_queue = None

228+

self.executor_kwargs = dict(initializer=init_fail)

210229

super().setUp()

211230212231

def test_initializer(self):

@@ -234,14 +253,20 @@ def _prime_executor(self):

234253235254

@contextlib.contextmanager

236255

def _assert_logged(self, msg):

237-

if self.executor_type is futures.ProcessPoolExecutor:

238-

# No easy way to catch the child processes' stderr

256+

if self.log_queue is not None:

239257

yield

258+

output = []

259+

try:

260+

while True:

261+

output.append(self.log_queue.get_nowait().getMessage())

262+

except queue.Empty:

263+

pass

240264

else:

241265

with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:

242266

yield

243-

self.assertTrue(any(msg in line for line in cm.output),

244-

cm.output)

267+

output = cm.output

268+

self.assertTrue(any(msg in line for line in output),

269+

output)

245270246271247272

create_executor_tests(InitializerMixin)