diff --git a/qualityflow/README.md b/qualityflow/README.md index dd69e58e..febb3f76 100644 --- a/qualityflow/README.md +++ b/qualityflow/README.md @@ -33,8 +33,10 @@ The main pipeline handles the complete test generation workflow: 3. **Code Analysis** - Select files for testing (with max_files limit) 4. **LLM Test Generation** - Generate tests using OpenAI/Anthropic/fake providers 5. **Baseline Generation** - Create simple heuristic tests for comparison -6. **Test Execution** - Run both test suites with coverage analysis -7. **Report Generation** - Compare results and generate markdown reports +6. **Agent Test Execution** - Run LLM-generated tests with coverage analysis +7. **Baseline Test Execution** - Run baseline tests with coverage analysis +8. **Coverage Evaluation** - Compare and analyze coverage metrics between approaches +9. **Report Generation** - Generate comprehensive markdown reports with comparisons ### 🔧 Architecture @@ -55,14 +57,18 @@ The main pipeline handles the complete test generation workflow: │ │ Generate & Evaluate │ │ │ │ │ │ │ │ 1. Select Input → 2. Fetch Source → 3. Analyze │ │ -│ │ 4. Generate (LLM) → 5. Generate (Base) → 6. Run Tests │ │ -│ │ 7. Run Tests → 8. Report & Compare │ │ +│ │ 4. Generate (LLM) → 5. Generate (Base) → 6. Run Agent │ │ +│ │ 7. Run Baseline → 8. Evaluate → 9. Report │ │ │ │ │ │ │ │ Features: max_files control, Path artifacts, metadata │ │ │ └─────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` +

+ QualityFlow Architecture +

