2020import  sys 
2121import  threading 
2222import  warnings 
23+ from  collections  import  deque 
2324
2425from  . import  spawn 
2526from  . import  util 
@@ -62,6 +63,7 @@ def __init__(self):
6263        self ._fd  =  None 
6364        self ._pid  =  None 
6465        self ._exitcode  =  None 
66+         self ._reentrant_messages  =  deque ()
6567
6668    def  _reentrant_call_error (self ):
6769        # gh-109629: this happens if an explicit call to the ResourceTracker 
@@ -98,7 +100,7 @@ def _stop_locked(
98100        # This shouldn't happen (it might when called by a finalizer) 
99101        # so we check for it anyway. 
100102        if  self ._lock ._recursion_count () >  1 :
101-             return  self ._reentrant_call_error ()
103+             raise  self ._reentrant_call_error ()
102104        if  self ._fd  is  None :
103105            # not running 
104106            return 
@@ -128,69 +130,99 @@ def ensure_running(self):
128130
129131        This can be run from any process.  Usually a child process will use 
130132        the resource created by its parent.''' 
133+         return  self ._ensure_running_and_write ()
134+ 
135+     def  _teardown_dead_process (self ):
136+         os .close (self ._fd )
137+ 
138+         # Clean-up to avoid dangling processes. 
139+         try :
140+             # _pid can be None if this process is a child from another 
141+             # python process, which has started the resource_tracker. 
142+             if  self ._pid  is  not None :
143+                 os .waitpid (self ._pid , 0 )
144+         except  ChildProcessError :
145+             # The resource_tracker has already been terminated. 
146+             pass 
147+         self ._fd  =  None 
148+         self ._pid  =  None 
149+         self ._exitcode  =  None 
150+ 
151+         warnings .warn ('resource_tracker: process died unexpectedly, ' 
152+                       'relaunching.  Some resources might leak.' )
153+ 
154+     def  _launch (self ):
155+         fds_to_pass  =  []
156+         try :
157+             fds_to_pass .append (sys .stderr .fileno ())
158+         except  Exception :
159+             pass 
160+         r , w  =  os .pipe ()
161+         try :
162+             fds_to_pass .append (r )
163+             # process will out live us, so no need to wait on pid 
164+             exe  =  spawn .get_executable ()
165+             args  =  [
166+                 exe ,
167+                 * util ._args_from_interpreter_flags (),
168+                 '-c' ,
169+                 f'from multiprocessing.resource_tracker import main;main({ r }  ,
170+             ]
171+             # bpo-33613: Register a signal mask that will block the signals. 
172+             # This signal mask will be inherited by the child that is going 
173+             # to be spawned and will protect the child from a race condition 
174+             # that can make the child die before it registers signal handlers 
175+             # for SIGINT and SIGTERM. The mask is unregistered after spawning 
176+             # the child. 
177+             prev_sigmask  =  None 
178+             try :
179+                 if  _HAVE_SIGMASK :
180+                     prev_sigmask  =  signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
181+                 pid  =  util .spawnv_passfds (exe , args , fds_to_pass )
182+             finally :
183+                 if  prev_sigmask  is  not None :
184+                     signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
185+         except :
186+             os .close (w )
187+             raise 
188+         else :
189+             self ._fd  =  w 
190+             self ._pid  =  pid 
191+         finally :
192+             os .close (r )
193+ 
194+     def  _ensure_running_and_write (self , msg = None ):
131195        with  self ._lock :
132196            if  self ._lock ._recursion_count () >  1 :
133197                # The code below is certainly not reentrant-safe, so bail out 
134-                 return  self ._reentrant_call_error ()
198+                 if  msg  is  None :
199+                     raise  self ._reentrant_call_error ()
200+                 return  self ._reentrant_messages .append (msg )
201+ 
135202            if  self ._fd  is  not None :
136203                # resource tracker was launched before, is it still running? 
137-                 if  self ._check_alive ():
138-                     # => still alive 
139-                     return 
140-                 # => dead, launch it again 
141-                 os .close (self ._fd )
142- 
143-                 # Clean-up to avoid dangling processes. 
204+                 if  msg  is  None :
205+                     to_send  =  b'PROBE:0:noop\n ' 
206+                 else :
207+                     to_send  =  msg 
144208                try :
145-                     # _pid can be None if this process is a child from another 
146-                     # python process, which has started the resource_tracker. 
147-                     if  self ._pid  is  not None :
148-                         os .waitpid (self ._pid , 0 )
149-                 except  ChildProcessError :
150-                     # The resource_tracker has already been terminated. 
151-                     pass 
152-                 self ._fd  =  None 
153-                 self ._pid  =  None 
154-                 self ._exitcode  =  None 
209+                     self ._write (to_send )
210+                 except  OSError :
211+                     self ._teardown_dead_process ()
212+                     self ._launch ()
155213
156-                 warnings .warn ('resource_tracker: process died unexpectedly, ' 
157-                               'relaunching.  Some resources might leak.' )
214+                 msg  =  None   # message was sent in probe 
215+             else :
216+                 self ._launch ()
158217
159-              fds_to_pass   =  [] 
218+         while   True : 
160219            try :
161-                 fds_to_pass .append (sys .stderr .fileno ())
162-             except  Exception :
163-                 pass 
164-             cmd  =  'from multiprocessing.resource_tracker import main;main(%d)' 
165-             r , w  =  os .pipe ()
166-             try :
167-                 fds_to_pass .append (r )
168-                 # process will out live us, so no need to wait on pid 
169-                 exe  =  spawn .get_executable ()
170-                 args  =  [exe ] +  util ._args_from_interpreter_flags ()
171-                 args  +=  ['-c' , cmd  %  r ]
172-                 # bpo-33613: Register a signal mask that will block the signals. 
173-                 # This signal mask will be inherited by the child that is going 
174-                 # to be spawned and will protect the child from a race condition 
175-                 # that can make the child die before it registers signal handlers 
176-                 # for SIGINT and SIGTERM. The mask is unregistered after spawning 
177-                 # the child. 
178-                 prev_sigmask  =  None 
179-                 try :
180-                     if  _HAVE_SIGMASK :
181-                         prev_sigmask  =  signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
182-                     pid  =  util .spawnv_passfds (exe , args , fds_to_pass )
183-                 finally :
184-                     if  prev_sigmask  is  not None :
185-                         signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
186-             except :
187-                 os .close (w )
188-                 raise 
189-             else :
190-                 self ._fd  =  w 
191-                 self ._pid  =  pid 
192-             finally :
193-                 os .close (r )
220+                 reentrant_msg  =  self ._reentrant_messages .popleft ()
221+             except  IndexError :
222+                 break 
223+             self ._write (reentrant_msg )
224+         if  msg  is  not None :
225+             self ._write (msg )
194226
195227    def  _check_alive (self ):
196228        '''Check that the pipe has not been closed by sending a probe.''' 
@@ -211,27 +243,18 @@ def unregister(self, name, rtype):
211243        '''Unregister name of resource with resource tracker.''' 
212244        self ._send ('UNREGISTER' , name , rtype )
213245
246+     def  _write (self , msg ):
247+         nbytes  =  os .write (self ._fd , msg )
248+         assert  nbytes  ==  len (msg ), f"{ nbytes = } { len (msg )= }  
249+ 
214250    def  _send (self , cmd , name , rtype ):
215-         try :
216-             self .ensure_running ()
217-         except  ReentrantCallError :
218-             # The code below might or might not work, depending on whether 
219-             # the resource tracker was already running and still alive. 
220-             # Better warn the user. 
221-             # (XXX is warnings.warn itself reentrant-safe? :-) 
222-             warnings .warn (
223-                 f"ResourceTracker called reentrantly for resource cleanup, " 
224-                 f"which is unsupported. " 
225-                 f"The { rtype } { name !r}  )
226-         msg  =  '{0}:{1}:{2}\n ' .format (cmd , name , rtype ).encode ('ascii' )
251+         msg  =  f"{ cmd } { name } { rtype } \n " .encode ("ascii" )
227252        if  len (msg ) >  512 :
228253            # posix guarantees that writes to a pipe of less than PIPE_BUF 
229254            # bytes are atomic, and that PIPE_BUF >= 512 
230255            raise  ValueError ('msg too long' )
231-         nbytes  =  os .write (self ._fd , msg )
232-         assert  nbytes  ==  len (msg ), "nbytes {0:n} but len(msg) {1:n}" .format (
233-             nbytes , len (msg ))
234256
257+         self ._ensure_running_and_write (msg )
235258
236259_resource_tracker  =  ResourceTracker ()
237260ensure_running  =  _resource_tracker .ensure_running 
0 commit comments