Skip to content

Commit 17bee70

Browse files
committed
mitogen: Fix BlockingIOError & EAGAIN in subprocess stdio
Mitogen was leaving the stdout and stderr of subprocesses in non-blocking mode. When Python code ran in the remote process created by Mitogen calls such as `print(long_string)` or `os.stout.write(bigger_than_the_buffer)` sometimes raised `BlockingIOError`, or similar. This change - Removes code in `mitogen.core.Side` that set blocking/non-blocking mode - Adds blocking/non-blocking control to `os.mitogen.pipe()` and a new function `mitogen.core.socketpair()` - Replaces `mitogen.core.set_block` and `mitogen.core.set_nonblock` with `mitogen.core.set_blocking`, mirroring `os.set_blocking` - Updates call sites as appropriate - Adds tests for new functions and arguments - Adds a regression test for subprocess stdio blocking/non-blocking fixes #712
1 parent 4529a21 commit 17bee70

File tree

15 files changed

+211
-50
lines changed

15 files changed

+211
-50
lines changed

ansible_mitogen/process.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ def __init__(self, _init_logging=True):
426426

427427
common_setup(_init_logging=_init_logging)
428428

429-
self.parent_sock, self.child_sock = socket.socketpair()
429+
self.parent_sock, self.child_sock = mitogen.core.socketpair()
430430
mitogen.core.set_cloexec(self.parent_sock.fileno())
431431
mitogen.core.set_cloexec(self.child_sock.fileno())
432432

docs/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ In progress (unreleased)
2323

2424
* :gh:issue:`1318` CI: Abbreviate Github Actions job names
2525
* :gh:issue:`1309` :mod:`ansible_mitogen`: Fix ``become_method: doas``
26+
* :gh:issue:`712` :mod:`mitogen`: Fix :exc:`BlockingIOError` & ``EAGAIN``
27+
errors in subprocesses that write to stdio
2628

2729

2830
v0.3.25 (2025-07-29)

mitogen/core.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ def _path_importer_cache(cls, path):
8787
import weakref
8888
import zlib
8989

90+
if sys.version_info > (3,5):
91+
from os import get_blocking, set_blocking
92+
else:
93+
def get_blocking(fd):
94+
return not fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_NONBLOCK
95+
96+
def set_blocking(fd, blocking):
97+
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
98+
if blocking: fcntl.fcntl(fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
99+
else: fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
100+
90101
try:
91102
# Python >= 3.4, PEP 451 ModuleSpec API
92103
import importlib.machinery
@@ -559,26 +570,6 @@ def set_cloexec(fd):
559570
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
560571

561572

562-
def set_nonblock(fd):
563-
"""
564-
Set the file descriptor `fd` to non-blocking mode. For most underlying file
565-
types, this causes :func:`os.read` or :func:`os.write` to raise
566-
:class:`OSError` with :data:`errno.EAGAIN` rather than block the thread
567-
when the underlying kernel buffer is exhausted.
568-
"""
569-
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
570-
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
571-
572-
573-
def set_block(fd):
574-
"""
575-
Inverse of :func:`set_nonblock`, i.e. cause `fd` to block the thread when
576-
the underlying kernel buffer is exhausted.
577-
"""
578-
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
579-
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
580-
581-
582573
def io_op(func, *args):
583574
"""
584575
Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`,
@@ -720,20 +711,30 @@ def import_module(modname):
720711
return __import__(modname, None, None, [''])
721712

722713

723-
def pipe():
714+
def pipe(blocking=None):
724715
"""
725716
Create a UNIX pipe pair using :func:`os.pipe`, wrapping the returned
726717
descriptors in Python file objects in order to manage their lifetime and
727718
ensure they are closed when their last reference is discarded and they have
728719
not been closed explicitly.
729720
"""
730721
rfd, wfd = os.pipe()
722+
for fd in rfd, wfd:
723+
if blocking is not None: set_blocking(fd, blocking) # noqa: E701
731724
return (
732725
os.fdopen(rfd, 'rb', 0),
733726
os.fdopen(wfd, 'wb', 0)
734727
)
735728

736729

730+
def socketpair(blocking=None):
731+
fp1, fp2 = socket.socketpair()
732+
for fp in fp1, fp2:
733+
fd = fp.fileno()
734+
if blocking is not None: set_blocking(fd, blocking) # noqa: E701
735+
return fp1, fp2
736+
737+
737738
def iter_split(buf, delim, func):
738739
"""
739740
Invoke `func(s)` for each `delim`-delimited chunk in the potentially large
@@ -1879,8 +1880,7 @@ def accept(self, rfp, wfp):
18791880
"""
18801881
Attach a pair of file objects to :attr:`receive_side` and
18811882
:attr:`transmit_side`, after wrapping them in :class:`Side` instances.
1882-
:class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
1883-
on the underlying file descriptors during construction.
1883+
:class:`Side` will call :func:`set_cloexec` on them.
18841884
18851885
The same file object may be used for both sides. The default
18861886
:meth:`on_disconnect` is handles the possibility that only one
@@ -2155,14 +2155,11 @@ class Side(object):
21552155
:param bool keep_alive:
21562156
If :data:`True`, the continued existence of this side will extend the
21572157
shutdown grace period until it has been unregistered from the broker.
2158-
:param bool blocking:
2159-
If :data:`False`, the descriptor has its :data:`os.O_NONBLOCK` flag
2160-
enabled using :func:`fcntl.fcntl`.
21612158
"""
21622159
_fork_refs = weakref.WeakValueDictionary()
21632160
closed = False
21642161

2165-
def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False):
2162+
def __init__(self, stream, fp, cloexec=True, keep_alive=True):
21662163
#: The :class:`Stream` for which this is a read or write side.
21672164
self.stream = stream
21682165
# File or socket object responsible for the lifetime of its underlying
@@ -2180,8 +2177,6 @@ def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False):
21802177
self._fork_refs[id(self)] = self
21812178
if cloexec:
21822179
set_cloexec(self.fd)
2183-
if not blocking:
2184-
set_nonblock(self.fd)
21852180

