Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 84 additions & 6 deletions src/specify_cli/workflows/steps/gate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

from __future__ import annotations

import re
import sys
from pathlib import Path
from typing import Any

from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus
from specify_cli.workflows.expressions import evaluate_expression

#: C0 control characters except tab — stripped from ``show_file`` content so a
#: file containing ANSI escapes (e.g. ``\x1b[2J``) cannot clear the screen or
#: spoof the prompt/options when its lines are printed to the terminal.
_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b-\x1f\x7f]")


class GateStep(StepBase):
"""Interactive review gate.
Expand All @@ -23,6 +30,10 @@ class GateStep(StepBase):

type_key = "gate"

#: Maximum number of ``show_file`` lines rendered at the prompt, so a
#: large file cannot flood the terminal before the choice.
MAX_SHOW_FILE_LINES = 200

def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
message = config.get("message", "Review required.")
if isinstance(message, str) and "{{" in message:
Expand All @@ -32,8 +43,14 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
on_reject = config.get("on_reject", "abort")

show_file = config.get("show_file")
if show_file and isinstance(show_file, str) and "{{" in show_file:
if isinstance(show_file, str) and "{{" in show_file:
show_file = evaluate_expression(show_file, context)
# ``evaluate_expression`` can return a non-string for a single
# expression (e.g. a number from a prior step), and a literal
# non-string is also possible; coerce so it is rendered rather
# than silently skipped at the prompt.
if show_file is not None:
show_file = str(show_file)

output = {
"message": message,
Expand All @@ -43,12 +60,16 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
"choice": None,
}

# Non-interactive: pause for later resume
# Non-interactive: pause for later resume (the file is not read here)
if not sys.stdin.isatty():
return StepResult(status=StepStatus.PAUSED, output=output)

# Interactive: prompt the user
choice = self._prompt(message, options)
# Interactive: prompt the user. ``show_file`` contents are folded
# into the displayed message so the operator can review the
# referenced material before choosing. Composing the prompt text
# here keeps ``_prompt`` to its ``(message, options)`` contract, so
# adding review material never widens the interactive seam.
Comment on lines +67 to +71
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that the claim needs refining. To render show_file lines inside the gate box, _prompt now boxes every line of the message ( prefix per line). For the common single-line message this is byte-identical to the previous print(f" │ {message}"). It differs only when a message itself contains a newline: previously the continuation lines were printed unboxed (they fell outside the frame), now they're boxed too — i.e. the multi-line case is corrected, not regressed. So 'byte-identical for gates without show_file' holds for single-line messages (the normal case); multi-line messages are now rendered correctly within the box. Happy to spell this out in the PR description.

choice = self._prompt(self._compose_prompt(message, show_file), options)
output["choice"] = choice

if choice in ("reject", "abort"):
Expand All @@ -67,11 +88,35 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:

return StepResult(status=StepStatus.COMPLETED, output=output)

@classmethod
def _compose_prompt(cls, message: object, show_file: str | None) -> str:
"""Build the gate's display text.

``message`` may be a non-string (e.g. a YAML numeric literal that
``execute`` does not coerce), so it is rendered through ``str``.
When ``show_file`` names a file, its contents (read safely, see
``_read_show_file``) are appended below the message so the operator
can review the referenced material before choosing. Always returns a
``str`` — possibly multi-line — for ``_prompt`` to render in the box.
"""
text = str(message)
if not show_file:
return text
body = "\n".join(
[f"{show_file}:", *(f" {line}" for line in cls._read_show_file(show_file))]
)
return f"{text}\n\n{body}"

@staticmethod
def _prompt(message: str, options: list[str]) -> str:
"""Display gate message and prompt for a choice."""
"""Display the gate message and prompt for a choice.

``message`` may span multiple lines (e.g. when review material has
been folded in); each line is rendered inside the gate box.
"""
print("\n ┌─ Gate ─────────────────────────────────────")
print(f" │ {message}")
for line in message.split("\n"):
print(f" │ {line}" if line else " │")
Comment on lines +118 to +119
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 4d52dd9: _read_show_file now strips C0 control characters (except tab) from each line, so a show_file containing ANSI escapes (e.g. \x1b[2J) can't clear the screen or spoof the prompt/options. Sanitizing at the read boundary covers every rendering of the file. Added test_read_show_file_strips_control_chars.

print(" │")
for i, opt in enumerate(options, 1):
print(f" │ [{i}] {opt}")
Expand All @@ -90,6 +135,39 @@ def _prompt(message: str, options: list[str]) -> str:
return next(o for o in options if o.lower() == raw.lower())
print(f" Invalid choice. Enter 1-{len(options)} or an option name.")

@staticmethod
def _read_show_file(show_file: str) -> list[str]:
"""Return the lines of ``show_file`` for display.

