Skip to content

Commit f4cee16

Browse files
committed
parent: zombie reaping v3
Improvements: - Refactored off Process, separately testable without a connection - Don't delay Broker shutdown indefinitely for detached children
1 parent 709a0c0 commit f4cee16

File tree

4 files changed

+206
-86
lines changed

4 files changed

+206
-86
lines changed

docs/internals.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ Child Implementation
218218
Process Management
219219
==================
220220

221+
.. currentmodule:: mitogen.parent
222+
.. autoclass:: Reaper
223+
:members:
224+
221225
.. currentmodule:: mitogen.parent
222226
.. autoclass:: Process
223227
:members:

mitogen/parent.py

Lines changed: 169 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,7 +1364,22 @@ class Connection(object):
13641364
#: Prefix given to default names generated by :meth:`connect`.
13651365
name_prefix = u'local'
13661366

1367-
timer = None
1367+
#: :class:`Timer` that runs :meth:`_on_timer_expired` when connection
1368+
#: timeout occurs.
1369+
_timer = None
1370+
1371+
#: When disconnection completes, instance of :class:`Reaper` used to wait
1372+
#: on the exit status of the subprocess.
1373+
_reaper = None
1374+
1375+
#: On failure, the exception object that should be propagated back to the
1376+
#: user.
1377+
exception = None
1378+
1379+
#: Extra text appended to :class:`EofError` if that exception is raised on
1380+
#: a failed connection attempt. May be used in subclasses to hint at common
1381+
#: problems with a particular connection method.
1382+
eof_error_hint = None
13681383

13691384
def __init__(self, options, router):
13701385
#: :class:`Options`
@@ -1499,8 +1514,6 @@ def start_child(self):
14991514
msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args))
15001515
raise mitogen.core.StreamError(msg)
15011516

