From c7b3d3950bc60f4f9640499f0545fc7de308d446 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Thu, 27 Mar 2025 12:17:37 +1300 Subject: [PATCH 01/10] gh-83371: handle exceptions from user-supplied callbacks in process pools User-supplied callbacks are called from an internal pool management thread. At present any exceptions they raise are not caught and so propagate out and kill the thread. This then causes problems for subsequent pool operations, including joining the pool hanging. As a QoL improvement, catch and handle any such exceptions using the system exception hook. Thus by default details of the exception will be printed to stderr, but the pool's integrity will remain intact. --- Lib/multiprocessing/pool.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..c6eb5996a2d59e 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -17,6 +17,7 @@ import itertools import os import queue +import sys import threading import time import traceback @@ -776,13 +777,20 @@ def get(self, timeout=None): def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: - self._callback(self._value) + self._handle_exceptions(self._callback, self._value) if self._error_callback and not self._success: - self._error_callback(self._value) + self._handle_exceptions(self._error_callback, self._value) self._event.set() del self._cache[self._job] self._pool = None + @staticmethod + def _handle_exceptions(callback, args): + try: + return callback(args) + except Exception as e: + sys.excepthook(*sys.exc_info()) + __class_getitem__ = classmethod(types.GenericAlias) AsyncResult = ApplyResult # create alias -- see #17805 @@ -813,7 +821,7 @@ def _set(self, i, success_result): self._value[i*self._chunksize:(i+1)*self._chunksize] = result if self._number_left == 0: if self._callback: - self._callback(self._value) + self._handle_exceptions(self._callback, self._value) del self._cache[self._job] self._event.set() self._pool = None @@ -825,7 +833,7 @@ def _set(self, i, success_result): if self._number_left == 0: # only consider the result ready once all jobs are done if self._error_callback: - self._error_callback(self._value) + self._handle_exceptions(self._error_callback, self._value) del self._cache[self._job] self._event.set() self._pool = None From 3582925ed6c184442d72a3ffebcf8e4eb057e70a Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Sun, 30 Mar 2025 14:24:37 +1300 Subject: [PATCH 02/10] Use threading.excepthook instead of sys.excepthook --- Lib/multiprocessing/pool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index c6eb5996a2d59e..09f1db657c85f7 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -17,7 +17,6 @@ import itertools import os import queue -import sys import threading import time import traceback @@ -789,7 +788,9 @@ def _handle_exceptions(callback, args): try: return callback(args) except Exception as e: - sys.excepthook(*sys.exc_info()) + args = threading.ExceptHookArgs([type(e), e, e.__traceback__, None]) + threading.excepthook(args) + del args __class_getitem__ = classmethod(types.GenericAlias) From 6526fa318560e744a3f5132710742bf1ed91016d Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Tue, 1 Apr 2025 15:54:14 +1300 Subject: [PATCH 03/10] Raise new group exceptions on callback failure instead of just processing them with the default exception hook. This is a breaking change to the API, but only in cases where the existing code would be completely broken anyway, so hopefully it isn't a problem. TODO: docs need updating --- Lib/multiprocessing/managers.py | 3 +- Lib/multiprocessing/pool.py | 63 +++++++++++++++++------ Lib/test/_test_multiprocessing.py | 83 ++++++++++++++++++++++++++++++- 3 files changed, 132 insertions(+), 17 deletions(-) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index c1f09d2b409052..1ffe278eabd16e 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1232,7 +1232,7 @@ def __isub__(self, value): BasePoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', '_check_error' )) BasePoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', @@ -1246,6 +1246,7 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() + self._check_error(exc_val) # # Definition of SyncManager diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 09f1db657c85f7..b210f97c12188f 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -7,7 +7,7 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = ['Pool', 'ThreadPool'] +__all__ = ['BrokenPoolError', 'CallbackError', 'Pool', 'ThreadPool'] # # Imports @@ -69,6 +69,14 @@ def __init__(self, exc, tb): def __reduce__(self): return rebuild_exc, (self.exc, self.tb) +class BrokenPoolError(ExceptionGroup): + def __init__(self, msg, exc): + super().__init__(msg, exc) + +class CallbackError(ExceptionGroup): + def __init__(self, msg, exc): + super().__init__(msg, exc) + def rebuild_exc(exc, tb): exc.__cause__ = RemoteTraceback(tb) return exc @@ -198,6 +206,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._maxtasksperchild = maxtasksperchild self._initializer = initializer self._initargs = initargs + self._errors = [] if processes is None: processes = os.process_cpu_count() or 1 @@ -349,9 +358,17 @@ def _setup_queues(self): self._quick_get = self._outqueue._reader.recv def _check_running(self): + self._check_error() if self._state != RUN: raise ValueError("Pool not running") + def _check_error(self, exc=None): + if self._errors: + errs = list(self._errors) + if exc is not None and not isinstance(exc, CallbackError): + errs.append(exc) + raise BrokenPoolError("Callback(s) failed", errs) from None + def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. @@ -737,6 +754,11 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() + self._check_error(exc_val) + + def _error(self, error): + util.debug('callback error', exc_info=error) + self._errors.append(error) # # Class whose instances are returned by `Pool.apply_async()` @@ -751,6 +773,7 @@ def __init__(self, pool, callback, error_callback): self._cache = pool._cache self._callback = callback self._error_callback = error_callback + self._cb_error = None self._cache[self._job] = self def ready(self): @@ -768,7 +791,9 @@ def get(self, timeout=None): self.wait(timeout) if not self.ready(): raise TimeoutError - if self._success: + if self._cb_error: + raise self._cb_error + elif self._success: return self._value else: raise self._value @@ -776,22 +801,21 @@ def get(self, timeout=None): def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: - self._handle_exceptions(self._callback, self._value) + try: + self._callback(self._value) + except Exception as e: + self._cb_error = CallbackError("apply callback", [e]) + self._pool._error(self._cb_error) if self._error_callback and not self._success: - self._handle_exceptions(self._error_callback, self._value) + try: + self._error_callback(self._value) + except Exception as e: + self._cb_error = CallbackError("apply error callback", [e, self._value]) + self._pool._error(self._cb_error) self._event.set() del self._cache[self._job] self._pool = None - @staticmethod - def _handle_exceptions(callback, args): - try: - return callback(args) - except Exception as e: - args = threading.ExceptHookArgs([type(e), e, e.__traceback__, None]) - threading.excepthook(args) - del args - __class_getitem__ = classmethod(types.GenericAlias) AsyncResult = ApplyResult # create alias -- see #17805 @@ -822,7 +846,11 @@ def _set(self, i, success_result): self._value[i*self._chunksize:(i+1)*self._chunksize] = result if self._number_left == 0: if self._callback: - self._handle_exceptions(self._callback, self._value) + try: + self._callback(self._value) + except Exception as e: + self._cb_error = CallbackError("map callback", [e]) + self._pool._error(self._cb_error) del self._cache[self._job] self._event.set() self._pool = None @@ -834,7 +862,12 @@ def _set(self, i, success_result): if self._number_left == 0: # only consider the result ready once all jobs are done if self._error_callback: - self._handle_exceptions(self._error_callback, self._value) + try: + self._error_callback(self._value) + except Exception as e: + self._cb_error = CallbackError("map error callback", + [e, self._value]) + self._pool._error(self._cb_error) del self._cache[self._job] self._event.set() self._pool = None diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index dcce57629efe5b..6fe44a35e46785 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -54,6 +54,7 @@ import multiprocessing.pool import multiprocessing.queues from multiprocessing.connection import wait +from multiprocessing.pool import BrokenPoolError, CallbackError from multiprocessing import util @@ -3083,9 +3084,89 @@ def test_resource_warning(self): pool = None support.gc_collect() -def raising(): + def test_callback_errors(self): + def _apply(pool, target, **kwargs): + return pool.apply_async(target, **kwargs) + + def _map(pool, target, **kwargs): + return pool.map_async(target, range(5), **kwargs) + + for func in [_apply, _map]: + with self.subTest(func=func): + + # Fail upon trying to reuse a broken pool after callback failure: + # - BrokenPoolError containing: + # - CallbackError containing: + # - Error thrown from the callback + with self.assertRaises(BrokenPoolError) as pool_ctx: + with self.Pool(1) as pool: + res = func(pool, noop, callback=raising) + with self.assertRaises(CallbackError) as res_ctx: + res.get() + self._check_subexceptions(res_ctx.exception, [KeyError]) + pool.apply_async(noop) + self._check_subexceptions(pool_ctx.exception, [CallbackError]) + self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) + + # Fail upon trying to reuse a broken pool after error callback failures: + # - BrokenPoolError containing: + # - 3x CallbackError each containing: + # - Error thrown from the callback + # - Original error + with self.assertRaises(BrokenPoolError) as pool_ctx: + with self.Pool(3) as pool: + res = [func(pool, raising2, error_callback=raising) + for _ in range(3)] + for r in res: + with self.assertRaises(CallbackError) as res_ctx: + r.get() + self._check_subexceptions(res_ctx.exception, + [KeyError, IndexError]) + pool.apply_async(noop) + self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3) + for se in pool_ctx.exception.exceptions: + self._check_subexceptions(se, [KeyError, IndexError]) + + # Exiting the context manager with a "normal" error and a failed callback + # - BrokenPoolError containing: + # - CallbackError containing: + # - Error thrown from the callback + # - Exception that caused the context manager to exit + with self.assertRaises(BrokenPoolError) as pool_ctx: + with self.Pool(1) as pool: + res = func(pool, noop, callback=raising) + with self.assertRaises(CallbackError) as res_ctx: + res.get() + raise IndexError() + self._check_subexceptions(pool_ctx.exception, + [CallbackError, IndexError]) + + # Exiting the context manager directly with a callback failure error + # - BrokenPoolError containing: + # - CallbackError instance containing: + # - Error thrown from the callback + # Note that only one instance of the error is present: it was + # *not* added again as it was above, since it is a CallbackError + with self.assertRaises(BrokenPoolError) as pool_ctx: + with self.Pool(1) as pool: + func(pool, noop, callback=raising).get() + self._check_subexceptions(pool_ctx.exception, [CallbackError]) + self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) + + def _check_subexceptions(self, group, sub_types): + self.assertEqual(len(group.exceptions), len(sub_types)) + for sub_exc, sub_type in zip(group.exceptions, sub_types): + self.assertIsInstance(sub_exc, sub_type) + +def noop(*args): + pass + +def raising(*args): raise KeyError("key") +def raising2(*args): + raise IndexError() + def unpickleable_result(): return lambda: 42 From e22ce74aa5f3e480a1337700aed4369e93a0ad00 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Wed, 2 Apr 2025 11:00:38 +1300 Subject: [PATCH 04/10] Correct silly mistake --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b210f97c12188f..b59d4b056bbb23 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -757,7 +757,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._check_error(exc_val) def _error(self, error): - util.debug('callback error', exc_info=error) + util.debug('callback error: %s', error) self._errors.append(error) # From 89b8c56beef98922e99ebf6d719f4126c7e4811f Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Wed, 2 Apr 2025 11:44:57 +1300 Subject: [PATCH 05/10] Move timing dependent test (of multiple callback failures) into separate sub-test and use a barrier to ensure it is reliable. We must ensure all tasks are submitted before any of them run and mark the pool as broken. Since barriers can't be shared between processes, do not run that sub-test with process-based parallelism. --- Lib/test/_test_multiprocessing.py | 46 +++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 6fe44a35e46785..170f58d293b374 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3110,22 +3110,20 @@ def _map(pool, target, **kwargs): # Fail upon trying to reuse a broken pool after error callback failures: # - BrokenPoolError containing: - # - 3x CallbackError each containing: + # - CallbackError containing: # - Error thrown from the callback # - Original error with self.assertRaises(BrokenPoolError) as pool_ctx: - with self.Pool(3) as pool: - res = [func(pool, raising2, error_callback=raising) - for _ in range(3)] - for r in res: - with self.assertRaises(CallbackError) as res_ctx: - r.get() - self._check_subexceptions(res_ctx.exception, - [KeyError, IndexError]) + with self.Pool(1) as pool: + res = func(pool, raising2, error_callback=raising) + with self.assertRaises(CallbackError) as res_ctx: + res.get() + self._check_subexceptions(res_ctx.exception, + [KeyError, IndexError]) pool.apply_async(noop) - self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3) - for se in pool_ctx.exception.exceptions: - self._check_subexceptions(se, [KeyError, IndexError]) + self._check_subexceptions(pool_ctx.exception, [CallbackError]) + self._check_subexceptions(pool_ctx.exception.exceptions[0], + [KeyError, IndexError]) # Exiting the context manager with a "normal" error and a failed callback # - BrokenPoolError containing: @@ -3153,13 +3151,33 @@ def _map(pool, target, **kwargs): self._check_subexceptions(pool_ctx.exception, [CallbackError]) self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) + # Skip this test for process-based parallelism as sharing the barrier will fail + if self.TYPE != 'processes': + with self.subTest(name="Multiple callback failures"): + # Fail with 3x callback failure: + # - BrokenPoolError containing: + # - 3x CallbackError containing: + # - Error thrown from the callback + with self.assertRaises(BrokenPoolError) as pool_ctx: + kwds = {'barrier': self.Barrier(3)} + with self.Pool(3) as pool: + res = [pool.apply_async(noop, kwds=kwds, callback=raising) + for _ in range(3)] + for r in res: + with self.assertRaises(CallbackError) as res_ctx: + r.get() + self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3) + for se in pool_ctx.exception.exceptions: + self._check_subexceptions(se, [KeyError]) + def _check_subexceptions(self, group, sub_types): self.assertEqual(len(group.exceptions), len(sub_types)) for sub_exc, sub_type in zip(group.exceptions, sub_types): self.assertIsInstance(sub_exc, sub_type) -def noop(*args): - pass +def noop(*args, barrier=None): + if barrier: + barrier.wait() def raising(*args): raise KeyError("key") From eda0acdf108eca2d875a2d39fd49e652ec999c08 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Wed, 2 Apr 2025 14:51:34 +1300 Subject: [PATCH 06/10] CI tests on free-threading builds are failing due to leaked barrier resources. Attempt to fix that by not creating the CallbackError inside the except clause where the exceptions that cause them are caught, but just storing those exceptions and creating the exception group later when required. Also ensure if we are exiting a pool's context manager due to a BrokenPoolError that we don't re-raise it with the same error added again. --- Lib/multiprocessing/pool.py | 28 ++++++++++++++-------------- Lib/test/_test_multiprocessing.py | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b59d4b056bbb23..ca25d00ad18f52 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -364,8 +364,9 @@ def _check_running(self): def _check_error(self, exc=None): if self._errors: - errs = list(self._errors) - if exc is not None and not isinstance(exc, CallbackError): + errs = [CallbackError("callback raised", errs) for errs in self._errors] + if exc is not None and not isinstance(exc, CallbackError) \ + and not isinstance(exc, BrokenPoolError): errs.append(exc) raise BrokenPoolError("Callback(s) failed", errs) from None @@ -773,7 +774,7 @@ def __init__(self, pool, callback, error_callback): self._cache = pool._cache self._callback = callback self._error_callback = error_callback - self._cb_error = None + self._cb_errors = [] self._cache[self._job] = self def ready(self): @@ -791,8 +792,8 @@ def get(self, timeout=None): self.wait(timeout) if not self.ready(): raise TimeoutError - if self._cb_error: - raise self._cb_error + if self._cb_errors: + raise CallbackError("callback raised", self._cb_errors) elif self._success: return self._value else: @@ -804,14 +805,14 @@ def _set(self, i, obj): try: self._callback(self._value) except Exception as e: - self._cb_error = CallbackError("apply callback", [e]) - self._pool._error(self._cb_error) + self._cb_errors = (e,) + self._pool._error(self._cb_errors) if self._error_callback and not self._success: try: self._error_callback(self._value) except Exception as e: - self._cb_error = CallbackError("apply error callback", [e, self._value]) - self._pool._error(self._cb_error) + self._cb_errors = (e, self._value) + self._pool._error(self._cb_errors) self._event.set() del self._cache[self._job] self._pool = None @@ -849,8 +850,8 @@ def _set(self, i, success_result): try: self._callback(self._value) except Exception as e: - self._cb_error = CallbackError("map callback", [e]) - self._pool._error(self._cb_error) + self._cb_errors = (e,) + self._pool._error(self._cb_errors) del self._cache[self._job] self._event.set() self._pool = None @@ -865,9 +866,8 @@ def _set(self, i, success_result): try: self._error_callback(self._value) except Exception as e: - self._cb_error = CallbackError("map error callback", - [e, self._value]) - self._pool._error(self._cb_error) + self._cb_errors = (e, self._value) + self._pool._error(self._cb_errors) del self._cache[self._job] self._event.set() self._pool = None diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 170f58d293b374..bfdb02adc69f77 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3151,6 +3151,22 @@ def _map(pool, target, **kwargs): self._check_subexceptions(pool_ctx.exception, [CallbackError]) self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) + # Exiting the context manager directly with a broken pool error + # - BrokenPoolError containing: + # - CallbackError instance containing: + # - Error thrown from the callback + # Note that only one instance of the error is present: it was + # *not* added again as it was above, since it is a BrokenPoolError + with self.assertRaises(BrokenPoolError) as pool_ctx: + with self.Pool(1) as pool: + try: + func(pool, noop, callback=raising).get() + except CallbackError: + pass + func(pool, noop, callback=raising) + self._check_subexceptions(pool_ctx.exception, [CallbackError]) + self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) + # Skip this test for process-based parallelism as sharing the barrier will fail if self.TYPE != 'processes': with self.subTest(name="Multiple callback failures"): From 7125fc8cc3664f28483fd93315edf2ee72e9f6d5 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Mon, 7 Apr 2025 14:33:28 +1200 Subject: [PATCH 07/10] Shared objects are being leaked via what *looks like* a reference cycle: try harder to collect them. This doesn't seem like it should be necessary or make a difference, but let's give it a try... --- Lib/test/_test_multiprocessing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index bfdb02adc69f77..616a2c749cab0e 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -6967,7 +6967,8 @@ def tearDownClass(cls): f"{multiprocessing.active_children()} " f"active children after {dt:.1f} seconds") - gc.collect() # do garbage collection + # Garbage collect to ensure otherwise unreferenced cycles are cleaned up + support.gc_collect() if cls.manager._number_of_objects() != 0: # This is not really an error since some tests do not # ensure that all processes which hold a reference to a From 51a831064703c1b30efde8e562b6489440ca5519 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Mon, 26 May 2025 23:58:15 +1200 Subject: [PATCH 08/10] Revert all commits after "Use threading.excepthook instead of sys.excepthook" This reverts this branch back to just calling `threading.excepthook`, instead of trying to raise the exception when the user gets the result or exits the context manager. I would like to get the immediate bug fixed, and I think this is a simple, low-risk fix that does that. From what I understand it isn't possible to prevent the reference cycle which is causing all those test failures. We need to store the exception to raise it later, which implicitly creates one. We can always add raising the exception as an additional improvement later, but in the meantime if any user code wants to handle the exceptions it can do so by installing a default exception handler. --- Lib/multiprocessing/managers.py | 3 +- Lib/multiprocessing/pool.py | 63 ++++------------ Lib/test/_test_multiprocessing.py | 120 +----------------------------- 3 files changed, 18 insertions(+), 168 deletions(-) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 9e09072470c7a3..91bcf243e78e5b 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1234,7 +1234,7 @@ def __isub__(self, value): BasePoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', '_check_error' + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', )) BasePoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', @@ -1248,7 +1248,6 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() - self._check_error(exc_val) # # Definition of SyncManager diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index ca25d00ad18f52..09f1db657c85f7 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -7,7 +7,7 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = ['BrokenPoolError', 'CallbackError', 'Pool', 'ThreadPool'] +__all__ = ['Pool', 'ThreadPool'] # # Imports @@ -69,14 +69,6 @@ def __init__(self, exc, tb): def __reduce__(self): return rebuild_exc, (self.exc, self.tb) -class BrokenPoolError(ExceptionGroup): - def __init__(self, msg, exc): - super().__init__(msg, exc) - -class CallbackError(ExceptionGroup): - def __init__(self, msg, exc): - super().__init__(msg, exc) - def rebuild_exc(exc, tb): exc.__cause__ = RemoteTraceback(tb) return exc @@ -206,7 +198,6 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._maxtasksperchild = maxtasksperchild self._initializer = initializer self._initargs = initargs - self._errors = [] if processes is None: processes = os.process_cpu_count() or 1 @@ -358,18 +349,9 @@ def _setup_queues(self): self._quick_get = self._outqueue._reader.recv def _check_running(self): - self._check_error() if self._state != RUN: raise ValueError("Pool not running") - def _check_error(self, exc=None): - if self._errors: - errs = [CallbackError("callback raised", errs) for errs in self._errors] - if exc is not None and not isinstance(exc, CallbackError) \ - and not isinstance(exc, BrokenPoolError): - errs.append(exc) - raise BrokenPoolError("Callback(s) failed", errs) from None - def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. @@ -755,11 +737,6 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() - self._check_error(exc_val) - - def _error(self, error): - util.debug('callback error: %s', error) - self._errors.append(error) # # Class whose instances are returned by `Pool.apply_async()` @@ -774,7 +751,6 @@ def __init__(self, pool, callback, error_callback): self._cache = pool._cache self._callback = callback self._error_callback = error_callback - self._cb_errors = [] self._cache[self._job] = self def ready(self): @@ -792,9 +768,7 @@ def get(self, timeout=None): self.wait(timeout) if not self.ready(): raise TimeoutError - if self._cb_errors: - raise CallbackError("callback raised", self._cb_errors) - elif self._success: + if self._success: return self._value else: raise self._value @@ -802,21 +776,22 @@ def get(self, timeout=None): def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: - try: - self._callback(self._value) - except Exception as e: - self._cb_errors = (e,) - self._pool._error(self._cb_errors) + self._handle_exceptions(self._callback, self._value) if self._error_callback and not self._success: - try: - self._error_callback(self._value) - except Exception as e: - self._cb_errors = (e, self._value) - self._pool._error(self._cb_errors) + self._handle_exceptions(self._error_callback, self._value) self._event.set() del self._cache[self._job] self._pool = None + @staticmethod + def _handle_exceptions(callback, args): + try: + return callback(args) + except Exception as e: + args = threading.ExceptHookArgs([type(e), e, e.__traceback__, None]) + threading.excepthook(args) + del args + __class_getitem__ = classmethod(types.GenericAlias) AsyncResult = ApplyResult # create alias -- see #17805 @@ -847,11 +822,7 @@ def _set(self, i, success_result): self._value[i*self._chunksize:(i+1)*self._chunksize] = result if self._number_left == 0: if self._callback: - try: - self._callback(self._value) - except Exception as e: - self._cb_errors = (e,) - self._pool._error(self._cb_errors) + self._handle_exceptions(self._callback, self._value) del self._cache[self._job] self._event.set() self._pool = None @@ -863,11 +834,7 @@ def _set(self, i, success_result): if self._number_left == 0: # only consider the result ready once all jobs are done if self._error_callback: - try: - self._error_callback(self._value) - except Exception as e: - self._cb_errors = (e, self._value) - self._pool._error(self._cb_errors) + self._handle_exceptions(self._error_callback, self._value) del self._cache[self._job] self._event.set() self._pool = None diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index ee59bd08d89fbc..75f31d858d3306 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -54,7 +54,6 @@ import multiprocessing.pool import multiprocessing.queues from multiprocessing.connection import wait -from multiprocessing.pool import BrokenPoolError, CallbackError from multiprocessing import util @@ -3173,123 +3172,9 @@ def test_resource_warning(self): pool = None support.gc_collect() - def test_callback_errors(self): - def _apply(pool, target, **kwargs): - return pool.apply_async(target, **kwargs) - - def _map(pool, target, **kwargs): - return pool.map_async(target, range(5), **kwargs) - - for func in [_apply, _map]: - with self.subTest(func=func): - - # Fail upon trying to reuse a broken pool after callback failure: - # - BrokenPoolError containing: - # - CallbackError containing: - # - Error thrown from the callback - with self.assertRaises(BrokenPoolError) as pool_ctx: - with self.Pool(1) as pool: - res = func(pool, noop, callback=raising) - with self.assertRaises(CallbackError) as res_ctx: - res.get() - self._check_subexceptions(res_ctx.exception, [KeyError]) - pool.apply_async(noop) - self._check_subexceptions(pool_ctx.exception, [CallbackError]) - self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) - - # Fail upon trying to reuse a broken pool after error callback failures: - # - BrokenPoolError containing: - # - CallbackError containing: - # - Error thrown from the callback - # - Original error - with self.assertRaises(BrokenPoolError) as pool_ctx: - with self.Pool(1) as pool: - res = func(pool, raising2, error_callback=raising) - with self.assertRaises(CallbackError) as res_ctx: - res.get() - self._check_subexceptions(res_ctx.exception, - [KeyError, IndexError]) - pool.apply_async(noop) - self._check_subexceptions(pool_ctx.exception, [CallbackError]) - self._check_subexceptions(pool_ctx.exception.exceptions[0], - [KeyError, IndexError]) - - # Exiting the context manager with a "normal" error and a failed callback - # - BrokenPoolError containing: - # - CallbackError containing: - # - Error thrown from the callback - # - Exception that caused the context manager to exit - with self.assertRaises(BrokenPoolError) as pool_ctx: - with self.Pool(1) as pool: - res = func(pool, noop, callback=raising) - with self.assertRaises(CallbackError) as res_ctx: - res.get() - raise IndexError() - self._check_subexceptions(pool_ctx.exception, - [CallbackError, IndexError]) - - # Exiting the context manager directly with a callback failure error - # - BrokenPoolError containing: - # - CallbackError instance containing: - # - Error thrown from the callback - # Note that only one instance of the error is present: it was - # *not* added again as it was above, since it is a CallbackError - with self.assertRaises(BrokenPoolError) as pool_ctx: - with self.Pool(1) as pool: - func(pool, noop, callback=raising).get() - self._check_subexceptions(pool_ctx.exception, [CallbackError]) - self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) - - # Exiting the context manager directly with a broken pool error - # - BrokenPoolError containing: - # - CallbackError instance containing: - # - Error thrown from the callback - # Note that only one instance of the error is present: it was - # *not* added again as it was above, since it is a BrokenPoolError - with self.assertRaises(BrokenPoolError) as pool_ctx: - with self.Pool(1) as pool: - try: - func(pool, noop, callback=raising).get() - except CallbackError: - pass - func(pool, noop, callback=raising) - self._check_subexceptions(pool_ctx.exception, [CallbackError]) - self._check_subexceptions(pool_ctx.exception.exceptions[0], [KeyError]) - - # Skip this test for process-based parallelism as sharing the barrier will fail - if self.TYPE != 'processes': - with self.subTest(name="Multiple callback failures"): - # Fail with 3x callback failure: - # - BrokenPoolError containing: - # - 3x CallbackError containing: - # - Error thrown from the callback - with self.assertRaises(BrokenPoolError) as pool_ctx: - kwds = {'barrier': self.Barrier(3)} - with self.Pool(3) as pool: - res = [pool.apply_async(noop, kwds=kwds, callback=raising) - for _ in range(3)] - for r in res: - with self.assertRaises(CallbackError) as res_ctx: - r.get() - self._check_subexceptions(pool_ctx.exception, [CallbackError] * 3) - for se in pool_ctx.exception.exceptions: - self._check_subexceptions(se, [KeyError]) - - def _check_subexceptions(self, group, sub_types): - self.assertEqual(len(group.exceptions), len(sub_types)) - for sub_exc, sub_type in zip(group.exceptions, sub_types): - self.assertIsInstance(sub_exc, sub_type) - -def noop(*args, barrier=None): - if barrier: - barrier.wait() - -def raising(*args): +def raising(): raise KeyError("key") -def raising2(*args): - raise IndexError() - def unpickleable_result(): return lambda: 42 @@ -7080,8 +6965,7 @@ def tearDownClass(cls): f"{multiprocessing.active_children()} " f"active children after {dt:.1f} seconds") - # Garbage collect to ensure otherwise unreferenced cycles are cleaned up - support.gc_collect() + gc.collect() # do garbage collection if cls.manager._number_of_objects() != 0: # This is not really an error since some tests do not # ensure that all processes which hold a reference to a From 4c4674bc2e83135ae7db45f2d84600219dec1c53 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Tue, 27 May 2025 01:06:34 +1200 Subject: [PATCH 09/10] Add a test and a blurb --- Lib/multiprocessing/pool.py | 3 +- Lib/test/_test_multiprocessing.py | 34 ++++++++++++++++++- ...5-05-27-01-06-25.gh-issue-83371.-oeZI3.rst | 3 ++ 3 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 09f1db657c85f7..a643e7757022fd 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -788,7 +788,8 @@ def _handle_exceptions(callback, args): try: return callback(args) except Exception as e: - args = threading.ExceptHookArgs([type(e), e, e.__traceback__, None]) + args = threading.ExceptHookArgs([type(e), e, e.__traceback__, + threading.current_thread()]) threading.excepthook(args) del args diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 75f31d858d3306..f8744550a87191 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3172,7 +3172,39 @@ def test_resource_warning(self): pool = None support.gc_collect() -def raising(): + def test_callback_errors(self): + if self.TYPE == 'manager': + self.skipTest("cannot intercept excepthook in manager") + + def _apply(pool, target, **kwargs): + return pool.apply_async(target, **kwargs) + + def _map(pool, target, **kwargs): + return pool.map_async(target, range(1), **kwargs) + + def record_exceptions(errs): + def record(args): + errs.append(args.exc_type) + return record + + errs = [] + for func in [_apply, _map]: + with self.subTest(func=func): + saved_hook = threading.excepthook + threading.excepthook = record_exceptions(errs) + try: + with self.Pool(1) as pool: + res = func(pool, noop, callback=raising) + res.get() + finally: + threading.excepthook = saved_hook + + self.assertEqual(errs, [KeyError, KeyError]) + +def noop(*args): + pass + +def raising(*args): raise KeyError("key") def unpickleable_result(): diff --git a/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst b/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst new file mode 100644 index 00000000000000..576b8d8abeceaf --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst @@ -0,0 +1,3 @@ +Handle exceptions thrown by callbacks passed to +:class:`multiprocessing.Pool` ``*_async`` methods, preventing them from +breaking the pool. From e321047a473ebc09bd7a2278d7e1124b4d506c77 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Tue, 27 May 2025 10:40:34 +1200 Subject: [PATCH 10/10] Correct reference in blurb --- .../next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst b/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst index 576b8d8abeceaf..ecb61103d15c0c 100644 --- a/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst +++ b/Misc/NEWS.d/next/Library/2025-05-27-01-06-25.gh-issue-83371.-oeZI3.rst @@ -1,3 +1,3 @@ Handle exceptions thrown by callbacks passed to -:class:`multiprocessing.Pool` ``*_async`` methods, preventing them from +:class:`multiprocessing.pool.Pool` ``*_async`` methods, preventing them from breaking the pool.