|
7 | 7 | import json |
8 | 8 | import os |
9 | 9 | import pickle |
| 10 | +import re |
10 | 11 | import sqlite3 |
11 | 12 | import sys |
12 | 13 | import threading |
@@ -112,7 +113,7 @@ def __init__( |
112 | 113 | console.rule(f"Project Root: {self.project_root}", style="bold blue") |
113 | 114 | self.ignored_functions = {"<listcomp>", "<genexpr>", "<dictcomp>", "<setcomp>", "<lambda>", "<module>"} |
114 | 115 |
|
115 | | - self.file_being_called_from: str = str(Path(sys._getframe().f_back.f_code.co_filename).name).replace(".", "_") # noqa: SLF001 |
| 116 | + self.sanitized_filename = self.sanitize_to_filename(command) |
116 | 117 | self.result_pickle_file_path = result_pickle_file_path |
117 | 118 |
|
118 | 119 | assert timeout is None or timeout > 0, "Timeout should be greater than 0" |
@@ -167,7 +168,7 @@ def __enter__(self) -> None: |
167 | 168 |
|
168 | 169 | # Store command metadata |
169 | 170 | cur.execute("INSERT INTO metadata VALUES (?, ?)", ("command", self.command)) |
170 | | - cur.execute("INSERT INTO metadata VALUES (?, ?)", ("program_name", self.file_being_called_from)) |
| 171 | + cur.execute("INSERT INTO metadata VALUES (?, ?)", ("program_name", self.sanitized_filename)) |
171 | 172 | cur.execute( |
172 | 173 | "INSERT INTO metadata VALUES (?, ?)", |
173 | 174 | ("functions_filter", json.dumps(self.functions) if self.functions else None), |
@@ -263,7 +264,7 @@ def __exit__( |
263 | 264 | test_framework=self.config["test_framework"], |
264 | 265 | max_run_count=self.max_function_count, |
265 | 266 | ) |
266 | | - function_path = "_".join(self.functions) if self.functions else self.file_being_called_from |
| 267 | + function_path = "_".join(self.functions) if self.functions else self.sanitized_filename |
267 | 268 | test_file_path = get_test_file_path( |
268 | 269 | test_dir=Path(self.config["tests_root"]), function_name=function_path, test_type="replay" |
269 | 270 | ) |
@@ -783,6 +784,22 @@ def snapshot_stats(self) -> None: |
783 | 784 | nc += callcnt |
784 | 785 | self.stats[func] = cc, nc, tt, ct, callers |
785 | 786 |
|
| 787 | + def sanitize_to_filename(self, arg: str) -> str: |
| 788 | + # Replace newlines with underscores |
| 789 | + arg = arg.replace("\n", "_").replace("\r", "_") |
| 790 | + |
| 791 | + # Replace contiguous whitespace (including tabs and multiple spaces) with a single underscore |
| 792 | + arg = re.sub(r"\s+", "_", arg) |
| 793 | + |
| 794 | + # Remove all characters that are not alphanumeric, underscore, or dot |
| 795 | + arg = re.sub(r"[^\w._]", "", arg) |
| 796 | + |
| 797 | + # Avoid filenames starting or ending with a dot or underscore |
| 798 | + arg = arg.strip("._") |
| 799 | + |
| 800 | + # Fallback if resulting name is empty |
| 801 | + return arg or "untitled" |
| 802 | + |
786 | 803 | def runctx(self, cmd: str, global_vars: dict[str, Any], local_vars: dict[str, Any]) -> Tracer | None: |
787 | 804 | self.__enter__() |
788 | 805 | try: |
|
0 commit comments