Skip to content

Commit 89b8c56

Browse files
committed
Move timing dependent test (of multiple callback failures) into separate
sub-test and use a barrier to ensure it is reliable. We must ensure all tasks are submitted before any of them run and mark the pool as broken. Since barriers can't be shared between processes, do not run that sub-test with process-based parallelism.
1 parent e22ce74 commit 89b8c56

File tree

1 file changed

+32
-14
lines changed

1 file changed

+32
-14
lines changed

Lib/test/_test_multiprocessing.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3110,22 +3110,20 @@ def _map(pool, target, **kwargs):
31103110

31113111
# Fail upon trying to reuse a broken pool after error callback failures:
31123112
# - BrokenPoolError containing:
3113-
# - 3x CallbackError each containing:
3113+
# - CallbackError containing:
31143114
# - Error thrown from the callback
31153115
# - Original error
31163116
with self.assertRaises(BrokenPoolError) as pool_ctx:
3117-
with self.Pool(3) as pool:
3118-
res = [func(pool, raising2, error_callback=raising)
3119-
for _ in range(3)]
3120-
for r in res:
3121-
with self.assertRaises(CallbackError) as res_ctx:
3122-
r.get()
3123-
self._check_subexceptions(res_ctx.exception,
3124-
[KeyError, IndexError])
3117+
with self.Pool(1) as pool:
3118+
res = func(pool, raising2, error_callback=raising)
3119+
with self.assertRaises(CallbackError) as res_ctx:
3120+
res.get()
3121+
self._check_subexceptions(res_ctx.exception,
3122+
[KeyError, IndexError])
31253123
pool.apply_async(noop)
3126-
self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3)
3127-
for se in pool_ctx.exception.exceptions:
3128-
self._check_subexceptions(se, [KeyError, IndexError])
3124+
self._check_subexceptions(pool_ctx.exception, [CallbackError])
3125+
self._check_subexceptions(pool_ctx.exception.exceptions[0],
3126+
[KeyError, IndexError])
31293127

31303128
# Exiting the context manager with a "normal" error and a failed callback
31313129
# - BrokenPoolError containing:
@@ -3153,13 +3151,33 @@ def _map(pool, target, **kwargs):
31533151
self._check_subexceptions(pool_ctx.exception, [CallbackError])
31543152
self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError])
31553153

3154+
# Skip this test for process-based parallelism as sharing the barrier will fail
3155+
if self.TYPE != 'processes':
3156+
with self.subTest(name="Multiple callback failures"):
3157+
# Fail with 3x callback failure:
3158+
# - BrokenPoolError containing:
3159+
# - 3x CallbackError containing:
3160+
# - Error thrown from the callback
3161+
with self.assertRaises(BrokenPoolError) as pool_ctx:
3162+
kwds = {'barrier': self.Barrier(3)}
3163+
with self.Pool(3) as pool:
3164+
res = [pool.apply_async(noop, kwds=kwds, callback=raising)
3165+
for _ in range(3)]
3166+
for r in res:
3167+
with self.assertRaises(CallbackError) as res_ctx:
3168+
r.get()
3169+
self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3)
3170+
for se in pool_ctx.exception.exceptions:
3171+
self._check_subexceptions(se, [KeyError])
3172+
31563173
def _check_subexceptions(self, group, sub_types):
31573174
self.assertEqual(len(group.exceptions), len(sub_types))
31583175
for sub_exc, sub_type in zip(group.exceptions, sub_types):
31593176
self.assertIsInstance(sub_exc, sub_type)
31603177

3161-
def noop(*args):
3162-
pass
3178+
def noop(*args, barrier=None):
3179+
if barrier:
3180+
barrier.wait()
31633181

31643182
def raising(*args):
31653183
raise KeyError("key")

0 commit comments

Comments
 (0)