+ ## 🚀 Quick Start Get QualityFlow running in 3 simple steps: @@ -302,6 +308,10 @@ After running QualityFlow successfully: 4. **Deploy to Production**: Use cloud orchestration for scale 5. **Set Up Monitoring**: Configure alerts for regression detection +## ⚠️ Known Limitations + +- **CHANGED_FILES Strategy**: The `CHANGED_FILES` selection strategy in `analyze_code.py` is currently a stub implementation that falls back to selecting all files. In production, this should use `git diff` to identify modified files for targeted test generation. + ## 🆘 Troubleshooting ### Common Issues diff --git a/qualityflow/pipelines/generate_and_evaluate.py b/qualityflow/pipelines/generate_and_evaluate.py index 7050b5bd..b3a710b2 100644 --- a/qualityflow/pipelines/generate_and_evaluate.py +++ b/qualityflow/pipelines/generate_and_evaluate.py @@ -3,6 +3,7 @@ """ from steps.analyze_code import analyze_code +from steps.evaluate_coverage import evaluate_coverage from steps.fetch_source import fetch_source from steps.gen_tests_agent import gen_tests_agent from steps.gen_tests_baseline import gen_tests_baseline @@ -23,7 +24,8 @@ def generate_and_evaluate() -> None: 1. Analyze code to find files needing tests 2. Generate tests using LLM and baseline approaches 3. Run tests and measure coverage - 4. Report results for comparison + 4. Evaluate and compare coverage metrics + 5. Report results for comparison """ # Step 1: Resolve source specification spec = select_input() @@ -50,11 +52,19 @@ def generate_and_evaluate() -> None: workspace_dir, baseline_tests_dir, label="baseline" ) - # Step 8: Generate comprehensive report (includes evaluation) + # Step 8: Evaluate coverage metrics + evaluation_metrics = evaluate_coverage( + agent_results, + baseline_results, + commit_sha, + ) + + # Step 9: Generate comprehensive report report( workspace_dir, commit_sha, test_summary, agent_results, baseline_results, + evaluation_metrics, ) diff --git a/qualityflow/prompts/unit_test_strict_v2.jinja b/qualityflow/prompts/unit_test_strict_v2.jinja index 32dd2643..5446ce8e 100644 --- a/qualityflow/prompts/unit_test_strict_v2.jinja +++ b/qualityflow/prompts/unit_test_strict_v2.jinja @@ -69,8 +69,8 @@ import tempfile import os from contextlib import contextmanager -# Import the module under test -# from {{ file_path.replace('/', '.').replace('.py', '') }} import * +# Import the module under test (adjust import paths as needed) +from {{ file_path.replace('/', '.').replace('.py', '') }} import * class Test{{ file_path.split('/')[-1].replace('.py', '').title() }}(unittest.TestCase): """Comprehensive test suite for {{ file_path }}.""" diff --git a/qualityflow/prompts/unit_test_v1.jinja b/qualityflow/prompts/unit_test_v1.jinja index 1c1cd444..ae3aa374 100644 --- a/qualityflow/prompts/unit_test_v1.jinja +++ b/qualityflow/prompts/unit_test_v1.jinja @@ -44,8 +44,8 @@ import pytest import unittest from unittest.mock import Mock, patch, MagicMock -# Import the module under test -# from {{ file_path.replace('/', '.').replace('.py', '') }} import * +# Import the module under test (uncomment and adjust as needed) +from {{ file_path.replace('/', '.').replace('.py', '') }} import * class TestModule(unittest.TestCase): """Test suite for {{ file_path }}.""" diff --git a/qualityflow/requirements.txt b/qualityflow/requirements.txt index 2d3f977e..e3baddb7 100644 --- a/qualityflow/requirements.txt +++ b/qualityflow/requirements.txt @@ -10,6 +10,7 @@ jinja2>=3.0.0,<4.0.0 pytest>=7.0.0,<8.0.0 pytest-cov>=4.0.0,<5.0.0 coverage>=7.0.0,<8.0.0 +hypothesis>=6.0.0,<7.0.0 # Code Analysis # ast is built-in, no need to install @@ -17,6 +18,7 @@ coverage>=7.0.0,<8.0.0 # Git Integration gitpython>=3.1.0,<4.0.0 -# LLM Integration (optional) -openai>=1.0.0,<2.0.0 # for OpenAI provider -anthropic>=0.25.0,<1.0.0 # for Anthropic provider \ No newline at end of file +# LLM Integration (optional - code gracefully handles absence) +# Uncomment and install only if using real LLM providers: +# openai>=1.0.0,<2.0.0 # for OpenAI provider +# anthropic>=0.25.0,<1.0.0 # for Anthropic provider \ No newline at end of file diff --git a/qualityflow/steps/analyze_code.py b/qualityflow/steps/analyze_code.py index 7cc5822c..7b92c186 100644 --- a/qualityflow/steps/analyze_code.py +++ b/qualityflow/steps/analyze_code.py @@ -27,7 +27,7 @@ class SelectionStrategy(str, Enum): @step def analyze_code( workspace_dir: Path, - commit_sha: str, + commit_sha: str, # Used in code_summary for metadata source_spec: Dict[str, str], strategy: SelectionStrategy = SelectionStrategy.LOW_COVERAGE, max_files: int = 10, @@ -102,6 +102,7 @@ def analyze_code( "total_files": len(valid_files), "selection_reason": f"Selected top {len(selected_files)} files using {strategy} strategy", "complexity_scores": {f: complexity_scores[f] for f in selected_files}, + "commit_sha": commit_sha, # Include commit_sha in metadata } logger.info(f"Selected {len(selected_files)} files: {selected_files}") @@ -157,11 +158,16 @@ def _select_files( return sorted_files[:max_files] elif strategy == SelectionStrategy.CHANGED_FILES: - # For this demo, just return all files (in real implementation, would use git diff) + # NOTE: CHANGED_FILES strategy is currently a stub implementation + # In production, this should use git diff to identify changed files: + # - Compare current commit against base branch (e.g., main) + # - Filter for Python files that have been modified/added + # - Prioritize files based on change size and complexity logger.warning( - "CHANGED_FILES strategy not fully implemented, falling back to ALL" + "CHANGED_FILES strategy not fully implemented, falling back to ALL strategy. " + "To implement: use 'git diff --name-only HEAD~1..HEAD' or similar to identify changed files." ) return files[:max_files] - else: - raise ValueError(f"Unknown selection strategy: {strategy}") + # This should never be reached due to enum validation, but kept for safety + raise ValueError(f"Unknown selection strategy: {strategy}") diff --git a/qualityflow/steps/fetch_source.py b/qualityflow/steps/fetch_source.py index cdf37548..99825276 100644 --- a/qualityflow/steps/fetch_source.py +++ b/qualityflow/steps/fetch_source.py @@ -70,6 +70,18 @@ def fetch_source( except Exception as e: logger.error(f"Failed to set up local workspace: {e}") + # Clean up any partial workspace on error + if "workspace_dir" in locals(): + try: + import shutil + + shutil.rmtree(workspace_dir, ignore_errors=True) + logger.info( + f"Cleaned up partial workspace after error: {workspace_dir}" + ) + except Exception: + pass + # Fallback to current working directory workspace_dir = tempfile.mkdtemp( prefix="qualityflow_fallback_workspace_" diff --git a/qualityflow/steps/gen_tests_agent.py b/qualityflow/steps/gen_tests_agent.py index de879731..7fe81cba 100644 --- a/qualityflow/steps/gen_tests_agent.py +++ b/qualityflow/steps/gen_tests_agent.py @@ -321,7 +321,35 @@ def _get_default_prompt_template() -> str: def _generate_fake_tests( file_path: str, source_code: str, max_tests: int ) -> Tuple[str, Dict]: - """Generate fake/mock tests for development/testing.""" + """Generate fake/mock tests for development/testing. + + This generates more realistic-looking tests that attempt to exercise + the actual source code by parsing it for functions and classes. + """ + import ast + + # Parse the source code to extract function/class names + try: + tree = ast.parse(source_code) + functions = [] + classes = [] + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and not node.name.startswith( + "_" + ): + functions.append(node.name) + elif isinstance(node, ast.ClassDef): + classes.append(node.name) + except Exception: + # Fallback if parsing fails + functions = [] + classes = [] + + # Generate module name from file path + module_name = file_path.replace("/", ".").replace(".py", "") + class_name = file_path.split("/")[-1].replace(".py", "").title() + test_content = f'''""" Generated tests for {file_path} """ @@ -330,43 +358,78 @@ def _generate_fake_tests( import unittest from unittest.mock import Mock, patch, MagicMock -class Test{file_path.split("/")[-1].replace(".py", "").title()}(unittest.TestCase): +# Attempt to import the module under test +try: + from {module_name} import * +except ImportError: + # Handle import errors gracefully for demo purposes + pass + +class Test{class_name}(unittest.TestCase): """Auto-generated test class for {file_path}.""" + def setUp(self): + """Set up test fixtures.""" + self.test_data = {{"sample": "data", "numbers": [1, 2, 3]}} + def test_module_import(self): - """Test that we can at least validate the test framework.""" - # Simple test that always passes to ensure test discovery works - self.assertTrue(True) - - def test_basic_functionality(self): - """Test basic functionality.""" - # Mock test demonstrating test execution - result = 1 + 1 - self.assertEqual(result, 2) - + """Test that the module can be imported without errors.""" + # This test ensures the module structure is valid + self.assertTrue(True, "Module imported successfully") +''' + + # Generate tests for discovered functions + for func_name in functions[: max_tests // 2]: + test_content += f''' + def test_{func_name}_basic(self): + """Test basic functionality of {func_name}.""" + # TODO: Add proper test for {func_name} + # This is a placeholder that should exercise the function + try: + # Attempt to call the function with basic parameters + if callable(globals().get('{func_name}')): + # Basic smoke test - at least try to call it + pass + except NameError: + # Function not available in scope + pass + self.assertTrue(True, "Basic test for {func_name}") +''' + + # Generate tests for discovered classes + for class_name_found in classes[: max_tests // 3]: + test_content += f''' + def test_{class_name_found.lower()}_instantiation(self): + """Test that {class_name_found} can be instantiated.""" + try: + if '{class_name_found}' in globals(): + # Try basic instantiation + # obj = {class_name_found}() + pass + except NameError: + pass + self.assertTrue(True, "Instantiation test for {class_name_found}") +''' + + # Add some general coverage tests + test_content += f''' def test_error_handling(self): - """Test error handling.""" - # Test exception handling + """Test error handling patterns.""" with self.assertRaises(ValueError): raise ValueError("Expected test exception") + def test_data_structures(self): + """Test basic data structure operations.""" + data = self.test_data.copy() + self.assertIn("sample", data) + self.assertEqual(len(data["numbers"]), 3) + def test_mock_usage(self): """Test mock functionality.""" - # Test using mocks mock_obj = Mock() mock_obj.method.return_value = "mocked_result" result = mock_obj.method() self.assertEqual(result, "mocked_result") - - def test_coverage_target(self): - """Test that generates some coverage.""" - # Simple operations to generate coverage - data = {{"key": "value"}} - self.assertIn("key", data) - - items = [1, 2, 3, 4, 5] - filtered = [x for x in items if x > 3] - self.assertEqual(len(filtered), 2) if __name__ == "__main__": unittest.main() @@ -508,14 +571,27 @@ def _generate_anthropic_tests(prompt: str, model: str) -> Tuple[str, Dict]: def _estimate_cost( tokens_in: int, tokens_out: int, provider: GenerationProvider, model: str ) -> float: - """Estimate cost based on token usage.""" - # Rough cost estimates (would need real pricing) + """Estimate cost based on token usage. + + WARNING: These are hardcoded pricing estimates that will become outdated. + For accurate pricing, refer to the official pricing pages: + - OpenAI: https://openai.com/api/pricing/ + - Anthropic: https://www.anthropic.com/pricing + + Consider implementing a dynamic pricing lookup or configuration-based approach + for production use. + """ + # NOTE: These are rough estimates based on pricing as of early 2024 + # and will likely become outdated as providers update their pricing if provider == GenerationProvider.OPENAI: if "gpt-4" in model: + # GPT-4 pricing (approximate, check current rates) return (tokens_in * 0.00003) + (tokens_out * 0.00006) - else: # gpt-3.5 + else: # gpt-3.5 and other models + # GPT-3.5 pricing (approximate, check current rates) return (tokens_in * 0.0000015) + (tokens_out * 0.000002) elif provider == GenerationProvider.ANTHROPIC: + # Claude pricing (approximate, check current rates) return (tokens_in * 0.000008) + (tokens_out * 0.000024) else: return 0.0 diff --git a/qualityflow/steps/report.py b/qualityflow/steps/report.py index ab8e564d..38523169 100644 --- a/qualityflow/steps/report.py +++ b/qualityflow/steps/report.py @@ -25,6 +25,7 @@ def report( test_summary: MarkdownString, agent_results: Dict, baseline_results: Optional[Dict], + evaluation_metrics: Dict, ) -> Annotated[MarkdownString, "final_report"]: """ Generate comprehensive markdown report for pipeline execution. @@ -35,6 +36,7 @@ def report( test_summary: Test generation summary with snippets agent_results: Agent test results baseline_results: Baseline test results (optional) + evaluation_metrics: Pre-computed evaluation metrics Returns: Markdown report as string @@ -46,12 +48,7 @@ def report( Path(tempfile.mkdtemp(prefix="qualityflow_report_")) / "report.md" ) - # Evaluate coverage metrics first - evaluation_metrics = _evaluate_coverage_metrics( - agent_results, baseline_results, commit_sha - ) - - # Generate report content + # Generate report content using pre-computed evaluation metrics report_content = _generate_report_content( workspace_dir, commit_sha, @@ -71,63 +68,6 @@ def report( return MarkdownString(report_content) -def _evaluate_coverage_metrics( - agent_results: Dict, - baseline_results: Optional[Dict], - commit_sha: str, -) -> Dict: - """Evaluate coverage metrics and compare agent vs baseline approaches.""" - - # Extract agent metrics - use actual values from test results - coverage_total_agent = agent_results.get("coverage_total", 0.0) - tests_passed_agent = agent_results.get("tests_passed", 0) - tests_failed_agent = agent_results.get("tests_failed", 0) - - total_tests_agent = tests_passed_agent + tests_failed_agent - pass_rate_agent = ( - tests_passed_agent / total_tests_agent - if total_tests_agent > 0 - else 0.0 - ) - - # Extract baseline metrics - coverage_total_baseline = 0.0 - if baseline_results and not baseline_results.get("skipped", False): - coverage_total_baseline = baseline_results.get("coverage_total", 0.0) - - # Compare agent vs baseline coverage - coverage_improvement = coverage_total_agent - coverage_total_baseline - - # Analyze coverage quality - pass_rate_quality = ( - "excellent" - if pass_rate_agent > 0.95 - else "good" - if pass_rate_agent > 0.8 - else "needs_improvement" - ) - coverage_quality = ( - "excellent" - if coverage_total_agent > 80 - else "good" - if coverage_total_agent > 50 - else "needs_improvement" - ) - - return { - "coverage_total_agent": coverage_total_agent, - "coverage_total_baseline": coverage_total_baseline, - "coverage_improvement": coverage_improvement, - "tests_passed_agent": tests_passed_agent, - "tests_failed_agent": tests_failed_agent, - "pass_rate_agent": pass_rate_agent, - "pass_rate_quality": pass_rate_quality, - "coverage_quality": coverage_quality, - "commit_sha": commit_sha, - "files_analyzed": len(agent_results.get("coverage_by_file", {})), - } - - def _generate_report_content( workspace_dir: Path, commit_sha: str, diff --git a/qualityflow/steps/run_tests.py b/qualityflow/steps/run_tests.py index 4ad3edc9..5eb2c642 100644 --- a/qualityflow/steps/run_tests.py +++ b/qualityflow/steps/run_tests.py @@ -61,7 +61,25 @@ def run_tests( shutil.copytree(tests_dir, workspace_tests_dir) try: - # Run pytest with coverage + # Create a temporary coverage config to exclude test directories from coverage + coverage_config_file = output_path / ".coveragerc" + with open(coverage_config_file, "w") as f: + f.write(f"""[run] +omit = + */tests_*/* + *test_*.py + */test_* + {workspace_tests_dir}/* + +[report] +exclude_lines = + pragma: no cover + def __repr__ + raise AssertionError + raise NotImplementedError +""") + + # Run pytest with coverage - use custom config to exclude generated tests pytest_cmd = [ "python", "-m", @@ -75,6 +93,8 @@ def run_tests( f"xml:{coverage_file}", "--cov-report", "term", + "--cov-config", + str(coverage_config_file), "-v", ] @@ -128,6 +148,18 @@ def run_tests( except subprocess.TimeoutExpired: logger.error(f"Test run for {label} timed out after 5 minutes") + # Clean up workspace tests immediately on timeout + if workspace_tests_dir.exists(): + try: + shutil.rmtree(workspace_tests_dir) + logger.info( + f"Cleaned up test directory after timeout: {workspace_tests_dir}" + ) + except Exception as cleanup_error: + logger.warning( + f"Failed to clean up test directory after timeout: {cleanup_error}" + ) + return { "label": label, "tests_passed": 0, @@ -144,6 +176,18 @@ def run_tests( except Exception as e: logger.error(f"Failed to run tests for {label}: {e}") + # Clean up workspace tests immediately on error + if workspace_tests_dir.exists(): + try: + shutil.rmtree(workspace_tests_dir) + logger.info( + f"Cleaned up test directory after error: {workspace_tests_dir}" + ) + except Exception as cleanup_error: + logger.warning( + f"Failed to clean up test directory after error: {cleanup_error}" + ) + return { "label": label, "tests_passed": 0, @@ -159,9 +203,28 @@ def run_tests( } finally: - # Clean up copied tests + # Clean up copied tests - use try/except instead of ignore_errors for better logging if workspace_tests_dir.exists(): - shutil.rmtree(workspace_tests_dir, ignore_errors=True) + try: + shutil.rmtree(workspace_tests_dir) + logger.info( + f"Successfully cleaned up test directory: {workspace_tests_dir}" + ) + except Exception as cleanup_error: + logger.error( + f"Failed to clean up test directory {workspace_tests_dir}: {cleanup_error}" + ) + # Still try to clean up individual files if directory removal failed + try: + for item in workspace_tests_dir.iterdir(): + if item.is_file(): + item.unlink(missing_ok=True) + elif item.is_dir(): + shutil.rmtree(item, ignore_errors=True) + except Exception: + logger.warning( + f"Could not clean up individual items in {workspace_tests_dir}" + ) def _parse_test_results( @@ -173,27 +236,37 @@ def _parse_test_results( ) -> Dict: """Parse test execution results.""" - # Parse pytest output for basic stats + # Parse junit.xml first (preferred method), fallback to stdout parsing tests_passed = 0 tests_failed = 0 - if result.stdout: - lines = result.stdout.split("\n") - for line in lines: - if " passed" in line and " failed" in line: - # Line like "2 failed, 3 passed in 1.23s" - parts = line.split() - for i, part in enumerate(parts): - if part == "passed" and i > 0: - tests_passed = int(parts[i - 1]) - elif part == "failed" and i > 0: - tests_failed = int(parts[i - 1]) - elif " passed" in line and "failed" not in line: - # Line like "5 passed in 1.23s" - parts = line.split() - for i, part in enumerate(parts): - if part == "passed" and i > 0: - tests_passed = int(parts[i - 1]) + if junit_file.exists(): + tests_passed, tests_failed = _parse_junit_xml(junit_file) + logger.info( + f"Parsed test results from junit.xml: {tests_passed} passed, {tests_failed} failed" + ) + else: + # Fallback to stdout parsing if junit.xml is not available + logger.warning( + f"junit.xml not found at {junit_file}, falling back to stdout parsing" + ) + if result.stdout: + lines = result.stdout.split("\n") + for line in lines: + if " passed" in line and " failed" in line: + # Line like "2 failed, 3 passed in 1.23s" + parts = line.split() + for i, part in enumerate(parts): + if part == "passed" and i > 0: + tests_passed = int(parts[i - 1]) + elif part == "failed" and i > 0: + tests_failed = int(parts[i - 1]) + elif " passed" in line and "failed" not in line: + # Line like "5 passed in 1.23s" + parts = line.split() + for i, part in enumerate(parts): + if part == "passed" and i > 0: + tests_passed = int(parts[i - 1]) # Parse coverage from XML if available coverage_total = 0.0 @@ -217,6 +290,60 @@ def _parse_test_results( } +def _parse_junit_xml(junit_file: Path) -> tuple[int, int]: + """Parse junit.xml file for test results. + + Returns: + Tuple of (tests_passed, tests_failed) + """ + try: + import xml.etree.ElementTree as ET + + tree = ET.parse(junit_file) + root = tree.getroot() + + # JUnit XML can have different formats, handle common ones + tests_passed = 0 + tests_failed = 0 + + # Look for testsuite elements + for testsuite in root.findall(".//testsuite"): + # Get attributes from testsuite + passed = ( + int(testsuite.get("tests", 0)) + - int(testsuite.get("failures", 0)) + - int(testsuite.get("errors", 0)) + - int(testsuite.get("skipped", 0)) + ) + failed = int(testsuite.get("failures", 0)) + int( + testsuite.get("errors", 0) + ) + + tests_passed += max(0, passed) # Ensure non-negative + tests_failed += failed + + # If no testsuite found, look for testcases directly + if tests_passed == 0 and tests_failed == 0: + for testcase in root.findall(".//testcase"): + # Check if testcase has failure or error children + if ( + testcase.find("failure") is not None + or testcase.find("error") is not None + ): + tests_failed += 1 + else: + tests_passed += 1 + + logger.info( + f"Parsed junit.xml: {tests_passed} passed, {tests_failed} failed" + ) + return tests_passed, tests_failed + + except Exception as e: + logger.warning(f"Failed to parse junit.xml: {e}") + return 0, 0 + + def _parse_coverage_xml(coverage_file: Path) -> tuple[float, Dict[str, float]]: """Parse coverage XML file.""" try: