Skip to content

Commit 7ca2d00

Browse files
committed
Merge remote-tracking branch 'origin/osx-ci-fixes'
* origin/osx-ci-fixes: issue #573: guard against a forked top-level Ansible process [linear2] simplify ClassicWorkerModel and fix repeat initialization issue #549 / [stream-refactor]: fix close/poller deregister crash on OSX issue #549: skip Docker tests if Docker is unavailable issue #549: remove Linux-specific assumptions from create_child_test issue #549: fix setrlimit() crash and hard-wire OS X default
2 parents c9d890b + d408cac commit 7ca2d00

File tree

4 files changed

+143
-84
lines changed

4 files changed

+143
-84
lines changed

ansible_mitogen/process.py

Lines changed: 82 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,22 @@ def increase_open_file_limit():
246246
limit is much higher.
247247
"""
248248
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
249-
if soft < hard:
250-
LOG.debug('raising soft open file limit from %d to %d', soft, hard)
251-
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
252-
else:
253-
LOG.debug('cannot increase open file limit; existing limit is %d', hard)
249+
LOG.debug('inherited open file limits: soft=%d hard=%d', soft, hard)
250+
if soft >= hard:
251+
LOG.debug('max open files already set to hard limit: %d', hard)
252+
return
253+
254+
# OS X is limited by kern.maxfilesperproc sysctl, rather than the
255+
# advertised unlimited hard RLIMIT_NOFILE. Just hard-wire known defaults
256+
# for that sysctl, to avoid the mess of querying it.
257+
for value in (hard, 10240):
258+
try:
259+
resource.setrlimit(resource.RLIMIT_NOFILE, (value, hard))
260+
LOG.debug('raised soft open file limit from %d to %d', soft, value)
261+
break
262+
except ValueError as e:
263+
LOG.debug('could not raise soft open file limit from %d to %d: %s',
264+
soft, value, e)
254265

255266

256267
def common_setup(enable_affinity=True, _init_logging=True):
@@ -388,6 +399,18 @@ def close(self):
388399

389400

390401
class ClassicWorkerModel(WorkerModel):
402+
#: In the top-level process, this references one end of a socketpair(),
403+
#: whose other end child MuxProcesses block reading from to determine when
404+
#: the master process dies. When the top-level exits abnormally, or
405+
#: normally but where :func:`_on_process_exit` has been called, this socket
406+
#: will be closed, causing all the children to wake.
407+
parent_sock = None
408+
409+
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
410+
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
411+
#: is closed.
412+
child_sock = None
413+
391414
#: mitogen.master.Router for this worker.
392415
router = None
393416

@@ -403,8 +426,40 @@ class ClassicWorkerModel(WorkerModel):
403426
parent = None
404427

405428
def __init__(self, _init_logging=True):
406-
self._init_logging = _init_logging
407-
self.initialized = False
429+
"""
430+
Arrange for classic model multiplexers to be started, if they are not
431+
already running.
432+
433+
The parent process picks a UNIX socket path each child will use prior
434+
to fork, creates a socketpair used essentially as a semaphore, then
435+
blocks waiting for the child to indicate the UNIX socket is ready for
436+
use.
437+
438+
:param bool _init_logging:
439+
For testing, if :data:`False`, don't initialize logging.
440+
"""
441+
# #573: The process ID that installed the :mod:`atexit` handler. If
442+
# some unknown Ansible plug-in forks the Ansible top-level process and
443+
# later performs a graceful Python exit, it may try to wait for child
444+
# PIDs it never owned, causing a crash. We want to avoid that.
445+
self._pid = os.getpid()
446+
447+
common_setup(_init_logging=_init_logging)
448+
449+
self.parent_sock, self.child_sock = socket.socketpair()
450+
mitogen.core.set_cloexec(self.parent_sock.fileno())
451+
mitogen.core.set_cloexec(self.child_sock.fileno())
452+
453+
self._muxes = [
454+
MuxProcess(self, index)
455+
for index in range(get_cpu_count(default=1))
456+
]
457+
for mux in self._muxes:
458+
mux.start()
459+
460+
atexit.register(self._on_process_exit)
461+
self.child_sock.close()
462+
self.child_sock = None
408463

409464
def _listener_for_name(self, name):
410465
"""
@@ -438,7 +493,7 @@ def _reconnect(self, path):
438493

439494
self.listener_path = path
440495

441-
def on_process_exit(self, sock):
496+
def _on_process_exit(self):
442497
"""
443498
This is an :mod:`atexit` handler installed in the top-level process.
444499
@@ -452,62 +507,34 @@ def on_process_exit(self, sock):
452507
MuxProcess, debug logs may appear on the user's terminal *after* the
453508
prompt has been printed.
454509
"""
510+
if self._pid != os.getpid():
511+
return
512+
455513
try:
456-
sock.shutdown(socket.SHUT_WR)
514+
self.parent_sock.shutdown(socket.SHUT_WR)
457515
except socket.error:
458516
# Already closed. This is possible when tests are running.
459-
LOG.debug('on_process_exit: ignoring duplicate call')
517+
LOG.debug('_on_process_exit: ignoring duplicate call')
460518
return
461519

462-
mitogen.core.io_op(sock.recv, 1)
463-
sock.close()
520+
mitogen.core.io_op(self.parent_sock.recv, 1)
521+
self.parent_sock.close()
464522

465523
for mux in self._muxes:
466524
_, status = os.waitpid(mux.pid, 0)
467525
status = mitogen.fork._convert_exit_status(status)
468526
LOG.debug('mux %d PID %d %s', mux.index, mux.pid,
469527
mitogen.parent.returncode_to_str(status))
470528

