diff --git a/codetracer-python-recorder/codetracer_python_recorder/trace_balance.py b/codetracer-python-recorder/codetracer_python_recorder/trace_balance.py new file mode 100644 index 0000000..b9a8edf --- /dev/null +++ b/codetracer-python-recorder/codetracer_python_recorder/trace_balance.py @@ -0,0 +1,104 @@ +"""Utilities for validating the balance of Codetracer JSON trace files. + +A trace is considered *balanced* when it contains the same number of +``Call`` and ``Return`` events and the stream of events never dips below +zero (i.e. we never see a return without a preceding call). This helper +module provides small, importable utilities that can be reused by the +CLI helper script as well as unit tests. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable, Mapping, Sequence + + +class TraceBalanceError(RuntimeError): + """Raised when the trace file is malformed or cannot be processed.""" + + +@dataclass(frozen=True) +class TraceBalanceResult: + """Summary information about call/return balance within a trace.""" + + call_count: int + return_count: int + first_negative_index: int | None + + @property + def is_balanced(self) -> bool: + """Return True when the trace has matching call/return counts and never underflows.""" + return self.call_count == self.return_count and self.first_negative_index is None + + @property + def delta(self) -> int: + """Number of call events minus return events.""" + return self.call_count - self.return_count + + +def load_trace_events(trace_path: Path) -> Sequence[Mapping[str, Any]]: + """Load and validate the JSON event stream from ``trace.json``.""" + try: + raw_text = trace_path.read_text(encoding="utf-8") + except FileNotFoundError as exc: + raise TraceBalanceError(f"trace file not found: {trace_path}") from exc + except OSError as exc: # pragma: no cover - surfaced in tests via mocked IO + raise TraceBalanceError(f"unable to read trace file: {trace_path}") from exc + + try: + data = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise TraceBalanceError(f"invalid JSON in trace file: {trace_path}: {exc}") from exc + + if not isinstance(data, list): + raise TraceBalanceError(f"trace root must be a JSON array, got {type(data).__name__}") + + for index, event in enumerate(data): + if not isinstance(event, dict): + raise TraceBalanceError( + f"event #{index} is not a JSON object (found {type(event).__name__})" + ) + + return data + + +def summarize_trace_balance(events: Iterable[Mapping[str, Any]]) -> TraceBalanceResult: + """Return a balance summary for the provided event sequence.""" + calls = 0 + returns = 0 + active_calls = 0 + first_negative_index: int | None = None + + for index, event in enumerate(events): + is_call = "Call" in event + is_return = "Return" in event + + if is_call and is_return: + raise TraceBalanceError( + f"event #{index} contains both Call and Return payloads, which is unsupported" + ) + + if is_call: + calls += 1 + active_calls += 1 + if is_return: + returns += 1 + active_calls -= 1 + if active_calls < 0 and first_negative_index is None: + first_negative_index = index + + return TraceBalanceResult( + call_count=calls, + return_count=returns, + first_negative_index=first_negative_index, + ) + + +__all__ = [ + "TraceBalanceError", + "TraceBalanceResult", + "load_trace_events", + "summarize_trace_balance", +] diff --git a/codetracer-python-recorder/scripts/check_trace_balance.py b/codetracer-python-recorder/scripts/check_trace_balance.py new file mode 100644 index 0000000..78d8890 --- /dev/null +++ b/codetracer-python-recorder/scripts/check_trace_balance.py @@ -0,0 +1,69 @@ +"""CLI helper to verify that a Codetracer ``trace.json`` stream is balanced.""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path +from typing import Sequence + +from codetracer_python_recorder.trace_balance import ( + TraceBalanceError, + load_trace_events, + summarize_trace_balance, +) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Check whether a Codetracer trace.json has matching call and return events.", + ) + parser.add_argument( + "trace", + type=Path, + help="Path to trace.json emitted by Codetracer", + ) + return parser + + +def main(argv: Sequence[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + trace_path = args.trace + + try: + events = load_trace_events(trace_path) + result = summarize_trace_balance(events) + except TraceBalanceError as exc: + print(f"error: {exc}", file=sys.stderr) + return 2 + + if result.is_balanced: + print( + f"Balanced trace: {result.call_count} call events, " + f"{result.return_count} return events." + ) + return 0 + + print("Unbalanced trace detected.") + print(f"Call events : {result.call_count}") + print(f"Return events : {result.return_count}") + + delta = result.delta + if delta > 0: + print(f"Missing {delta} return event(s).") + elif delta < 0: + print(f"Unexpected {-delta} extra return event(s).") + + if result.first_negative_index is not None: + print( + "The first unmatched return appears at event " + f"#{result.first_negative_index} (0-based index)." + ) + + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/codetracer-python-recorder/tests/python/test_trace_balance.py b/codetracer-python-recorder/tests/python/test_trace_balance.py new file mode 100644 index 0000000..65c4680 --- /dev/null +++ b/codetracer-python-recorder/tests/python/test_trace_balance.py @@ -0,0 +1,101 @@ +import json +import importlib.util +from pathlib import Path +from typing import List, Mapping + +import pytest + +from codetracer_python_recorder.trace_balance import ( + TraceBalanceError, + TraceBalanceResult, + load_trace_events, + summarize_trace_balance, +) + + +def _write_trace(tmp_path: Path, events: List[Mapping[str, object]]) -> Path: + path = tmp_path / "trace.json" + path.write_text(json.dumps(events)) + return path + + +def _load_cli() -> object: + module_path = Path(__file__).parents[2] / "scripts" / "check_trace_balance.py" + spec = importlib.util.spec_from_file_location("check_trace_balance", module_path) + if spec is None or spec.loader is None: + raise RuntimeError("Unable to load check_trace_balance.py module spec") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_summarize_trace_balance_identifies_balanced_trace() -> None: + events = [{"Call": {}}, {"Return": {}}] + result = summarize_trace_balance(events) + + assert isinstance(result, TraceBalanceResult) + assert result.call_count == 1 + assert result.return_count == 1 + assert result.is_balanced + assert result.delta == 0 + assert result.first_negative_index is None + + +def test_summarize_trace_balance_detects_missing_return() -> None: + events = [{"Call": {}}, {"Call": {}}, {"Return": {}}] + result = summarize_trace_balance(events) + + assert result.call_count == 2 + assert result.return_count == 1 + assert not result.is_balanced + assert result.delta == 1 + assert result.first_negative_index is None + + +def test_summarize_trace_balance_detects_unmatched_return() -> None: + events = [{"Return": {}}, {"Call": {}}] + result = summarize_trace_balance(events) + + assert result.call_count == 1 + assert result.return_count == 1 + assert not result.is_balanced + assert result.first_negative_index == 0 + + +def test_load_trace_events_validates_structure(tmp_path: Path) -> None: + trace_path = _write_trace(tmp_path, [{"Call": {}}]) + + events = load_trace_events(trace_path) + assert isinstance(events, list) + assert events[0]["Call"] == {} + + +def test_load_trace_events_raises_on_non_array(tmp_path: Path) -> None: + trace_path = tmp_path / "trace.json" + trace_path.write_text("{}") + + with pytest.raises(TraceBalanceError): + load_trace_events(trace_path) + + +def test_cli_returns_non_zero_on_unbalanced(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + trace_path = _write_trace(tmp_path, [{"Call": {}}, {"Return": {}}, {"Return": {}}]) + cli = _load_cli() + + exit_code = cli.main([str(trace_path)]) + captured = capsys.readouterr() + + assert exit_code == 1 + assert "Unbalanced trace detected." in captured.out + assert "Unexpected 1 extra return event(s)." in captured.out + + +def test_cli_reports_success_for_balanced_trace(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + trace_path = _write_trace(tmp_path, [{"Call": {}}, {"Return": {}}]) + cli = _load_cli() + + exit_code = cli.main([str(trace_path)]) + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Balanced trace" in captured.out