1502-
eof_error_hint = None
1503-
15041517
def _adorn_eof_error(self, e):
15051518
"""
15061519
Subclasses may provide additional information in the case of a failed
@@ -1509,10 +1522,8 @@ def _adorn_eof_error(self, e):
15091522
if self.eof_error_hint:
15101523
e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),)
15111524

1512-
exception = None
1513-
15141525
def _complete_connection(self):
1515-
self.timer.cancel()
1526+
self._timer.cancel()
15161527
if not self.exception:
15171528
mitogen.core.unlisten(self._router.broker, 'shutdown',
15181529
self._on_broker_shutdown)
@@ -1569,19 +1580,36 @@ def on_stderr_disconnect(self):
15691580

15701581
def _on_streams_disconnected(self):
15711582
"""
1572-
When disconnection has been detected for both our streams, cancel the
1583+
When disconnection has been detected for both streams, cancel the
15731584
connection timer, mark the connection failed, and reap the child
15741585
process. Do nothing if the timer has already been cancelled, indicating
15751586
some existing failure has already been noticed.
15761587
"""
1577-
if not self.timer.cancelled:
1578-
self.timer.cancel()
1588+
if self._timer.active:
1589+
self._timer.cancel()
15791590
self._fail_connection(EofError(
15801591
self.eof_error_msg + get_history(
15811592
[self.stdio_stream, self.stderr_stream]
15821593
)
15831594
))
1584-
self.proc._async_reap(self, self._router)
1595+
1596+
if self._reaper:
1597+
return
1598+
1599+
self._reaper = Reaper(
1600+
broker=self._router.broker,
1601+
proc=self.proc,
1602+
kill=not (
1603+
(self.detached and self.child_is_immediate_subprocess) or
1604+
# Avoid killing so child has chance to write cProfile data
1605+
self._router.profiling
1606+
),
1607+
# Don't delay shutdown waiting for a detached child, since the
1608+
# detached child may expect to live indefinitely after its parent
1609+
# exited.
1610+
wait_on_shutdown=(not self.detached),
1611+
)
1612+
self._reaper.reap()
15851613

15861614
def _on_broker_shutdown(self):
15871615
"""
@@ -1590,20 +1618,6 @@ def _on_broker_shutdown(self):
15901618
"""
15911619
self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG))
15921620

1593-
def _start_timer(self):
1594-
self.timer = self._router.broker.timers.schedule(
1595-
when=self.options.connect_deadline,
1596-
func=self._on_timer_expired,
1597-
)
1598-
1599-
def _on_timer_expired(self):
1600-
self._fail_connection(
1601-
mitogen.core.TimeoutError(
1602-
'Failed to setup connection after %.2f seconds',
1603-
self.options.connect_timeout,
1604-
)
1605-
)
1606-
16071621
def stream_factory(self):
16081622
return self.stream_protocol_class.build_stream(
16091623
broker=self._router.broker,
@@ -1632,12 +1646,23 @@ def _setup_stderr_stream(self):
16321646
self._router.broker.start_receive(stream)
16331647
return stream
16341648

1649+
def _on_timer_expired(self):
1650+
self._fail_connection(
1651+
mitogen.core.TimeoutError(
1652+
'Failed to setup connection after %.2f seconds',
1653+
self.options.connect_timeout,
1654+
)
1655+
)
1656+
16351657
def _async_connect(self):
16361658
LOG.debug('creating connection to context %d using %s',
16371659
self.context.context_id, self.__class__.__module__)
16381660
mitogen.core.listen(self._router.broker, 'shutdown',
16391661
self._on_broker_shutdown)
1640-
self._start_timer()
1662+
self._timer = self._router.broker.timers.schedule(
1663+
when=self.options.connect_deadline,
1664+
func=self._on_timer_expired,
1665+
)
16411666

16421667
try:
16431668
self.proc = self.start_child()
@@ -2464,12 +2489,121 @@ def ssh(self, **kwargs):
24642489
return self.connect(u'ssh', **kwargs)
24652490

24662491

2467-
class Process(object):
2492+
class Reaper(object):
2493+
"""
2494+
Asynchronous logic for reaping :class:`Process` objects. This is necessary
2495+
to prevent uncontrolled buildup of zombie processes in long-lived parents
2496+
that will eventually reach an OS limit, preventing creation of new threads
2497+
and processes, and to log the exit status of the child in the case of an
2498+
error.
2499+
2500+
To avoid modifying process-global state such as with
2501+
:func:`signal.set_wakeup_fd` or installing a :data:`signal.SIGCHLD` handler
2502+
that might interfere with the user's ability to use those facilities,
2503+
Reaper polls for exit with backoff using timers installed on an associated
2504+
:class:`Broker`.
2505+
2506+
:param mitogen.core.Broker broker:
2507+
The :class:`Broker` on which to install timers
2508+
:param Process proc:
2509+
The process to reap.
2510+
:param bool kill:
2511+
If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process.
2512+
:param bool wait_on_shutdown:
2513+
If :data:`True`, delay :class:`Broker` shutdown if child has not yet
2514+
exited. If :data:`False` simply forget the child.
24682515
"""
2469-
Process objects contain asynchronous logic for reaping children, and
2470-
keeping track of their stdio descriptors.
2516+
#: :class:`Timer` that invokes :meth:`reap` after some polling delay.
2517+
_timer = None
2518+
2519+
def __init__(self, broker, proc, kill, wait_on_shutdown):
2520+
self.broker = broker
2521+
self.proc = proc
2522+
self.kill = kill
2523+
self.wait_on_shutdown = wait_on_shutdown
2524+
self._tries = 0
2525+
2526+
def _signal_child(self, signum):
2527+
# For processes like sudo we cannot actually send sudo a signal,
2528+
# because it is setuid, so this is best-effort only.
2529+
LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
2530+
try:
2531+
os.kill(self.proc.pid, signum)
2532+
except OSError:
2533+
e = sys.exc_info()[1]
2534+
if e.args[0] != errno.EPERM:
2535+
raise
24712536

2472-
This base class is extended by :class:`PopenProcess` and
2537+
def _calc_delay(self, count):
2538+
"""
2539+
Calculate a poll delay given `count` attempts have already been made.
2540+
These constants have no principle, they just produce rapid but still
2541+
relatively conservative retries.
2542+
"""
2543+
delay = 0.05
2544+
for _ in xrange(count):
2545+
delay *= 1.72
2546+
return delay
2547+
2548+
def _on_broker_shutdown(self):
2549+
"""
2550+
Respond to :class:`Broker` shutdown by cancelling the reap timer if
2551+
:attr:`Router.await_children_at_shutdown` is disabled. Otherwise
2552+
shutdown is delayed for up to :attr:`Broker.shutdown_timeout` for
2553+
subprocesses may have no intention of exiting any time soon.
2554+
"""
2555+
if not self.wait_on_shutdown:
2556+
self._timer.cancel()
2557+
2558+
def _install_timer(self, delay):
2559+
new = self._timer is None
2560+
self._timer = self.broker.timers.schedule(
2561+
when=time.time() + delay,
2562+
func=self.reap,
2563+
)
2564+
if new:
2565+
mitogen.core.listen(self.broker, 'shutdown',
2566+
self._on_broker_shutdown)
2567+
2568+
def _remove_timer(self):
2569+
if self._timer and self._timer.active:
2570+
self._timer.cancel()
2571+
mitogen.core.unlisten(self.broker, 'shutdown',
2572+
self._on_broker_shutdown)
2573+
2574+
def reap(self):
2575+
"""
2576+
Reap the child process during disconnection.
2577+
"""
2578+
status = self.proc.poll()
2579+
if status is not None:
2580+
LOG.debug('%r: %s', self.proc, returncode_to_str(status))
2581+
self._remove_timer()
2582+
return
2583+
2584+
self._tries += 1
2585+
if self._tries > 20:
2586+
LOG.warning('%r: child will not exit, giving up', self)
2587+
self._remove_timer()
2588+
return
2589+
2590+
delay = self._calc_delay(self._tries - 1)
2591+
LOG.debug('%r still running after IO disconnect, recheck in %.03fs',
2592+
self.proc, delay)
2593+
self._install_timer(delay)
2594+
2595+
if not self.kill:
2596+
pass
2597+
elif self._tries == 1:
2598+
self._signal_child(signal.SIGTERM)
2599+
elif self._tries == 5: # roughly 4 seconds
2600+
self._signal_child(signal.SIGKILL)
2601+
2602+
2603+
class Process(object):
2604+
"""
2605+
Process objects provide a uniform interface to the :mod:`subprocess` and
2606+
:mod:`mitogen.fork`. This class is extended by :class:`PopenProcess` and
24732607
:class:`mitogen.fork.Process`.
24742608
24752609
:param int pid:
@@ -2481,16 +2615,19 @@ class Process(object):
24812615
:param file stderr:
24822616
File object attached to standard error, or :data:`None`.
24832617
"""
2484-
_delays = [0.05, 0.15, 0.3, 1.0, 5.0, 10.0]
2618+
#: Name of the process used in logs. Set to the stream/context name by
2619+
#: :class:`Connection`.
24852620
name = None
24862621

24872622
def __init__(self, pid, stdin, stdout, stderr=None):
2623+
#: The process ID.
24882624
self.pid = pid
2625+
#: File object attached to standard input.
24892626
self.stdin = stdin
2627+
#: File object attached to standard output.
24902628
self.stdout = stdout
2629+
#: File object attached to standard error.
24912630
self.stderr = stderr
2492-
self._returncode = None
2493-
self._reap_count = 0
24942631

24952632
def __repr__(self):
24962633
return '%s %s pid %d' % (
@@ -2510,56 +2647,6 @@ def poll(self):
25102647
"""
25112648
raise NotImplementedError()
25122649