Reads at most ``MAX_SHOW_FILE_LINES`` lines so a large file cannot
flood the prompt, and returns a short notice instead of raising
when the file is missing, undecodable, or names an invalid path,
so a misconfigured ``show_file`` never breaks the interactive
prompt. ``ValueError`` covers paths the OS rejects outright (e.g.
an embedded NUL byte), which ``Path.open`` raises before any I/O.

Control characters are stripped from each line so file content
cannot inject ANSI escape sequences into the terminal.
"""
lines: list[str] = []
truncated = False
try:
with Path(show_file).open(encoding="utf-8") as handle:
for line in handle:
if len(lines) >= GateStep.MAX_SHOW_FILE_LINES:
truncated = True
break
lines.append(_CONTROL_CHARS.sub("", line.rstrip("\n")))
except (OSError, UnicodeDecodeError, ValueError) as exc:
return [f"(could not read file: {exc})"]
Comment on lines +154 to +162
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 017e84d: _read_show_file now catches ValueError as well (the error Path.open raises for an embedded NUL byte, before any I/O), so such a path degrades to the (could not read file: …) notice instead of crashing. Covered by test_read_show_file_invalid_path_does_not_raise.

if not lines and not truncated:
return ["(file is empty)"]
if truncated:
lines.append(
f"… (output truncated at {GateStep.MAX_SHOW_FILE_LINES} lines)"
)
return lines

def validate(self, config: dict[str, Any]) -> list[str]:
errors = super().validate(config)
if "message" not in config:
Expand Down
188 changes: 188 additions & 0 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,40 @@ def test_validate_missing_run(self):
assert any("missing 'run'" in e for e in errors)


class _StubStdin:
"""Stdin stub with a fixed ``isatty`` result.

