-
-
Notifications
You must be signed in to change notification settings - Fork 33.3k
gh-66587: Fix deadlock from pool worker death without communication #16103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
c8f4896
315ec3d
bcbd7d3
e1a9eb5
6459284
fa54afb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -121,6 +121,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, | |||||||||
| break | ||||||||||
|
|
||||||||||
| job, i, func, args, kwds = task | ||||||||||
| put((job, i, (None, os.getpid()))) # Provide info on who took job | ||||||||||
| try: | ||||||||||
| result = (True, func(*args, **kwds)) | ||||||||||
| except Exception as e: | ||||||||||
|
|
@@ -220,12 +221,14 @@ def __init__(self, processes=None, initializer=None, initargs=(), | |||||||||
|
|
||||||||||
| sentinels = self._get_sentinels() | ||||||||||
|
|
||||||||||
| self._job_assignments = {} | ||||||||||
| self._worker_handler = threading.Thread( | ||||||||||
| target=Pool._handle_workers, | ||||||||||
| args=(self._cache, self._taskqueue, self._ctx, self.Process, | ||||||||||
| self._processes, self._pool, self._inqueue, self._outqueue, | ||||||||||
| self._initializer, self._initargs, self._maxtasksperchild, | ||||||||||
| self._wrap_exception, sentinels, self._change_notifier) | ||||||||||
| self._wrap_exception, sentinels, self._change_notifier, | ||||||||||
| self._job_assignments) | ||||||||||
| ) | ||||||||||
| self._worker_handler.daemon = True | ||||||||||
| self._worker_handler._state = RUN | ||||||||||
|
|
@@ -243,7 +246,8 @@ def __init__(self, processes=None, initializer=None, initargs=(), | |||||||||
|
|
||||||||||
| self._result_handler = threading.Thread( | ||||||||||
| target=Pool._handle_results, | ||||||||||
| args=(self._outqueue, self._quick_get, self._cache) | ||||||||||
| args=(self._outqueue, self._quick_get, self._cache, | ||||||||||
| self._job_assignments) | ||||||||||
| ) | ||||||||||
| self._result_handler.daemon = True | ||||||||||
| self._result_handler._state = RUN | ||||||||||
|
|
@@ -264,8 +268,6 @@ def __del__(self, _warn=warnings.warn, RUN=RUN): | |||||||||
| if self._state == RUN: | ||||||||||
| _warn(f"unclosed running multiprocessing pool {self!r}", | ||||||||||
| ResourceWarning, source=self) | ||||||||||
| if getattr(self, '_change_notifier', None) is not None: | ||||||||||
| self._change_notifier.put(None) | ||||||||||
|
|
||||||||||
| def __repr__(self): | ||||||||||
| cls = self.__class__ | ||||||||||
|
|
@@ -284,7 +286,7 @@ def _get_worker_sentinels(workers): | |||||||||
| workers if hasattr(worker, "sentinel")] | ||||||||||
|
|
||||||||||
| @staticmethod | ||||||||||
| def _join_exited_workers(pool): | ||||||||||
| def _join_exited_workers(pool, outqueue, job_assignments): | ||||||||||
| """Cleanup after any worker processes which have exited due to reaching | ||||||||||
| their specified lifetime. Returns True if any workers were cleaned up. | ||||||||||
| """ | ||||||||||
|
|
@@ -294,8 +296,15 @@ def _join_exited_workers(pool): | |||||||||
| if worker.exitcode is not None: | ||||||||||
| # worker exited | ||||||||||
| util.debug('cleaning up worker %d' % i) | ||||||||||
| pid = worker.ident | ||||||||||
| worker.join() | ||||||||||
| cleaned = True | ||||||||||
| if pid in job_assignments: | ||||||||||
|
||||||||||
| if pid in job_assignments: | |
| job = job_assignments.pop(pid, None) | |
| if job: | |
| outqueue.put((job, i, (False, RuntimeError("Worker died")))) |
And some additional simplification below, of course.
ambv marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
ambv marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you remove the job from job_assignement here? It would avoid unecessary operation when a worker died gracefully.
ambv marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2712,6 +2712,27 @@ def errback(exc): | |
| p.close() | ||
| p.join() | ||
|
|
||
| def test_pool_worker_died_without_communicating(self): | ||
| # Issue22393: test fix of indefinite hang caused by worker processes | ||
| # exiting abruptly (such as via os._exit()) without communicating | ||
| # back to the pool at all. | ||
| prog = ( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be written much more clearly using a multi-line string. See for example a very similar case in |
||
| "import os, multiprocessing as mp; " | ||
| "is_main = (__name__ == '__main__'); " | ||
| "p = mp.Pool(1) if is_main else print('worker'); " | ||
| "p.map(os._exit, [1]) if is_main else None; " | ||
| "(p.close() or p.join()) if is_main else None" | ||
| ) | ||
| # Only if there is a regression will this ever trigger a | ||
| # subprocess.TimeoutExpired. | ||
| completed_process = subprocess.run( | ||
| [sys.executable, '-E', '-S', '-O', '-c', prog], | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The '-O' flag probably shouldn't be used here, but '-S' and '-E' seem fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, consider calling Or just use |
||
| check=False, | ||
| timeout=100, | ||
| capture_output=True | ||
| ) | ||
| self.assertNotEqual(0, completed_process.returncode) | ||
|
|
||
| class _TestPoolWorkerLifetime(BaseTestCase): | ||
| ALLOWED_TYPES = ('processes', ) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix deadlock from multiprocessing.Pool worker death without communication. |
Uh oh!
There was an error while loading. Please reload this page.