2513-
def _signal_child(self, signum):
2514-
# For processes like sudo we cannot actually send sudo a signal,
2515-
# because it is setuid, so this is best-effort only.
2516-
LOG.debug('%r: child process still alive, sending %s',
2517-
self, SIGNAL_BY_NUM[signum])
2518-
try:
2519-
os.kill(self.pid, signum)
2520-
except OSError:
2521-
e = sys.exc_info()[1]
2522-
if e.args[0] != errno.EPERM:
2523-
raise
2524-
2525-
def _async_reap(self, conn, router):
2526-
"""
2527-
Reap the child process during disconnection.
2528-
"""
2529-
if self._returncode is not None:
2530-
# on_disconnect() may be invoked more than once, for example, if
2531-
# there is still a pending message to be sent after the first
2532-
# on_disconnect() call.
2533-
return
2534-
2535-
if conn.detached and conn.child_is_immediate_subprocess:
2536-
LOG.debug('%r: immediate child is detached, won\'t reap it', self)
2537-
return
2538-
2539-
if router.profiling:
2540-
LOG.info('%r: wont kill child because profiling=True', self)
2541-
return
2542-
2543-
self._reap_count += 1
2544-
status = self.poll()
2545-
if status is not None:
2546-
LOG.debug('%r: %s', self, returncode_to_str(status))
2547-
return
2548-
2549-
i = self._reap_count - 1
2550-
if i >= len(self._delays):
2551-
LOG.warning('%r: child will not die, abandoning it', self)
2552-
return
2553-
elif i == 0:
2554-
self._signal_child(signal.SIGTERM)
2555-
elif i == 1:
2556-
self._signal_child(signal.SIGKILL)
2557-
2558-
router.broker.timers.schedule(
2559-
when=time.time() + self._delays[i],
2560-
func=lambda: self._async_reap(conn, router),
2561-
)
2562-
25632650

25642651
class PopenProcess(Process):
25652652
"""

tests/connection_test.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11

2-
import time
3-
import tempfile
4-
import sys
52
import os
3+
import signal
4+
import sys
5+
import tempfile
66
import threading
7+
import time
78

89
import unittest2
910
import testlib
@@ -44,5 +45,33 @@ def thread():
4445
self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0])
4546

4647

48+
@mitogen.core.takes_econtext
49+
def do_detach(econtext):
50+
econtext.detach()
51+
while 1:
52+
time.sleep(1)
53+
logging.getLogger('mitogen').error('hi')
54+
55+
56+
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
57+
def test_subprocess_preserved_on_shutdown(self):
58+
c1 = self.router.local()
59+
pid = c1.call(os.getpid)
60+
61+
l = mitogen.core.Latch()
62+
mitogen.core.listen(c1, 'disconnect', l.put)
63+
c1.call_no_reply(do_detach)
64+
l.get()
65+
66+
self.broker.shutdown()
67+
self.broker.join()
68+
69+
os.kill(pid, 0) # succeeds if process still alive
70+
71+
# now clean up
72+
os.kill(pid, signal.SIGTERM)
73+
os.waitpid(pid, 0)
74+
75+
4776
if __name__ == '__main__':
4877
unittest2.main()

0 commit comments

Comments
 (0)