1
1
from __future__ import annotations
2
2
3
+ import re
3
4
import shlex
4
5
import subprocess
6
+ import tempfile
5
7
from pathlib import Path
6
8
from typing import TYPE_CHECKING
7
9
10
+ from crosshair .auditwall import SideEffectDetected
11
+
8
12
from codeflash .cli_cmds .console import logger
9
13
from codeflash .code_utils .code_utils import get_run_tmp_file
10
14
from codeflash .code_utils .compat import IS_POSIX , SAFE_SYS_EXECUTABLE
11
15
from codeflash .code_utils .config_consts import TOTAL_LOOPING_TIME
12
16
from codeflash .code_utils .coverage_utils import prepare_coverage_files
13
17
from codeflash .models .models import TestFiles
18
+ from codeflash .verification .codeflash_auditwall import transform_code
14
19
from codeflash .verification .test_results import TestType
15
20
16
21
if TYPE_CHECKING :
@@ -36,78 +41,97 @@ def run_behavioral_tests(
36
41
pytest_target_runtime_seconds : int = TOTAL_LOOPING_TIME ,
37
42
enable_coverage : bool = False ,
38
43
) -> tuple [Path , subprocess .CompletedProcess , Path | None ]:
39
- if test_framework == "pytest" :
40
- test_files : list [str ] = []
41
- for file in test_paths .test_files :
42
- if file .test_type == TestType .REPLAY_TEST :
43
- # TODO: Does this work for unittest framework?
44
- test_files .extend (
45
- [
46
- str (file .instrumented_behavior_file_path ) + "::" + test .test_function
47
- for test in file .tests_in_file
48
- ]
49
- )
50
- else :
51
- test_files .append (str (file .instrumented_behavior_file_path ))
52
- test_files = list (set (test_files )) # remove multiple calls in the same test function
53
- pytest_cmd_list = shlex .split (pytest_cmd , posix = IS_POSIX )
54
-
55
- common_pytest_args = [
56
- "--capture=tee-sys" ,
57
- f"--timeout={ pytest_timeout } " ,
58
- "-q" ,
59
- "--codeflash_loops_scope=session" ,
60
- "--codeflash_min_loops=1" ,
61
- "--codeflash_max_loops=1" ,
62
- f"--codeflash_seconds={ pytest_target_runtime_seconds } " , # TODO :This is unnecessary, update the plugin to not ask for this
63
- ]
64
-
65
- result_file_path = get_run_tmp_file (Path ("pytest_results.xml" ))
66
- result_args = [f"--junitxml={ result_file_path .as_posix ()} " , "-o" , "junit_logging=all" ]
67
-
68
- pytest_test_env = test_env .copy ()
69
- pytest_test_env ["PYTEST_PLUGINS" ] = "codeflash.verification.pytest_plugin"
44
+ if test_framework not in ["pytest" , "unittest" ]:
45
+ raise ValueError (f"Unsupported test framework: { test_framework } " )
70
46
71
- if enable_coverage :
72
- coverage_database_file , coveragercfile = prepare_coverage_files ()
73
-
74
- cov_erase = execute_test_subprocess (
75
- shlex .split (f"{ SAFE_SYS_EXECUTABLE } -m coverage erase" ), cwd = cwd , env = pytest_test_env
76
- ) # this cleanup is necessary to avoid coverage data from previous runs, if there are any,
77
- # then the current run will be appended to the previous data, which skews the results
78
- logger .debug (cov_erase )
79
-
80
- results = execute_test_subprocess (
81
- shlex .split (f"{ SAFE_SYS_EXECUTABLE } -m coverage run --rcfile={ coveragercfile .as_posix ()} -m" )
82
- + pytest_cmd_list
83
- + common_pytest_args
84
- + result_args
85
- + test_files ,
86
- cwd = cwd ,
87
- env = pytest_test_env ,
88
- timeout = 600 ,
47
+ test_files : list [str ] = []
48
+ for file in test_paths .test_files :
49
+ if file .test_type == TestType .REPLAY_TEST :
50
+ # TODO: Does this work for unittest framework?
51
+ test_files .extend (
52
+ [str (file .instrumented_behavior_file_path ) + "::" + test .test_function for test in file .tests_in_file ]
89
53
)
90
- logger .debug (
91
- f"""Result return code: { results .returncode } , { "Result stderr:" + str (results .stderr ) if results .stderr else '' } """ )
92
54
else :
93
- results = execute_test_subprocess (
94
- pytest_cmd_list + common_pytest_args + result_args + test_files ,
95
- cwd = cwd ,
96
- env = pytest_test_env ,
97
- timeout = 600 , # TODO: Make this dynamic
55
+ test_files .append (str (file .instrumented_behavior_file_path ))
56
+
57
+ source_code = next ((file .original_source for file in test_paths .test_files if file .original_source ), None )
58
+ if not source_code :
59
+ raise ValueError ("No source code found for auditing" )
60
+
61
+ audit_code = transform_code (source_code )
62
+ pytest_cmd_list = shlex .split (pytest_cmd , posix = IS_POSIX )
63
+ common_pytest_args = [
64
+ "--capture=tee-sys" ,
65
+ f"--timeout={ pytest_timeout } " ,
66
+ "-q" ,
67
+ "--codeflash_loops_scope=session" ,
68
+ "--codeflash_min_loops=1" ,
69
+ "--codeflash_max_loops=1" ,
70
+ f"--codeflash_seconds={ pytest_target_runtime_seconds } " ,
71
+ "-p" ,
72
+ "no:cacheprovider" ,
73
+ ]
74
+
75
+ result_file_path = get_run_tmp_file (Path ("pytest_results.xml" ))
76
+ result_args = [f"--junitxml={ result_file_path .as_posix ()} " , "-o" , "junit_logging=all" ]
77
+
78
+ pytest_test_env = test_env .copy ()
79
+ pytest_test_env ["PYTEST_PLUGINS" ] = "codeflash.verification.pytest_plugin"
80
+
81
+ with tempfile .TemporaryDirectory (
82
+ dir = Path (test_paths .test_files [0 ].instrumented_behavior_file_path ).parent
83
+ ) as temp_dir :
84
+ audited_file_path = Path (temp_dir ) / "audited_code.py"
85
+ audited_file_path .write_text (audit_code , encoding = "utf8" )
86
+
87
+ auditing_res = execute_test_subprocess (
88
+ pytest_cmd_list + common_pytest_args + [audited_file_path .as_posix ()],
89
+ cwd = cwd ,
90
+ env = pytest_test_env ,
91
+ timeout = 600 ,
92
+ )
93
+
94
+ if auditing_res .returncode != 0 :
95
+ line_co = next (
96
+ (
97
+ line
98
+ for line in auditing_res .stderr .splitlines () + auditing_res .stdout .splitlines ()
99
+ if "crosshair.auditwall.SideEffectDetected" in line
100
+ ),
101
+ None ,
98
102
)
99
- logger .debug (
100
- f"""Result return code: { results .returncode } , { "Result stderr:" + str (results .stderr ) if results .stderr else '' } """ )
101
- elif test_framework == "unittest" :
103
+
104
+ if line_co :
105
+ match = re .search (r"crosshair\.auditwall\.SideEffectDetected: A(.*) operation was detected\." , line_co )
106
+ if match :
107
+ msg = match .group (1 )
108
+ raise SideEffectDetected (msg )
109
+ logger .debug (auditing_res .stderr )
110
+ logger .debug (auditing_res .stdout )
111
+
112
+ if test_framework == "pytest" :
113
+ coverage_database_file , coveragercfile = prepare_coverage_files ()
114
+ cov_erase = execute_test_subprocess (
115
+ shlex .split (f"{ SAFE_SYS_EXECUTABLE } -m coverage erase" ), cwd = cwd , env = pytest_test_env
116
+ )
117
+ logger .debug (cov_erase )
118
+
119
+ results = execute_test_subprocess (
120
+ shlex .split (f"{ SAFE_SYS_EXECUTABLE } -m coverage run --rcfile={ coveragercfile .as_posix ()} -m" )
121
+ + pytest_cmd_list
122
+ + common_pytest_args
123
+ + result_args
124
+ + list (set (test_files )), # remove duplicates
125
+ cwd = cwd ,
126
+ env = pytest_test_env ,
127
+ timeout = 600 ,
128
+ )
129
+ else : # unittest
102
130
if enable_coverage :
103
131
raise ValueError ("Coverage is not supported yet for unittest framework" )
104
132
test_env ["CODEFLASH_LOOP_INDEX" ] = "1"
105
133
test_files = [file .instrumented_behavior_file_path for file in test_paths .test_files ]
106
134
result_file_path , results = run_unittest_tests (verbose , test_files , test_env , cwd )
107
- logger .debug (
108
- f"""Result return code: { results .returncode } , { "Result stderr:" + str (results .stderr ) if results .stderr else '' } """ )
109
- else :
110
- raise ValueError (f"Unsupported test framework: { test_framework } " )
111
135
112
136
return result_file_path , results , coverage_database_file if enable_coverage else None
113
137
0 commit comments