Silence error output in test_concurrent_futures (bpo-21423) (#4347) · python/cpython@0a2ff23
@@ -9,7 +9,10 @@
991010import contextlib
1111import itertools
12+import logging
13+from logging.handlers import QueueHandler
1214import os
15+import queue
1316import sys
1417import threading
1518import time
@@ -61,7 +64,12 @@ def init(x):
6164def get_init_status():
6265return INITIALIZER_STATUS
636664-def init_fail():
67+def init_fail(log_queue=None):
68+if log_queue is not None:
69+logger = logging.getLogger('concurrent.futures')
70+logger.addHandler(QueueHandler(log_queue))
71+logger.setLevel('CRITICAL')
72+logger.propagate = False
6573time.sleep(0.1) # let some futures be scheduled
6674raise ValueError('error in initializer')
6775@@ -101,18 +109,15 @@ def setUp(self):
101109super().setUp()
102110103111self.t1 = time.time()
104-try:
105-if hasattr(self, "ctx"):
106-self.executor = self.executor_type(
107-max_workers=self.worker_count,
108-mp_context=get_context(self.ctx),
109-**self.executor_kwargs)
110-else:
111-self.executor = self.executor_type(
112-max_workers=self.worker_count,
113-**self.executor_kwargs)
114-except NotImplementedError as e:
115-self.skipTest(str(e))
112+if hasattr(self, "ctx"):
113+self.executor = self.executor_type(
114+max_workers=self.worker_count,
115+mp_context=self.get_context(),
116+**self.executor_kwargs)
117+else:
118+self.executor = self.executor_type(
119+max_workers=self.worker_count,
120+**self.executor_kwargs)
116121self._prime_executor()
117122118123def tearDown(self):
@@ -126,6 +131,9 @@ def tearDown(self):
126131127132super().tearDown()
128133134+def get_context(self):
135+return get_context(self.ctx)
136+129137def _prime_executor(self):
130138# Make sure that the executor is ready to do work before running the
131139# tests. This should reduce the probability of timeouts in the tests.
@@ -143,10 +151,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
143151executor_type = futures.ProcessPoolExecutor
144152ctx = "fork"
145153146-def setUp(self):
154+def get_context(self):
147155if sys.platform == "win32":
148156self.skipTest("require unix system")
149-super().setUp()
157+return super().get_context()
150158151159152160class ProcessPoolSpawnMixin(ExecutorMixin):
@@ -158,10 +166,10 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
158166executor_type = futures.ProcessPoolExecutor
159167ctx = "forkserver"
160168161-def setUp(self):
169+def get_context(self):
162170if sys.platform == "win32":
163171self.skipTest("require unix system")
164-super().setUp()
172+return super().get_context()
165173166174167175def create_executor_tests(mixin, bases=(BaseTestCase,),
@@ -206,7 +214,18 @@ class FailingInitializerMixin(ExecutorMixin):
206214worker_count = 2
207215208216def setUp(self):
209-self.executor_kwargs = dict(initializer=init_fail)
217+if hasattr(self, "ctx"):
218+# Pass a queue to redirect the child's logging output
219+self.mp_context = self.get_context()
220+self.log_queue = self.mp_context.Queue()
221+self.executor_kwargs = dict(initializer=init_fail,
222+initargs=(self.log_queue,))
223+else:
224+# In a thread pool, the child shares our logging setup
225+# (see _assert_logged())
226+self.mp_context = None
227+self.log_queue = None
228+self.executor_kwargs = dict(initializer=init_fail)
210229super().setUp()
211230212231def test_initializer(self):
@@ -234,14 +253,20 @@ def _prime_executor(self):
234253235254@contextlib.contextmanager
236255def _assert_logged(self, msg):
237-if self.executor_type is futures.ProcessPoolExecutor:
238-# No easy way to catch the child processes' stderr
256+if self.log_queue is not None:
239257yield
258+output = []
259+try:
260+while True:
261+output.append(self.log_queue.get_nowait().getMessage())
262+except queue.Empty:
263+pass
240264else:
241265with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
242266yield
243-self.assertTrue(any(msg in line for line in cm.output),
244-cm.output)
267+output = cm.output
268+self.assertTrue(any(msg in line for line in output),
269+output)
245270246271247272create_executor_tests(InitializerMixin)