Skip to content

Conversation

@KRRT7
Copy link
Contributor

@KRRT7 KRRT7 commented Oct 26, 2025

PR Type

Enhancement, Tests


Description

  • Add Hypothesis test generation pipeline

  • Discover and track Hypothesis tests

  • Compare Hypothesis results semantically

  • Cleanup Hypothesis temp directories


Diagram Walkthrough

flowchart LR
  gen["generate_hypothesis_tests()"] -- "creates temp suite" --> hypoDir["hypothesis_tests_dir"]
  gen -- "returns map + code" --> f2t["function_to_hypothesis_tests"]
  fo["FunctionOptimizer"] -- "merge tests" --> f2tAll["function_to_all_tests"]
  discover["discover_unit_tests()"] -- "HYPOTHESIS_TEST type" --> mapping["function_to_tests map"]
  equiv["compare_test_results()"] -- "semantic compare" --> hypo["Hypothesis results"]
  opt["Optimizer"] -- "track dirs" --> cleanup["cleanup_temporary_paths()"]
Loading

File Walkthrough

Relevant files
Enhancement
8 files
code_utils.py
Add helper to build qualified function path                           
+5/-0     
discover_unit_tests.py
AST-based discovery and Hypothesis test support                   
+153/-1 
test_type.py
Introduce Hypothesis test type and label                                 
+2/-0     
function_optimizer.py
Integrate Hypothesis generation, merge, and cleanup           
+72/-15 
optimizer.py
Track and cleanup Hypothesis test directories                       
+8/-1     
concolic_testing.py
Use helper for qualified function path; filter discovery 
+9/-10   
equivalence.py
Semantic comparison for Hypothesis test results                   
+106/-1 
hypothesis_testing.py
Implement Hypothesis ghostwriter generation and filtering
+280/-0 
Formatting
1 files
env_utils.py
No-op whitespace change in is_pr_draft                                     
+1/-1     
Tests
1 files
test_hypothesis_testing.py
Unit tests for deterministic Hypothesis adjustments           
+158/-0 
Dependencies
1 files
pyproject.toml
Add Hypothesis dependency                                                               
+1/-0     

KRRT7 and others added 13 commits October 22, 2025 03:54
- Modified generate_hypothesis_tests() to return the temp directory Path
- Added hypothesis_tests_dir tracking in FunctionOptimizer
- Extended cleanup_generated_files() to remove hypothesis test directories
- Added hypothesis_tests_dirs list in Optimizer to track all directories
- Updated cleanup_temporary_paths() to cleanup hypothesis test directories
- Ensures cleanup on success, errors, and KeyboardInterrupt
- Changed temp dir prefix to 'codeflash_hypothesis_' for clarity
@github-actions
Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

AST Resolution Robustness

The AST-based call discovery resolves imports via a simple alias map and split on the first dot; this may miss relative imports, star imports, or nested aliasing and fail on calls via factory wrappers. Validate coverage on common import patterns in generated tests and ensure no false negatives.

is_generated_test_file = (
    any(
        tf.test_type in (TestType.HYPOTHESIS_TEST, TestType.CONCOLIC_COVERAGE_TEST) for tf in test_functions
    )
    if test_functions
    else any(
        func.test_type in (TestType.HYPOTHESIS_TEST, TestType.CONCOLIC_COVERAGE_TEST) for func in functions
    )
)