Swapped in via the gate module's ``sys.stdin`` rather than
``setattr(sys.stdin, "isatty", …)`` because ``TextIOWrapper.isatty``
is not assignable under some runners (e.g. pytest with capture
disabled), and because forcing the value keeps interactive/non-
interactive tests deterministic regardless of how the suite is run.
"""

def __init__(self, tty: bool):
self._tty = tty

def isatty(self) -> bool:
return self._tty


def _force_gate_stdin(monkeypatch, *, tty: bool):
from specify_cli.workflows.steps import gate as gate_module

monkeypatch.setattr(gate_module.sys, "stdin", _StubStdin(tty=tty))


class TestGateStep:
"""Test the gate step type."""

@pytest.fixture(autouse=True)
def _non_tty_stdin_by_default(self, monkeypatch):
# Default every gate test to a non-TTY stdin so none can drop into
# the interactive prompt and block on input() when the suite runs
# with a real TTY. Interactive tests opt back in with
# _force_gate_stdin(monkeypatch, tty=True).
_force_gate_stdin(monkeypatch, tty=False)

def test_execute_returns_paused(self):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus
Expand Down Expand Up @@ -822,6 +853,163 @@ def test_validate_invalid_on_reject(self):
})
assert any("on_reject" in e for e in errors)

def test_interactive_prompt_renders_show_file(self, tmp_path, monkeypatch, capsys):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 4d52dd9: added an autouse fixture on TestGateStep that defaults every gate test to a non-TTY stdin (via _force_gate_stdin(monkeypatch, tty=False)), so test_execute_returns_paused and any other non-stubbed test can't enter the interactive prompt or hang. Interactive tests opt back in with tty=True.

from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

review = tmp_path / "spec.md"
review.write_text("LINE-ONE\nLINE-TWO\n", encoding="utf-8")

_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")

step = GateStep()
config = {
"id": "review",
"message": "Review the spec.",
"show_file": str(review),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
out = capsys.readouterr().out

assert "LINE-ONE" in out and "LINE-TWO" in out
assert str(review) in out
assert result.status == StepStatus.COMPLETED
assert result.output["choice"] == "approve"

def test_interactive_prompt_missing_show_file_does_not_crash(
self, tmp_path, monkeypatch, capsys
):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

missing = tmp_path / "does-not-exist.md"

_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")

step = GateStep()
config = {
"id": "review",
"message": "Review.",
"show_file": str(missing),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
out = capsys.readouterr().out

assert "could not read file" in out
assert result.status == StepStatus.COMPLETED

def test_non_interactive_show_file_still_pauses_without_reading(
self, tmp_path, monkeypatch
):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

review = tmp_path / "spec.md"
review.write_text("CONTENT\n", encoding="utf-8")

# stdin defaults to non-TTY via the autouse fixture.
# The non-interactive path must not read the file; hard-fail if it does.
monkeypatch.setattr(
GateStep,
"_read_show_file",
staticmethod(
lambda _p: (_ for _ in ()).throw(
AssertionError("show_file read on the non-interactive path")
)
),
)

step = GateStep()
config = {
"id": "review",
"message": "Review.",
"show_file": str(review),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
assert result.status == StepStatus.PAUSED
assert result.output["show_file"] == str(review)

Comment on lines +905 to +936
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 06bcd43: switched to _force_gate_stdin(monkeypatch, tty=False), and the test now monkeypatches _read_show_file to raise, so it hard-fails if the file is read on the non-interactive (PAUSED) path — proving the 'without reading' part.

def test_read_show_file_empty(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

empty = tmp_path / "empty.md"
empty.write_text("", encoding="utf-8")
assert GateStep._read_show_file(str(empty)) == ["(file is empty)"]

def test_read_show_file_truncates_large_file(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

big = tmp_path / "big.md"
big.write_text(
"\n".join(f"line{i}" for i in range(GateStep.MAX_SHOW_FILE_LINES + 50)),
encoding="utf-8",
)
rendered = GateStep._read_show_file(str(big))
# MAX_SHOW_FILE_LINES content lines + one truncation notice line.
assert len(rendered) == GateStep.MAX_SHOW_FILE_LINES + 1
assert "truncated" in rendered[-1]

def test_read_show_file_invalid_path_does_not_raise(self):
from specify_cli.workflows.steps.gate import GateStep

# An embedded NUL byte makes the OS reject the path with ValueError
# before any I/O; it must degrade to a notice, not crash the prompt.
rendered = GateStep._read_show_file("bad\x00path.md")
assert len(rendered) == 1
assert rendered[0].startswith("(could not read file:")

def test_read_show_file_strips_control_chars(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

# A file with ANSI/control bytes must not inject escapes into the
# terminal; ESC and other C0 controls are stripped, tab is kept.
f = tmp_path / "ansi.md"
f.write_text("a\x1b[2Jb\tc\x07d\n", encoding="utf-8")
rendered = GateStep._read_show_file(str(f))
assert rendered == ["a[2Jb\tcd"]
assert "\x1b" not in rendered[0] and "\x07" not in rendered[0]

def test_interactive_non_string_message_renders(self, monkeypatch, capsys):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

# A YAML numeric literal reaches the prompt as a non-string; it must
# render rather than crash on the multi-line split.
_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")

step = GateStep()
config = {"id": "review", "message": 123, "options": ["approve", "reject"]}
result = step.execute(config, StepContext())
out = capsys.readouterr().out
assert "123" in out
assert result.status == StepStatus.COMPLETED

def test_templated_show_file_resolving_to_non_string_is_coerced(self):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

# A single-expression template can resolve to a non-string (e.g. a
# number from a prior step); it must be coerced to str, not skipped.
# stdin defaults to non-TTY via the autouse fixture, so the path
# stays non-interactive (-> PAUSED) and cannot block on input.
step = GateStep()
ctx = StepContext(steps={"prev": {"output": {"ref": 123}}})
config = {
"id": "review",
"message": "Review.",
"show_file": "{{ steps.prev.output.ref }}",
"options": ["approve", "reject"],
}
result = step.execute(config, ctx) # non-interactive -> PAUSED
assert result.status == StepStatus.PAUSED
assert result.output["show_file"] == "123"


class TestIfThenStep:
"""Test the if/then/else step type."""
Expand Down