471-
def _initialize(self):
472-
"""
473-
Arrange for classic model multiplexers to be started, if they are not
474-
already running.
475-
476-
The parent process picks a UNIX socket path each child will use prior
477-
to fork, creates a socketpair used essentially as a semaphore, then
478-
blocks waiting for the child to indicate the UNIX socket is ready for
479-
use.
480-
481-
:param bool _init_logging:
482-
For testing, if :data:`False`, don't initialize logging.
483-
"""
484-
common_setup(_init_logging=self._init_logging)
485-
486-
MuxProcess.cls_parent_sock, \
487-
MuxProcess.cls_child_sock = socket.socketpair()
488-
mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno())
489-
mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno())
490-
491-
self._muxes = [
492-
MuxProcess(index)
493-
for index in range(get_cpu_count(default=1))
494-
]
495-
for mux in self._muxes:
496-
mux.start()
497-
498-
atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock)
499-
MuxProcess.cls_child_sock.close()
500-
MuxProcess.cls_child_sock = None
501-
502529
def _test_reset(self):
503530
"""
504531
Used to clean up in unit tests.
505532
"""
506533
# TODO: split this up a bit.
507534
global _classic_worker_model
508-
assert MuxProcess.cls_parent_sock is not None
509-
MuxProcess.cls_parent_sock.close()
510-
MuxProcess.cls_parent_sock = None
535+
assert self.parent_sock is not None
536+
self.parent_sock.close()
537+
self.parent_sock = None
511538
self.listener_path = None
512539
self.router = None
513540
self.parent = None
@@ -525,9 +552,6 @@ def on_strategy_start(self):
525552
"""
526553
See WorkerModel.on_strategy_start().
527554
"""
528-
if not self.initialized:
529-
self._initialize()
530-
self.initialized = True
531555

532556
def on_strategy_complete(self):
533557
"""
@@ -556,7 +580,6 @@ def on_binding_close(self):
556580
self.router = None
557581
self.broker = None
558582
self.listener_path = None
559-
self.initialized = False
560583

561584
# #420: Ansible executes "meta" actions in the top-level process,
562585
# meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs
@@ -587,25 +610,16 @@ class MuxProcess(object):
587610
See https://bugs.python.org/issue6721 for a thorough description of the
588611
class of problems this worker is intended to avoid.
589612
"""
590-
#: In the top-level process, this references one end of a socketpair(),
591-
#: whose other end child MuxProcesses block reading from to determine when
592-
#: the master process dies. When the top-level exits abnormally, or
593-
#: normally but where :func:`on_process_exit` has been called, this socket
594-
#: will be closed, causing all the children to wake.
595-
cls_parent_sock = None
596-
597-
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
598-
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
599-
#: is closed.
600-
cls_child_sock = None
601-
602613
#: A copy of :data:`os.environ` at the time the multiplexer process was
603614
#: started. It's used by mitogen_local.py to find changes made to the
604615
#: top-level environment (e.g. vars plugins -- issue #297) that must be
605616
#: applied to locally executed commands and modules.
606617
cls_original_env = None
607618

608-
def __init__(self, index):
619+
def __init__(self, model, index):
620+
#: :class:`ClassicWorkerModel` instance we were created by.
621+
self.model = model
622+
#: MuxProcess CPU index.
609623
self.index = index
610624
#: Individual path of this process.
611625
self.path = mitogen.unix.make_socket_path()
@@ -614,7 +628,7 @@ def start(self):
614628
self.pid = os.fork()
615629
if self.pid:
616630
# Wait for child to boot before continuing.
617-
mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1)
631+
mitogen.core.io_op(self.model.parent_sock.recv, 1)
618632
return
619633

620634
ansible_mitogen.logging.set_process_name('mux:' + str(self.index))
@@ -624,8 +638,8 @@ def start(self):
624638
os.path.basename(self.path),
625639
))
626640

627-
MuxProcess.cls_parent_sock.close()
628-
MuxProcess.cls_parent_sock = None
641+
self.model.parent_sock.close()
642+
self.model.parent_sock = None
629643
try:
630644
try:
631645
self.worker_main()
@@ -649,9 +663,9 @@ def worker_main(self):
649663

650664
try:
651665
# Let the parent know our listening socket is ready.
652-
mitogen.core.io_op(self.cls_child_sock.send, b('1'))
666+
mitogen.core.io_op(self.model.child_sock.send, b('1'))
653667
# Block until the socket is closed, which happens on parent exit.
654-
mitogen.core.io_op(self.cls_child_sock.recv, 1)
668+
mitogen.core.io_op(self.model.child_sock.recv, 1)
655669
finally:
656670
self.broker.shutdown()
657671
self.broker.join()

mitogen/core.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,12 +1628,17 @@ def on_shutdown(self, broker):
16281628
self.stream.on_disconnect(broker)
16291629

16301630
def on_disconnect(self, broker):
1631+
# Normally both sides an FD, so it is important that tranmit_side is
1632+
# deregistered from Poller before closing the receive side, as pollers
1633+
# like epoll and kqueue unregister all events on FD close, causing
1634+
# subsequent attempt to unregister the transmit side to fail.
16311635
LOG.debug('%r: disconnecting', self)
1632-
if self.stream.receive_side:
1633-
broker.stop_receive(self.stream)
1634-
self.stream.receive_side.close()
1636+
broker.stop_receive(self.stream)
16351637
if self.stream.transmit_side:
16361638
broker._stop_transmit(self.stream)
1639+
1640+
self.stream.receive_side.close()
1641+
if self.stream.transmit_side:
16371642
self.stream.transmit_side.close()
16381643

16391644

0 commit comments

Comments
 (0)