2020import sys
2121import threading
2222import warnings
23+ from collections import deque
2324
2425from . import spawn
2526from . import util
@@ -66,6 +67,7 @@ def __init__(self):
6667 self ._fd = None
6768 self ._pid = None
6869 self ._exitcode = None
70+ self ._reentrant_messages = deque ()
6971
7072 def _reentrant_call_error (self ):
7173 # gh-109629: this happens if an explicit call to the ResourceTracker
@@ -132,80 +134,108 @@ def ensure_running(self):
132134
133135 This can be run from any process. Usually a child process will use
134136 the resource created by its parent.'''
137+ return self ._ensure_running_and_write ()
138+
139+ def _teardown_dead_process (self ):
140+ os .close (self ._fd )
141+
142+ # Clean-up to avoid dangling processes.
143+ try :
144+ # _pid can be None if this process is a child from another
145+ # python process, which has started the resource_tracker.
146+ if self ._pid is not None :
147+ os .waitpid (self ._pid , 0 )
148+ except ChildProcessError :
149+ # The resource_tracker has already been terminated.
150+ pass
151+ self ._fd = None
152+ self ._pid = None
153+ self ._exitcode = None
154+
155+ warnings .warn ('resource_tracker: process died unexpectedly, '
156+ 'relaunching. Some resources might leak.' )
157+
158+ def _launch (self ):
159+ fds_to_pass = []
160+ try :
161+ fds_to_pass .append (sys .stderr .fileno ())
162+ except Exception :
163+ pass
164+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
165+ r , w = os .pipe ()
166+ try :
167+ fds_to_pass .append (r )
168+ # process will out live us, so no need to wait on pid
169+ exe = spawn .get_executable ()
170+ args = [exe ] + util ._args_from_interpreter_flags ()
171+ args += ['-c' , cmd % r ]
172+ # bpo-33613: Register a signal mask that will block the signals.
173+ # This signal mask will be inherited by the child that is going
174+ # to be spawned and will protect the child from a race condition
175+ # that can make the child die before it registers signal handlers
176+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
177+ # the child.
178+ prev_sigmask = None
179+ try :
180+ if _HAVE_SIGMASK :
181+ prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
182+ pid = util .spawnv_passfds (exe , args , fds_to_pass )
183+ finally :
184+ if prev_sigmask is not None :
185+ signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
186+ except :
187+ os .close (w )
188+ raise
189+ else :
190+ self ._fd = w
191+ self ._pid = pid
192+ finally :
193+ os .close (r )
194+
195+ def _ensure_running_and_write (self , msg = None ):
135196 with self ._lock :
136197 if self ._lock ._recursion_count () > 1 :
137198 # The code below is certainly not reentrant-safe, so bail out
138- return self ._reentrant_call_error ()
199+ if msg is not None :
200+ self ._reentrant_messages .append (msg )
201+ return
139202 if self ._fd is not None :
140203 # resource tracker was launched before, is it still running?
141- if self ._check_alive ():
142- # => still alive
143- return
144- # => dead, launch it again
145- os .close (self ._fd )
146-
147- # Clean-up to avoid dangling processes.
204+ if msg is None :
205+ to_send = b'PROBE:0:noop\n '
206+ else :
207+ to_send = msg
148208 try :
149- # _pid can be None if this process is a child from another
150- # python process, which has started the resource_tracker.
151- if self ._pid is not None :
152- os .waitpid (self ._pid , 0 )
153- except ChildProcessError :
154- # The resource_tracker has already been terminated.
155- pass
156- self ._fd = None
157- self ._pid = None
158- self ._exitcode = None
159-
160- warnings .warn ('resource_tracker: process died unexpectedly, '
161- 'relaunching. Some resources might leak.' )
162-
163- fds_to_pass = []
164- try :
165- fds_to_pass .append (sys .stderr .fileno ())
166- except Exception :
167- pass
168- cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
169- r , w = os .pipe ()
170- try :
171- fds_to_pass .append (r )
172- # process will out live us, so no need to wait on pid
173- exe = spawn .get_executable ()
174- args = [exe ] + util ._args_from_interpreter_flags ()
175- args += ['-c' , cmd % r ]
176- # bpo-33613: Register a signal mask that will block the signals.
177- # This signal mask will be inherited by the child that is going
178- # to be spawned and will protect the child from a race condition
179- # that can make the child die before it registers signal handlers
180- # for SIGINT and SIGTERM. The mask is unregistered after spawning
181- # the child.
182- prev_sigmask = None
183- try :
184- if _HAVE_SIGMASK :
185- prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
186- pid = util .spawnv_passfds (exe , args , fds_to_pass )
187- finally :
188- if prev_sigmask is not None :
189- signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
190- except :
191- os .close (w )
192- raise
209+ self ._write (to_send )
210+ except OSError :
211+ dead = True
212+ else :
213+ dead = False
214+ if dead :
215+ self ._teardown_dead_process ()
216+ self ._launch ()
217+
218+ msg = None # message was sent in probe
193219 else :
194- self ._fd = w
195- self ._pid = pid
196- finally :
197- os .close (r )
220+ self ._launch ()
198221
199- def _check_alive (self ):
222+ while True :
223+ try :
224+ reentrant_msg = self ._reentrant_messages .popleft ()
225+ except IndexError :
226+ break
227+ self ._write (reentrant_msg )
228+ if msg is not None :
229+ self ._write (msg )
230+
231+ def _check_alive (self , msg = b'PROBE:0:noop\n ' ):
200232 '''Check that the pipe has not been closed by sending a probe.'''
201233 try :
202234 # We cannot use send here as it calls ensure_running, creating
203235 # a cycle.
204- os .write (self ._fd , b'PROBE:0:noop\n ' )
236+ return os .write (self ._fd , b'PROBE:0:noop\n ' )
205237 except OSError :
206- return False
207- else :
208- return True
238+ return None
209239
210240 def register (self , name , rtype ):
211241 '''Register name of resource with resource tracker.'''
@@ -215,27 +245,19 @@ def unregister(self, name, rtype):
215245 '''Unregister name of resource with resource tracker.'''
216246 self ._send ('UNREGISTER' , name , rtype )
217247
248+ def _write (self , msg ):
249+ nbytes = os .write (self ._fd , msg )
250+ assert nbytes == len (msg ), "nbytes {0:n} but len(msg) {1:n}" .format (
251+ nbytes , len (msg ))
252+
218253 def _send (self , cmd , name , rtype ):
219- try :
220- self .ensure_running ()
221- except ReentrantCallError :
222- # The code below might or might not work, depending on whether
223- # the resource tracker was already running and still alive.
224- # Better warn the user.
225- # (XXX is warnings.warn itself reentrant-safe? :-)
226- warnings .warn (
227- f"ResourceTracker called reentrantly for resource cleanup, "
228- f"which is unsupported. "
229- f"The { rtype } object { name !r} might leak." )
230254 msg = '{0}:{1}:{2}\n ' .format (cmd , name , rtype ).encode ('ascii' )
231255 if len (msg ) > 512 :
232256 # posix guarantees that writes to a pipe of less than PIPE_BUF
233257 # bytes are atomic, and that PIPE_BUF >= 512
234258 raise ValueError ('msg too long' )
235- nbytes = os .write (self ._fd , msg )
236- assert nbytes == len (msg ), "nbytes {0:n} but len(msg) {1:n}" .format (
237- nbytes , len (msg ))
238259
260+ self ._ensure_running_and_write (msg )
239261
240262_resource_tracker = ResourceTracker ()
241263ensure_running = _resource_tracker .ensure_running
0 commit comments