diff --git a/CHANGELOG.md b/CHANGELOG.md index 27cdb5b19..09f27ec98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ - Added `Cmd.macro_arg_complete()` which tab completes arguments to a macro. Its default behavior is to perform path completion, but it can be overridden as needed. + - Bug Fixes + - No longer redirecting `sys.stdout` if it's a different stream than `self.stdout`. This + fixes issue where we overwrote an application's `sys.stdout` while redirecting. + ## 2.7.0 (June 30, 2025) - Enhancements diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index d9bc4abc6..5337f1449 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -13,8 +13,8 @@ Easy transcript-based testing of applications (see examples/example.py) Bash-style ``select`` available -Note that redirection with > and | will only work if `self.poutput()` -is used in place of `print`. +Note, if self.stdout is different than sys.stdout, then redirection with > and | +will only work if `self.poutput()` is used in place of `print`. - Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com @@ -200,8 +200,6 @@ def __init__(self) -> None: self.readline_settings = _SavedReadlineSettings() self.readline_module: Optional[ModuleType] = None self.history: list[str] = [] - self.sys_stdout: Optional[TextIO] = None - self.sys_stdin: Optional[TextIO] = None # Contains data about a disabled command which is used to restore its original functions when the command is enabled @@ -2854,9 +2852,12 @@ def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState: """ import subprocess + # Only redirect sys.stdout if it's the same as self.stdout + stdouts_match = self.stdout == sys.stdout + # Initialize the redirection saved state redir_saved_state = utils.RedirectionSavedState( - cast(TextIO, self.stdout), sys.stdout, self._cur_pipe_proc_reader, self._redirecting + cast(TextIO, self.stdout), stdouts_match, self._cur_pipe_proc_reader, self._redirecting ) # The ProcReader for this command @@ -2912,7 +2913,10 @@ def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState: raise RedirectionError(f'Pipe process exited with code {proc.returncode} before command could run') redir_saved_state.redirecting = True # type: ignore[unreachable] cmd_pipe_proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr) - sys.stdout = self.stdout = new_stdout + + self.stdout = new_stdout + if stdouts_match: + sys.stdout = self.stdout elif statement.output: if statement.output_to: @@ -2926,7 +2930,10 @@ def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState: raise RedirectionError('Failed to redirect output') from ex redir_saved_state.redirecting = True - sys.stdout = self.stdout = new_stdout + + self.stdout = new_stdout + if stdouts_match: + sys.stdout = self.stdout else: # Redirecting to a paste buffer @@ -2944,7 +2951,10 @@ def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState: # create a temporary file to store output new_stdout = cast(TextIO, tempfile.TemporaryFile(mode="w+")) # noqa: SIM115 redir_saved_state.redirecting = True - sys.stdout = self.stdout = new_stdout + + self.stdout = new_stdout + if stdouts_match: + sys.stdout = self.stdout if statement.output == constants.REDIRECTION_APPEND: self.stdout.write(current_paste_buffer) @@ -2974,7 +2984,8 @@ def _restore_output(self, statement: Statement, saved_redir_state: utils.Redirec # Restore the stdout values self.stdout = cast(TextIO, saved_redir_state.saved_self_stdout) - sys.stdout = cast(TextIO, saved_redir_state.saved_sys_stdout) + if saved_redir_state.stdouts_match: + sys.stdout = self.stdout # Check if we need to wait for the process being piped to if self._cur_pipe_proc_reader is not None: @@ -4449,12 +4460,6 @@ def _set_up_py_shell_env(self, interp: InteractiveConsole) -> _SavedCmd2Env: # Set up sys module for the Python console self._reset_py_display() - cmd2_env.sys_stdout = sys.stdout - sys.stdout = self.stdout # type: ignore[assignment] - - cmd2_env.sys_stdin = sys.stdin - sys.stdin = self.stdin # type: ignore[assignment] - return cmd2_env def _restore_cmd2_env(self, cmd2_env: _SavedCmd2Env) -> None: @@ -4462,9 +4467,6 @@ def _restore_cmd2_env(self, cmd2_env: _SavedCmd2Env) -> None: :param cmd2_env: the environment settings to restore """ - sys.stdout = cmd2_env.sys_stdout # type: ignore[assignment] - sys.stdin = cmd2_env.sys_stdin # type: ignore[assignment] - # Set up readline for cmd2 if rl_type != RlType.NONE: # Save py's history diff --git a/cmd2/py_bridge.py b/cmd2/py_bridge.py index 2a147583c..fe3405239 100644 --- a/cmd2/py_bridge.py +++ b/cmd2/py_bridge.py @@ -4,10 +4,7 @@ """ import sys -from contextlib import ( - redirect_stderr, - redirect_stdout, -) +from contextlib import redirect_stderr from typing import ( IO, TYPE_CHECKING, @@ -113,6 +110,9 @@ def __call__(self, command: str, *, echo: Optional[bool] = None) -> CommandResul if echo is None: echo = self.cmd_echo + # Only capture sys.stdout if it's the same stream as self.stdout + stdouts_match = self._cmd2_app.stdout == sys.stdout + # This will be used to capture _cmd2_app.stdout and sys.stdout copy_cmd_stdout = StdSim(cast(Union[TextIO, StdSim], self._cmd2_app.stdout), echo=echo) @@ -126,8 +126,12 @@ def __call__(self, command: str, *, echo: Optional[bool] = None) -> CommandResul stop = False try: - self._cmd2_app.stdout = cast(TextIO, copy_cmd_stdout) - with redirect_stdout(cast(IO[str], copy_cmd_stdout)), redirect_stderr(cast(IO[str], copy_stderr)): + with self._cmd2_app.sigint_protection: + self._cmd2_app.stdout = cast(TextIO, copy_cmd_stdout) + if stdouts_match: + sys.stdout = self._cmd2_app.stdout + + with redirect_stderr(cast(IO[str], copy_stderr)): stop = self._cmd2_app.onecmd_plus_hooks( command, add_to_history=self._add_to_history, @@ -136,6 +140,9 @@ def __call__(self, command: str, *, echo: Optional[bool] = None) -> CommandResul finally: with self._cmd2_app.sigint_protection: self._cmd2_app.stdout = cast(IO[str], copy_cmd_stdout.inner_stream) + if stdouts_match: + sys.stdout = self._cmd2_app.stdout + self.stop = stop or self.stop # Save the result diff --git a/cmd2/utils.py b/cmd2/utils.py index 1c3506e6b..da01ff390 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -683,23 +683,23 @@ class RedirectionSavedState: def __init__( self, self_stdout: Union[StdSim, TextIO], - sys_stdout: Union[StdSim, TextIO], + stdouts_match: bool, pipe_proc_reader: Optional[ProcReader], saved_redirecting: bool, ) -> None: """RedirectionSavedState initializer. :param self_stdout: saved value of Cmd.stdout - :param sys_stdout: saved value of sys.stdout + :param stdouts_match: True if Cmd.stdout is equal to sys.stdout :param pipe_proc_reader: saved value of Cmd._cur_pipe_proc_reader :param saved_redirecting: saved value of Cmd._redirecting. """ # Tells if command is redirecting self.redirecting = False - # Used to restore values after redirection ends + # Used to restore stdout values after redirection ends self.saved_self_stdout = self_stdout - self.saved_sys_stdout = sys_stdout + self.stdouts_match = stdouts_match # Used to restore values after command ends regardless of whether the command redirected self.saved_pipe_proc_reader = pipe_proc_reader diff --git a/tests/conftest.py b/tests/conftest.py index 253d9fcd4..35bb90e6f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,10 +2,7 @@ import argparse import sys -from contextlib import ( - redirect_stderr, - redirect_stdout, -) +from contextlib import redirect_stderr from typing import ( Optional, Union, @@ -116,8 +113,9 @@ def normalize(block): def run_cmd(app, cmd): """Clear out and err StdSim buffers, run the command, and return out and err""" - saved_sysout = sys.stdout - sys.stdout = app.stdout + + # Only capture sys.stdout if it's the same stream as self.stdout + stdouts_match = app.stdout == sys.stdout # This will be used to capture app.stdout and sys.stdout copy_cmd_stdout = StdSim(app.stdout) @@ -127,11 +125,14 @@ def run_cmd(app, cmd): try: app.stdout = copy_cmd_stdout - with redirect_stdout(copy_cmd_stdout), redirect_stderr(copy_stderr): + if stdouts_match: + sys.stdout = app.stdout + with redirect_stderr(copy_stderr): app.onecmd_plus_hooks(cmd) finally: app.stdout = copy_cmd_stdout.inner_stream - sys.stdout = saved_sysout + if stdouts_match: + sys.stdout = app.stdout out = copy_cmd_stdout.getvalue() err = copy_stderr.getvalue() diff --git a/tests/pyscript/stdout_capture.py b/tests/pyscript/stdout_capture.py index 5cc0cf3a4..7cc6641c6 100644 --- a/tests/pyscript/stdout_capture.py +++ b/tests/pyscript/stdout_capture.py @@ -1,25 +1,4 @@ -# This script demonstrates when output of a command finalization hook is captured by a pyscript app() call -import sys - -# The unit test framework passes in the string being printed by the command finalization hook -hook_output = sys.argv[1] - -# Run a help command which results in 1 call to onecmd_plus_hooks -res = app('help') - -# hook_output will not be captured because there are no nested calls to onecmd_plus_hooks -if hook_output not in res.stdout: - print("PASSED") -else: - print("FAILED") - -# Run the last command in the history -res = app('history -r -1') - -# All output of the history command will be captured. This includes all output of the commands -# started in do_history() using onecmd_plus_hooks(), including any output in those commands' hooks. -# Therefore we expect the hook_output to show up this time. -if hook_output in res.stdout: - print("PASSED") -else: - print("FAILED") +# This script demonstrates that cmd2 can capture sys.stdout and self.stdout when both point to the same stream. +# Set base_app.self_in_py to True before running this script. +print("print") +self.poutput("poutput") diff --git a/tests/test_argparse_completer.py b/tests/test_argparse_completer.py index f6561321a..b6713e879 100644 --- a/tests/test_argparse_completer.py +++ b/tests/test_argparse_completer.py @@ -15,10 +15,7 @@ argparse_custom, with_argparser, ) -from cmd2.utils import ( - StdSim, - align_right, -) +from cmd2.utils import align_right from .conftest import ( complete_tester, @@ -334,9 +331,7 @@ def do_standalone(self, args: argparse.Namespace) -> None: @pytest.fixture def ac_app(): - app = ArgparseCompleterTester() - app.stdout = StdSim(app.stdout) - return app + return ArgparseCompleterTester() @pytest.mark.parametrize('command', ['music', 'music create', 'music create rock', 'music create jazz']) diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index fa9ee561a..751797231 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -627,38 +627,85 @@ def do_passthrough(self, _) -> NoReturn: base_app.onecmd_plus_hooks('passthrough') -def test_output_redirection(base_app) -> None: +class RedirectionApp(cmd2.Cmd): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def do_print_output(self, _: str) -> None: + """Print output to sys.stdout and self.stdout..""" + print("print") + self.poutput("poutput") + + def do_print_feedback(self, _: str) -> None: + """Call pfeedback.""" + self.pfeedback("feedback") + + +@pytest.fixture +def redirection_app(): + return RedirectionApp() + + +def test_output_redirection(redirection_app) -> None: fd, filename = tempfile.mkstemp(prefix='cmd2_test', suffix='.txt') os.close(fd) try: # Verify that writing to a file works - run_cmd(base_app, f'help > {filename}') + run_cmd(redirection_app, f'print_output > {filename}') + with open(filename) as f: + lines = f.read().splitlines() + assert lines[0] == "print" + assert lines[1] == "poutput" + + # Verify that appending to a file also works + run_cmd(redirection_app, f'print_output >> {filename}') + with open(filename) as f: + lines = f.read().splitlines() + assert lines[0] == "print" + assert lines[1] == "poutput" + assert lines[2] == "print" + assert lines[3] == "poutput" + finally: + os.remove(filename) + + +def test_output_redirection_custom_stdout(redirection_app) -> None: + """sys.stdout should not redirect if it's different than self.stdout.""" + fd, filename = tempfile.mkstemp(prefix='cmd2_test', suffix='.txt') + os.close(fd) + + redirection_app.stdout = io.StringIO() + try: + # Verify that we only see output written to self.stdout + run_cmd(redirection_app, f'print_output > {filename}') with open(filename) as f: - content = f.read() - verify_help_text(base_app, content) + lines = f.read().splitlines() + assert "print" not in lines + assert lines[0] == "poutput" # Verify that appending to a file also works - run_cmd(base_app, f'help history >> {filename}') + run_cmd(redirection_app, f'print_output >> {filename}') with open(filename) as f: - appended_content = f.read() - assert appended_content.startswith(content) - assert len(appended_content) > len(content) + lines = f.read().splitlines() + assert "print" not in lines + assert lines[0] == "poutput" + assert lines[1] == "poutput" finally: os.remove(filename) -def test_output_redirection_to_nonexistent_directory(base_app) -> None: +def test_output_redirection_to_nonexistent_directory(redirection_app) -> None: filename = '~/fakedir/this_does_not_exist.txt' - out, err = run_cmd(base_app, f'help > {filename}') + out, err = run_cmd(redirection_app, f'print_output > {filename}') assert 'Failed to redirect' in err[0] - out, err = run_cmd(base_app, f'help >> {filename}') + out, err = run_cmd(redirection_app, f'print_output >> {filename}') assert 'Failed to redirect' in err[0] -def test_output_redirection_to_too_long_filename(base_app) -> None: +def test_output_redirection_to_too_long_filename(redirection_app) -> None: filename = ( '~/sdkfhksdjfhkjdshfkjsdhfkjsdhfkjdshfkjdshfkjshdfkhdsfkjhewfuihewiufhweiufhiweufhiuewhiuewhfiuwehfia' 'ewhfiuewhfiuewhfiuewhiuewhfiuewhfiuewfhiuwehewiufhewiuhfiweuhfiuwehfiuewfhiuwehiuewfhiuewhiewuhfiueh' @@ -667,93 +714,86 @@ def test_output_redirection_to_too_long_filename(base_app) -> None: 'whfieuwfhieufhiuewhfeiuwfhiuefhueiwhfw' ) - out, err = run_cmd(base_app, f'help > {filename}') + out, err = run_cmd(redirection_app, f'print_output > {filename}') assert 'Failed to redirect' in err[0] - out, err = run_cmd(base_app, f'help >> {filename}') + out, err = run_cmd(redirection_app, f'print_output >> {filename}') assert 'Failed to redirect' in err[0] -def test_feedback_to_output_true(base_app) -> None: - base_app.feedback_to_output = True - base_app.timing = True +def test_feedback_to_output_true(redirection_app) -> None: + redirection_app.feedback_to_output = True f, filename = tempfile.mkstemp(prefix='cmd2_test', suffix='.txt') os.close(f) try: - run_cmd(base_app, f'help > {filename}') + run_cmd(redirection_app, f'print_feedback > {filename}') with open(filename) as f: - content = f.readlines() - assert content[-1].startswith('Elapsed: ') + content = f.read().splitlines() + assert "feedback" in content finally: os.remove(filename) -def test_feedback_to_output_false(base_app) -> None: - base_app.feedback_to_output = False - base_app.timing = True +def test_feedback_to_output_false(redirection_app) -> None: + redirection_app.feedback_to_output = False f, filename = tempfile.mkstemp(prefix='feedback_to_output', suffix='.txt') os.close(f) try: - out, err = run_cmd(base_app, f'help > {filename}') + out, err = run_cmd(redirection_app, f'print_feedback > {filename}') with open(filename) as f: - content = f.readlines() - assert not content[-1].startswith('Elapsed: ') - assert err[0].startswith('Elapsed') + content = f.read().splitlines() + assert not content + assert "feedback" in err finally: os.remove(filename) -def test_disallow_redirection(base_app) -> None: +def test_disallow_redirection(redirection_app) -> None: # Set allow_redirection to False - base_app.allow_redirection = False + redirection_app.allow_redirection = False filename = 'test_allow_redirect.txt' # Verify output wasn't redirected - out, err = run_cmd(base_app, f'help > {filename}') - verify_help_text(base_app, out) + out, err = run_cmd(redirection_app, f'print_output > {filename}') + assert "print" in out + assert "poutput" in out # Verify that no file got created assert not os.path.exists(filename) -def test_pipe_to_shell(base_app) -> None: - if sys.platform == "win32": - # Windows - command = 'help | sort' - else: - # Mac and Linux - # Get help on help and pipe it's output to the input of the word count shell command - command = 'help help | wc' +def test_pipe_to_shell(redirection_app) -> None: + out, err = run_cmd(redirection_app, "print_output | sort") + assert "print" in out + assert "poutput" in out + assert not err + - out, err = run_cmd(base_app, command) - assert out +def test_pipe_to_shell_custom_stdout(redirection_app) -> None: + """sys.stdout should not redirect if it's different than self.stdout.""" + redirection_app.stdout = io.StringIO() + out, err = run_cmd(redirection_app, "print_output | sort") + assert "print" not in out + assert "poutput" in out assert not err -def test_pipe_to_shell_and_redirect(base_app) -> None: +def test_pipe_to_shell_and_redirect(redirection_app) -> None: filename = 'out.txt' - if sys.platform == "win32": - # Windows - command = f'help | sort > {filename}' - else: - # Mac and Linux - # Get help on help and pipe it's output to the input of the word count shell command - command = f'help help | wc > {filename}' - - out, err = run_cmd(base_app, command) + out, err = run_cmd(redirection_app, f"print_output | sort > {filename}") assert not out assert not err assert os.path.exists(filename) os.remove(filename) -def test_pipe_to_shell_error(base_app) -> None: +def test_pipe_to_shell_error(redirection_app) -> None: # Try to pipe command output to a shell command that doesn't exist in order to produce an error - out, err = run_cmd(base_app, 'help | foobarbaz.this_does_not_exist') + out, err = run_cmd(redirection_app, 'print_output | foobarbaz.this_does_not_exist') assert not out assert "Pipe process exited with code" in err[0] @@ -773,26 +813,48 @@ def test_pipe_to_shell_error(base_app) -> None: @pytest.mark.skipif(not can_paste, reason="Pyperclip could not find a copy/paste mechanism for your system") -def test_send_to_paste_buffer(base_app) -> None: +def test_send_to_paste_buffer(redirection_app) -> None: # Test writing to the PasteBuffer/Clipboard - run_cmd(base_app, 'help >') - paste_contents = cmd2.cmd2.get_paste_buffer() - verify_help_text(base_app, paste_contents) + run_cmd(redirection_app, 'print_output >') + lines = cmd2.cmd2.get_paste_buffer().splitlines() + assert lines[0] == "print" + assert lines[1] == "poutput" + + # Test appending to the PasteBuffer/Clipboard + run_cmd(redirection_app, 'print_output >>') + lines = cmd2.cmd2.get_paste_buffer().splitlines() + assert lines[0] == "print" + assert lines[1] == "poutput" + assert lines[2] == "print" + assert lines[3] == "poutput" + + +@pytest.mark.skipif(not can_paste, reason="Pyperclip could not find a copy/paste mechanism for your system") +def test_send_to_paste_buffer_custom_stdout(redirection_app) -> None: + """sys.stdout should not redirect if it's different than self.stdout.""" + redirection_app.stdout = io.StringIO() + + # Verify that we only see output written to self.stdout + run_cmd(redirection_app, 'print_output >') + lines = cmd2.cmd2.get_paste_buffer().splitlines() + assert "print" not in lines + assert lines[0] == "poutput" # Test appending to the PasteBuffer/Clipboard - run_cmd(base_app, 'help history >>') - appended_contents = cmd2.cmd2.get_paste_buffer() - assert appended_contents.startswith(paste_contents) - assert len(appended_contents) > len(paste_contents) + run_cmd(redirection_app, 'print_output >>') + lines = cmd2.cmd2.get_paste_buffer().splitlines() + assert "print" not in lines + assert lines[0] == "poutput" + assert lines[1] == "poutput" -def test_get_paste_buffer_exception(base_app, mocker, capsys) -> None: +def test_get_paste_buffer_exception(redirection_app, mocker, capsys) -> None: # Force get_paste_buffer to throw an exception pastemock = mocker.patch('pyperclip.paste') pastemock.side_effect = ValueError('foo') # Redirect command output to the clipboard - base_app.onecmd_plus_hooks('help > ') + redirection_app.onecmd_plus_hooks('print_output > ') # Make sure we got the exception output out, err = capsys.readouterr() @@ -802,8 +864,8 @@ def test_get_paste_buffer_exception(base_app, mocker, capsys) -> None: assert 'foo' in err -def test_allow_clipboard_initializer(base_app) -> None: - assert base_app.allow_clipboard is True +def test_allow_clipboard_initializer(redirection_app) -> None: + assert redirection_app.allow_clipboard is True noclipcmd = cmd2.Cmd(allow_clipboard=False) assert noclipcmd.allow_clipboard is False diff --git a/tests/test_run_pyscript.py b/tests/test_run_pyscript.py index a64f77ba9..78739dc4a 100644 --- a/tests/test_run_pyscript.py +++ b/tests/test_run_pyscript.py @@ -8,24 +8,13 @@ import pytest -from cmd2 import ( - plugin, - utils, -) +from cmd2 import utils from .conftest import ( odd_file_names, run_cmd, ) -HOOK_OUTPUT = "TEST_OUTPUT" - - -def cmdfinalization_hook(data: plugin.CommandFinalizationData) -> plugin.CommandFinalizationData: - """A cmdfinalization_hook hook which requests application exit""" - print(HOOK_OUTPUT) - return data - def test_run_pyscript(base_app, request) -> None: test_dir = os.path.dirname(request.module.__file__) @@ -133,14 +122,29 @@ def test_run_pyscript_dir(base_app, request) -> None: assert out[0] == "['cmd_echo']" -def test_run_pyscript_stdout_capture(base_app, request) -> None: - base_app.register_cmdfinalization_hook(cmdfinalization_hook) +def test_run_pyscript_capture(base_app, request) -> None: + base_app.self_in_py = True test_dir = os.path.dirname(request.module.__file__) python_script = os.path.join(test_dir, 'pyscript', 'stdout_capture.py') - out, err = run_cmd(base_app, f'run_pyscript {python_script} {HOOK_OUTPUT}') + out, err = run_cmd(base_app, f'run_pyscript {python_script}') - assert out[0] == "PASSED" - assert out[1] == "PASSED" + assert out[0] == "print" + assert out[1] == "poutput" + + +def test_run_pyscript_capture_custom_stdout(base_app, request) -> None: + """sys.stdout will not be captured if it's different than self.stdout.""" + import io + + base_app.stdout = io.StringIO() + + base_app.self_in_py = True + test_dir = os.path.dirname(request.module.__file__) + python_script = os.path.join(test_dir, 'pyscript', 'stdout_capture.py') + out, err = run_cmd(base_app, f'run_pyscript {python_script}') + + assert "print" not in out + assert out[0] == "poutput" def test_run_pyscript_stop(base_app, request) -> None: