@@ -399,6 +399,18 @@ def close(self):
399399
400400
401401class ClassicWorkerModel (WorkerModel ):
402+ #: In the top-level process, this references one end of a socketpair(),
403+ #: whose other end child MuxProcesses block reading from to determine when
404+ #: the master process dies. When the top-level exits abnormally, or
405+ #: normally but where :func:`_on_process_exit` has been called, this socket
406+ #: will be closed, causing all the children to wake.
407+ parent_sock = None
408+
409+ #: In the mux process, this is the other end of :attr:`cls_parent_sock`.
410+ #: The main thread blocks on a read from it until :attr:`cls_parent_sock`
411+ #: is closed.
412+ child_sock = None
413+
402414 #: mitogen.master.Router for this worker.
403415 router = None
404416
@@ -414,8 +426,34 @@ class ClassicWorkerModel(WorkerModel):
414426 parent = None
415427
416428 def __init__ (self , _init_logging = True ):
417- self ._init_logging = _init_logging
418- self .initialized = False
429+ """
430+ Arrange for classic model multiplexers to be started, if they are not
431+ already running.
432+
433+ The parent process picks a UNIX socket path each child will use prior
434+ to fork, creates a socketpair used essentially as a semaphore, then
435+ blocks waiting for the child to indicate the UNIX socket is ready for
436+ use.
437+
438+ :param bool _init_logging:
439+ For testing, if :data:`False`, don't initialize logging.
440+ """
441+ common_setup (_init_logging = _init_logging )
442+
443+ self .parent_sock , self .child_sock = socket .socketpair ()
444+ mitogen .core .set_cloexec (self .parent_sock .fileno ())
445+ mitogen .core .set_cloexec (self .child_sock .fileno ())
446+
447+ self ._muxes = [
448+ MuxProcess (self , index )
449+ for index in range (get_cpu_count (default = 1 ))
450+ ]
451+ for mux in self ._muxes :
452+ mux .start ()
453+
454+ atexit .register (self ._on_process_exit , self .parent_sock )
455+ self .child_sock .close ()
456+ self .child_sock = None
419457
420458 def _listener_for_name (self , name ):
421459 """
@@ -449,7 +487,7 @@ def _reconnect(self, path):
449487
450488 self .listener_path = path
451489
452- def on_process_exit (self , sock ):
490+ def _on_process_exit (self , sock ):
453491 """
454492 This is an :mod:`atexit` handler installed in the top-level process.
455493
@@ -467,7 +505,7 @@ def on_process_exit(self, sock):
467505 sock .shutdown (socket .SHUT_WR )
468506 except socket .error :
469507 # Already closed. This is possible when tests are running.
470- LOG .debug ('on_process_exit : ignoring duplicate call' )
508+ LOG .debug ('_on_process_exit : ignoring duplicate call' )
471509 return
472510
473511 mitogen .core .io_op (sock .recv , 1 )
@@ -479,46 +517,15 @@ def on_process_exit(self, sock):
479517 LOG .debug ('mux %d PID %d %s' , mux .index , mux .pid ,
480518 mitogen .parent .returncode_to_str (status ))
481519
482- def _initialize (self ):
483- """
484- Arrange for classic model multiplexers to be started, if they are not
485- already running.
486-
487- The parent process picks a UNIX socket path each child will use prior
488- to fork, creates a socketpair used essentially as a semaphore, then
489- blocks waiting for the child to indicate the UNIX socket is ready for
490- use.
491-
492- :param bool _init_logging:
493- For testing, if :data:`False`, don't initialize logging.
494- """
495- common_setup (_init_logging = self ._init_logging )
496-
497- MuxProcess .cls_parent_sock , \
498- MuxProcess .cls_child_sock = socket .socketpair ()
499- mitogen .core .set_cloexec (MuxProcess .cls_parent_sock .fileno ())
500- mitogen .core .set_cloexec (MuxProcess .cls_child_sock .fileno ())
501-
502- self ._muxes = [
503- MuxProcess (index )
504- for index in range (get_cpu_count (default = 1 ))
505- ]
506- for mux in self ._muxes :
507- mux .start ()
508-
509- atexit .register (self .on_process_exit , MuxProcess .cls_parent_sock )
510- MuxProcess .cls_child_sock .close ()
511- MuxProcess .cls_child_sock = None
512-
513520 def _test_reset (self ):
514521 """
515522 Used to clean up in unit tests.
516523 """
517524 # TODO: split this up a bit.
518525 global _classic_worker_model
519- assert MuxProcess . cls_parent_sock is not None
520- MuxProcess . cls_parent_sock .close ()
521- MuxProcess . cls_parent_sock = None
526+ assert self . parent_sock is not None
527+ self . parent_sock .close ()
528+ self . parent_sock = None
522529 self .listener_path = None
523530 self .router = None
524531 self .parent = None
@@ -536,9 +543,6 @@ def on_strategy_start(self):
536543 """
537544 See WorkerModel.on_strategy_start().
538545 """
539- if not self .initialized :
540- self ._initialize ()
541- self .initialized = True
542546
543547 def on_strategy_complete (self ):
544548 """
@@ -567,7 +571,6 @@ def on_binding_close(self):
567571 self .router = None
568572 self .broker = None
569573 self .listener_path = None
570- self .initialized = False
571574
572575 # #420: Ansible executes "meta" actions in the top-level process,
573576 # meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs
@@ -598,25 +601,16 @@ class MuxProcess(object):
598601 See https://bugs.python.org/issue6721 for a thorough description of the
599602 class of problems this worker is intended to avoid.
600603 """
601- #: In the top-level process, this references one end of a socketpair(),
602- #: whose other end child MuxProcesses block reading from to determine when
603- #: the master process dies. When the top-level exits abnormally, or
604- #: normally but where :func:`on_process_exit` has been called, this socket
605- #: will be closed, causing all the children to wake.
606- cls_parent_sock = None
607-
608- #: In the mux process, this is the other end of :attr:`cls_parent_sock`.
609- #: The main thread blocks on a read from it until :attr:`cls_parent_sock`
610- #: is closed.
611- cls_child_sock = None
612-
613604 #: A copy of :data:`os.environ` at the time the multiplexer process was
614605 #: started. It's used by mitogen_local.py to find changes made to the
615606 #: top-level environment (e.g. vars plugins -- issue #297) that must be
616607 #: applied to locally executed commands and modules.
617608 cls_original_env = None
618609
619- def __init__ (self , index ):
610+ def __init__ (self , model , index ):
611+ #: :class:`ClassicWorkerModel` instance we were created by.
612+ self .model = model
613+ #: MuxProcess CPU index.
620614 self .index = index
621615 #: Individual path of this process.
622616 self .path = mitogen .unix .make_socket_path ()
@@ -625,7 +619,7 @@ def start(self):
625619 self .pid = os .fork ()
626620 if self .pid :
627621 # Wait for child to boot before continuing.
628- mitogen .core .io_op (MuxProcess . cls_parent_sock .recv , 1 )
622+ mitogen .core .io_op (self . model . parent_sock .recv , 1 )
629623 return
630624
631625 ansible_mitogen .logging .set_process_name ('mux:' + str (self .index ))
@@ -635,8 +629,8 @@ def start(self):
635629 os .path .basename (self .path ),
636630 ))
637631
638- MuxProcess . cls_parent_sock .close ()
639- MuxProcess . cls_parent_sock = None
632+ self . model . parent_sock .close ()
633+ self . model . parent_sock = None
640634 try :
641635 try :
642636 self .worker_main ()
@@ -660,9 +654,9 @@ def worker_main(self):
660654
661655 try :
662656 # Let the parent know our listening socket is ready.
663- mitogen .core .io_op (self .cls_child_sock .send , b ('1' ))
657+ mitogen .core .io_op (self .model . child_sock .send , b ('1' ))
664658 # Block until the socket is closed, which happens on parent exit.
665- mitogen .core .io_op (self .cls_child_sock .recv , 1 )
659+ mitogen .core .io_op (self .model . child_sock .recv , 1 )
666660 finally :
667661 self .broker .shutdown ()
668662 self .broker .join ()
0 commit comments