Skip to content

Commit 7927622

Browse files
authored
Merge pull request #4755 from lexming/non-block-reads
enable non-blocking reads for streaming outputs
2 parents 79dfbde + a3a8b33 commit 7927622

File tree

3 files changed

+17
-40
lines changed

3 files changed

+17
-40
lines changed

easybuild/tools/run.py

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
* Toon Willems (Ghent University)
3636
* Ward Poelmans (Ghent University)
3737
"""
38-
import fcntl
3938
import functools
4039
import inspect
4140
import locale
@@ -64,7 +63,7 @@
6463

6564
from easybuild.base import fancylogger
6665
from easybuild.tools.build_log import EasyBuildError, EasyBuildExit, CWD_NOTFOUND_ERROR
67-
from easybuild.tools.build_log import dry_run_msg, print_msg, time_str_since
66+
from easybuild.tools.build_log import dry_run_msg, time_str_since
6867
from easybuild.tools.config import build_option
6968
from easybuild.tools.hooks import RUN_SHELL_CMD, load_hooks, run_hook
7069
from easybuild.tools.output import COLOR_RED, COLOR_YELLOW, colorize, print_error
@@ -482,9 +481,6 @@ def to_cmd_str(cmd):
482481
if not hidden:
483482
_cmd_trace_msg(cmd_str, start_time, work_dir, stdin, tmpdir, thread_id, interactive=interactive)
484483

485-
if stream_output:
486-
print_msg(f"(streaming) output for command '{cmd_str}':")
487-
488484
# use bash as shell instead of the default /bin/sh used by subprocess.run
489485
# (which could be dash instead of bash, like on Ubuntu, see https://wiki.ubuntu.com/DashAsBinSh)
490486
# stick to None (default value) when not running command via a shell
@@ -511,17 +507,10 @@ def to_cmd_str(cmd):
511507
stdin = stdin.encode()
512508

513509
if stream_output or qa_patterns:
514-
515-
if qa_patterns:
516-
# make stdout, stderr, stdin non-blocking files
517-
channels = [proc.stdout, proc.stdin]
518-
if split_stderr:
519-
channels.append(proc.stderr)
520-
521-
for channel in channels:
522-
fd = channel.fileno()
523-
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
524-
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
510+
# enable non-blocking access to stdout, stderr, stdin
511+
for channel in (proc.stdout, proc.stdin, proc.stderr):
512+
if channel is not None:
513+
os.set_blocking(channel.fileno(), False)
525514

526515
if stdin:
527516
proc.stdin.write(stdin)
@@ -535,28 +524,16 @@ def to_cmd_str(cmd):
535524
time_no_match = 0
536525
prev_stdout = ''
537526

538-
# collect output piece-wise, while checking for questions to answer (if qa_patterns is provided)
539527
while exit_code is None:
540-
541-
# use small read size (128 bytes) when streaming output, to make it stream more fluently
542-
# -1 means reading until EOF
543-
read_size = 128 if exit_code is None else -1
544-
545-
# get output as long as output is available;
546-
# note: can't use proc.stdout.read without read_size argument,
547-
# since that will always wait until EOF
548-
more_stdout = True
549-
while more_stdout:
550-
more_stdout = proc.stdout.read(read_size) or b''
551-
_log.debug(f"Obtained more stdout: {more_stdout}")
552-
stdout += more_stdout
528+
# collect output line by line, while checking for questions to answer (if qa_patterns is provided)
529+
for line in iter(proc.stdout.readline, b''):
530+
_log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}")
531+
stdout += line
553532

554533
# note: we assume that there won't be any questions in stderr output
555534
if split_stderr:
556-
more_stderr = True
557-
while more_stderr:
558-
more_stderr = proc.stderr.read(read_size) or b''
559-
stderr += more_stderr
535+
for line in iter(proc.stderr.readline, b''):
536+
stderr += line
560537

561538
if qa_patterns:
562539
# only check for question patterns if additional output is available
@@ -575,17 +552,18 @@ def to_cmd_str(cmd):
575552
error_msg = "No matching questions found for current command output, "
576553
error_msg += f"giving up after {qa_timeout} seconds!"
577554
raise EasyBuildError(error_msg)
578-
else:
579-
_log.debug(f"{time_no_match:0.1f} seconds without match in output of interactive shell command")
555+
_log.debug(f"{time_no_match:0.1f} seconds without match in output of interactive shell command")
580556

581557
time.sleep(check_interval_secs)
582558

583559
exit_code = proc.poll()
584560

585561
# collect last bit of output once processed has exited
586-
stdout += proc.stdout.read()
562+
for line in iter(proc.stdout.readline, b''):
563+
_log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}")
564+
stdout += line
587565
if split_stderr:
588-
stderr += proc.stderr.read()
566+
stderr += proc.stderr.read() or b''
589567
else:
590568
(stdout, stderr) = proc.communicate(input=stdin)
591569

test/framework/options.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,6 @@ def test_zzz_logtostdout(self):
576576
self.mock_stdout(False)
577577

578578
self.assertIn("Auto-enabling streaming output", stdout)
579-
self.assertIn("== (streaming) output for command 'gcc toy.c -o toy':", stdout)
580579

581580
if os.path.exists(dummylogfn):
582581
os.remove(dummylogfn)

test/framework/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1703,7 +1703,7 @@ def test_run_shell_cmd_stream(self):
17031703
self.assertEqual(res.output, expected_output)
17041704

17051705
self.assertEqual(stderr, '')
1706-
expected = ("== (streaming) output for command 'echo hello" + '\n' + expected_output).split('\n')
1706+
expected = ("running shell command:\n\techo hello" + '\n' + expected_output).split('\n')
17071707
for line in expected:
17081708
self.assertIn(line, stdout)
17091709

0 commit comments

Comments
 (0)