|
1 | 1 | import functools |
2 | 2 | import inspect |
3 | | -import re |
4 | 3 | from collections.abc import Callable |
5 | | -from pathlib import Path |
6 | | -from typing import Any |
7 | 4 |
|
8 | | -from tests.infra.trace.models import CLASS_NAME_MAP, NON_SSZ_FIXTURES |
9 | 5 | from tests.infra.trace.traced_spec import RecordingSpec |
10 | 6 |
|
11 | | -DEFAULT_TRACE_DIR = Path("traces").resolve() |
12 | 7 |
|
13 | | -TRACE_PATH_EXCLUDED_FIXTURES = { |
14 | | - "spec", |
15 | | - "state", |
16 | | - "phases", |
17 | | - "post_spec", |
18 | | - "pre_tag", |
19 | | - "post_tag", |
20 | | - "fork_epoch", |
21 | | -} |
22 | | - |
23 | | - |
24 | | -# these helpers are slightly verbose but it's just for filename generation |
25 | | -def _sanitize_value_for_path(value: Any) -> str: |
26 | | - """Converts a parameter value into a filesystem-friendly string.""" |
27 | | - if isinstance(value, (int, bool)): |
28 | | - s_val = str(value) |
29 | | - elif isinstance(value, str): |
30 | | - s_val = value |
31 | | - elif isinstance(value, bytes): |
32 | | - s_val = value.hex() |
33 | | - elif hasattr(value, "__name__"): |
34 | | - s_val = value.__name__ |
35 | | - else: |
36 | | - s_val = str(value) |
37 | | - |
38 | | - # Replace invalid chars |
39 | | - s_val = re.sub(r'[<>:"/\\|?*]', "_", s_val) |
40 | | - s_val = re.sub(r"[^a-zA-Z0-9_-]", "-", s_val) |
41 | | - return s_val[:50] |
42 | | - |
43 | | - |
44 | | -def _get_trace_output_dir( |
45 | | - base_output_dir: str | None, |
46 | | - fn: Callable, |
47 | | - bound_args: inspect.BoundArguments, |
48 | | - fork_name: str, |
49 | | - preset_name: str, |
50 | | -) -> str: |
51 | | - """Calculates the output directory path for the trace artifacts.""" |
52 | | - if base_output_dir: |
53 | | - return base_output_dir |
54 | | - |
55 | | - test_module = fn.__module__.split(".")[-1] |
56 | | - test_name = fn.__name__ |
57 | | - |
58 | | - # Generate a suffix based on test parameters (e.g., param_a=True -> param_a_True) |
59 | | - param_parts = [] |
60 | | - for name, value in bound_args.arguments.items(): |
61 | | - if name in TRACE_PATH_EXCLUDED_FIXTURES: |
62 | | - continue |
63 | | - sanitized_val = _sanitize_value_for_path(value) |
64 | | - param_parts.append(f"{name}_{sanitized_val}") |
65 | | - |
66 | | - path = DEFAULT_TRACE_DIR / fork_name / preset_name / test_module / test_name |
67 | | - |
68 | | - if param_parts: |
69 | | - path /= "__".join(param_parts) |
70 | | - |
71 | | - return path |
72 | | - |
73 | | - |
74 | | -def record_spec_trace(_fn: Callable | None = None, *, output_dir: str | None = None): |
| 8 | +def spec_trace(fn: Callable) -> Callable: |
75 | 9 | """ |
76 | 10 | Decorator to wrap a pyspec test and record execution traces. |
77 | | - Can be used with or without arguments: |
78 | | - @record_spec_trace |
79 | | - @record_spec_trace(output_dir="...") |
| 11 | + Usage: |
| 12 | + @with_all_phases # or other decorators |
| 13 | + @spec_state_test # still needed as before |
| 14 | + @spec_trace # new decorator to record trace |
| 15 | + def test_my_feature(spec, ...): |
| 16 | + ... |
80 | 17 | """ |
81 | 18 |
|
82 | | - def decorator(fn: Callable): |
83 | | - @functools.wraps(fn) |
84 | | - def wrapper(*args, **kwargs): |
85 | | - # 1. Bind arguments to find 'spec' and fixtures |
86 | | - try: |
87 | | - bound_args = inspect.signature(fn).bind(*args, **kwargs) |
88 | | - bound_args.apply_defaults() |
89 | | - except TypeError: |
90 | | - # Fallback for non-test invocations |
91 | | - return fn(*args, **kwargs) |
92 | | - |
93 | | - if "spec" not in bound_args.arguments: |
94 | | - return fn(*args, **kwargs) |
95 | | - |
96 | | - real_spec = bound_args.arguments["spec"] |
97 | | - |
98 | | - # 2. Prepare context for recording |
99 | | - initial_fixtures = { |
100 | | - k: v |
101 | | - for k, v in bound_args.arguments.items() |
102 | | - if k != "spec" and (k in NON_SSZ_FIXTURES or CLASS_NAME_MAP.get(type(v).__name__)) |
103 | | - } |
104 | | - |
105 | | - metadata = { |
106 | | - "fork": real_spec.fork, |
107 | | - "preset": real_spec.config.PRESET_BASE, |
108 | | - } |
109 | | - |
110 | | - parameters = { |
111 | | - k: v |
112 | | - for k, v in bound_args.arguments.items() |
113 | | - if isinstance(v, (int, str, bool, type(None))) |
114 | | - } |
115 | | - |
116 | | - # 3. Inject the recorder |
117 | | - recorder = RecordingSpec( |
118 | | - real_spec, initial_fixtures, metadata=metadata, parameters=parameters |
119 | | - ) |
120 | | - bound_args.arguments["spec"] = recorder |
121 | | - |
122 | | - # 4. Run test & Save trace |
123 | | - try: |
124 | | - return fn(*bound_args.args, **bound_args.kwargs) |
125 | | - finally: |
126 | | - try: |
127 | | - # Use the *original* spec's fork name for the path |
128 | | - artifact_dir = _get_trace_output_dir( |
129 | | - output_dir, fn, bound_args, real_spec.fork, real_spec.config.PRESET_BASE |
130 | | - ) |
131 | | - print(f"\n[Trace Recorder] Saving trace for {fn.__name__} to: {artifact_dir}") |
132 | | - recorder.save_trace(artifact_dir) |
133 | | - except Exception as e: |
134 | | - print(f"ERROR: [Trace Recorder] FAILED to save trace for {fn.__name__}: {e}") |
135 | | - |
136 | | - return wrapper |
137 | | - |
138 | | - if _fn is None: |
139 | | - return decorator |
140 | | - elif callable(_fn): |
141 | | - return decorator(_fn) |
142 | | - else: |
143 | | - raise TypeError("Invalid use of @record_spec_trace decorator.") |
| 19 | + @functools.wraps(fn) |
| 20 | + def wrapper(*args, **kwargs): |
| 21 | + # this might be somewhat overcomplicated just for figuring out if the first arg is a spec of not |
| 22 | + # 1. Bind arguments to find 'spec' and fixtures |
| 23 | + try: |
| 24 | + bound_args = inspect.signature(fn).bind(*args, **kwargs) |
| 25 | + bound_args.apply_defaults() |
| 26 | + except TypeError: |
| 27 | + # Fallback for non-test invocations |
| 28 | + fn(*args, **kwargs) |
| 29 | + |
| 30 | + if "spec" not in bound_args.arguments: |
| 31 | + fn(*args, **kwargs) |
| 32 | + |
| 33 | + # 2. Get the actual spec instance |
| 34 | + real_spec = bound_args.arguments["spec"] |
| 35 | + |
| 36 | + # 3. Inject the recorder |
| 37 | + recorder = RecordingSpec(real_spec) |
| 38 | + bound_args.arguments["spec"] = recorder |
| 39 | + |
| 40 | + # 4. Run test & Save trace |
| 41 | + try: |
| 42 | + fn(*bound_args.args, **bound_args.kwargs) |
| 43 | + finally: |
| 44 | + # we need to do this after execution is done before returning data |
| 45 | + recorder.finalize() |
| 46 | + |
| 47 | + # yield data so that runner can pick it up and dump |
| 48 | + yield from [ |
| 49 | + # trace to be dumped as yaml |
| 50 | + ("trace", "data", recorder._model.model_dump(mode="json", exclude_none=True)), |
| 51 | + ] + [ |
| 52 | + (name, "ssz", value) |
| 53 | + # ssz artifacts are already serialized and will be compressed by the dumper |
| 54 | + for name, value in recorder._model._artifacts.items() |
| 55 | + ] |
| 56 | + |
| 57 | + return wrapper |
0 commit comments