# For generated tests, use AST-based discovery since Jedi often fails
if is_generated_test_file and functions_to_optimize:
    logger.debug(f"Using AST-based discovery for generated test file: {test_file.name}")
    target_qualified_names = {
        func.qualified_name_with_modules_from_root(project_root_path) for func in functions_to_optimize
    }

    if not test_functions:
        logger.debug("Jedi found no functions, building test_functions from collected functions")
        test_functions = {
            TestFunction(
                function_name=func.test_function,
                test_class=func.test_class,
                parameters=None,
                test_type=func.test_type,
            )
            for func in functions
        }

    ast_results = _discover_calls_via_ast(test_file, test_functions, target_qualified_names)

    for qualified_name, matches in ast_results.items():
        for test_func, position in matches:
            if test_func.parameters is not None:
                if test_framework == "pytest":
                    scope_test_function = f"{test_func.function_name}[{test_func.parameters}]"
                else:  # unittest
                    scope_test_function = f"{test_func.function_name}_{test_func.parameters}"
            else:
                scope_test_function = test_func.function_name

            function_to_test_map[qualified_name].add(
                FunctionCalledInTest(
                    tests_in_file=TestsInFile(
                        test_file=test_file,
                        test_class=test_func.test_class,
                        test_function=scope_test_function,
                        test_type=test_func.test_type,
                    ),
                    position=position,
                )
            )
            tests_cache.insert_test(
                file_path=str(test_file),
                file_hash=file_hash,
                qualified_name_with_modules_from_root=qualified_name,
AST Unparse Compatibility

Functions rely on ast.unparse which is Python 3.9+; ensure runtime env matches and that formatting round-trips don’t change semantics. Consider fallback if unparse fails or different Python versions are used.

            new_body.append(node)

    new_tree = ast.Module(body=new_body, type_ignores=[])
    return ast.unparse(new_tree)


def filter_hypothesis_tests_by_function_name(code: str, function_name: str) -> str:
    """Filter hypothesis tests to only include tests matching the function name.

    Preserves all imports, module-level assignments, and only test functions
    that contain the target function name.

    Args:
        code: The hypothesis test code to filter
        function_name: The name of the function being tested

    Returns:
        Filtered code with only matching tests

    """
    tree = ast.parse(code)

    class TestFunctionRemover(ast.NodeTransformer):
        def visit_Module(self, node):  # noqa: ANN001, ANN202
            # Filter body to keep imports, module-level assignments, and matching test functions
            new_body = []
            for item in node.body:
                if isinstance(item, (ast.Import, ast.ImportFrom, ast.Assign)):
                    # Keep all imports and module-level assignments
                    new_body.append(item)
                elif isinstance(item, ast.FunctionDef) and item.name.startswith("test_") and function_name in item.name:
                    # Only keep test functions that match the function name
                    new_body.append(item)
            node.body = new_body
            return node

    modified_tree = TestFunctionRemover().visit(tree)
    ast.fix_missing_locations(modified_tree)
    return ast.unparse(modified_tree)
Hypothesis Compare Semantics

Semantic comparison checks pass/fail per test function but ignores mismatches where only one side has that test function; current flow may skip those due to early handling of IDs. Verify behavior when Hypothesis generates a test function that only appears on one side.

def _compare_hypothesis_tests_semantic(original_hypothesis: list, candidate_hypothesis: list) -> bool:
    """Compare Hypothesis tests by test function, not by example count.

    Hypothesis can generate different numbers of examples between runs due to:
    - Timing differences
    - Early stopping
    - Shrinking behavior
    - Performance differences

    What matters is whether the test functions themselves pass or fail,
    not how many examples Hypothesis generated.
    """

    # Group by test function (excluding loop index and iteration_id from comparison)
    def get_test_key(test_result: FunctionTestInvocation) -> tuple[str, str, str, str]:
        """Get unique key for a Hypothesis test function."""
        return (
            test_result.id.test_module_path,
            test_result.id.test_class_name,
            test_result.id.test_function_name,
            test_result.id.function_getting_tested,
        )

    # Group original results by test function
    original_by_func = defaultdict(list)
    for result in original_hypothesis:
        original_by_func[get_test_key(result)].append(result)

    # Group candidate results by test function
    candidate_by_func = defaultdict(list)
    for result in candidate_hypothesis:
        candidate_by_func[get_test_key(result)].append(result)

    # Log summary statistics
    orig_total_examples = sum(len(examples) for examples in original_by_func.values())
    cand_total_examples = sum(len(examples) for examples in candidate_by_func.values())

    logger.debug(
        f"Hypothesis comparison: Original={len(original_by_func)} test functions ({orig_total_examples} examples), "
        f"Candidate={len(candidate_by_func)} test functions ({cand_total_examples} examples)"
    )

    for test_key in original_by_func:
        if test_key not in candidate_by_func:
            continue  # Already handled above

        orig_examples = original_by_func[test_key]
        cand_examples = candidate_by_func[test_key]

        # Check if any original example failed
        orig_had_failure = any(not ex.did_pass for ex in orig_examples)
        cand_had_failure = any(not ex.did_pass for ex in cand_examples)

        # If original had failures, candidate must also have failures (or be missing, already handled)
        # If original passed, candidate must pass (but can have different example counts)
        if orig_had_failure != cand_had_failure:
            logger.debug(
                f"Hypothesis test function behavior mismatch: {test_key} "
                f"(original_failed={orig_had_failure}, candidate_failed={cand_had_failure})"
            )
            return False
    return True

@github-actions
Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Make module path resolution robust

Guard against paths outside project_root_path to avoid ValueError and wrong module
paths on symlinked or resolved paths. Use .resolve() on both paths and fall back to
the existing module_name_from_file_path traversal logic when direct relative_to
fails.

codeflash/code_utils/code_utils.py [257-259]

 def get_qualified_function_path(file_path: Path, project_root_path: Path, qualified_name: str) -> str:
-    module_path = file_path.relative_to(project_root_path).with_suffix("").as_posix().replace("/", ".")
+    try:
+        module_path = file_path.resolve().relative_to(project_root_path.resolve()).with_suffix("").as_posix().replace("/", ".")
+    except ValueError:
+        # Fall back to robust traversal if file is not directly under project_root_path
+        module_path = module_name_from_file_path(file_path, project_root_path)
     return f"{module_path}.{qualified_name}"
Suggestion importance[1-10]: 7

__

Why: The enhancement correctly guards Path.relative_to with resolve() and sensibly falls back to existing module_name_from_file_path, improving robustness for symlinks/out-of-root cases. Moderate impact and accurate to the PR context where this helper is newly added.

Medium
Avoid NameError for settings decorator

Ensure the settings decorator is fully qualified when only hypothesis.settings is
available. Before inserting, detect existing from hypothesis import settings or add
from hypothesis import settings; otherwise, wrap as hypothesis.settings(...) if only
import hypothesis exists to avoid NameError.

codeflash/verification/hypothesis_testing.py [101-180]

 def make_hypothesis_tests_deterministic(code: str) -> str:
     """Add @settings(derandomize=True) decorator and constrain strategies to make Hypothesis tests deterministic."""
     try:
         tree = ast.parse(code)
     except SyntaxError:
         return code
-...
-        if settings_decorator:
-            if not any(k.arg == "derandomize" for k in settings_decorator.keywords):
-                settings_decorator.keywords.append(ast.keyword(arg="derandomize", value=ast.Constant(value=True)))
-        else:
-            node.decorator_list.append(
-                ast.Call(
-                    func=ast.Name(id="settings", ctx=ast.Load()),
-                    args=[],
-                    keywords=[ast.keyword(arg="derandomize", value=ast.Constant(value=True))],
+
+    has_from_import_settings = any(
+        isinstance(node, ast.ImportFrom)
+        and node.module == "hypothesis"
+        and any(alias.name == "settings" for alias in node.names)
+        for node in tree.body
+    )
+    has_import_hypothesis = any(isinstance(node, ast.Import) and any(a.name == "hypothesis" for a in node.names) for node in tree.body)
+
+    # Insert import if needed
+    if not has_from_import_settings and not has_import_hypothesis:
+        tree.body.insert(0, ast.parse("from hypothesis import settings").body[0])
+
+    class StrategyConstrainer(ast.NodeTransformer):
+        def visit_Call(self, node: ast.Call) -> ast.Call:
+            self.generic_visit(node)
+            if (
+                isinstance(node.func, ast.Attribute)
+                and isinstance(node.func.value, ast.Name)
+                and node.func.value.id == "st"
+            ):
+                if node.func.attr == "floats" and not any(
+                    k.arg in ["min_value", "max_value", "allow_nan", "allow_infinity"] for k in node.keywords
+                ):
+                    node.keywords.extend(
+                        [
+                            ast.keyword(arg="min_value", value=ast.UnaryOp(op=ast.USub(), operand=ast.Constant(value=1e6))),
+                            ast.keyword(arg="max_value", value=ast.Constant(value=1e6)),
+                            ast.keyword(arg="allow_nan", value=ast.Constant(value=False)),
+                            ast.keyword(arg="allow_infinity", value=ast.Constant(value=False)),
+                        ]
+                    )
+                elif node.func.attr == "integers" and not any(k.arg in ["min_value", "max_value"] for k in node.keywords):
+                    node.keywords.extend(
+                        [
+                            ast.keyword(arg="min_value", value=ast.Constant(value=-10000)),
+                            ast.keyword(arg="max_value", value=ast.Constant(value=10000)),
+                        ]
+                    )
+            return node
+
+    tree = StrategyConstrainer().visit(tree)
+    ast.fix_missing_locations(tree)
+
+    def settings_call() -> ast.expr:
+        if has_from_import_settings or not has_import_hypothesis:
+            return ast.Name(id="settings", ctx=ast.Load())
+        # Use fully-qualified call if only `import hypothesis` exists
+        return ast.Attribute(value=ast.Name(id="hypothesis", ctx=ast.Load()), attr="settings", ctx=ast.Load())
+
+    for node in ast.walk(tree):
+        if isinstance(node, ast.FunctionDef):
+            settings_decorator = next(
+                (d for d in node.decorator_list if isinstance(d, ast.Call) and (
+                    (isinstance(d.func, ast.Name) and d.func.id == "settings") or
+                    (isinstance(d.func, ast.Attribute) and isinstance(d.func.value, ast.Name) and d.func.value.id == "hypothesis" and d.func.attr == "settings")
+                )),
+                None,
+            )
+            if settings_decorator:
+                if not any(k.arg == "derandomize" for k in settings_decorator.keywords):
+                    settings_decorator.keywords.append(ast.keyword(arg="derandomize", value=ast.Constant(value=True)))
+            else:
+                node.decorator_list.append(
+                    ast.Call(func=settings_call(), args=[], keywords=[ast.keyword(arg="derandomize", value=ast.Constant(value=True))])
                 )
-            )
 
+    return ast.unparse(tree)
+
Suggestion importance[1-10]: 6

__

Why: The change strengthens make_hypothesis_tests_deterministic to work when only import hypothesis exists, preventing potential NameError and maintaining functionality. It's contextually accurate and beneficial though not critical.

Low
Normalize import resolution in AST

Normalize resolved names to avoid false negatives caused by relative imports and
aliasing. Expand leading dots in ast.ImportFrom and normalize both resolved and
target_qualified_names by stripping redundant dots.

codeflash/discovery/discover_unit_tests.py [83-136]

 def _discover_calls_via_ast(
     test_file: Path, test_functions: set[TestFunction], target_qualified_names: set[str]
 ) -> dict[str, list[tuple[TestFunction, CodePosition]]]:
     try:
         with test_file.open("r", encoding="utf-8") as f:
             source = f.read()
         tree = ast.parse(source, filename=str(test_file))
     except (SyntaxError, FileNotFoundError) as e:
         logger.debug(f"AST parsing failed for {test_file}: {e}")
         return {}
-...
-        if parts[0] in import_map:
-            resolved = f"{import_map[parts[0]]}.{parts[1]}" if len(parts) == 2 else import_map[parts[0]]
 
-            if resolved in target_qualified_names:
-                result[resolved].append((test_func, CodePosition(line_no=child.lineno, col_no=child.col_offset)))
+    import_map: dict[str, str] = {}
+    module_pkg: str | None = None
+    # Attempt to infer current module path for relative imports
+    try:
+        rel = test_file.with_suffix("").as_posix().replace("/", ".")
+        module_pkg = rel.rsplit(".", 1)[0] if "." in rel else None
+    except Exception:
+        module_pkg = None
 
+    for node in ast.walk(tree):
+        if isinstance(node, ast.Import):
+            for alias in node.names:
+                name = alias.asname or alias.name
+                import_map[name] = alias.name
+        elif isinstance(node, ast.ImportFrom) and node.module is not None:
+            base = node.module
+            # Handle relative levels (e.g., from .sub import x)
+            if getattr(node, "level", 0):
+                if module_pkg:
+                    parts = module_pkg.split(".")
+                    level = int(node.level)
+                    base = ".".join(parts[: max(0, len(parts) - level)]) + (("." + base) if base else "")
+                    base = base.strip(".")
+            for alias in node.names:
+                if alias.name != "*":
+                    full_name = f"{base}.{alias.name}" if base else alias.name
+                    name = alias.asname or alias.name
+                    import_map[name] = full_name
+
+    test_funcs_by_name = {tf.function_name: tf for tf in test_functions}
+    result = defaultdict(list)
+    normalized_targets = {t.strip(".") for t in target_qualified_names}
+
+    for node in ast.walk(tree):
+        if not isinstance(node, ast.FunctionDef) or node.name not in test_funcs_by_name:
+            continue
+        test_func = test_funcs_by_name[node.name]
+        for child in ast.walk(node):
+            if not isinstance(child, ast.Call):
+                continue
+            call_name = _extract_dotted_call_name(child.func)
+            if not call_name:
+                continue
+            # direct match
+            if call_name.strip(".") in normalized_targets:
+                result[call_name.strip(".")].append(
+                    (test_func, CodePosition(line_no=child.lineno, col_no=child.col_offset))
+                )
+                continue
+            parts = call_name.split(".", 1)
+            if parts[0] in import_map:
+                resolved = f"{import_map[parts[0]]}.{parts[1]}" if len(parts) == 2 else import_map[parts[0]]
+                norm = resolved.strip(".")
+                if norm in normalized_targets:
+                    result[norm].append((test_func, CodePosition(line_no=child.lineno, col_no=child.col_offset)))
+
+    return dict(result)
+
Suggestion importance[1-10]: 5

__

Why: Accounting for relative imports and normalizing names can reduce false negatives; the proposed logic aligns with the added AST discovery block. It's a reasonable improvement but not critical and introduces heuristic complexity without clear PR evidence of such issues.

Low

The optimized code achieves a **32% speedup** by eliminating redundant data structures and reducing iteration overhead through two key optimizations:

**1. Single-pass aggregation instead of list accumulation:**
- **Original**: Uses `defaultdict(list)` to collect all `FunctionTestInvocation` objects per test function, then later iterates through these lists to compute failure flags with `any(not ex.did_pass for ex in orig_examples)`
- **Optimized**: Uses plain dicts with 2-element lists `[count, had_failure]` to track both example count and failure status in a single pass, eliminating the need to store individual test objects or re-scan them

**2. Reduced memory allocation and access patterns:**
- **Original**: Creates and stores complete lists of test objects (up to 9,458 objects in large test cases), then performs expensive `any()` operations over these lists
- **Optimized**: Uses compact 2-item lists per test function, avoiding object accumulation and expensive linear scans

The line profiler shows the key performance gains:
- Lines with `any(not ex.did_pass...)` in original (10.1% and 10.2% of total time) are completely eliminated
- The `setdefault()` operations replace the more expensive `defaultdict(list).append()` calls
- Overall reduction from storing ~9,458 objects to just tracking summary statistics

**Best performance gains** occur in test cases with:
- **Large numbers of examples per test function** (up to 105% faster for `test_large_scale_all_fail`)
- **Many distinct test functions** (up to 75% faster for `test_large_scale_some_failures`) 
- **Mixed pass/fail scenarios** where the original's `any()` operations were most expensive

The optimization maintains identical behavior while dramatically reducing both memory usage and computational complexity from O(examples) to O(1) per test function group.
@codeflash-ai
Copy link
Contributor

codeflash-ai bot commented Oct 26, 2025

⚡️ Codeflash found optimizations for this PR

📄 32% (0.32x) speedup for _compare_hypothesis_tests_semantic in codeflash/verification/equivalence.py

⏱️ Runtime : 4.67 milliseconds 3.53 milliseconds (best of 284 runs)

A dependent PR with the suggested changes has been created. Please review:

If you approve, it will be merged into this PR (branch feat/hypothesis-tests).

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
@codeflash-ai
Copy link
Contributor

codeflash-ai bot commented Oct 26, 2025

This PR is now faster! 🚀 Kevin Turcios accepted my code suggestion above.

…25-10-26T20.37.41

⚡️ Speed up function `_compare_hypothesis_tests_semantic` by 32% in PR #857 (`feat/hypothesis-tests`)
@codeflash-ai
Copy link
Contributor

codeflash-ai bot commented Oct 26, 2025

This PR is now faster! 🚀 @KRRT7 accepted my optimizations from:

@KRRT7 KRRT7 requested a review from misrasaurabh1 October 26, 2025 23:37
@KRRT7 KRRT7 marked this pull request as draft October 30, 2025 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants