Skip to content

Commit 24ec572

Browse files
committed
fix long stdout full pipe buffer deadlock error
1 parent 0fff8bb commit 24ec572

File tree

1 file changed

+80
-18
lines changed

1 file changed

+80
-18
lines changed

src/_nebari/utils.py

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import re
77
import secrets
8+
import selectors
89
import signal
910
import string
1011
import subprocess
@@ -46,6 +47,77 @@ def change_directory(directory):
4647
os.chdir(current_directory)
4748

4849

50+
def strip_ansi_errors(line):
51+
"""Strips ANSI escape codes from a string."""
52+
ansi_escape = re.compile(rb"\x1b\[[0-9;]*[mK]")
53+
return ansi_escape.sub(b"", line)
54+
55+
56+
def process_streams(
57+
process, line_prefix, strip_errors, print_stdout=True, print_stderr=True
58+
):
59+
sel = selectors.DefaultSelector()
60+
sel.register(process.stdout, selectors.EVENT_READ, data="stdout")
61+
if process.stderr and process.stderr != process.stdout:
62+
sel.register(process.stderr, selectors.EVENT_READ, data="stderr")
63+
64+
outputs = {"stdout": [], "stderr": []}
65+
partial = {"stdout": b"", "stderr": b""}
66+
67+
try:
68+
while True:
69+
events = sel.select(timeout=0.1)
70+
if not events and process.poll() is not None:
71+
# Handle any remaining partial output
72+
for stream_name in ["stdout", "stderr"]:
73+
if partial[stream_name]:
74+
line = partial[stream_name]
75+
if strip_errors:
76+
line = strip_ansi_errors(line)
77+
outputs[stream_name].append(line)
78+
break
79+
80+
for key, _ in events:
81+
data = key.fileobj.read1(8192)
82+
if not data:
83+
sel.unregister(key.fileobj)
84+
continue
85+
86+
stream_name = key.data
87+
chunk = partial[stream_name] + data
88+
lines = chunk.split(b"\n")
89+
partial[stream_name] = lines[-1]
90+
91+
for line in lines[:-1]:
92+
line_w_newline = line + b"\n"
93+
if strip_errors:
94+
line_w_newline = strip_ansi_errors(line_w_newline)
95+
96+
# Handle stdout
97+
if stream_name == "stdout":
98+
if print_stdout:
99+
sys.stdout.buffer.write(line_prefix + line_w_newline)
100+
sys.stdout.flush()
101+
else:
102+
outputs["stdout"].append(line_w_newline)
103+
104+
# Handle stderr
105+
if stream_name == "stderr":
106+
if print_stderr:
107+
sys.stderr.buffer.write(line_prefix + line_w_newline)
108+
sys.stderr.flush()
109+
else:
110+
outputs["stderr"].append(line_w_newline)
111+
finally:
112+
sel.close()
113+
if process.stdout:
114+
process.stdout.close()
115+
if process.stderr:
116+
process.stderr.close()
117+
118+
return outputs["stdout"], outputs["stderr"]
119+
120+
49121
def run_subprocess_cmd(processargs, prefix=b"", capture_output=False, **kwargs):
50122
"""Runs subprocess command with realtime stdout logging with optional line prefix."""
51123
if prefix:
@@ -71,6 +143,7 @@ def run_subprocess_cmd(processargs, prefix=b"", capture_output=False, **kwargs):
71143
stderr=stderr_stream,
72144
preexec_fn=os.setsid,
73145
)
146+
74147
# Set timeout thread
75148
timeout_timer = None
76149
if timeout > 0:
@@ -84,25 +157,14 @@ def kill_process():
84157
timeout_timer = threading.Timer(timeout, kill_process)
85158
timeout_timer.start()
86159

87-
print_stream = process.stderr if capture_output else process.stdout
88-
for line in iter(lambda: print_stream.readline(), b""):
89-
full_line = line_prefix + line
90-
if strip_errors:
91-
full_line = full_line.decode("utf-8")
92-
full_line = re.sub(
93-
r"\x1b\[31m", "", full_line
94-
) # Remove red ANSI escape code
95-
full_line = full_line.encode("utf-8")
96-
97-
sys.stdout.buffer.write(full_line)
98-
sys.stdout.flush()
99-
print_stream.close()
100-
101-
output = []
102160
if capture_output:
103-
for line in iter(lambda: process.stdout.readline(), b""):
104-
output.append(line)
105-
process.stdout.close()
161+
output, _ = process_streams(
162+
process, line_prefix, strip_errors, print_stdout=False, print_stderr=True
163+
)
164+
else:
165+
process_streams(
166+
process, line_prefix, strip_errors, print_stdout=True, print_stderr=True
167+
)
106168

107169
if timeout_timer is not None:
108170
timeout_timer.cancel()

0 commit comments

Comments
 (0)