diff --git a/codegen-on-oss/codegen_on_oss/analyzers/analyzer.py b/codegen-on-oss/codegen_on_oss/analyzers/analyzer.py index 4337bba5b..3471380d8 100644 --- a/codegen-on-oss/codegen_on_oss/analyzers/analyzer.py +++ b/codegen-on-oss/codegen_on_oss/analyzers/analyzer.py @@ -7,35 +7,37 @@ It serves as the primary API entry point for the analyzer backend. """ -import os -import sys import json import logging +import sys from datetime import datetime -from pathlib import Path -from typing import Dict, List, Set, Tuple, Any, Optional, Union, Type, Callable -from enum import Enum +from typing import Any try: - from codegen.sdk.core.codebase import Codebase from codegen.configs.models.codebase import CodebaseConfig from codegen.configs.models.secrets import SecretsConfig - from codegen.sdk.codebase.config import ProjectConfig - from codegen.git.schemas.repo_config import RepoConfig from codegen.git.repo_operator.repo_operator import RepoOperator + from codegen.git.schemas.repo_config import RepoConfig + from codegen.sdk.codebase.config import ProjectConfig + from codegen.sdk.core.codebase import Codebase from codegen.shared.enums.programming_language import ProgrammingLanguage except ImportError: print("Codegen SDK not found. Please install it first.") sys.exit(1) # Import internal modules - these will be replaced with actual imports once implemented -from codegen_on_oss.analyzers.issues import Issue, IssueSeverity, AnalysisType, IssueCategory +from codegen_on_oss.analyzers.issues import ( + AnalysisType, + Issue, + IssueCategory, + IssueSeverity, +) # Configure logging logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[logging.StreamHandler()] + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler()], ) logger = logging.getLogger(__name__) @@ -56,108 +58,115 @@ ".vscode", ] + class AnalyzerRegistry: """Registry of analyzer plugins.""" - + _instance = None - + def __new__(cls): if cls._instance is None: - cls._instance = super(AnalyzerRegistry, cls).__new__(cls) - cls._instance._analyzers = {} + cls._instance = super().__new__(cls) + cls._analyzers = {} return cls._instance - - def register(self, analysis_type: AnalysisType, analyzer_class: Type['AnalyzerPlugin']): + + def register( + self, analysis_type: AnalysisType, analyzer_class: type["AnalyzerPlugin"] + ): """Register an analyzer plugin.""" self._analyzers[analysis_type] = analyzer_class - - def get_analyzer(self, analysis_type: AnalysisType) -> Optional[Type['AnalyzerPlugin']]: + + def get_analyzer( + self, analysis_type: AnalysisType + ) -> type["AnalyzerPlugin"] | None: """Get the analyzer plugin for a specific analysis type.""" return self._analyzers.get(analysis_type) - - def list_analyzers(self) -> Dict[AnalysisType, Type['AnalyzerPlugin']]: + + def list_analyzers(self) -> dict[AnalysisType, type["AnalyzerPlugin"]]: """Get all registered analyzers.""" return self._analyzers.copy() + class AnalyzerPlugin: """Base class for analyzer plugins.""" - - def __init__(self, manager: 'AnalyzerManager'): + + def __init__(self, manager: "AnalyzerManager"): """Initialize the analyzer plugin.""" self.manager = manager self.issues = [] - - def analyze(self) -> Dict[str, Any]: + + def analyze(self) -> dict[str, Any]: """Perform analysis using this plugin.""" raise NotImplementedError("Analyzer plugins must implement analyze()") - + def add_issue(self, issue: Issue): """Add an issue to the list.""" self.manager.add_issue(issue) self.issues.append(issue) + class CodeQualityPlugin(AnalyzerPlugin): """Plugin for code quality analysis.""" - - def analyze(self) -> Dict[str, Any]: + + def analyze(self) -> dict[str, Any]: """Perform code quality analysis.""" # This is a simplified placeholder - would import and use code_quality.py result = { "dead_code": self._find_dead_code(), "complexity": self._analyze_complexity(), "maintainability": self._analyze_maintainability(), - "style_issues": self._analyze_style_issues() + "style_issues": self._analyze_style_issues(), } return result - - def _find_dead_code(self) -> Dict[str, Any]: + + def _find_dead_code(self) -> dict[str, Any]: """Find unused code in the codebase.""" # This is a placeholder return {"unused_functions": [], "unused_classes": [], "unused_variables": []} - - def _analyze_complexity(self) -> Dict[str, Any]: + + def _analyze_complexity(self) -> dict[str, Any]: """Analyze code complexity.""" # This is a placeholder return {"complex_functions": [], "average_complexity": 0} - - def _analyze_maintainability(self) -> Dict[str, Any]: + + def _analyze_maintainability(self) -> dict[str, Any]: """Analyze code maintainability.""" # This is a placeholder return {"maintainability_index": {}} - - def _analyze_style_issues(self) -> Dict[str, Any]: + + def _analyze_style_issues(self) -> dict[str, Any]: """Analyze code style issues.""" # This is a placeholder return {"style_violations": []} + class DependencyPlugin(AnalyzerPlugin): """Plugin for dependency analysis.""" - def analyze(self) -> Dict[str, Any]: + def analyze(self) -> dict[str, Any]: """Perform dependency analysis using the DependencyAnalyzer.""" - from codegen_on_oss.analyzers.dependencies import DependencyAnalyzer from codegen_on_oss.analyzers.codebase_context import CodebaseContext + from codegen_on_oss.analyzers.dependencies import DependencyAnalyzer # Create context if needed - context = getattr(self.manager, 'base_context', None) - if not context and hasattr(self.manager, 'base_codebase'): + context = getattr(self.manager, "base_context", None) + if not context and hasattr(self.manager, "base_codebase"): try: context = CodebaseContext( codebase=self.manager.base_codebase, base_path=self.manager.repo_path, pr_branch=None, - base_branch=self.manager.base_branch + base_branch=self.manager.base_branch, ) # Save context for future use self.manager.base_context = context - except Exception as e: - logger.error(f"Error initializing context: {e}") + except Exception: + logger.exception("Error initializing context") # Initialize and run the dependency analyzer if context: dependency_analyzer = DependencyAnalyzer( - codebase=self.manager.base_codebase, - context=context + codebase=self.manager.base_codebase, context=context ) # Run analysis @@ -173,43 +182,44 @@ def analyze(self) -> Dict[str, Any]: result = { "import_dependencies": self._analyze_imports(), "circular_dependencies": self._find_circular_dependencies(), - "module_coupling": self._analyze_module_coupling() + "module_coupling": self._analyze_module_coupling(), } return result - def _analyze_imports(self) -> Dict[str, Any]: + def _analyze_imports(self) -> dict[str, Any]: """Fallback import analysis if context initialization failed.""" return {"module_dependencies": [], "external_dependencies": []} - def _find_circular_dependencies(self) -> Dict[str, Any]: + def _find_circular_dependencies(self) -> dict[str, Any]: """Fallback circular dependencies analysis if context initialization failed.""" return {"circular_imports": []} - def _analyze_module_coupling(self) -> Dict[str, Any]: + def _analyze_module_coupling(self) -> dict[str, Any]: """Fallback module coupling analysis if context initialization failed.""" return {"high_coupling_modules": []} + class AnalyzerManager: """ Unified manager for codebase analysis. - + This class serves as the main entry point for all analysis operations, coordinating different analyzer plugins and managing results. """ - + def __init__( self, - repo_url: Optional[str] = None, - repo_path: Optional[str] = None, + repo_url: str | None = None, + repo_path: str | None = None, base_branch: str = "main", - pr_number: Optional[int] = None, - language: Optional[str] = None, - file_ignore_list: Optional[List[str]] = None, - config: Optional[Dict[str, Any]] = None + pr_number: int | None = None, + language: str | None = None, + file_ignore_list: list[str] | None = None, + config: dict[str, Any] | None = None, ): """ Initialize the analyzer manager. - + Args: repo_url: URL of the repository to analyze repo_path: Local path to the repository to analyze @@ -224,88 +234,89 @@ def __init__( self.base_branch = base_branch self.pr_number = pr_number self.language = language - + # Use custom ignore list or default global list self.file_ignore_list = file_ignore_list or GLOBAL_FILE_IGNORE_LIST - + # Configuration options self.config = config or {} - + # Codebase and context objects self.base_codebase = None self.pr_codebase = None - + # Analysis results self.issues = [] self.results = {} - + # PR comparison data self.pr_diff = None self.commit_shas = None self.modified_symbols = None self.pr_branch = None - + # Initialize codebase(s) based on provided parameters if repo_url: self._init_from_url(repo_url, language) elif repo_path: self._init_from_path(repo_path, language) - + # If PR number is provided, initialize PR-specific data if self.pr_number is not None and self.base_codebase is not None: self._init_pr_data(self.pr_number) - + # Register default analyzers self._register_default_analyzers() - - def _init_from_url(self, repo_url: str, language: Optional[str] = None): + + def _init_from_url(self, repo_url: str, language: str | None = None): """Initialize codebase from a repository URL.""" try: # Extract repository information - if repo_url.endswith('.git'): + if repo_url.endswith(".git"): repo_url = repo_url[:-4] - - parts = repo_url.rstrip('/').split('/') + + parts = repo_url.rstrip("/").split("/") repo_name = parts[-1] owner = parts[-2] repo_full_name = f"{owner}/{repo_name}" - + # Create temporary directory for cloning import tempfile + tmp_dir = tempfile.mkdtemp(prefix="analyzer_") - + # Set up configuration config = CodebaseConfig( debug=False, allow_external=True, py_resolve_syspath=True, ) - + secrets = SecretsConfig() - + # Determine programming language prog_lang = None if language: prog_lang = ProgrammingLanguage(language.upper()) - + # Initialize the codebase logger.info(f"Initializing codebase from {repo_url}") - + self.base_codebase = Codebase.from_github( repo_full_name=repo_full_name, tmp_dir=tmp_dir, language=prog_lang, config=config, - secrets=secrets + secrets=secrets, ) - + logger.info(f"Successfully initialized codebase from {repo_url}") - - except Exception as e: - logger.error(f"Error initializing codebase from URL: {e}") + + except Exception: + logger.exception("Error initializing codebase from URL") raise - - def _init_from_path(self, repo_path: str, language: Optional[str] = None): + + def _init_from_path(self, repo_path: str, language: str | None = None): """Initialize codebase from a local repository path.""" try: # Set up configuration @@ -314,163 +325,163 @@ def _init_from_path(self, repo_path: str, language: Optional[str] = None): allow_external=True, py_resolve_syspath=True, ) - + secrets = SecretsConfig() - + # Initialize the codebase logger.info(f"Initializing codebase from {repo_path}") - + # Determine programming language prog_lang = None if language: prog_lang = ProgrammingLanguage(language.upper()) - + # Set up repository configuration repo_config = RepoConfig.from_repo_path(repo_path) repo_config.respect_gitignore = False repo_operator = RepoOperator(repo_config=repo_config, bot_commit=False) - + # Create project configuration project_config = ProjectConfig( repo_operator=repo_operator, - programming_language=prog_lang if prog_lang else None + programming_language=prog_lang if prog_lang else None, ) - + # Initialize codebase self.base_codebase = Codebase( - projects=[project_config], - config=config, - secrets=secrets + projects=[project_config], config=config, secrets=secrets ) - + logger.info(f"Successfully initialized codebase from {repo_path}") - - except Exception as e: - logger.error(f"Error initializing codebase from path: {e}") + + except Exception: + logger.exception("Error initializing codebase from path") raise - + def _init_pr_data(self, pr_number: int): """Initialize PR-specific data.""" try: logger.info(f"Fetching PR #{pr_number} data") result = self.base_codebase.get_modified_symbols_in_pr(pr_number) - + # Unpack the result tuple if len(result) >= 3: self.pr_diff, self.commit_shas, self.modified_symbols = result[:3] if len(result) >= 4: self.pr_branch = result[3] - + logger.info(f"Found {len(self.modified_symbols)} modified symbols in PR") - + # Initialize PR codebase self._init_pr_codebase() - - except Exception as e: - logger.error(f"Error initializing PR data: {e}") + + except Exception: + logger.exception("Error initializing PR data") raise - + def _init_pr_codebase(self): """Initialize PR codebase by checking out the PR branch.""" if not self.base_codebase or not self.pr_number: logger.error("Base codebase or PR number not initialized") return - + try: # Get PR data if not already fetched if not self.pr_branch: self._init_pr_data(self.pr_number) - + if not self.pr_branch: logger.error("Failed to get PR branch") return - + # Clone the base codebase self.pr_codebase = self.base_codebase - + # Checkout PR branch logger.info(f"Checking out PR branch: {self.pr_branch}") self.pr_codebase.checkout(self.pr_branch) - + logger.info("Successfully initialized PR codebase") - - except Exception as e: - logger.error(f"Error initializing PR codebase: {e}") + + except Exception: + logger.exception("Error initializing PR codebase") raise - + def _register_default_analyzers(self): """Register default analyzers.""" registry = AnalyzerRegistry() registry.register(AnalysisType.CODE_QUALITY, CodeQualityPlugin) registry.register(AnalysisType.DEPENDENCY, DependencyPlugin) - + def add_issue(self, issue: Issue): """Add an issue to the list.""" # Check if issue should be skipped if self._should_skip_issue(issue): return - + self.issues.append(issue) - + def _should_skip_issue(self, issue: Issue) -> bool: """Check if an issue should be skipped.""" # Skip issues in ignored files file_path = issue.file - + # Check against ignore list for pattern in self.file_ignore_list: if pattern in file_path: return True - - # Check if the file is a test file - if "test" in file_path.lower() or "tests" in file_path.lower(): - # Skip low-severity issues in test files - if issue.severity in [IssueSeverity.INFO, IssueSeverity.WARNING]: - return True - - return False - - def get_issues(self, severity: Optional[IssueSeverity] = None, category: Optional[IssueCategory] = None) -> List[Issue]: + + # Check if the file is a test file and skip low-severity issues in test files + return bool( + ("test" in file_path.lower() or "tests" in file_path.lower()) + and issue.severity in [IssueSeverity.INFO, IssueSeverity.WARNING] + ) + + def get_issues( + self, + severity: IssueSeverity | None = None, + category: IssueCategory | None = None, + ) -> list[Issue]: """ Get all issues matching the specified criteria. - + Args: severity: Optional severity level to filter by category: Optional category to filter by - + Returns: List of matching issues """ filtered_issues = self.issues - + if severity: filtered_issues = [i for i in filtered_issues if i.severity == severity] - + if category: filtered_issues = [i for i in filtered_issues if i.category == category] - + return filtered_issues - + def analyze( - self, - analysis_types: Optional[List[Union[AnalysisType, str]]] = None, - output_file: Optional[str] = None, - output_format: str = "json" - ) -> Dict[str, Any]: + self, + analysis_types: list[AnalysisType | str] | None = None, + output_file: str | None = None, + output_format: str = "json", + ) -> dict[str, Any]: """ Perform analysis on the codebase. - + Args: analysis_types: List of analysis types to perform output_file: Path to save results to output_format: Format of the output file - + Returns: Dictionary containing analysis results """ if not self.base_codebase: - raise ValueError("Codebase not initialized") - + raise ValueError("Base codebase is missing") + # Convert string analysis types to enums if analysis_types: analysis_types = [ @@ -480,78 +491,97 @@ def analyze( else: # Default to code quality and dependency analysis analysis_types = [AnalysisType.CODE_QUALITY, AnalysisType.DEPENDENCY] - + # Initialize results self.results = { "metadata": { "analysis_time": datetime.now().isoformat(), "analysis_types": [t.value for t in analysis_types], - "repo_name": getattr(self.base_codebase.ctx, 'repo_name', None), - "language": str(getattr(self.base_codebase.ctx, 'programming_language', None)), + "repo_name": getattr(self.base_codebase.ctx, "repo_name", None), + "language": str( + getattr(self.base_codebase.ctx, "programming_language", None) + ), }, "summary": {}, - "results": {} + "results": {}, } - + # Reset issues self.issues = [] - + # Run each analyzer registry = AnalyzerRegistry() - + for analysis_type in analysis_types: analyzer_class = registry.get_analyzer(analysis_type) - + if analyzer_class: logger.info(f"Running {analysis_type.value} analysis") analyzer = analyzer_class(self) analysis_result = analyzer.analyze() - + # Add results to unified results self.results["results"][analysis_type.value] = analysis_result else: logger.warning(f"No analyzer found for {analysis_type.value}") - + # Add issues to results self.results["issues"] = [issue.to_dict() for issue in self.issues] - + # Add issue statistics self.results["issue_stats"] = { "total": len(self.issues), "by_severity": { - "critical": sum(1 for issue in self.issues if issue.severity == IssueSeverity.CRITICAL), - "error": sum(1 for issue in self.issues if issue.severity == IssueSeverity.ERROR), - "warning": sum(1 for issue in self.issues if issue.severity == IssueSeverity.WARNING), - "info": sum(1 for issue in self.issues if issue.severity == IssueSeverity.INFO), - } + "critical": sum( + 1 + for issue in self.issues + if issue.severity == IssueSeverity.CRITICAL + ), + "error": sum( + 1 for issue in self.issues if issue.severity == IssueSeverity.ERROR + ), + "warning": sum( + 1 + for issue in self.issues + if issue.severity == IssueSeverity.WARNING + ), + "info": sum( + 1 for issue in self.issues if issue.severity == IssueSeverity.INFO + ), + }, } - + # Save results if output file is specified if output_file: self.save_results(output_file, output_format) - + return self.results - - def save_results(self, output_file: str, format: str = "json"): + + def save_results(self, output_file: str, output_format: str = "json"): """ Save analysis results to a file. - + Args: output_file: Path to the output file - format: Output format (json, html) + output_format: Format of the output file (json or yaml) """ - if format == "json": - with open(output_file, 'w') as f: + if output_format == "json": + with open(output_file, "w") as f: json.dump(self.results, f, indent=2) - elif format == "html": - self._generate_html_report(output_file) + elif output_format == "yaml": + import yaml + + with open(output_file, "w") as f: + yaml.dump(self.results, f, default_flow_style=False) + elif output_format == "console": + print(json.dumps(self.results, indent=2)) else: # Default to JSON - with open(output_file, 'w') as f: + with open(output_file, "w") as f: json.dump(self.results, f, indent=2) - + logger.info(f"Results saved to {output_file}") - + def _generate_html_report(self, output_file: str): """Generate an HTML report of the analysis results.""" html_content = f""" @@ -578,85 +608,93 @@ def _generate_html_report(self, output_file: str):
Repository: {self.results['metadata'].get('repo_name', 'Unknown')}
-Language: {self.results['metadata'].get('language', 'Unknown')}
-Analysis Time: {self.results['metadata'].get('analysis_time', 'Unknown')}
-Analysis Types: {', '.join(self.results['metadata'].get('analysis_types', []))}
+Repository: {self.results["metadata"].get("repo_name", "Unknown")}
+Language: {self.results["metadata"].get("language", "Unknown")}
+Analysis Time: {self.results["metadata"].get("analysis_time", "Unknown")}
+Analysis Types: {", ".join(self.results["metadata"].get("analysis_types", []))}
Total Issues: {len(self.issues)}
{location} {category} {issue.message}
-{issue.suggestion if hasattr(issue, 'suggestion') else ""}
+{issue.suggestion if hasattr(issue, "suggestion") else ""}
{json.dumps(results, indent=2)}
"""
-
+
html_content += """