Skip to content

Commit c023586

Browse files
committed
[stream-refactor] don't abort Connection until all buffers are empty
1 parent 93342ba commit c023586

File tree

2 files changed

+64
-34
lines changed

2 files changed

+64
-34
lines changed

mitogen/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1554,7 +1554,7 @@ def on_receive(self, broker):
15541554
"""
15551555
buf = self.receive_side.read(self.protocol.read_size)
15561556
if not buf:
1557-
LOG.debug('%r: empty read, disconnecting', self)
1557+
LOG.debug('%r: empty read, disconnecting', self.receive_side)
15581558
return self.on_disconnect(broker)
15591559

15601560
self.protocol.on_receive(broker, buf)

mitogen/parent.py

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,8 +1227,8 @@ class Connection(object):
12271227
#: :class:`Process`
12281228
proc = None
12291229

1230-
#: :class:`mitogen.core.Stream`
1231-
stream = None
1230+
#: :class:`mitogen.core.Stream` with sides connected to stdin/stdout.
1231+
stdio_stream = None
12321232

12331233
#: If `proc.stderr` is set, referencing either a plain pipe or the
12341234
#: controlling TTY, this references the corresponding
@@ -1264,7 +1264,7 @@ def __init__(self, options, router):
12641264
self._router = router
12651265

12661266
def __repr__(self):
1267-
return 'Connection(%r)' % (self.stream,)
1267+
return 'Connection(%r)' % (self.stdio_stream,)
12681268

12691269
# Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
12701270
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter
@@ -1405,8 +1405,8 @@ def _adorn_eof_error(self, e):
14051405
def _complete_connection(self):
14061406
self.timer.cancel()
14071407
if not self.exception:
1408-
self._router.register(self.context, self.stream)
1409-
self.stream.set_protocol(
1408+
self._router.register(self.context, self.stdio_stream)
1409+
self.stdio_stream.set_protocol(
14101410
mitogen.core.MitogenProtocol(
14111411
router=self._router,
14121412
remote_id=self.context.context_id,
@@ -1419,11 +1419,11 @@ def _fail_connection(self, exc):
14191419
Fail the connection attempt.
14201420
"""
14211421
LOG.debug('%s: failing connection due to %r',
1422-
self.stream.name, exc)
1422+
self.stdio_stream.name, exc)
14231423
if self.exception is None:
14241424
self._adorn_eof_error(exc)
14251425
self.exception = exc
1426-
for stream in self.stream, self.stderr_stream:
1426+
for stream in self.stdio_stream, self.stderr_stream:
14271427
if stream and not stream.receive_side.closed:
14281428
stream.on_disconnect(self._router.broker)
14291429
self._complete_connection()
@@ -1433,24 +1433,52 @@ def on_stream_shutdown(self):
14331433
Request the slave gracefully shut itself down.
14341434
"""
14351435
LOG.debug('%r: requesting child shutdown', self)
1436-
self.stream.protocol._send(
1436+
self.stdio_stream.protocol._send(
14371437
mitogen.core.Message(
14381438
src_id=mitogen.context_id,
1439-
dst_id=self.stream.protocol.remote_id,
1439+
dst_id=self.stdio_stream.protocol.remote_id,
14401440
handle=mitogen.core.SHUTDOWN,
14411441
)
14421442
)
14431443

14441444
eof_error_msg = 'EOF on stream; last 100 lines received:\n'
14451445

1446-
def on_stream_disconnect(self):
1447-
if self.stderr_stream is not None:
1448-
self.stderr_stream.on_disconnect(self._router.broker)
1446+
def on_stdio_disconnect(self):
1447+
"""
1448+
Handle stdio stream disconnection by failing the Connection if the
1449+
stderr stream has already been closed. Otherwise, wait for it to close
1450+
(or timeout), to allow buffered diagnostic logs to be consumed.
1451+
1452+
It is normal that when a subprocess aborts, stdio has nothing buffered
1453+
when it is closed, thus signalling readability, causing an empty read
1454+
(interpreted as indicating disconnection) on the next loop iteration,
1455+
even if its stderr pipe has lots of diagnostic logs still buffered in
1456+
the kernel. Therefore we must wait for both pipes to indicate they are
1457+
empty before triggering connection failure.
1458+
"""
1459+
stderr = self.stderr_stream
1460+
if stderr is None or stderr.receive_side.closed:
1461+
self._on_streams_disconnected()
1462+
1463+
def on_stderr_disconnect(self):
1464+
"""
1465+
Inverse of :func:`on_stdio_disconnect`.
1466+
"""
1467+
if self.stdio_stream.receive_side.closed:
1468+
self._on_streams_disconnected()
1469+
1470+
def _on_streams_disconnected(self):
1471+
"""
1472+
When disconnection has been detected for both our streams, cancel the
1473+
connection timer, mark the connection failed, and reap the child
1474+
process. Do nothing if the timer has already been cancelled, indicating
1475+
some existing failure has already been noticed.
1476+
"""
14491477
if not self.timer.cancelled:
14501478
self.timer.cancel()
14511479
self._fail_connection(EofError(
14521480
self.eof_error_msg + get_history(
1453-
[self.stream, self.stderr_stream]
1481+
[self.stdio_stream, self.stderr_stream]
14541482
)
14551483
))
14561484
self.proc._async_reap(self, self._router)
@@ -1477,33 +1505,35 @@ def stream_factory(self):
14771505
def stderr_stream_factory(self):
14781506
return self.diag_protocol_class.build_stream()
14791507

1480-
def _setup_stream(self):
1481-
self.stream = self.stream_factory()
1482-
self.stream.conn = self
1483-
self.stream.name = self.options.name or self._get_name()
1484-
self.stream.accept(self.proc.stdout, self.proc.stdin)
1508+
def _setup_stdio_stream(self):
1509+
stream = self.stream_factory()
1510+
stream.conn = self
1511+
stream.name = self.options.name or self._get_name()
1512+
stream.accept(self.proc.stdout, self.proc.stdin)
14851513

1486-
mitogen.core.listen(self.stream, 'shutdown',
1487-
self.on_stream_shutdown)
1488-
mitogen.core.listen(self.stream, 'disconnect',
1489-
self.on_stream_disconnect)
1490-
self._router.broker.start_receive(self.stream)
1514+
mitogen.core.listen(stream, 'shutdown', self.on_stream_shutdown)
1515+
mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
1516+
self._router.broker.start_receive(stream)
1517+
return stream
14911518

14921519
def _setup_stderr_stream(self):
1493-
self.stderr_stream = self.stderr_stream_factory()
1494-
self.stderr_stream.conn = self
1495-
self.stderr_stream.name = self.options.name or self._get_name()
1496-
self.stderr_stream.accept(self.proc.stderr, self.proc.stderr)
1497-
self._router.broker.start_receive(self.stderr_stream)
1520+
stream = self.stderr_stream_factory()
1521+
stream.conn = self
1522+
stream.name = self.options.name or self._get_name()
1523+
stream.accept(self.proc.stderr, self.proc.stderr)
1524+
1525+
mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect)
1526+
self._router.broker.start_receive(stream)
1527+
return stream
14981528

14991529
def _async_connect(self):
15001530
self._start_timer()
1501-
self._setup_stream()
1531+
self.stdio_stream = self._setup_stdio_stream()
15021532
if self.context.name is None:
1503-
self.context.name = self.stream.name
1504-
self.proc.name = self.stream.name
1533+
self.context.name = self.stdio_stream.name
1534+
self.proc.name = self.stdio_stream.name
15051535
if self.proc.stderr:
1506-
self._setup_stderr_stream()
1536+
self.stderr_stream = self._setup_stderr_stream()
15071537

15081538
def connect(self, context):
15091539
LOG.debug('%r.connect()', self)
@@ -2181,7 +2211,7 @@ def _connect(self, klass, **kwargs):
21812211
except mitogen.core.TimeoutError:
21822212
raise mitogen.core.StreamError(self.connection_timeout_msg)
21832213

2184-
self.route_monitor.notice_stream(conn.stream)
2214+
self.route_monitor.notice_stream(conn.stdio_stream)
21852215
return context
21862216

21872217
def connect(self, method_name, name=None, **kwargs):

0 commit comments

Comments
 (0)