-
-
Notifications
You must be signed in to change notification settings - Fork 33.1k
gh-131788: make resource_tracker re-entrant safe #131787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
gpshead
merged 18 commits into
python:main
from
graingert:make-resource-tracker-reentrant-safe
Aug 13, 2025
+94
−70
Merged
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
9114b16
make resource_tracker re-entrant safe
graingert 60ac5f9
📜🤖 Added by blurb_it.
blurb-it[bot] 1565c28
Merge branch 'main' into make-resource-tracker-reentrant-safe
graingert 52b2100
Update Lib/multiprocessing/resource_tracker.py
graingert ce82968
Update Lib/multiprocessing/resource_tracker.py
graingert 15e73a9
Update Lib/multiprocessing/resource_tracker.py
graingert 174a3ff
trim trailing whitespace
graingert 54008b1
Merge branch 'main' into make-resource-tracker-reentrant-safe
graingert 5d6406c
use f-string and args = [x, *y, z]
graingert 395d706
raise self._reentrant_call_error
graingert 3331162
Update Lib/multiprocessing/resource_tracker.py
graingert 544d02e
Merge branch 'main' into make-resource-tracker-reentrant-safe
graingert a2c537d
Update Lib/multiprocessing/resource_tracker.py
graingert 5351b10
Update Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0…
graingert efe1c92
Merge branch 'main' into make-resource-tracker-reentrant-safe
graingert 8e8f9f7
Merge branch 'main' into make-resource-tracker-reentrant-safe
graingert 2b7d1fa
Merge branch 'main' into make-resource-tracker-reentrant-safe
gpshead 7ded9c1
Merge branch 'main' into make-resource-tracker-reentrant-safe
colesbury File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import sys | ||
import threading | ||
import warnings | ||
from collections import deque | ||
|
||
from . import spawn | ||
from . import util | ||
|
@@ -66,6 +67,7 @@ def __init__(self): | |
self._fd = None | ||
self._pid = None | ||
self._exitcode = None | ||
self._reentrant_messages = deque() | ||
|
||
def _reentrant_call_error(self): | ||
# gh-109629: this happens if an explicit call to the ResourceTracker | ||
|
@@ -132,69 +134,100 @@ def ensure_running(self): | |
|
||
This can be run from any process. Usually a child process will use | ||
the resource created by its parent.''' | ||
return self._ensure_running_and_write() | ||
|
||
def _teardown_dead_process(self): | ||
os.close(self._fd) | ||
|
||
# Clean-up to avoid dangling processes. | ||
try: | ||
# _pid can be None if this process is a child from another | ||
# python process, which has started the resource_tracker. | ||
if self._pid is not None: | ||
os.waitpid(self._pid, 0) | ||
except ChildProcessError: | ||
# The resource_tracker has already been terminated. | ||
pass | ||
self._fd = None | ||
self._pid = None | ||
self._exitcode = None | ||
|
||
warnings.warn('resource_tracker: process died unexpectedly, ' | ||
'relaunching. Some resources might leak.') | ||
|
||
def _launch(self): | ||
fds_to_pass = [] | ||
try: | ||
fds_to_pass.append(sys.stderr.fileno()) | ||
except Exception: | ||
pass | ||
cmd = 'from multiprocessing.resource_tracker import main;main(%d)' | ||
r, w = os.pipe() | ||
try: | ||
fds_to_pass.append(r) | ||
# process will out live us, so no need to wait on pid | ||
exe = spawn.get_executable() | ||
args = [exe] + util._args_from_interpreter_flags() | ||
args += ['-c', cmd % r] | ||
graingert marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
# bpo-33613: Register a signal mask that will block the signals. | ||
# This signal mask will be inherited by the child that is going | ||
# to be spawned and will protect the child from a race condition | ||
# that can make the child die before it registers signal handlers | ||
# for SIGINT and SIGTERM. The mask is unregistered after spawning | ||
# the child. | ||
prev_sigmask = None | ||
try: | ||
if _HAVE_SIGMASK: | ||
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) | ||
pid = util.spawnv_passfds(exe, args, fds_to_pass) | ||
finally: | ||
if prev_sigmask is not None: | ||
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask) | ||
except: | ||
os.close(w) | ||
raise | ||
else: | ||
self._fd = w | ||
self._pid = pid | ||
finally: | ||
os.close(r) | ||
|
||
def _ensure_running_and_write(self, msg=None): | ||
with self._lock: | ||
if self._lock._recursion_count() > 1: | ||
# The code below is certainly not reentrant-safe, so bail out | ||
return self._reentrant_call_error() | ||
if msg is None: | ||
return self._reentrant_call_error() | ||
graingert marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return self._reentrant_messages.append(msg) | ||
|
||
if self._fd is not None: | ||
# resource tracker was launched before, is it still running? | ||
if self._check_alive(): | ||
# => still alive | ||
return | ||
# => dead, launch it again | ||
os.close(self._fd) | ||
|
||
# Clean-up to avoid dangling processes. | ||
if msg is None: | ||
to_send = b'PROBE:0:noop\n' | ||
else: | ||
to_send = msg | ||
try: | ||
# _pid can be None if this process is a child from another | ||
# python process, which has started the resource_tracker. | ||
if self._pid is not None: | ||
os.waitpid(self._pid, 0) | ||
except ChildProcessError: | ||
# The resource_tracker has already been terminated. | ||
pass | ||
self._fd = None | ||
self._pid = None | ||
self._exitcode = None | ||
|
||
warnings.warn('resource_tracker: process died unexpectedly, ' | ||
'relaunching. Some resources might leak.') | ||
self._write(to_send) | ||
except OSError: | ||
dead = True | ||
else: | ||
dead = False | ||
if dead: | ||
self._teardown_dead_process() | ||
self._launch() | ||
graingert marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
msg = None # message was sent in probe | ||
else: | ||
self._launch() | ||
|
||
fds_to_pass = [] | ||
try: | ||
fds_to_pass.append(sys.stderr.fileno()) | ||
except Exception: | ||
pass | ||
cmd = 'from multiprocessing.resource_tracker import main;main(%d)' | ||
r, w = os.pipe() | ||
while True: | ||
try: | ||
fds_to_pass.append(r) | ||
# process will out live us, so no need to wait on pid | ||
exe = spawn.get_executable() | ||
args = [exe] + util._args_from_interpreter_flags() | ||
args += ['-c', cmd % r] | ||
# bpo-33613: Register a signal mask that will block the signals. | ||
# This signal mask will be inherited by the child that is going | ||
# to be spawned and will protect the child from a race condition | ||
# that can make the child die before it registers signal handlers | ||
# for SIGINT and SIGTERM. The mask is unregistered after spawning | ||
# the child. | ||
prev_sigmask = None | ||
try: | ||
if _HAVE_SIGMASK: | ||
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) | ||
pid = util.spawnv_passfds(exe, args, fds_to_pass) | ||
finally: | ||
if prev_sigmask is not None: | ||
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask) | ||
except: | ||
os.close(w) | ||
raise | ||
else: | ||
self._fd = w | ||
self._pid = pid | ||
finally: | ||
os.close(r) | ||
reentrant_msg = self._reentrant_messages.popleft() | ||
except IndexError: | ||
break | ||
self._write(reentrant_msg) | ||
if msg is not None: | ||
self._write(msg) | ||
|
||
def _check_alive(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is not called anymore, the method can be clean up. |
||
'''Check that the pipe has not been closed by sending a probe.''' | ||
|
@@ -215,27 +248,19 @@ def unregister(self, name, rtype): | |
'''Unregister name of resource with resource tracker.''' | ||
self._send('UNREGISTER', name, rtype) | ||
|
||
def _write(self, msg): | ||
nbytes = os.write(self._fd, msg) | ||
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( | ||
nbytes, len(msg)) | ||
graingert marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
def _send(self, cmd, name, rtype): | ||
try: | ||
self.ensure_running() | ||
except ReentrantCallError: | ||
# The code below might or might not work, depending on whether | ||
# the resource tracker was already running and still alive. | ||
# Better warn the user. | ||
# (XXX is warnings.warn itself reentrant-safe? :-) | ||
warnings.warn( | ||
f"ResourceTracker called reentrantly for resource cleanup, " | ||
f"which is unsupported. " | ||
f"The {rtype} object {name!r} might leak.") | ||
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') | ||
graingert marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if len(msg) > 512: | ||
# posix guarantees that writes to a pipe of less than PIPE_BUF | ||
# bytes are atomic, and that PIPE_BUF >= 512 | ||
raise ValueError('msg too long') | ||
nbytes = os.write(self._fd, msg) | ||
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( | ||
nbytes, len(msg)) | ||
|
||
self._ensure_running_and_write(msg) | ||
|
||
_resource_tracker = ResourceTracker() | ||
ensure_running = _resource_tracker.ensure_running | ||
|
1 change: 1 addition & 0 deletions
1
Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Make ``ResourceTracker.send`` re-entrant safe | ||
graingert marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.