Skip to content

Commit 35b2802

Browse files
authored
Merge pull request Pyomo#3679 from jsiirola/tee-deadlock-win
Resolve `capture_output(capture_fd=True)` deadlock on Windows
2 parents a780912 + 07b8716 commit 35b2802

File tree

2 files changed

+116
-27
lines changed

2 files changed

+116
-27
lines changed

pyomo/common/tee.py

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,17 @@
3737
# ~(13.1 * #threads) seconds
3838
_poll_timeout = 1 # 14 rounds: 0.0001 * 2**14 == 1.6384
3939
_poll_timeout_deadlock = 100 # seconds
40-
40+
_pipe_buffersize = 1 << 16 # 65536
4141
_noop = lambda: None
4242
_mswindows = sys.platform.startswith('win')
4343
try:
4444
if _mswindows:
4545
from msvcrt import get_osfhandle
46-
from win32pipe import PeekNamedPipe
4746
from win32file import ReadFile
47+
from win32pipe import FdCreatePipe, PeekNamedPipe, SetNamedPipeHandleState
48+
49+
# This constant from Microsoft SetNamedPipeHandleState documentation:
50+
PIPE_NOWAIT = 1
4851
else:
4952
from select import select
5053
_peek_available = True
@@ -59,9 +62,47 @@ def __init__(self, ostream, handle):
5962
super().__setattr__('_ostream', ostream)
6063
super().__setattr__('_handle', handle)
6164

62-
def flush(self):
63-
self._ostream.flush()
64-
self._handle.flush = True
65+
if _mswindows:
66+
# Because we are setting the pipe to be non-blocking in Windows,
67+
# it is possible that calls to flush() and write() will raise
68+
# BlockingIOError. We will catch and retry. In addition, we
69+
# will chunk the data into pieces that should be well below the
70+
# pipe buffer size (so we should avoid deadlock)
71+
72+
def _retry(self, fcn, *args, retries=10):
73+
# Attempting to write to the pipe in the testing harness
74+
# occasionally raises OSError ("No space left on disk") or
75+
# BlockingIOError (when the write would be truncated). We
76+
# will re-try after a brief pause.
77+
failCount = 0
78+
while 1:
79+
try:
80+
fcn(*args)
81+
break
82+
except (OSError, BlockingIOError):
83+
failCount += 1
84+
if failCount >= retries:
85+
raise
86+
time.sleep(_poll_rampup_limit / (retries - 1))
87+
88+
def flush(self):
89+
self._retry(self._ostream.flush)
90+
self._handle.flush = True
91+
92+
def write(self, data):
93+
chunksize = _pipe_buffersize >> 1 # 1/2 the buffer size
94+
for i in range(0, len(data), chunksize):
95+
self._retry(self._ostream.write, data[i : i + chunksize])
96+
97+
def writelines(self, data):
98+
for line in data:
99+
self.write(line)
100+
101+
else:
102+
103+
def flush(self):
104+
self._ostream.flush()
105+
self._handle.flush = True
65106

66107
def __getattr__(self, attr):
67108
return getattr(self._ostream, attr)
@@ -71,13 +112,27 @@ def __setattr__(self, attr, val):
71112

72113

73114
class _AutoFlush(_SignalFlush):
74-
def write(self, data):
75-
self._ostream.write(data)
76-
self.flush()
115+
if _mswindows:
116+
# Because we define write() and writelines() under windows, we
117+
# need to make sure that _AutoFlush calls them
77118

78-
def writelines(self, data):
79-
self._ostream.writelines(data)
80-
self.flush()
119+
def write(self, data):
120+
super().write(data)
121+
self.flush()
122+
123+
def writelines(self, data):
124+
super().writelines(data)
125+
self.flush()
126+
127+
else:
128+
129+
def write(self, data):
130+
self._ostream.write(data)
131+
self.flush()
132+
133+
def writelines(self, data):
134+
self._ostream.writelines(data)
135+
self.flush()
81136

82137

83138
class _fd_closer(object):
@@ -163,7 +218,7 @@ def __enter__(self):
163218
# inheritable if it is stdout/stderr
164219
os.dup2(out_fd, self.fd, inheritable=bool(self.std))
165220

166-
# We no longer need this original file descriptor
221+
# We no longer need the original file descriptor
167222
if not isinstance(self.target, int):
168223
os.close(out_fd)
169224

@@ -283,7 +338,9 @@ def _exit_context_stack(self, et, ev, tb):
283338
cm.__exit__(et, ev, tb)
284339
except:
285340
_stack = self.context_stack
286-
FAIL.append(f"{sys.exc_info()[1]} ({len(_stack)+1}: {cm}@{id(cm):x})")
341+
FAIL.append(
342+
f"{sys.exc_info()[0].__name__}: {sys.exc_info()[1]} ({len(_stack)+1}: {cm}@{id(cm):x})"
343+
)
287344
return FAIL
288345

289346
def __enter__(self):
@@ -492,7 +549,33 @@ def __init__(self, mode, buffering, encoding, newline):
492549
self.buffering = buffering
493550
self.newlines = newline
494551
self.flush = False
495-
self.read_pipe, self.write_pipe = os.pipe()
552+
if _peek_available and _mswindows:
553+
# This is a re-implementation of os.pipe() on Windows so
554+
# that we can explicitly request a larger pipe buffer (64k;
555+
# matching *NIX). Per the docs: on Windows, the pipe buffer
556+
# should automatically grow if needed. However, we have
557+
# observed (see #3658) that if it happens, it can cause
558+
# deadlock when clients write directly to the underlying
559+
# file descriptor. By explicitly requesting a larger buffer
560+
# from the outset, we reduce the likelihood of needing to
561+
# reallocate the buffer.
562+
self.read_pipe, self.write_pipe = FdCreatePipe(
563+
None, _pipe_buffersize, os.O_BINARY if 'b' in mode else os.O_TEXT
564+
)
565+
self.read_pyhandle = get_osfhandle(self.read_pipe)
566+
self.write_pyhandle = get_osfhandle(self.write_pipe)
567+
# Because reallocating the pipe buffer can cause deadlock
568+
# (at least in the context in which we are using pipes
569+
# here), we will set the write pipe to NOWAIT. This will
570+
# guarantee that we don't deadlock, but at the cost of
571+
# possibly losing some output (the fprintf() to the FD will
572+
# return a number of bytes written less than the string that
573+
# was passed. If the client is ignoring the return value,
574+
# then *poof*: the output is truncated)
575+
SetNamedPipeHandleState(self.write_pyhandle, PIPE_NOWAIT, None, None)
576+
else:
577+
self.read_pipe, self.write_pipe = os.pipe()
578+
496579
if not buffering and 'b' not in mode:
497580
# While we support "unbuffered" behavior in text mode,
498581
# python does not
@@ -535,9 +618,11 @@ def close(self):
535618
self.write_file = None
536619

537620
if self.write_pipe is not None:
538-
# If someone else has closed the file descriptor, then
539-
# python raises an OSError
621+
# Close the write side of the pipe: the reader thread is
622+
# waiting for the EOF so that it can shut down.
540623
try:
624+
# If someone else has closed the file descriptor, then
625+
# python raises an OSError
541626
os.close(self.write_pipe)
542627
except OSError:
543628
pass
@@ -550,8 +635,8 @@ def finalize(self, ostreams):
550635
self.decodeIncomingBuffer()
551636
if ostreams:
552637
self.writeOutputBuffer(ostreams, True)
638+
# Close the read side of the pipe.
553639
os.close(self.read_pipe)
554-
555640
if self.decoder_buffer:
556641
logger.error(
557642
"Stream handle closed with un-decoded characters "
@@ -827,7 +912,7 @@ def _mergedReader(self):
827912
if _mswindows:
828913
for handle in list(handles):
829914
try:
830-
pipe = get_osfhandle(handle.read_pipe)
915+
pipe = handle.read_pyhandle
831916
numAvail = PeekNamedPipe(pipe, 0)[1]
832917
if numAvail:
833918
result, new_data = ReadFile(pipe, numAvail, None)

pyomo/common/tests/test_tee.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,16 @@ def flush(self):
5151

5252
def check(self, *bases):
5353
"""Map the recorded times to {0, 1} based on the range of times
54-
recorded: anything in the first half of the range is mapped to
55-
0, and anything in the second half is mapped to 1. This
54+
recorded: anything in the first two-thirds of the range is mapped to
55+
0, and anything in the last third is mapped to 1. This
5656
"discretizes" the times so that we can reliably compare to
5757
baselines.
5858
5959
"""
6060

6161
n = list(itertools.chain(*self.buf))
62-
mid = (min(n) + max(n)) / 2.0
63-
result = [tuple(0 if i < mid else 1 for i in _) for _ in self.buf]
62+
cutoff = min(n) + (max(n) - min(n)) * 2.0 / 3.0
63+
result = [tuple(0 if i < cutoff else 1 for i in _) for _ in self.buf]
6464
if result not in bases:
6565
base = ' or '.join(str(_) for _ in bases)
6666
self.error = f"result {result} != baseline {base}\nRaw timing: {self.buf}"
@@ -303,6 +303,7 @@ def test_handle_prematurely_closed(self):
303303
class TestCapture(unittest.TestCase):
304304
def setUp(self):
305305
self.streams = sys.stdout, sys.stderr
306+
self.fd = [os.dup(stream.fileno()) for stream in self.streams]
306307
self.reenable_gc = gc.isenabled()
307308
gc.disable()
308309
gc.collect()
@@ -313,6 +314,8 @@ def setUp(self):
313314

314315
def tearDown(self):
315316
sys.stdout, sys.stderr = self.streams
317+
os.dup2(self.fd[0], self.streams[0].fileno())
318+
os.dup2(self.fd[1], self.streams[1].fileno())
316319
sys.setswitchinterval(self.switchinterval)
317320
if self.reenable_gc:
318321
gc.enable()
@@ -531,14 +534,14 @@ def test_no_fileno_stdout(self):
531534
self.assertEqual(len(T.context_stack), 2)
532535
T = tee.capture_output(capture_fd=True)
533536
# out & err point to something other than fd 1 and 2
534-
sys.stdout = os.fdopen(os.dup(1), closefd=True)
535-
sys.stderr = os.fdopen(os.dup(2), closefd=True)
537+
sys.stdout = os.fdopen(os.dup(1), 'w', closefd=True)
538+
sys.stderr = os.fdopen(os.dup(2), 'w', closefd=True)
536539
with sys.stdout, sys.stderr:
537540
with T:
538541
self.assertEqual(len(T.context_stack), 8)
539542
# out & err point to fd 1 and 2
540-
sys.stdout = os.fdopen(1, closefd=False)
541-
sys.stderr = os.fdopen(2, closefd=False)
543+
sys.stdout = os.fdopen(1, 'w', closefd=False)
544+
sys.stderr = os.fdopen(2, 'w', closefd=False)
542545
with sys.stdout, sys.stderr:
543546
with T:
544547
self.assertEqual(len(T.context_stack), 6)
@@ -781,7 +784,8 @@ def test_buffered_stdout_long_message(self):
781784
ts = timestamper()
782785
ts.write(f"{time.time()}")
783786
with tee.TeeStream(ts, ts) as t, tee.capture_output(t.STDOUT, capture_fd=fd):
784-
sys.stdout.write(f"{time.time()}" + ' ' * 4096 + "\n")
787+
# Note: bigger than the buffer we allocate on Windows.
788+
sys.stdout.write(f"{time.time()}" + ' ' * tee._pipe_buffersize + "\n")
785789
time.sleep(self.dt)
786790
ts.write(f"{time.time()}")
787791
if not ts.check([(0, 0), (0, 0), (0, 0), (1, 1)]):

0 commit comments

Comments
 (0)