|
| 1 | +import json |
| 2 | +import runpy |
| 3 | +import tempfile |
| 4 | +from dataclasses import dataclass |
| 5 | +from pathlib import Path |
| 6 | +from typing import Any, Dict, List, Tuple |
| 7 | + |
| 8 | +import pytest |
| 9 | + |
| 10 | +import codetracer_python_recorder as codetracer |
| 11 | + |
| 12 | + |
| 13 | +@dataclass |
| 14 | +class ParsedTrace: |
| 15 | + paths: List[str] |
| 16 | + functions: List[Dict[str, Any]] # index is function_id |
| 17 | + calls: List[int] # sequence of function_id values |
| 18 | + returns: List[Dict[str, Any]] # raw Return payloads (order preserved) |
| 19 | + steps: List[Tuple[int, int]] # (path_id, line) |
| 20 | + |
| 21 | + |
| 22 | +def _parse_trace(out_dir: Path) -> ParsedTrace: |
| 23 | + events_path = out_dir / "trace.json" |
| 24 | + paths_path = out_dir / "trace_paths.json" |
| 25 | + |
| 26 | + events = json.loads(events_path.read_text()) |
| 27 | + paths: List[str] = json.loads(paths_path.read_text()) |
| 28 | + |
| 29 | + functions: List[Dict[str, Any]] = [] |
| 30 | + calls: List[int] = [] |
| 31 | + returns: List[Dict[str, Any]] = [] |
| 32 | + steps: List[Tuple[int, int]] = [] |
| 33 | + |
| 34 | + for item in events: |
| 35 | + if "Function" in item: |
| 36 | + functions.append(item["Function"]) |
| 37 | + elif "Call" in item: |
| 38 | + calls.append(int(item["Call"]["function_id"])) |
| 39 | + elif "Return" in item: |
| 40 | + returns.append(item["Return"]) # keep raw payload for value checks |
| 41 | + elif "Step" in item: |
| 42 | + s = item["Step"] |
| 43 | + steps.append((int(s["path_id"]), int(s["line"]))) |
| 44 | + |
| 45 | + return ParsedTrace(paths=paths, functions=functions, calls=calls, returns=returns, steps=steps) |
| 46 | + |
| 47 | + |
| 48 | +def _write_script(tmp: Path) -> Path: |
| 49 | + # Keep lines compact and predictable to assert step line numbers |
| 50 | + code = ( |
| 51 | + "# simple script\n\n" |
| 52 | + "def foo():\n" |
| 53 | + " x = 1\n" |
| 54 | + " y = 2\n" |
| 55 | + " return x + y\n\n" |
| 56 | + "if __name__ == '__main__':\n" |
| 57 | + " r = foo()\n" |
| 58 | + " print(r)\n" |
| 59 | + ) |
| 60 | + p = tmp / "script.py" |
| 61 | + p.write_text(code) |
| 62 | + return p |
| 63 | + |
| 64 | + |
| 65 | +def test_py_start_line_and_return_events_are_recorded(tmp_path: Path) -> None: |
| 66 | + # Arrange: create a script and start tracing with activation restricted to that file |
| 67 | + script = _write_script(tmp_path) |
| 68 | + out_dir = tmp_path / "trace_out" |
| 69 | + out_dir.mkdir() |
| 70 | + |
| 71 | + session = codetracer.start(out_dir, format=codetracer.TRACE_JSON, start_on_enter=script) |
| 72 | + |
| 73 | + try: |
| 74 | + # Act: execute the script as __main__ under tracing |
| 75 | + runpy.run_path(str(script), run_name="__main__") |
| 76 | + finally: |
| 77 | + # Ensure files are flushed and tracer is stopped even on error |
| 78 | + codetracer.flush() |
| 79 | + codetracer.stop() |
| 80 | + |
| 81 | + # Assert: expected files exist and contain valid JSON |
| 82 | + assert (out_dir / "trace.json").exists() |
| 83 | + assert (out_dir / "trace_metadata.json").exists() |
| 84 | + assert (out_dir / "trace_paths.json").exists() |
| 85 | + |
| 86 | + parsed = _parse_trace(out_dir) |
| 87 | + |
| 88 | + # The script path must be present (activation gating starts there, but |
| 89 | + # other helper modules like codecs may also appear during execution). |
| 90 | + assert str(script) in parsed.paths |
| 91 | + script_path_id = parsed.paths.index(str(script)) |
| 92 | + |
| 93 | + # One function named 'foo' should be registered for the script |
| 94 | + foo_fids = [i for i, f in enumerate(parsed.functions) if f["name"] == "foo" and f["path_id"] == script_path_id] |
| 95 | + assert foo_fids, "Expected function entry for foo()" |
| 96 | + foo_fid = foo_fids[0] |
| 97 | + |
| 98 | + # A call to foo() must be present (PY_START) and matched by a later return (PY_RETURN) |
| 99 | + assert foo_fid in parsed.calls, "Expected a call to foo() to be recorded" |
| 100 | + |
| 101 | + # Returns are emitted in order; the first Return in this script should be the result of foo() |
| 102 | + # and carry the concrete integer value 3 encoded by the writer |
| 103 | + first_return = parsed.returns[0] |
| 104 | + rv = first_return.get("return_value", {}) |
| 105 | + assert rv.get("kind") == "Int" and rv.get("i") == 3 |
| 106 | + |
| 107 | + # LINE events: confirm that the key lines within foo() were stepped |
| 108 | + # Compute concrete line numbers by scanning the file content |
| 109 | + lines = script.read_text().splitlines() |
| 110 | + want_lines = { |
| 111 | + next(i + 1 for i, t in enumerate(lines) if t.strip() == "x = 1"), |
| 112 | + next(i + 1 for i, t in enumerate(lines) if t.strip() == "y = 2"), |
| 113 | + next(i + 1 for i, t in enumerate(lines) if t.strip() == "return x + y"), |
| 114 | + } |
| 115 | + seen_lines = {ln for pid, ln in parsed.steps if pid == script_path_id} |
| 116 | + assert want_lines.issubset(seen_lines), f"Missing expected step lines: {want_lines - seen_lines}" |
| 117 | + |
| 118 | + |
| 119 | +def test_start_while_active_raises(tmp_path: Path) -> None: |
| 120 | + out_dir = tmp_path / "trace_out" |
| 121 | + out_dir.mkdir() |
| 122 | + session = codetracer.start(out_dir, format=codetracer.TRACE_JSON) |
| 123 | + try: |
| 124 | + with pytest.raises(RuntimeError): |
| 125 | + codetracer.start(out_dir, format=codetracer.TRACE_JSON) |
| 126 | + finally: |
| 127 | + codetracer.stop() |
0 commit comments