2020import sys
2121import threading
2222import warnings
23- from collections import deque
2423
2524from . import spawn
2625from . import util
@@ -67,7 +66,6 @@ def __init__(self):
6766 self ._fd = None
6867 self ._pid = None
6968 self ._exitcode = None
70- self ._reentrant_messages = deque ()
7169
7270 def _reentrant_call_error (self ):
7371 # gh-109629: this happens if an explicit call to the ResourceTracker
@@ -104,7 +102,7 @@ def _stop_locked(
104102 # This shouldn't happen (it might when called by a finalizer)
105103 # so we check for it anyway.
106104 if self ._lock ._recursion_count () > 1 :
107- raise self ._reentrant_call_error ()
105+ return self ._reentrant_call_error ()
108106 if self ._fd is None :
109107 # not running
110108 return
@@ -134,99 +132,69 @@ def ensure_running(self):
134132
135133 This can be run from any process. Usually a child process will use
136134 the resource created by its parent.'''
137- return self ._ensure_running_and_write ()
138-
139- def _teardown_dead_process (self ):
140- os .close (self ._fd )
141-
142- # Clean-up to avoid dangling processes.
143- try :
144- # _pid can be None if this process is a child from another
145- # python process, which has started the resource_tracker.
146- if self ._pid is not None :
147- os .waitpid (self ._pid , 0 )
148- except ChildProcessError :
149- # The resource_tracker has already been terminated.
150- pass
151- self ._fd = None
152- self ._pid = None
153- self ._exitcode = None
154-
155- warnings .warn ('resource_tracker: process died unexpectedly, '
156- 'relaunching. Some resources might leak.' )
157-
158- def _launch (self ):
159- fds_to_pass = []
160- try :
161- fds_to_pass .append (sys .stderr .fileno ())
162- except Exception :
163- pass
164- r , w = os .pipe ()
165- try :
166- fds_to_pass .append (r )
167- # process will out live us, so no need to wait on pid
168- exe = spawn .get_executable ()
169- args = [
170- exe ,
171- * util ._args_from_interpreter_flags (),
172- '-c' ,
173- f'from multiprocessing.resource_tracker import main;main({ r } )' ,
174- ]
175- # bpo-33613: Register a signal mask that will block the signals.
176- # This signal mask will be inherited by the child that is going
177- # to be spawned and will protect the child from a race condition
178- # that can make the child die before it registers signal handlers
179- # for SIGINT and SIGTERM. The mask is unregistered after spawning
180- # the child.
181- prev_sigmask = None
182- try :
183- if _HAVE_SIGMASK :
184- prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
185- pid = util .spawnv_passfds (exe , args , fds_to_pass )
186- finally :
187- if prev_sigmask is not None :
188- signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
189- except :
190- os .close (w )
191- raise
192- else :
193- self ._fd = w
194- self ._pid = pid
195- finally :
196- os .close (r )
197-
198- def _ensure_running_and_write (self , msg = None ):
199135 with self ._lock :
200136 if self ._lock ._recursion_count () > 1 :
201137 # The code below is certainly not reentrant-safe, so bail out
202- if msg is None :
203- raise self ._reentrant_call_error ()
204- return self ._reentrant_messages .append (msg )
205-
138+ return self ._reentrant_call_error ()
206139 if self ._fd is not None :
207140 # resource tracker was launched before, is it still running?
208- if msg is None :
209- to_send = b'PROBE:0:noop\n '
210- else :
211- to_send = msg
141+ if self ._check_alive ():
142+ # => still alive
143+ return
144+ # => dead, launch it again
145+ os .close (self ._fd )
146+
147+ # Clean-up to avoid dangling processes.
212148 try :
213- self ._write (to_send )
214- except OSError :
215- self ._teardown_dead_process ()
216- self ._launch ()
149+ # _pid can be None if this process is a child from another
150+ # python process, which has started the resource_tracker.
151+ if self ._pid is not None :
152+ os .waitpid (self ._pid , 0 )
153+ except ChildProcessError :
154+ # The resource_tracker has already been terminated.
155+ pass
156+ self ._fd = None
157+ self ._pid = None
158+ self ._exitcode = None
217159
218- msg = None # message was sent in probe
219- else :
220- self ._launch ()
160+ warnings .warn ('resource_tracker: process died unexpectedly, '
161+ 'relaunching. Some resources might leak.' )
221162
222- while True :
163+ fds_to_pass = []
223164 try :
224- reentrant_msg = self ._reentrant_messages .popleft ()
225- except IndexError :
226- break
227- self ._write (reentrant_msg )
228- if msg is not None :
229- self ._write (msg )
165+ fds_to_pass .append (sys .stderr .fileno ())
166+ except Exception :
167+ pass
168+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
169+ r , w = os .pipe ()
170+ try :
171+ fds_to_pass .append (r )
172+ # process will out live us, so no need to wait on pid
173+ exe = spawn .get_executable ()
174+ args = [exe ] + util ._args_from_interpreter_flags ()
175+ args += ['-c' , cmd % r ]
176+ # bpo-33613: Register a signal mask that will block the signals.
177+ # This signal mask will be inherited by the child that is going
178+ # to be spawned and will protect the child from a race condition
179+ # that can make the child die before it registers signal handlers
180+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
181+ # the child.
182+ prev_sigmask = None
183+ try :
184+ if _HAVE_SIGMASK :
185+ prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
186+ pid = util .spawnv_passfds (exe , args , fds_to_pass )
187+ finally :
188+ if prev_sigmask is not None :
189+ signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
190+ except :
191+ os .close (w )
192+ raise
193+ else :
194+ self ._fd = w
195+ self ._pid = pid
196+ finally :
197+ os .close (r )
230198
231199 def _check_alive (self ):
232200 '''Check that the pipe has not been closed by sending a probe.'''
@@ -247,18 +215,27 @@ def unregister(self, name, rtype):
247215 '''Unregister name of resource with resource tracker.'''
248216 self ._send ('UNREGISTER' , name , rtype )
249217
250- def _write (self , msg ):
251- nbytes = os .write (self ._fd , msg )
252- assert nbytes == len (msg ), f"{ nbytes = } != { len (msg )= } "
253-
254218 def _send (self , cmd , name , rtype ):
255- msg = f"{ cmd } :{ name } :{ rtype } \n " .encode ("ascii" )
219+ try :
220+ self .ensure_running ()
221+ except ReentrantCallError :
222+ # The code below might or might not work, depending on whether
223+ # the resource tracker was already running and still alive.
224+ # Better warn the user.
225+ # (XXX is warnings.warn itself reentrant-safe? :-)
226+ warnings .warn (
227+ f"ResourceTracker called reentrantly for resource cleanup, "
228+ f"which is unsupported. "
229+ f"The { rtype } object { name !r} might leak." )
230+ msg = '{0}:{1}:{2}\n ' .format (cmd , name , rtype ).encode ('ascii' )
256231 if len (msg ) > 512 :
257232 # posix guarantees that writes to a pipe of less than PIPE_BUF
258233 # bytes are atomic, and that PIPE_BUF >= 512
259234 raise ValueError ('msg too long' )
235+ nbytes = os .write (self ._fd , msg )
236+ assert nbytes == len (msg ), "nbytes {0:n} but len(msg) {1:n}" .format (
237+ nbytes , len (msg ))
260238
261- self ._ensure_running_and_write (msg )
262239
263240_resource_tracker = ResourceTracker ()
264241ensure_running = _resource_tracker .ensure_running
0 commit comments