2020import sys
2121import threading
2222import warnings
23+ from collections import deque
2324
2425from . import spawn
2526from . import util
@@ -62,6 +63,7 @@ def __init__(self):
6263 self ._fd = None
6364 self ._pid = None
6465 self ._exitcode = None
66+ self ._reentrant_messages = deque ()
6567
6668 def _reentrant_call_error (self ):
6769 # gh-109629: this happens if an explicit call to the ResourceTracker
@@ -98,7 +100,7 @@ def _stop_locked(
98100 # This shouldn't happen (it might when called by a finalizer)
99101 # so we check for it anyway.
100102 if self ._lock ._recursion_count () > 1 :
101- return self ._reentrant_call_error ()
103+ raise self ._reentrant_call_error ()
102104 if self ._fd is None :
103105 # not running
104106 return
@@ -128,69 +130,99 @@ def ensure_running(self):
128130
129131 This can be run from any process. Usually a child process will use
130132 the resource created by its parent.'''
133+ return self ._ensure_running_and_write ()
134+
135+ def _teardown_dead_process (self ):
136+ os .close (self ._fd )
137+
138+ # Clean-up to avoid dangling processes.
139+ try :
140+ # _pid can be None if this process is a child from another
141+ # python process, which has started the resource_tracker.
142+ if self ._pid is not None :
143+ os .waitpid (self ._pid , 0 )
144+ except ChildProcessError :
145+ # The resource_tracker has already been terminated.
146+ pass
147+ self ._fd = None
148+ self ._pid = None
149+ self ._exitcode = None
150+
151+ warnings .warn ('resource_tracker: process died unexpectedly, '
152+ 'relaunching. Some resources might leak.' )
153+
154+ def _launch (self ):
155+ fds_to_pass = []
156+ try :
157+ fds_to_pass .append (sys .stderr .fileno ())
158+ except Exception :
159+ pass
160+ r , w = os .pipe ()
161+ try :
162+ fds_to_pass .append (r )
163+ # process will out live us, so no need to wait on pid
164+ exe = spawn .get_executable ()
165+ args = [
166+ exe ,
167+ * util ._args_from_interpreter_flags (),
168+ '-c' ,
169+ f'from multiprocessing.resource_tracker import main;main({ r } )' ,
170+ ]
171+ # bpo-33613: Register a signal mask that will block the signals.
172+ # This signal mask will be inherited by the child that is going
173+ # to be spawned and will protect the child from a race condition
174+ # that can make the child die before it registers signal handlers
175+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
176+ # the child.
177+ prev_sigmask = None
178+ try :
179+ if _HAVE_SIGMASK :
180+ prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
181+ pid = util .spawnv_passfds (exe , args , fds_to_pass )
182+ finally :
183+ if prev_sigmask is not None :
184+ signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
185+ except :
186+ os .close (w )
187+ raise
188+ else :
189+ self ._fd = w
190+ self ._pid = pid
191+ finally :
192+ os .close (r )
193+
194+ def _ensure_running_and_write (self , msg = None ):
131195 with self ._lock :
132196 if self ._lock ._recursion_count () > 1 :
133197 # The code below is certainly not reentrant-safe, so bail out
134- return self ._reentrant_call_error ()
198+ if msg is None :
199+ raise self ._reentrant_call_error ()
200+ return self ._reentrant_messages .append (msg )
201+
135202 if self ._fd is not None :
136203 # resource tracker was launched before, is it still running?
137- if self ._check_alive ():
138- # => still alive
139- return
140- # => dead, launch it again
141- os .close (self ._fd )
142-
143- # Clean-up to avoid dangling processes.
204+ if msg is None :
205+ to_send = b'PROBE:0:noop\n '
206+ else :
207+ to_send = msg
144208 try :
145- # _pid can be None if this process is a child from another
146- # python process, which has started the resource_tracker.
147- if self ._pid is not None :
148- os .waitpid (self ._pid , 0 )
149- except ChildProcessError :
150- # The resource_tracker has already been terminated.
151- pass
152- self ._fd = None
153- self ._pid = None
154- self ._exitcode = None
209+ self ._write (to_send )
210+ except OSError :
211+ self ._teardown_dead_process ()
212+ self ._launch ()
155213
156- warnings .warn ('resource_tracker: process died unexpectedly, '
157- 'relaunching. Some resources might leak.' )
214+ msg = None # message was sent in probe
215+ else :
216+ self ._launch ()
158217
159- fds_to_pass = []
218+ while True :
160219 try :
161- fds_to_pass .append (sys .stderr .fileno ())
162- except Exception :
163- pass
164- cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
165- r , w = os .pipe ()
166- try :
167- fds_to_pass .append (r )
168- # process will out live us, so no need to wait on pid
169- exe = spawn .get_executable ()
170- args = [exe ] + util ._args_from_interpreter_flags ()
171- args += ['-c' , cmd % r ]
172- # bpo-33613: Register a signal mask that will block the signals.
173- # This signal mask will be inherited by the child that is going
174- # to be spawned and will protect the child from a race condition
175- # that can make the child die before it registers signal handlers
176- # for SIGINT and SIGTERM. The mask is unregistered after spawning
177- # the child.
178- prev_sigmask = None
179- try :
180- if _HAVE_SIGMASK :
181- prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
182- pid = util .spawnv_passfds (exe , args , fds_to_pass )
183- finally :
184- if prev_sigmask is not None :
185- signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
186- except :
187- os .close (w )
188- raise
189- else :
190- self ._fd = w
191- self ._pid = pid
192- finally :
193- os .close (r )
220+ reentrant_msg = self ._reentrant_messages .popleft ()
221+ except IndexError :
222+ break
223+ self ._write (reentrant_msg )
224+ if msg is not None :
225+ self ._write (msg )
194226
195227 def _check_alive (self ):
196228 '''Check that the pipe has not been closed by sending a probe.'''
@@ -211,27 +243,18 @@ def unregister(self, name, rtype):
211243 '''Unregister name of resource with resource tracker.'''
212244 self ._send ('UNREGISTER' , name , rtype )
213245
246+ def _write (self , msg ):
247+ nbytes = os .write (self ._fd , msg )
248+ assert nbytes == len (msg ), f"{ nbytes = } != { len (msg )= } "
249+
214250 def _send (self , cmd , name , rtype ):
215- try :
216- self .ensure_running ()
217- except ReentrantCallError :
218- # The code below might or might not work, depending on whether
219- # the resource tracker was already running and still alive.
220- # Better warn the user.
221- # (XXX is warnings.warn itself reentrant-safe? :-)
222- warnings .warn (
223- f"ResourceTracker called reentrantly for resource cleanup, "
224- f"which is unsupported. "
225- f"The { rtype } object { name !r} might leak." )
226- msg = '{0}:{1}:{2}\n ' .format (cmd , name , rtype ).encode ('ascii' )
251+ msg = f"{ cmd } :{ name } :{ rtype } \n " .encode ("ascii" )
227252 if len (msg ) > 512 :
228253 # posix guarantees that writes to a pipe of less than PIPE_BUF
229254 # bytes are atomic, and that PIPE_BUF >= 512
230255 raise ValueError ('msg too long' )
231- nbytes = os .write (self ._fd , msg )
232- assert nbytes == len (msg ), "nbytes {0:n} but len(msg) {1:n}" .format (
233- nbytes , len (msg ))
234256
257+ self ._ensure_running_and_write (msg )
235258
236259_resource_tracker = ResourceTracker ()
237260ensure_running = _resource_tracker .ensure_running
0 commit comments