Skip to content

Commit 69fa701

Browse files
committed
mirror tests
1 parent 0f066c0 commit 69fa701

File tree

2 files changed

+386
-0
lines changed

2 files changed

+386
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import asyncio
2+
from typing import List, Union
3+
4+
5+
async def async_sorter(lst: List[Union[int, float]]) -> List[Union[int, float]]:
6+
"""
7+
Async bubble sort implementation for testing.
8+
"""
9+
print("codeflash stdout: Async sorting list")
10+
11+
await asyncio.sleep(0.01)
12+
13+
n = len(lst)
14+
for i in range(n):
15+
for j in range(0, n - i - 1):
16+
if lst[j] > lst[j + 1]:
17+
lst[j], lst[j + 1] = lst[j + 1], lst[j]
18+
19+
result = lst.copy()
20+
print(f"result: {result}")
21+
return result
22+
23+
24+
class AsyncBubbleSorter:
25+
"""Class with async sorting method for testing."""
26+
27+
async def sorter(self, lst: List[Union[int, float]]) -> List[Union[int, float]]:
28+
"""
29+
Async bubble sort implementation within a class.
30+
"""
31+
print("codeflash stdout: AsyncBubbleSorter.sorter() called")
32+
33+
# Add some async delay
34+
await asyncio.sleep(0.005)
35+
36+
n = len(lst)
37+
for i in range(n):
38+
for j in range(0, n - i - 1):
39+
if lst[j] > lst[j + 1]:
40+
lst[j], lst[j + 1] = lst[j + 1], lst[j]
41+
42+
result = lst.copy()
43+
return result
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
from __future__ import annotations
2+
3+
import os
4+
from argparse import Namespace
5+
from pathlib import Path
6+
7+
from codeflash.code_utils.instrument_existing_tests import instrument_source_module_with_async_decorators
8+
from codeflash.discovery.functions_to_optimize import FunctionToOptimize
9+
from codeflash.models.models import FunctionParent, TestFile, TestFiles, TestingMode, TestType
10+
from codeflash.optimization.optimizer import Optimizer
11+
from codeflash.verification.instrument_codeflash_capture import instrument_codeflash_capture
12+
13+
14+
def test_async_bubble_sort_behavior_results() -> None:
15+
test_code = """import asyncio
16+
import pytest
17+
from code_to_optimize.async_bubble_sort import async_sorter
18+
19+
20+
@pytest.mark.asyncio
21+
async def test_async_sort():
22+
input = [5, 4, 3, 2, 1, 0]
23+
output = await async_sorter(input)
24+
assert output == [0, 1, 2, 3, 4, 5]
25+
26+
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
27+
output = await async_sorter(input)
28+
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]"""
29+
30+
test_path = (
31+
Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_async_bubble_sort_temp.py"
32+
).resolve()
33+
test_path_perf = (
34+
Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_async_bubble_sort_perf_temp.py"
35+
).resolve()
36+
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/async_bubble_sort.py").resolve()
37+
original_code = fto_path.read_text("utf-8")
38+
39+
try:
40+
# Write test file
41+
with test_path.open("w") as f:
42+
f.write(test_code)
43+
44+
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
45+
project_root_path = (Path(__file__).parent / "..").resolve()
46+
47+
# Create async function to optimize
48+
func = FunctionToOptimize(function_name="async_sorter", parents=[], file_path=Path(fto_path), is_async=True)
49+
50+
# For async functions, instrument the source module directly with decorators
51+
source_success, instrumented_source = instrument_source_module_with_async_decorators(
52+
fto_path, func, TestingMode.BEHAVIOR
53+
)
54+
55+
assert source_success
56+
assert instrumented_source is not None
57+
assert '''import asyncio\nfrom typing import List, Union\n\nfrom codeflash.code_utils.codeflash_wrap_decorator import \\\n codeflash_behavior_async\n\n\n@codeflash_behavior_async\nasync def async_sorter(lst: List[Union[int, float]]) -> List[Union[int, float]]:\n """\n Async bubble sort implementation for testing.\n """\n print("codeflash stdout: Async sorting list")\n \n # Add some async delay to simulate async work\n await asyncio.sleep(0.01)\n \n n = len(lst)\n for i in range(n):\n for j in range(0, n - i - 1):\n if lst[j] > lst[j + 1]:\n lst[j], lst[j + 1] = lst[j + 1], lst[j]\n \n result = lst.copy()\n print(f"result: {result}")\n return result\n\n\nclass AsyncBubbleSorter:\n """Class with async sorting method for testing."""\n \n async def sorter(self, lst: List[Union[int, float]]) -> List[Union[int, float]]:\n """\n Async bubble sort implementation within a class.\n """\n print("codeflash stdout: AsyncBubbleSorter.sorter() called")\n \n # Add some async delay\n await asyncio.sleep(0.005)\n \n n = len(lst)\n for i in range(n):\n for j in range(0, n - i - 1):\n if lst[j] > lst[j + 1]:\n lst[j], lst[j + 1] = lst[j + 1], lst[j]\n \n result = lst.copy()\n return result\n''' in instrumented_source
58+
59+
# Write the instrumented source back
60+
fto_path.write_text(instrumented_source, "utf-8")
61+
62+
# Add codeflash capture
63+
instrument_codeflash_capture(func, {}, tests_root)
64+
65+
# Create optimizer
66+
opt = Optimizer(
67+
Namespace(
68+
project_root=project_root_path,
69+
disable_telemetry=True,
70+
tests_root=tests_root,
71+
test_framework="pytest",
72+
pytest_cmd="pytest",
73+
experiment_id=None,
74+
test_project_root=project_root_path,
75+
)
76+
)
77+
78+
test_env = os.environ.copy()
79+
test_env["CODEFLASH_TEST_ITERATION"] = "0"
80+
test_env["CODEFLASH_LOOP_INDEX"] = "1"
81+
test_type = TestType.EXISTING_UNIT_TEST
82+
83+
# Create function optimizer and set up test files
84+
func_optimizer = opt.create_function_optimizer(func)
85+
func_optimizer.test_files = TestFiles(
86+
test_files=[
87+
TestFile(
88+
instrumented_behavior_file_path=test_path,
89+
test_type=test_type,
90+
original_file_path=test_path,
91+
benchmarking_file_path=test_path_perf,
92+
)
93+
]
94+
)
95+
96+
test_results, coverage_data = func_optimizer.run_and_parse_tests(
97+
testing_type=TestingMode.BEHAVIOR,
98+
test_env=test_env,
99+
test_files=func_optimizer.test_files,
100+
optimization_iteration=0,
101+
pytest_min_loops=1,
102+
pytest_max_loops=1,
103+
testing_time=0.1,
104+
)
105+
106+
assert test_results is not None
107+
assert test_results.test_results is not None
108+
109+
results_list = test_results.test_results
110+
assert results_list[0].id.function_getting_tested == "async_sorter"
111+
assert results_list[0].id.test_class_name == "PytestPluginManager"
112+
assert results_list[0].id.test_function_name == "test_async_sort"
113+
assert results_list[0].did_pass
114+
assert results_list[0].runtime is None or results_list[0].runtime >= 0
115+
116+
expected_stdout = "codeflash stdout: Async sorting list\nresult: [0, 1, 2, 3, 4, 5]\n"
117+
assert expected_stdout == results_list[0].stdout
118+
119+
120+
if len(results_list) > 1:
121+
assert results_list[1].id.function_getting_tested == "async_sorter"
122+
assert results_list[1].id.test_function_name == "test_async_sort"
123+
assert results_list[1].did_pass
124+
125+
expected_stdout2 = "codeflash stdout: Async sorting list\nresult: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]\n"
126+
assert expected_stdout2 == results_list[1].stdout
127+
128+
finally:
129+
# Restore original code
130+
fto_path.write_text(original_code, "utf-8")
131+
# Clean up test files
132+
if test_path.exists():
133+
test_path.unlink()
134+
if test_path_perf.exists():
135+
test_path_perf.unlink()
136+
137+
138+
def test_async_class_method_behavior_results() -> None:
139+
"""Test async class method behavior with run_and_parse_tests."""
140+
test_code = """import asyncio
141+
import pytest
142+
from code_to_optimize.async_bubble_sort import AsyncBubbleSorter
143+
144+
145+
@pytest.mark.asyncio
146+
async def test_async_class_sort():
147+
sorter = AsyncBubbleSorter()
148+
input = [3, 1, 4, 1, 5]
149+
output = await sorter.sorter(input)
150+
assert output == [1, 1, 3, 4, 5]"""
151+
152+
test_path = (
153+
Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_async_class_bubble_sort_temp.py"
154+
).resolve()
155+
test_path_perf = (
156+
Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_async_class_bubble_sort_perf_temp.py"
157+
).resolve()
158+
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/async_bubble_sort.py").resolve()
159+
original_code = fto_path.read_text("utf-8")
160+
161+
try:
162+
with test_path.open("w") as f:
163+
f.write(test_code)
164+
165+
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
166+
project_root_path = (Path(__file__).parent / "..").resolve()
167+
168+
func = FunctionToOptimize(
169+
function_name="sorter",
170+
parents=[FunctionParent("AsyncBubbleSorter", "ClassDef")],
171+
file_path=Path(fto_path),
172+
is_async=True,
173+
)
174+
175+
source_success, instrumented_source = instrument_source_module_with_async_decorators(
176+
fto_path, func, TestingMode.BEHAVIOR
177+
)
178+
179+
assert source_success
180+
assert instrumented_source is not None
181+
assert "@codeflash_behavior_async" in instrumented_source
182+
183+
fto_path.write_text(instrumented_source, "utf-8")
184+
185+
instrument_codeflash_capture(func, {}, tests_root)
186+
187+
opt = Optimizer(
188+
Namespace(
189+
project_root=project_root_path,
190+
disable_telemetry=True,
191+
tests_root=tests_root,
192+
test_framework="pytest",
193+
pytest_cmd="pytest",
194+
experiment_id=None,
195+
test_project_root=project_root_path,
196+
)
197+
)
198+
199+
test_env = os.environ.copy()
200+
test_env["CODEFLASH_TEST_ITERATION"] = "0"
201+
test_env["CODEFLASH_LOOP_INDEX"] = "1"
202+
test_type = TestType.EXISTING_UNIT_TEST
203+
204+
func_optimizer = opt.create_function_optimizer(func)
205+
func_optimizer.test_files = TestFiles(
206+
test_files=[
207+
TestFile(
208+
instrumented_behavior_file_path=test_path,
209+
test_type=test_type,
210+
original_file_path=test_path,
211+
benchmarking_file_path=test_path_perf,
212+
)
213+
]
214+
)
215+
216+
test_results, coverage_data = func_optimizer.run_and_parse_tests(
217+
testing_type=TestingMode.BEHAVIOR,
218+
test_env=test_env,
219+
test_files=func_optimizer.test_files,
220+
optimization_iteration=0,
221+
pytest_min_loops=1,
222+
pytest_max_loops=1,
223+
testing_time=0.1,
224+
)
225+
226+
227+
assert test_results is not None
228+
assert test_results.test_results is not None
229+
230+
results_list = test_results.test_results
231+
assert len(results_list) == 2, f"Expected 2 results but got {len(results_list)}: {[r.id.function_getting_tested for r in results_list]}"
232+
233+
init_result = results_list[0]
234+
sorter_result = results_list[1]
235+
236+
237+
assert sorter_result.id.function_getting_tested == "sorter"
238+
assert sorter_result.id.test_class_name == "PytestPluginManager"
239+
assert sorter_result.id.test_function_name == "test_async_class_sort"
240+
assert sorter_result.did_pass
241+
assert sorter_result.runtime is None or sorter_result.runtime >= 0
242+
243+
expected_stdout = "codeflash stdout: AsyncBubbleSorter.sorter() called\n"
244+
assert expected_stdout == sorter_result.stdout
245+
246+
assert ".__init__" in init_result.id.function_getting_tested
247+
assert init_result.did_pass
248+
249+
finally:
250+
fto_path.write_text(original_code, "utf-8")
251+
if test_path.exists():
252+
test_path.unlink()
253+
if test_path_perf.exists():
254+
test_path_perf.unlink()
255+
256+
257+
def test_async_function_performance_mode() -> None:
258+
test_code = """import asyncio
259+
import pytest
260+
from code_to_optimize.async_bubble_sort import async_sorter
261+
262+
263+
@pytest.mark.asyncio
264+
async def test_async_perf():
265+
input = [8, 7, 6, 5, 4, 3, 2, 1]
266+
output = await async_sorter(input)
267+
assert output == [1, 2, 3, 4, 5, 6, 7, 8]"""
268+
269+
test_path = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_async_perf_temp.py").resolve()
270+
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/async_bubble_sort.py").resolve()
271+
original_code = fto_path.read_text("utf-8")
272+
273+
try:
274+
with test_path.open("w") as f:
275+
f.write(test_code)
276+
277+
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
278+
project_root_path = (Path(__file__).parent / "..").resolve()
279+
280+
# Create async function to optimize
281+
func = FunctionToOptimize(function_name="async_sorter", parents=[], file_path=Path(fto_path), is_async=True)
282+
283+
# Instrument the source module with async performance decorators
284+
source_success, instrumented_source = instrument_source_module_with_async_decorators(
285+
fto_path, func, TestingMode.PERFORMANCE
286+
)
287+
288+
assert source_success
289+
assert instrumented_source is not None
290+
assert '''import asyncio\nfrom typing import List, Union\n\nfrom codeflash.code_utils.codeflash_wrap_decorator import \\\n codeflash_performance_async\n\n\n@codeflash_performance_async\nasync def async_sorter(lst: List[Union[int, float]]) -> List[Union[int, float]]:\n """\n Async bubble sort implementation for testing.\n """\n print("codeflash stdout: Async sorting list")\n \n # Add some async delay to simulate async work\n await asyncio.sleep(0.01)\n \n n = len(lst)\n for i in range(n):\n for j in range(0, n - i - 1):\n if lst[j] > lst[j + 1]:\n lst[j], lst[j + 1] = lst[j + 1], lst[j]\n \n result = lst.copy()\n print(f"result: {result}")\n return result\n\n\nclass AsyncBubbleSorter:\n """Class with async sorting method for testing."""\n \n async def sorter(self, lst: List[Union[int, float]]) -> List[Union[int, float]]:\n """\n Async bubble sort implementation within a class.\n """\n print("codeflash stdout: AsyncBubbleSorter.sorter() called")\n \n # Add some async delay\n await asyncio.sleep(0.005)\n \n n = len(lst)\n for i in range(n):\n for j in range(0, n - i - 1):\n if lst[j] > lst[j + 1]:\n lst[j], lst[j + 1] = lst[j + 1], lst[j]\n \n result = lst.copy()\n return result\n''' == instrumented_source
291+
292+
fto_path.write_text(instrumented_source, "utf-8")
293+
294+
instrument_codeflash_capture(func, {}, tests_root)
295+
296+
opt = Optimizer(
297+
Namespace(
298+
project_root=project_root_path,
299+
disable_telemetry=True,
300+
tests_root=tests_root,
301+
test_framework="pytest",
302+
pytest_cmd="pytest",
303+
experiment_id=None,
304+
test_project_root=project_root_path,
305+
)
306+
)
307+
308+
test_env = os.environ.copy()
309+
test_env["CODEFLASH_TEST_ITERATION"] = "0"
310+
test_env["CODEFLASH_LOOP_INDEX"] = "1"
311+
test_type = TestType.EXISTING_UNIT_TEST
312+
313+
func_optimizer = opt.create_function_optimizer(func)
314+
func_optimizer.test_files = TestFiles(
315+
test_files=[
316+
TestFile(
317+
instrumented_behavior_file_path=test_path,
318+
test_type=test_type,
319+
original_file_path=test_path,
320+
benchmarking_file_path=test_path, # Same file for perf
321+
)
322+
]
323+
)
324+
325+
test_results, coverage_data = func_optimizer.run_and_parse_tests(
326+
testing_type=TestingMode.PERFORMANCE,
327+
test_env=test_env,
328+
test_files=func_optimizer.test_files,
329+
optimization_iteration=0,
330+
pytest_min_loops=1,
331+
pytest_max_loops=1,
332+
testing_time=0.1,
333+
)
334+
335+
assert test_results is not None
336+
assert test_results.test_results is not None
337+
338+
finally:
339+
# Restore original code
340+
fto_path.write_text(original_code, "utf-8")
341+
# Clean up test files
342+
if test_path.exists():
343+
test_path.unlink()

0 commit comments

Comments
 (0)