21862181
def __repr__(self):
21872182
return '<Side of %s fd %s>' % (
@@ -2785,7 +2780,7 @@ def _get_socketpair(self):
27852780
try:
27862781
return self._cls_idle_socketpairs.pop() # pop() must be atomic
27872782
except IndexError:
2788-
rsock, wsock = socket.socketpair()
2783+
rsock, wsock = socketpair()
27892784
rsock.setblocking(False)
27902785
set_cloexec(rsock.fileno())
27912786
set_cloexec(wsock.fileno())
@@ -2958,7 +2953,8 @@ class Waker(Protocol):
29582953
@classmethod
29592954
def build_stream(cls, broker):
29602955
stream = super(Waker, cls).build_stream(broker)
2961-
stream.accept(*pipe())
2956+
rfp, wfp = pipe(blocking=False)
2957+
stream.accept(rfp, wfp)
29622958
return stream
29632959

29642960
def __init__(self, broker):
@@ -3056,7 +3052,8 @@ def build_stream(cls, name, dest_fd):
30563052
prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
30573053
<socket.socket.shutdown>` on it.
30583054
"""
3059-
rsock, wsock = socket.socketpair()
3055+
# Leave wsock & dest_fd blocking, so the subprocess will have sane stdio
3056+
rsock, wsock = socketpair()
30603057
os.dup2(wsock.fileno(), dest_fd)
30613058
stream = super(IoLoggerProtocol, cls).build_stream(name)
30623059
stream.name = name
@@ -4038,6 +4035,9 @@ def _setup_master(self):
40384035
local_id=self.config['context_id'],
40394036
parent_ids=self.config['parent_ids']
40404037
)
4038+
for f in in_fp, out_fp:
4039+
fd = f.fileno()
4040+
set_blocking(fd, False)
40414041
self.stream.accept(in_fp, out_fp)
40424042
self.stream.name = 'parent'
40434043
self.stream.receive_side.keep_alive = False

mitogen/fakessh.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ def __init__(self, router, stdin, stdout, proc=None):
179179
self.control_handle = router.add_handler(self._on_control)
180180
self.stdin_handle = router.add_handler(self._on_stdin)
181181
self.pump = IoPump.build_stream(router.broker)
182+
for fp in stdin, stdout:
183+
fd = fp.fileno()
184+
mitogen.core.set_blocking(fd, False)
182185
self.pump.accept(stdin, stdout)
183186
self.stdin = None
184187
self.control = None
@@ -419,10 +422,11 @@ def run(dest, router, args, deadline=None, econtext=None):
419422
fakessh = mitogen.parent.Context(router, context_id)
420423
fakessh.name = u'fakessh.%d' % (context_id,)
421424

422-
sock1, sock2 = socket.socketpair()
425+
sock1, sock2 = mitogen.core.socketpair()
423426

424427
stream = mitogen.core.Stream(router, context_id)
425428
stream.name = u'fakessh'
429+
mitogen.core.set_blocking(sock1.fileno(), False)
426430
stream.accept(sock1, sock1)
427431
router.register(fakessh, stream)
428432

mitogen/fork.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ def _child_main(self, childfp):
211211
on_fork()
212212
if self.options.on_fork:
213213
self.options.on_fork()
214-
mitogen.core.set_block(childfp.fileno())
214+
mitogen.core.set_blocking(childfp.fileno(), True)
215215

216216
childfp.send(b('MITO002\n'))
217217

mitogen/os_fork.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import weakref
3939

4040
import mitogen.core
41+
import mitogen.parent
4142

4243

4344
# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
@@ -131,9 +132,9 @@ def _cork_one(self, s, obj):
131132
`obj` to be written to by one of its threads.
132133
"""
133134
rsock, wsock = mitogen.parent.create_socketpair(size=4096)
135+
mitogen.core.set_blocking(wsock.fileno(), True) # gevent
134136
mitogen.core.set_cloexec(rsock.fileno())
135137
mitogen.core.set_cloexec(wsock.fileno())
136-
mitogen.core.set_block(wsock) # gevent
137138
self._rsocks.append(rsock)
138139
obj.defer(self._do_cork, s, wsock)
139140

mitogen/parent.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def disable_echo(fd):
265265
termios.tcsetattr(fd, flags, new)
266266

267267

268-
def create_socketpair(size=None):
268+
def create_socketpair(size=None, blocking=None):
269269
"""
270270
Create a :func:`socket.socketpair` for use as a child's UNIX stdio
271271
channels. As socketpairs are bidirectional, they are economical on file
@@ -276,14 +276,14 @@ def create_socketpair(size=None):
276276
if size is None:
277277
size = mitogen.core.CHUNK_SIZE
278278

279-
parentfp, childfp = socket.socketpair()
279+
parentfp, childfp = mitogen.core.socketpair(blocking)
280280
for fp in parentfp, childfp:
281281
fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
282282

283283
return parentfp, childfp
284284

285285

286-
def create_best_pipe(escalates_privilege=False):
286+
def create_best_pipe(escalates_privilege=False, blocking=None):
287287
"""
288288
By default we prefer to communicate with children over a UNIX socket, as a
289289
single file descriptor can represent bidirectional communication, and a
@@ -301,16 +301,19 @@ def create_best_pipe(escalates_privilege=False):
301301
:param bool escalates_privilege:
302302
If :data:`True`, the target program may escalate privileges, causing
303303
SELinux to disconnect AF_UNIX sockets, so avoid those.
304+
:param None|bool blocking:
305+
If :data:`False` or :data:`True`, set non-blocking or blocking mode.
306+
If :data:`None` (default), use default.
304307
:returns:
305308
`(parent_rfp, child_wfp, child_rfp, parent_wfp)`
306309
"""
307310
if (not escalates_privilege) or (not SELINUX_ENABLED):
308-
parentfp, childfp = create_socketpair()
311+
parentfp, childfp = create_socketpair(blocking=blocking)
309312
return parentfp, childfp, childfp, parentfp
310313

311-
parent_rfp, child_wfp = mitogen.core.pipe()
314+
parent_rfp, child_wfp = mitogen.core.pipe(blocking)
312315
try:
313-
child_rfp, parent_wfp = mitogen.core.pipe()
316+
child_rfp, parent_wfp = mitogen.core.pipe(blocking)
314317
return parent_rfp, child_wfp, child_rfp, parent_wfp
315318
except:
316319
parent_rfp.close()
@@ -481,7 +484,7 @@ def openpty():
481484
if not IS_SOLARIS:
482485
disable_echo(master_fd)
483486
disable_echo(slave_fd)
484-
mitogen.core.set_block(slave_fd)
487+
mitogen.core.set_blocking(slave_fd, True)
485488
return master_fp, slave_fp
486489

487490

@@ -547,8 +550,8 @@ def hybrid_tty_create_child(args, escalates_privilege=False):
547550
escalates_privilege=escalates_privilege,
548551
)
549552
try:
550-
mitogen.core.set_block(child_rfp)
551-
mitogen.core.set_block(child_wfp)
553+
mitogen.core.set_blocking(child_rfp.fileno(), True)
554+
mitogen.core.set_blocking(child_wfp.fileno(), True)
552555
proc = popen(
553556
args=args,
554557
stdin=child_rfp,
@@ -1643,6 +1646,9 @@ def _setup_stdio_stream(self):
16431646
stream = self.stream_factory()
16441647
stream.conn = self
16451648
stream.name = self.options.name or self._get_name()
1649+
for fp in self.proc.stdout, self.proc.stdin:
1650+
fd = fp.fileno()
1651+
mitogen.core.set_blocking(fd, False)
16461652
stream.accept(self.proc.stdout, self.proc.stdin)
16471653

16481654
mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
@@ -1653,6 +1659,8 @@ def _setup_stderr_stream(self):
16531659
stream = self.stderr_stream_factory()
16541660
stream.conn = self
16551661
stream.name = self.options.name or self._get_name()
1662+
fd = self.proc.stderr.fileno()
1663+
mitogen.core.set_blocking(fd, False)
16561664
stream.accept(self.proc.stderr, self.proc.stderr)
16571665

16581666
mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect)

mitogen/unix.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def build_stream(cls, router, path=None, backlog=100):
111111
sock.listen(backlog)
112112

113113
stream = super(Listener, cls).build_stream(router, path)
114+
mitogen.core.set_blocking(sock.fileno(), False)
114115
stream.accept(sock, sock)
115116
router.broker.start_receive(stream)
116117
return stream
@@ -169,6 +170,7 @@ def on_accept_client(self, sock):
169170
auth_id=mitogen.context_id,
170171
)
171172
stream.name = u'unix_client.%d' % (pid,)
173+
mitogen.core.set_blocking(sock.fileno(), False)
172174
stream.accept(sock, sock)
173175
LOG.debug('listener: accepted connection from PID %d: %s',
174176
pid, stream.name)

tests/blocking_test.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import os
2+
import tempfile
3+
4+
import mitogen.core
5+
6+
import testlib
7+
8+
class BlockingIOTest(testlib.TestCase):
9+
def setUp(self):
10+
super(BlockingIOTest, self).setUp()
11+
self.fp = tempfile.TemporaryFile()
12+
self.fd = self.fp.fileno()
13+
14+
def tearDown(self):
15+
self.fp.close()
16+
super(BlockingIOTest, self).tearDown()
17+
18+
def test_get_blocking(self):
19+
if hasattr(os, 'get_blocking'):
20+
self.assertEqual(
21+
os.get_blocking(self.fd), mitogen.core.get_blocking(self.fd),
22+
)
23+
self.assertTrue(mitogen.core.get_blocking(self.fd) is True)
24+
25+
def test_set_blocking(self):
26+
mitogen.core.set_blocking(self.fd, False)
27+
if hasattr(os, 'get_blocking'):
28+
self.assertEqual(
29+
os.get_blocking(self.fd), mitogen.core.get_blocking(self.fd),
30+
)
31+
self.assertTrue(mitogen.core.get_blocking(self.fd) is False)
32+

tests/create_child_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ def test_dev_tty_open_succeeds(self):
190190
proc = self.func([
191191
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
192192
])
193-
mitogen.core.set_block(proc.stdin.fileno())
193+
mitogen.core.set_blocking(proc.stdin.fileno(), True)
194194
# read(3) below due to https://bugs.python.org/issue37696
195195
self.assertEqual(mitogen.core.b('hi\n'), proc.stdin.read(3))
196196
waited_pid, status = os.waitpid(proc.pid, 0)

0 commit comments

Comments
 (0)