diff --git a/src/specify_cli/workflows/steps/gate/__init__.py b/src/specify_cli/workflows/steps/gate/__init__.py index d4d32d763c..e6526a634d 100644 --- a/src/specify_cli/workflows/steps/gate/__init__.py +++ b/src/specify_cli/workflows/steps/gate/__init__.py @@ -2,12 +2,19 @@ from __future__ import annotations +import re import sys +from pathlib import Path from typing import Any from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus from specify_cli.workflows.expressions import evaluate_expression +#: C0 control characters except tab — stripped from ``show_file`` content so a +#: file containing ANSI escapes (e.g. ``\x1b[2J``) cannot clear the screen or +#: spoof the prompt/options when its lines are printed to the terminal. +_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b-\x1f\x7f]") + class GateStep(StepBase): """Interactive review gate. @@ -23,6 +30,10 @@ class GateStep(StepBase): type_key = "gate" + #: Maximum number of ``show_file`` lines rendered at the prompt, so a + #: large file cannot flood the terminal before the choice. + MAX_SHOW_FILE_LINES = 200 + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: message = config.get("message", "Review required.") if isinstance(message, str) and "{{" in message: @@ -32,8 +43,14 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: on_reject = config.get("on_reject", "abort") show_file = config.get("show_file") - if show_file and isinstance(show_file, str) and "{{" in show_file: + if isinstance(show_file, str) and "{{" in show_file: show_file = evaluate_expression(show_file, context) + # ``evaluate_expression`` can return a non-string for a single + # expression (e.g. a number from a prior step), and a literal + # non-string is also possible; coerce so it is rendered rather + # than silently skipped at the prompt. + if show_file is not None: + show_file = str(show_file) output = { "message": message, @@ -43,12 +60,16 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: "choice": None, } - # Non-interactive: pause for later resume + # Non-interactive: pause for later resume (the file is not read here) if not sys.stdin.isatty(): return StepResult(status=StepStatus.PAUSED, output=output) - # Interactive: prompt the user - choice = self._prompt(message, options) + # Interactive: prompt the user. ``show_file`` contents are folded + # into the displayed message so the operator can review the + # referenced material before choosing. Composing the prompt text + # here keeps ``_prompt`` to its ``(message, options)`` contract, so + # adding review material never widens the interactive seam. + choice = self._prompt(self._compose_prompt(message, show_file), options) output["choice"] = choice if choice in ("reject", "abort"): @@ -67,11 +88,35 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: return StepResult(status=StepStatus.COMPLETED, output=output) + @classmethod + def _compose_prompt(cls, message: object, show_file: str | None) -> str: + """Build the gate's display text. + + ``message`` may be a non-string (e.g. a YAML numeric literal that + ``execute`` does not coerce), so it is rendered through ``str``. + When ``show_file`` names a file, its contents (read safely, see + ``_read_show_file``) are appended below the message so the operator + can review the referenced material before choosing. Always returns a + ``str`` — possibly multi-line — for ``_prompt`` to render in the box. + """ + text = str(message) + if not show_file: + return text + body = "\n".join( + [f"{show_file}:", *(f" {line}" for line in cls._read_show_file(show_file))] + ) + return f"{text}\n\n{body}" + @staticmethod def _prompt(message: str, options: list[str]) -> str: - """Display gate message and prompt for a choice.""" + """Display the gate message and prompt for a choice. + + ``message`` may span multiple lines (e.g. when review material has + been folded in); each line is rendered inside the gate box. + """ print("\n ┌─ Gate ─────────────────────────────────────") - print(f" │ {message}") + for line in message.split("\n"): + print(f" │ {line}" if line else " │") print(" │") for i, opt in enumerate(options, 1): print(f" │ [{i}] {opt}") @@ -90,6 +135,39 @@ def _prompt(message: str, options: list[str]) -> str: return next(o for o in options if o.lower() == raw.lower()) print(f" Invalid choice. Enter 1-{len(options)} or an option name.") + @staticmethod + def _read_show_file(show_file: str) -> list[str]: + """Return the lines of ``show_file`` for display. + + Reads at most ``MAX_SHOW_FILE_LINES`` lines so a large file cannot + flood the prompt, and returns a short notice instead of raising + when the file is missing, undecodable, or names an invalid path, + so a misconfigured ``show_file`` never breaks the interactive + prompt. ``ValueError`` covers paths the OS rejects outright (e.g. + an embedded NUL byte), which ``Path.open`` raises before any I/O. + + Control characters are stripped from each line so file content + cannot inject ANSI escape sequences into the terminal. + """ + lines: list[str] = [] + truncated = False + try: + with Path(show_file).open(encoding="utf-8") as handle: + for line in handle: + if len(lines) >= GateStep.MAX_SHOW_FILE_LINES: + truncated = True + break + lines.append(_CONTROL_CHARS.sub("", line.rstrip("\n"))) + except (OSError, UnicodeDecodeError, ValueError) as exc: + return [f"(could not read file: {exc})"] + if not lines and not truncated: + return ["(file is empty)"] + if truncated: + lines.append( + f"… (output truncated at {GateStep.MAX_SHOW_FILE_LINES} lines)" + ) + return lines + def validate(self, config: dict[str, Any]) -> list[str]: errors = super().validate(config) if "message" not in config: diff --git a/tests/test_workflows.py b/tests/test_workflows.py index f7c7f588d1..bf0135826e 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -784,9 +784,40 @@ def test_validate_missing_run(self): assert any("missing 'run'" in e for e in errors) +class _StubStdin: + """Stdin stub with a fixed ``isatty`` result. + + Swapped in via the gate module's ``sys.stdin`` rather than + ``setattr(sys.stdin, "isatty", …)`` because ``TextIOWrapper.isatty`` + is not assignable under some runners (e.g. pytest with capture + disabled), and because forcing the value keeps interactive/non- + interactive tests deterministic regardless of how the suite is run. + """ + + def __init__(self, tty: bool): + self._tty = tty + + def isatty(self) -> bool: + return self._tty + + +def _force_gate_stdin(monkeypatch, *, tty: bool): + from specify_cli.workflows.steps import gate as gate_module + + monkeypatch.setattr(gate_module.sys, "stdin", _StubStdin(tty=tty)) + + class TestGateStep: """Test the gate step type.""" + @pytest.fixture(autouse=True) + def _non_tty_stdin_by_default(self, monkeypatch): + # Default every gate test to a non-TTY stdin so none can drop into + # the interactive prompt and block on input() when the suite runs + # with a real TTY. Interactive tests opt back in with + # _force_gate_stdin(monkeypatch, tty=True). + _force_gate_stdin(monkeypatch, tty=False) + def test_execute_returns_paused(self): from specify_cli.workflows.steps.gate import GateStep from specify_cli.workflows.base import StepContext, StepStatus @@ -822,6 +853,163 @@ def test_validate_invalid_on_reject(self): }) assert any("on_reject" in e for e in errors) + def test_interactive_prompt_renders_show_file(self, tmp_path, monkeypatch, capsys): + from specify_cli.workflows.steps.gate import GateStep + from specify_cli.workflows.base import StepContext, StepStatus + + review = tmp_path / "spec.md" + review.write_text("LINE-ONE\nLINE-TWO\n", encoding="utf-8") + + _force_gate_stdin(monkeypatch, tty=True) + monkeypatch.setattr("builtins.input", lambda _prompt="": "1") + + step = GateStep() + config = { + "id": "review", + "message": "Review the spec.", + "show_file": str(review), + "options": ["approve", "reject"], + } + result = step.execute(config, StepContext()) + out = capsys.readouterr().out + + assert "LINE-ONE" in out and "LINE-TWO" in out + assert str(review) in out + assert result.status == StepStatus.COMPLETED + assert result.output["choice"] == "approve" + + def test_interactive_prompt_missing_show_file_does_not_crash( + self, tmp_path, monkeypatch, capsys + ): + from specify_cli.workflows.steps.gate import GateStep + from specify_cli.workflows.base import StepContext, StepStatus + + missing = tmp_path / "does-not-exist.md" + + _force_gate_stdin(monkeypatch, tty=True) + monkeypatch.setattr("builtins.input", lambda _prompt="": "1") + + step = GateStep() + config = { + "id": "review", + "message": "Review.", + "show_file": str(missing), + "options": ["approve", "reject"], + } + result = step.execute(config, StepContext()) + out = capsys.readouterr().out + + assert "could not read file" in out + assert result.status == StepStatus.COMPLETED + + def test_non_interactive_show_file_still_pauses_without_reading( + self, tmp_path, monkeypatch + ): + from specify_cli.workflows.steps.gate import GateStep + from specify_cli.workflows.base import StepContext, StepStatus + + review = tmp_path / "spec.md" + review.write_text("CONTENT\n", encoding="utf-8") + + # stdin defaults to non-TTY via the autouse fixture. + # The non-interactive path must not read the file; hard-fail if it does. + monkeypatch.setattr( + GateStep, + "_read_show_file", + staticmethod( + lambda _p: (_ for _ in ()).throw( + AssertionError("show_file read on the non-interactive path") + ) + ), + ) + + step = GateStep() + config = { + "id": "review", + "message": "Review.", + "show_file": str(review), + "options": ["approve", "reject"], + } + result = step.execute(config, StepContext()) + assert result.status == StepStatus.PAUSED + assert result.output["show_file"] == str(review) + + def test_read_show_file_empty(self, tmp_path): + from specify_cli.workflows.steps.gate import GateStep + + empty = tmp_path / "empty.md" + empty.write_text("", encoding="utf-8") + assert GateStep._read_show_file(str(empty)) == ["(file is empty)"] + + def test_read_show_file_truncates_large_file(self, tmp_path): + from specify_cli.workflows.steps.gate import GateStep + + big = tmp_path / "big.md" + big.write_text( + "\n".join(f"line{i}" for i in range(GateStep.MAX_SHOW_FILE_LINES + 50)), + encoding="utf-8", + ) + rendered = GateStep._read_show_file(str(big)) + # MAX_SHOW_FILE_LINES content lines + one truncation notice line. + assert len(rendered) == GateStep.MAX_SHOW_FILE_LINES + 1 + assert "truncated" in rendered[-1] + + def test_read_show_file_invalid_path_does_not_raise(self): + from specify_cli.workflows.steps.gate import GateStep + + # An embedded NUL byte makes the OS reject the path with ValueError + # before any I/O; it must degrade to a notice, not crash the prompt. + rendered = GateStep._read_show_file("bad\x00path.md") + assert len(rendered) == 1 + assert rendered[0].startswith("(could not read file:") + + def test_read_show_file_strips_control_chars(self, tmp_path): + from specify_cli.workflows.steps.gate import GateStep + + # A file with ANSI/control bytes must not inject escapes into the + # terminal; ESC and other C0 controls are stripped, tab is kept. + f = tmp_path / "ansi.md" + f.write_text("a\x1b[2Jb\tc\x07d\n", encoding="utf-8") + rendered = GateStep._read_show_file(str(f)) + assert rendered == ["a[2Jb\tcd"] + assert "\x1b" not in rendered[0] and "\x07" not in rendered[0] + + def test_interactive_non_string_message_renders(self, monkeypatch, capsys): + from specify_cli.workflows.steps.gate import GateStep + from specify_cli.workflows.base import StepContext, StepStatus + + # A YAML numeric literal reaches the prompt as a non-string; it must + # render rather than crash on the multi-line split. + _force_gate_stdin(monkeypatch, tty=True) + monkeypatch.setattr("builtins.input", lambda _prompt="": "1") + + step = GateStep() + config = {"id": "review", "message": 123, "options": ["approve", "reject"]} + result = step.execute(config, StepContext()) + out = capsys.readouterr().out + assert "123" in out + assert result.status == StepStatus.COMPLETED + + def test_templated_show_file_resolving_to_non_string_is_coerced(self): + from specify_cli.workflows.steps.gate import GateStep + from specify_cli.workflows.base import StepContext, StepStatus + + # A single-expression template can resolve to a non-string (e.g. a + # number from a prior step); it must be coerced to str, not skipped. + # stdin defaults to non-TTY via the autouse fixture, so the path + # stays non-interactive (-> PAUSED) and cannot block on input. + step = GateStep() + ctx = StepContext(steps={"prev": {"output": {"ref": 123}}}) + config = { + "id": "review", + "message": "Review.", + "show_file": "{{ steps.prev.output.ref }}", + "options": ["approve", "reject"], + } + result = step.execute(config, ctx) # non-interactive -> PAUSED + assert result.status == StepStatus.PAUSED + assert result.output["show_file"] == "123" + class TestIfThenStep: """Test the if/then/else step type."""