-
Notifications
You must be signed in to change notification settings - Fork 0
Add robust error context analysis system #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Create fully interconnected analysis module with comprehensive metrics integration
Enhance analysis.py with better CodebaseContext integration
|
@CodiumAI-Agent /review |
|
@sourcery-ai review |
Reviewer's GuideThis pull request introduces an error context analysis system by adding new Sequence diagram for File Error Analysis API CallsequenceDiagram
actor Client
participant FastAPIApp
participant CodeAnalyzer
participant ErrorContextAnalyzer
Client->>+FastAPIApp: POST /analyze_errors (repo_url, file_path)
FastAPIApp->>+CodeAnalyzer: get_file_error_context(file_path)
CodeAnalyzer->>+ErrorContextAnalyzer: get_file_error_context(file_path)
ErrorContextAnalyzer->>ErrorContextAnalyzer: analyze_file(file)
ErrorContextAnalyzer-->>-CodeAnalyzer: file_context_with_errors
CodeAnalyzer-->>-FastAPIApp: file_context_with_errors
FastAPIApp-->>-Client: JSON response with errors
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
/gemini review
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the You can disable this status message by setting the 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Join our Discord community for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
/review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
/improve |
|
/korbit-review |
|
@codecov-ai-reviewer review |
|
@codegen Implement and upgrade this PR with above Considerations and suggestions from other AI bots |
|
On it! We are reviewing the PR and will provide feedback shortly. |
PR Reviewer Guide 🔍(Review updated until commit e233dfa)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to e233dfa
Previous suggestionsSuggestions up to commit e233dfa
Suggestions up to commit ccff720
Suggestions up to commit ccff720
Suggestions up to commit 36a38a1
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Persistent review updated to latest commit 88cf208 |
|
Hey! 👋 I see one of the checks failed. I am on it! 🫡 |
|
I'll review this PR and suggest improvements right away! Let me examine the code changes and provide a thorough analysis. |
PR DescriptionThis pull request introduces a comprehensive code analysis module with a focus on error detection and contextual error information. The goal is to provide developers with robust tools for identifying and resolving issues in their codebase, leading to improved code quality and maintainability. Click to see moreKey Technical ChangesThe key technical changes include the introduction of the Architecture DecisionsThe architectural decisions involve a layered approach, with the Dependencies and InteractionsThis module depends on the Risk ConsiderationsPotential risks include performance bottlenecks when analyzing large codebases, especially with the AST parsing and graph analysis. Input validation is crucial to prevent malicious repository URLs or file paths from causing security vulnerabilities. The accuracy of error detection relies on the effectiveness of the AST parsing and pattern matching techniques, which may require ongoing refinement. The API endpoints should be secured with appropriate authentication and authorization mechanisms. Notable Implementation DetailsThe |
|
|
||
| class CodeError: | ||
| """Represents an error in code with context.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| error_type: str, | ||
| message: str, | ||
| file_path: Optional[str] = None, | ||
| line_number: Optional[int] = None, | ||
| column: Optional[int] = None, | ||
| severity: str = ErrorSeverity.MEDIUM, | ||
| symbol_name: Optional[str] = None, | ||
| context_lines: Optional[Dict[int, str]] = None, | ||
| suggested_fix: Optional[str] = None, | ||
| ): | ||
| self.error_type = error_type | ||
| self.message = message | ||
| self.file_path = file_path | ||
| self.line_number = line_number | ||
| self.column = column | ||
| self.severity = severity | ||
| self.symbol_name = symbol_name | ||
| self.context_lines = context_lines or {} | ||
| self.suggested_fix = suggested_fix | ||
|
|
||
| def to_dict(self) -> Dict[str, Any]: | ||
| """Convert the error to a dictionary representation.""" | ||
| return { | ||
| "error_type": self.error_type, | ||
| "message": self.message, | ||
| "file_path": self.file_path, | ||
| "line_number": self.line_number, | ||
| "column": self.column, | ||
| "severity": self.severity, | ||
| "symbol_name": self.symbol_name, | ||
| "context_lines": self.context_lines, | ||
| "suggested_fix": self.suggested_fix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a frozen dataclass for CodeError to make it immutable and more efficient. This would also provide automatic __init__, __eq__, and other dunder methods. Also consider adding parameter validation to ensure all fields meet requirements.
| class CodeError: | |
| """Represents an error in code with context.""" | |
| def __init__( | |
| self, | |
| error_type: str, | |
| message: str, | |
| file_path: Optional[str] = None, | |
| line_number: Optional[int] = None, | |
| column: Optional[int] = None, | |
| severity: str = ErrorSeverity.MEDIUM, | |
| symbol_name: Optional[str] = None, | |
| context_lines: Optional[Dict[int, str]] = None, | |
| suggested_fix: Optional[str] = None, | |
| ): | |
| self.error_type = error_type | |
| self.message = message | |
| self.file_path = file_path | |
| self.line_number = line_number | |
| self.column = column | |
| self.severity = severity | |
| self.symbol_name = symbol_name | |
| self.context_lines = context_lines or {} | |
| self.suggested_fix = suggested_fix | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert the error to a dictionary representation.""" | |
| return { | |
| "error_type": self.error_type, | |
| "message": self.message, | |
| "file_path": self.file_path, | |
| "line_number": self.line_number, | |
| "column": self.column, | |
| "severity": self.severity, | |
| "symbol_name": self.symbol_name, | |
| "context_lines": self.context_lines, | |
| "suggested_fix": self.suggested_fix, | |
| from dataclasses import dataclass | |
| from typing import Optional, Dict | |
| @dataclass(frozen=True) | |
| class CodeError: | |
| error_type: str | |
| message: str | |
| file_path: Optional[str] = None | |
| line_number: Optional[int] = None | |
| column: Optional[int] = None | |
| severity: str = ErrorSeverity.MEDIUM | |
| symbol_name: Optional[str] = None | |
| context_lines: Dict[int, str] = field(default_factory=dict) | |
| suggested_fix: Optional[str] = None | |
| def __post_init__(self): | |
| if self.line_number is not None and not isinstance(self.line_number, int): | |
| raise ValueError("line_number must be an integer") | |
| if self.severity not in [ErrorSeverity.CRITICAL, ErrorSeverity.HIGH, | |
| ErrorSeverity.MEDIUM, ErrorSeverity.LOW, ErrorSeverity.INFO]: | |
| raise ValueError("Invalid severity level") |
| # Function has return type but no return statements | ||
| errors.append(CodeError( | ||
| error_type=ErrorType.TYPE_ERROR, | ||
| message=f"Function '{function.name}' has return type '{function.return_type}' but no return statements", | ||
| file_path=function.file.name if hasattr(function, "file") else None, | ||
| line_number=function.line_number if hasattr(function, "line_number") else None, | ||
| severity=ErrorSeverity.MEDIUM, | ||
| symbol_name=function.name, | ||
| context_lines=self.get_context_lines(function.file.name, function.line_number) if hasattr(function, "file") and hasattr(function, "line_number") else None, | ||
| suggested_fix=f"Add return statement or change return type to 'None'" | ||
| )) | ||
| return errors | ||
|
|
||
| # Check if return statements match the declared return type | ||
| return_type = function.return_type | ||
| for ret_stmt in function.return_statements: | ||
| # This is a simplified check and would need more sophisticated type inference in practice | ||
| if hasattr(ret_stmt, "expression") and hasattr(ret_stmt.expression, "source"): | ||
| expr_source = ret_stmt.expression.source | ||
|
|
||
| # Simple pattern matching for potential type errors | ||
| if "int" in return_type and re.search(r"\".*\"", expr_source): | ||
| line_number = self._find_line_number(function.code_block.source, rf"return\s+{re.escape(expr_source)}") | ||
| errors.append(CodeError( | ||
| error_type=ErrorType.TYPE_ERROR, | ||
| message=f"Function '{function.name}' returns string but declares return type '{return_type}'", | ||
| file_path=function.file.name if hasattr(function, "file") else None, | ||
| line_number=line_number, | ||
| severity=ErrorSeverity.HIGH, | ||
| symbol_name=function.name, | ||
| context_lines=self.get_context_lines(function.file.name, line_number) if hasattr(function, "file") else None, | ||
| suggested_fix=f"Convert return value to {return_type} or update return type annotation" | ||
| )) | ||
|
|
||
| return errors | ||
|
|
||
| def analyze_unused_imports(self, file: SourceFile) -> List[CodeError]: | ||
| """ | ||
| Find unused imports in a file. | ||
| Args: | ||
| file: The file to analyze | ||
| Returns: | ||
| A list of unused import errors | ||
| """ | ||
| errors = [] | ||
|
|
||
| if not hasattr(file, "imports") or not hasattr(file, "symbols"): | ||
| return errors | ||
|
|
||
| # Get all imported symbols | ||
| imported_symbols = set() | ||
| for imp in file.imports: | ||
| if hasattr(imp, "imported_symbol") and imp.imported_symbol: | ||
| imported_symbols.add(imp.imported_symbol.name) | ||
|
|
||
| # Get all used symbols | ||
| used_symbols = set() | ||
| for symbol in file.symbols: | ||
| if hasattr(symbol, "dependencies"): | ||
| for dep in symbol.dependencies: | ||
| if isinstance(dep, Symbol): | ||
| used_symbols.add(dep.name) | ||
|
|
||
| # Find unused imports | ||
| unused_imports = imported_symbols - used_symbols | ||
| for unused in unused_imports: | ||
| # Find the import statement | ||
| for imp in file.imports: | ||
| if hasattr(imp, "imported_symbol") and imp.imported_symbol and imp.imported_symbol.name == unused: | ||
| errors.append(CodeError( | ||
| error_type=ErrorType.UNUSED_IMPORT, | ||
| message=f"Unused import: '{unused}'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The analyze_function_parameters method could benefit from caching to avoid re-analyzing the same function multiple times. Consider implementing an LRU cache. Also, the regex pattern matching could be compiled once for better performance.
| # Function has return type but no return statements | |
| errors.append(CodeError( | |
| error_type=ErrorType.TYPE_ERROR, | |
| message=f"Function '{function.name}' has return type '{function.return_type}' but no return statements", | |
| file_path=function.file.name if hasattr(function, "file") else None, | |
| line_number=function.line_number if hasattr(function, "line_number") else None, | |
| severity=ErrorSeverity.MEDIUM, | |
| symbol_name=function.name, | |
| context_lines=self.get_context_lines(function.file.name, function.line_number) if hasattr(function, "file") and hasattr(function, "line_number") else None, | |
| suggested_fix=f"Add return statement or change return type to 'None'" | |
| )) | |
| return errors | |
| # Check if return statements match the declared return type | |
| return_type = function.return_type | |
| for ret_stmt in function.return_statements: | |
| # This is a simplified check and would need more sophisticated type inference in practice | |
| if hasattr(ret_stmt, "expression") and hasattr(ret_stmt.expression, "source"): | |
| expr_source = ret_stmt.expression.source | |
| # Simple pattern matching for potential type errors | |
| if "int" in return_type and re.search(r"\".*\"", expr_source): | |
| line_number = self._find_line_number(function.code_block.source, rf"return\s+{re.escape(expr_source)}") | |
| errors.append(CodeError( | |
| error_type=ErrorType.TYPE_ERROR, | |
| message=f"Function '{function.name}' returns string but declares return type '{return_type}'", | |
| file_path=function.file.name if hasattr(function, "file") else None, | |
| line_number=line_number, | |
| severity=ErrorSeverity.HIGH, | |
| symbol_name=function.name, | |
| context_lines=self.get_context_lines(function.file.name, line_number) if hasattr(function, "file") else None, | |
| suggested_fix=f"Convert return value to {return_type} or update return type annotation" | |
| )) | |
| return errors | |
| def analyze_unused_imports(self, file: SourceFile) -> List[CodeError]: | |
| """ | |
| Find unused imports in a file. | |
| Args: | |
| file: The file to analyze | |
| Returns: | |
| A list of unused import errors | |
| """ | |
| errors = [] | |
| if not hasattr(file, "imports") or not hasattr(file, "symbols"): | |
| return errors | |
| # Get all imported symbols | |
| imported_symbols = set() | |
| for imp in file.imports: | |
| if hasattr(imp, "imported_symbol") and imp.imported_symbol: | |
| imported_symbols.add(imp.imported_symbol.name) | |
| # Get all used symbols | |
| used_symbols = set() | |
| for symbol in file.symbols: | |
| if hasattr(symbol, "dependencies"): | |
| for dep in symbol.dependencies: | |
| if isinstance(dep, Symbol): | |
| used_symbols.add(dep.name) | |
| # Find unused imports | |
| unused_imports = imported_symbols - used_symbols | |
| for unused in unused_imports: | |
| # Find the import statement | |
| for imp in file.imports: | |
| if hasattr(imp, "imported_symbol") and imp.imported_symbol and imp.imported_symbol.name == unused: | |
| errors.append(CodeError( | |
| error_type=ErrorType.UNUSED_IMPORT, | |
| message=f"Unused import: '{unused}'", | |
| import re | |
| from functools import lru_cache | |
| _TYPE_PATTERN = re.compile(r"\b([a-zA-Z_][a-zA-Z0-9_]*)\s*\+\s*\d+\b") | |
| @lru_cache(maxsize=128) | |
| def analyze_function_parameters(self, function: Function) -> List[CodeError]: | |
| errors = [] | |
| if not hasattr(function, "parameters") or not hasattr(function, "function_calls"): | |
| return errors | |
| for param in function.parameters: | |
| if not hasattr(param, "type_annotation") or not param.type_annotation: | |
| continue | |
| param_name = param.name | |
| param_type = param.type_annotation | |
| if hasattr(function, "code_block") and hasattr(function.code_block, "source"): | |
| source = function.code_block.source | |
| match = _TYPE_PATTERN.search(source) | |
| if match and "str" in param_type: | |
| line_number = self._find_line_number(source, rf"\b{param_name}\s*\+\s*\d+\b") | |
| # Rest of the code... |
| Returns: | ||
| A dictionary containing import analysis results | ||
| """ | ||
| graph = create_graph_from_codebase(self.codebase.repo_name) | ||
| graph = create_graph_from_codebase(self.codebase) | ||
| cycles = find_import_cycles(graph) | ||
| problematic_loops = find_problematic_import_loops(graph, cycles) | ||
|
|
||
| return { | ||
| "import_cycles": cycles, | ||
| "import_graph": graph, | ||
| "cycles": cycles, | ||
| "problematic_loops": problematic_loops | ||
| } | ||
|
|
||
| def convert_args_to_kwargs(self) -> None: | ||
| def get_dependency_graph(self) -> nx.DiGraph: | ||
| """ | ||
| Convert all function call arguments to keyword arguments. | ||
| """ | ||
| convert_all_calls_to_kwargs(self.codebase) | ||
|
|
||
| def visualize_module_dependencies(self) -> None: | ||
| """ | ||
| Visualize module dependencies in the codebase. | ||
| """ | ||
| module_dependencies_run(self.codebase) | ||
|
|
||
| def generate_mdx_documentation(self, class_name: str) -> str: | ||
| """ | ||
| Generate MDX documentation for a class. | ||
| Get a dependency graph for the codebase files. | ||
| Args: | ||
| class_name: Name of the class to document | ||
| Returns: | ||
| MDX documentation as a string | ||
| A directed graph representing file dependencies | ||
| """ | ||
| for cls in self.codebase.classes: | ||
| if cls.name == class_name: | ||
| return render_mdx_page_for_class(cls) | ||
| return f"Class not found: {class_name}" | ||
|
|
||
| def print_symbol_attribution(self) -> None: | ||
| """ | ||
| Print attribution information for symbols in the codebase. | ||
| """ | ||
| print_symbol_attribution(self.codebase) | ||
|
|
||
| def get_extended_symbol_context(self, symbol_name: str, degree: int = 2) -> Dict[str, List[str]]: | ||
| """ | ||
| Get extended context (dependencies and usages) for a symbol. | ||
| G = nx.DiGraph() | ||
|
|
||
| Args: | ||
| symbol_name: Name of the symbol to analyze | ||
| degree: How many levels deep to collect dependencies and usages | ||
| Returns: | ||
| A dictionary containing dependencies and usages | ||
| """ | ||
| symbol = self.find_symbol_by_name(symbol_name) | ||
| if symbol: | ||
| dependencies, usages = get_extended_context(symbol, degree) | ||
| return { | ||
| "dependencies": [dep.name for dep in dependencies], | ||
| "usages": [usage.name for usage in usages] | ||
| } | ||
| return {"dependencies": [], "usages": []} | ||
| # Add nodes for all files | ||
| for file in self.codebase.files: | ||
| G.add_node(file.name, type="file") | ||
|
|
||
| # Add edges for imports | ||
| for file in self.codebase.files: | ||
| for imp in file.imports: | ||
| if imp.imported_symbol and hasattr(imp.imported_symbol, "file"): | ||
| imported_file = imp.imported_symbol.file | ||
| if imported_file and imported_file.name != file.name: | ||
| G.add_edge(file.name, imported_file.name) | ||
|
|
||
| return G | ||
|
|
||
| def get_symbol_dependencies(self, symbol_name: str) -> List[str]: | ||
| def get_symbol_attribution(self, symbol_name: str) -> str: | ||
| """ | ||
| Get direct dependencies of a symbol. | ||
| Get attribution information for a symbol. | ||
| Args: | ||
| symbol_name: Name of the symbol to analyze | ||
| Returns: | ||
| A list of dependency symbol names | ||
| A string containing attribution information | ||
| """ | ||
| symbol = self.find_symbol_by_name(symbol_name) | ||
| if symbol and hasattr(symbol, "dependencies"): | ||
| return [dep.name for dep in symbol.dependencies] | ||
| return [] | ||
| if symbol is None: | ||
| return f"Symbol not found: {symbol_name}" | ||
|
|
||
| return print_symbol_attribution(symbol) | ||
|
|
||
| def get_symbol_usages(self, symbol_name: str) -> List[str]: | ||
| def get_context_for_symbol(self, symbol_name: str) -> Dict[str, Any]: | ||
| """ | ||
| Get direct usages of a symbol. | ||
| Get context information for a symbol. | ||
| Args: | ||
| symbol_name: Name of the symbol to analyze | ||
| Returns: | ||
| A list of usage symbol names | ||
| A dictionary containing context information | ||
| """ | ||
| symbol = self.find_symbol_by_name(symbol_name) | ||
| if symbol and hasattr(symbol, "symbol_usages"): | ||
| return [usage.name for usage in symbol.symbol_usages] | ||
| return [] | ||
|
|
||
| def get_file_imports(self, file_path: str) -> List[str]: | ||
| """ | ||
| Get all imports in a file. | ||
| if symbol is None: | ||
| return {"error": f"Symbol not found: {symbol_name}"} | ||
|
|
||
| # Use the context to get more information about the symbol | ||
| ctx = self.context | ||
|
|
||
| # Get symbol node ID in the context graph | ||
| node_id = None | ||
| for n_id, node in enumerate(ctx.nodes): | ||
| if isinstance(node, Symbol) and node.name == symbol_name: | ||
| node_id = n_id | ||
| break | ||
|
|
||
| if node_id is None: | ||
| return {"error": f"Symbol not found in context: {symbol_name}"} | ||
|
|
||
| # Get predecessors (symbols that use this symbol) | ||
| predecessors = [] | ||
| for pred in ctx.predecessors(node_id): | ||
| if isinstance(pred, Symbol): | ||
| predecessors.append({ | ||
| "name": pred.name, | ||
| "type": pred.symbol_type.name if hasattr(pred, "symbol_type") else "Unknown" | ||
| }) | ||
|
|
||
| Args: | ||
| file_path: Path to the file to analyze | ||
| Returns: | ||
| A list of import statements | ||
| """ | ||
| file = self.find_file_by_path(file_path) | ||
| if file and hasattr(file, "imports"): | ||
| return [imp.source for imp in file.imports] | ||
| return [] | ||
| # Get successors (symbols used by this symbol) | ||
| successors = [] | ||
| for succ in ctx.successors(node_id): | ||
| if isinstance(succ, Symbol): | ||
| successors.append({ | ||
| "name": succ.name, | ||
| "type": succ.symbol_type.name if hasattr(succ, "symbol_type") else "Unknown" | ||
| }) | ||
|
|
||
| return { | ||
| "symbol": { | ||
| "name": symbol.name, | ||
| "type": symbol.symbol_type.name if hasattr(symbol, "symbol_type") else "Unknown", | ||
| "file": symbol.file.name if hasattr(symbol, "file") else "Unknown" | ||
| }, | ||
| "predecessors": predecessors, | ||
| "successors": successors | ||
| } | ||
|
|
||
| def get_file_exports(self, file_path: str) -> List[str]: | ||
| def get_file_dependencies(self, file_path: str) -> Dict[str, Any]: | ||
| """ | ||
| Get all exports from a file. | ||
| Get dependency information for a file using CodebaseContext. | ||
| Args: | ||
| file_path: Path to the file to analyze | ||
| Returns: | ||
| A list of exported symbol names | ||
| A dictionary containing dependency information | ||
| """ | ||
| file = self.find_file_by_path(file_path) | ||
| if file is None: | ||
| return [] | ||
|
|
||
| exports = [] | ||
| for symbol in file.symbols: | ||
| # Check if this symbol is exported | ||
| if hasattr(symbol, "is_exported") and symbol.is_exported: | ||
| exports.append(symbol.name) | ||
| # For TypeScript/JavaScript, check for export keyword | ||
| elif hasattr(symbol, "modifiers") and "export" in symbol.modifiers: | ||
| exports.append(symbol.name) | ||
|
|
||
| return exports | ||
|
|
||
| def analyze_complexity(self) -> Dict[str, Any]: | ||
| """ | ||
| Analyze code complexity metrics for the codebase. | ||
| Returns: | ||
| A dictionary containing complexity metrics | ||
| """ | ||
| results = {} | ||
| return {"error": f"File not found: {file_path}"} | ||
|
|
||
| # Analyze cyclomatic complexity | ||
| complexity_results = [] | ||
| for func in self.codebase.functions: | ||
| if hasattr(func, "code_block"): | ||
| complexity = calculate_cyclomatic_complexity(func) | ||
| complexity_results.append({ | ||
| "name": func.name, | ||
| "complexity": complexity, | ||
| "rank": cc_rank(complexity) | ||
| }) | ||
| # Use the context to get more information about the file | ||
| ctx = self.context | ||
|
|
||
| # Calculate average complexity | ||
| if complexity_results: | ||
| avg_complexity = sum(item["complexity"] for item in complexity_results) / len(complexity_results) | ||
| else: | ||
| avg_complexity = 0 | ||
| # Get file node ID in the context graph | ||
| node_id = None | ||
| for n_id, node in enumerate(ctx.nodes): | ||
| if isinstance(node, SourceFile) and node.name == file.name: | ||
| node_id = n_id | ||
| break | ||
|
|
||
| results["cyclomatic_complexity"] = { | ||
| "functions": complexity_results, | ||
| "average": avg_complexity | ||
| } | ||
| if node_id is None: | ||
| return {"error": f"File not found in context: {file_path}"} | ||
|
|
||
| # Analyze line metrics | ||
| line_metrics = {} | ||
| total_loc = 0 | ||
| total_lloc = 0 | ||
| total_sloc = 0 | ||
| total_comments = 0 | ||
| # Get files that import this file | ||
| importers = [] | ||
| for pred in ctx.predecessors(node_id, edge_type=EdgeType.IMPORT): | ||
| if isinstance(pred, SourceFile): | ||
| importers.append(pred.name) | ||
|
|
||
| for file in self.codebase.files: | ||
| if hasattr(file, "source"): | ||
| loc, lloc, sloc, comments = count_lines(file.source) | ||
| line_metrics[file.name] = { | ||
| "loc": loc, | ||
| "lloc": lloc, | ||
| "sloc": sloc, | ||
| "comments": comments, | ||
| "comment_ratio": comments / loc if loc > 0 else 0 | ||
| } | ||
| total_loc += loc | ||
| total_lloc += lloc | ||
| total_sloc += sloc | ||
| total_comments += comments | ||
|
|
||
| results["line_metrics"] = { | ||
| "files": line_metrics, | ||
| "total": { | ||
| "loc": total_loc, | ||
| "lloc": total_lloc, | ||
| "sloc": total_sloc, | ||
| "comments": total_comments, | ||
| "comment_ratio": total_comments / total_loc if total_loc > 0 else 0 | ||
| } | ||
| } | ||
| imported = [] | ||
| for succ in ctx.successors(node_id, edge_type=EdgeType.IMPORT): | ||
| if isinstance(succ, SourceFile): | ||
| imported.append(succ.name) | ||
|
|
||
| # Analyze Halstead metrics | ||
| halstead_results = [] | ||
| total_volume = 0 | ||
|
|
||
| for func in self.codebase.functions: | ||
| if hasattr(func, "code_block"): | ||
| operators, operands = get_operators_and_operands(func) | ||
| volume, N1, N2, n1, n2 = calculate_halstead_volume(operators, operands) | ||
|
|
||
| # Calculate maintainability index | ||
| loc = len(func.code_block.source.splitlines()) | ||
| complexity = calculate_cyclomatic_complexity(func) | ||
| mi_score = calculate_maintainability_index(volume, complexity, loc) | ||
|
|
||
| halstead_results.append({ | ||
| "name": func.name, | ||
| "volume": volume, | ||
| "unique_operators": n1, | ||
| "unique_operands": n2, | ||
| "total_operators": N1, | ||
| "total_operands": N2, | ||
| "maintainability_index": mi_score, | ||
| "maintainability_rank": get_maintainability_rank(mi_score) | ||
| }) | ||
|
|
||
| total_volume += volume | ||
|
|
||
| results["halstead_metrics"] = { | ||
| "functions": halstead_results, | ||
| "total_volume": total_volume, | ||
| "average_volume": total_volume / len(halstead_results) if halstead_results else 0 | ||
| return { | ||
| "file": file.name, | ||
| "importers": importers, | ||
| "imported": imported | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API endpoints lack proper input validation and rate limiting. Consider adding request validation middleware and rate limiting to prevent abuse. Also, add error handling for cases where the repository URL is invalid or inaccessible.
| Returns: | |
| A dictionary containing import analysis results | |
| """ | |
| graph = create_graph_from_codebase(self.codebase.repo_name) | |
| graph = create_graph_from_codebase(self.codebase) | |
| cycles = find_import_cycles(graph) | |
| problematic_loops = find_problematic_import_loops(graph, cycles) | |
| return { | |
| "import_cycles": cycles, | |
| "import_graph": graph, | |
| "cycles": cycles, | |
| "problematic_loops": problematic_loops | |
| } | |
| def convert_args_to_kwargs(self) -> None: | |
| def get_dependency_graph(self) -> nx.DiGraph: | |
| """ | |
| Convert all function call arguments to keyword arguments. | |
| """ | |
| convert_all_calls_to_kwargs(self.codebase) | |
| def visualize_module_dependencies(self) -> None: | |
| """ | |
| Visualize module dependencies in the codebase. | |
| """ | |
| module_dependencies_run(self.codebase) | |
| def generate_mdx_documentation(self, class_name: str) -> str: | |
| """ | |
| Generate MDX documentation for a class. | |
| Get a dependency graph for the codebase files. | |
| Args: | |
| class_name: Name of the class to document | |
| Returns: | |
| MDX documentation as a string | |
| A directed graph representing file dependencies | |
| """ | |
| for cls in self.codebase.classes: | |
| if cls.name == class_name: | |
| return render_mdx_page_for_class(cls) | |
| return f"Class not found: {class_name}" | |
| def print_symbol_attribution(self) -> None: | |
| """ | |
| Print attribution information for symbols in the codebase. | |
| """ | |
| print_symbol_attribution(self.codebase) | |
| def get_extended_symbol_context(self, symbol_name: str, degree: int = 2) -> Dict[str, List[str]]: | |
| """ | |
| Get extended context (dependencies and usages) for a symbol. | |
| G = nx.DiGraph() | |
| Args: | |
| symbol_name: Name of the symbol to analyze | |
| degree: How many levels deep to collect dependencies and usages | |
| Returns: | |
| A dictionary containing dependencies and usages | |
| """ | |
| symbol = self.find_symbol_by_name(symbol_name) | |
| if symbol: | |
| dependencies, usages = get_extended_context(symbol, degree) | |
| return { | |
| "dependencies": [dep.name for dep in dependencies], | |
| "usages": [usage.name for usage in usages] | |
| } | |
| return {"dependencies": [], "usages": []} | |
| # Add nodes for all files | |
| for file in self.codebase.files: | |
| G.add_node(file.name, type="file") | |
| # Add edges for imports | |
| for file in self.codebase.files: | |
| for imp in file.imports: | |
| if imp.imported_symbol and hasattr(imp.imported_symbol, "file"): | |
| imported_file = imp.imported_symbol.file | |
| if imported_file and imported_file.name != file.name: | |
| G.add_edge(file.name, imported_file.name) | |
| return G | |
| def get_symbol_dependencies(self, symbol_name: str) -> List[str]: | |
| def get_symbol_attribution(self, symbol_name: str) -> str: | |
| """ | |
| Get direct dependencies of a symbol. | |
| Get attribution information for a symbol. | |
| Args: | |
| symbol_name: Name of the symbol to analyze | |
| Returns: | |
| A list of dependency symbol names | |
| A string containing attribution information | |
| """ | |
| symbol = self.find_symbol_by_name(symbol_name) | |
| if symbol and hasattr(symbol, "dependencies"): | |
| return [dep.name for dep in symbol.dependencies] | |
| return [] | |
| if symbol is None: | |
| return f"Symbol not found: {symbol_name}" | |
| return print_symbol_attribution(symbol) | |
| def get_symbol_usages(self, symbol_name: str) -> List[str]: | |
| def get_context_for_symbol(self, symbol_name: str) -> Dict[str, Any]: | |
| """ | |
| Get direct usages of a symbol. | |
| Get context information for a symbol. | |
| Args: | |
| symbol_name: Name of the symbol to analyze | |
| Returns: | |
| A list of usage symbol names | |
| A dictionary containing context information | |
| """ | |
| symbol = self.find_symbol_by_name(symbol_name) | |
| if symbol and hasattr(symbol, "symbol_usages"): | |
| return [usage.name for usage in symbol.symbol_usages] | |
| return [] | |
| def get_file_imports(self, file_path: str) -> List[str]: | |
| """ | |
| Get all imports in a file. | |
| if symbol is None: | |
| return {"error": f"Symbol not found: {symbol_name}"} | |
| # Use the context to get more information about the symbol | |
| ctx = self.context | |
| # Get symbol node ID in the context graph | |
| node_id = None | |
| for n_id, node in enumerate(ctx.nodes): | |
| if isinstance(node, Symbol) and node.name == symbol_name: | |
| node_id = n_id | |
| break | |
| if node_id is None: | |
| return {"error": f"Symbol not found in context: {symbol_name}"} | |
| # Get predecessors (symbols that use this symbol) | |
| predecessors = [] | |
| for pred in ctx.predecessors(node_id): | |
| if isinstance(pred, Symbol): | |
| predecessors.append({ | |
| "name": pred.name, | |
| "type": pred.symbol_type.name if hasattr(pred, "symbol_type") else "Unknown" | |
| }) | |
| Args: | |
| file_path: Path to the file to analyze | |
| Returns: | |
| A list of import statements | |
| """ | |
| file = self.find_file_by_path(file_path) | |
| if file and hasattr(file, "imports"): | |
| return [imp.source for imp in file.imports] | |
| return [] | |
| # Get successors (symbols used by this symbol) | |
| successors = [] | |
| for succ in ctx.successors(node_id): | |
| if isinstance(succ, Symbol): | |
| successors.append({ | |
| "name": succ.name, | |
| "type": succ.symbol_type.name if hasattr(succ, "symbol_type") else "Unknown" | |
| }) | |
| return { | |
| "symbol": { | |
| "name": symbol.name, | |
| "type": symbol.symbol_type.name if hasattr(symbol, "symbol_type") else "Unknown", | |
| "file": symbol.file.name if hasattr(symbol, "file") else "Unknown" | |
| }, | |
| "predecessors": predecessors, | |
| "successors": successors | |
| } | |
| def get_file_exports(self, file_path: str) -> List[str]: | |
| def get_file_dependencies(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Get all exports from a file. | |
| Get dependency information for a file using CodebaseContext. | |
| Args: | |
| file_path: Path to the file to analyze | |
| Returns: | |
| A list of exported symbol names | |
| A dictionary containing dependency information | |
| """ | |
| file = self.find_file_by_path(file_path) | |
| if file is None: | |
| return [] | |
| exports = [] | |
| for symbol in file.symbols: | |
| # Check if this symbol is exported | |
| if hasattr(symbol, "is_exported") and symbol.is_exported: | |
| exports.append(symbol.name) | |
| # For TypeScript/JavaScript, check for export keyword | |
| elif hasattr(symbol, "modifiers") and "export" in symbol.modifiers: | |
| exports.append(symbol.name) | |
| return exports | |
| def analyze_complexity(self) -> Dict[str, Any]: | |
| """ | |
| Analyze code complexity metrics for the codebase. | |
| Returns: | |
| A dictionary containing complexity metrics | |
| """ | |
| results = {} | |
| return {"error": f"File not found: {file_path}"} | |
| # Analyze cyclomatic complexity | |
| complexity_results = [] | |
| for func in self.codebase.functions: | |
| if hasattr(func, "code_block"): | |
| complexity = calculate_cyclomatic_complexity(func) | |
| complexity_results.append({ | |
| "name": func.name, | |
| "complexity": complexity, | |
| "rank": cc_rank(complexity) | |
| }) | |
| # Use the context to get more information about the file | |
| ctx = self.context | |
| # Calculate average complexity | |
| if complexity_results: | |
| avg_complexity = sum(item["complexity"] for item in complexity_results) / len(complexity_results) | |
| else: | |
| avg_complexity = 0 | |
| # Get file node ID in the context graph | |
| node_id = None | |
| for n_id, node in enumerate(ctx.nodes): | |
| if isinstance(node, SourceFile) and node.name == file.name: | |
| node_id = n_id | |
| break | |
| results["cyclomatic_complexity"] = { | |
| "functions": complexity_results, | |
| "average": avg_complexity | |
| } | |
| if node_id is None: | |
| return {"error": f"File not found in context: {file_path}"} | |
| # Analyze line metrics | |
| line_metrics = {} | |
| total_loc = 0 | |
| total_lloc = 0 | |
| total_sloc = 0 | |
| total_comments = 0 | |
| # Get files that import this file | |
| importers = [] | |
| for pred in ctx.predecessors(node_id, edge_type=EdgeType.IMPORT): | |
| if isinstance(pred, SourceFile): | |
| importers.append(pred.name) | |
| for file in self.codebase.files: | |
| if hasattr(file, "source"): | |
| loc, lloc, sloc, comments = count_lines(file.source) | |
| line_metrics[file.name] = { | |
| "loc": loc, | |
| "lloc": lloc, | |
| "sloc": sloc, | |
| "comments": comments, | |
| "comment_ratio": comments / loc if loc > 0 else 0 | |
| } | |
| total_loc += loc | |
| total_lloc += lloc | |
| total_sloc += sloc | |
| total_comments += comments | |
| results["line_metrics"] = { | |
| "files": line_metrics, | |
| "total": { | |
| "loc": total_loc, | |
| "lloc": total_lloc, | |
| "sloc": total_sloc, | |
| "comments": total_comments, | |
| "comment_ratio": total_comments / total_loc if total_loc > 0 else 0 | |
| } | |
| } | |
| imported = [] | |
| for succ in ctx.successors(node_id, edge_type=EdgeType.IMPORT): | |
| if isinstance(succ, SourceFile): | |
| imported.append(succ.name) | |
| # Analyze Halstead metrics | |
| halstead_results = [] | |
| total_volume = 0 | |
| for func in self.codebase.functions: | |
| if hasattr(func, "code_block"): | |
| operators, operands = get_operators_and_operands(func) | |
| volume, N1, N2, n1, n2 = calculate_halstead_volume(operators, operands) | |
| # Calculate maintainability index | |
| loc = len(func.code_block.source.splitlines()) | |
| complexity = calculate_cyclomatic_complexity(func) | |
| mi_score = calculate_maintainability_index(volume, complexity, loc) | |
| halstead_results.append({ | |
| "name": func.name, | |
| "volume": volume, | |
| "unique_operators": n1, | |
| "unique_operands": n2, | |
| "total_operators": N1, | |
| "total_operands": N2, | |
| "maintainability_index": mi_score, | |
| "maintainability_rank": get_maintainability_rank(mi_score) | |
| }) | |
| total_volume += volume | |
| results["halstead_metrics"] = { | |
| "functions": halstead_results, | |
| "total_volume": total_volume, | |
| "average_volume": total_volume / len(halstead_results) if halstead_results else 0 | |
| return { | |
| "file": file.name, | |
| "importers": importers, | |
| "imported": imported | |
| } | |
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.middleware.throttling import ThrottlingMiddleware | |
| from pydantic import BaseModel, HttpUrl, validator | |
| import re | |
| class RepoRequest(BaseModel): | |
| repo_url: str | |
| @validator('repo_url') | |
| def validate_repo_url(cls, v): | |
| if not re.match(r'^[\\w.-]+/[\\w.-]+$', v): | |
| raise ValueError('Invalid repository format. Expected: owner/repo') | |
| return v | |
| async def verify_repo_access(repo_url: str): | |
| try: | |
| # Add repository access verification logic | |
| if not is_repo_accessible(repo_url): | |
| raise HTTPException(status_code=404, detail="Repository not found or inaccessible") | |
| return repo_url | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| app.add_middleware( | |
| ThrottlingMiddleware, | |
| rate_limit=100, # requests | |
| time_window=60 # seconds | |
| ) | |
| @app.post("/analyze_repo") | |
| async def analyze_repo( | |
| request: RepoRequest, | |
| repo_url: str = Depends(verify_repo_access) | |
| ) -> Dict[str, Any]: | |
| try: | |
| codebase = Codebase.from_repo(repo_url) | |
| analyzer = CodeAnalyzer(codebase) | |
| # Rest of the code... |
| return f"{self.error_type.upper()} ({self.severity}): {self.message} at {location}" | ||
|
|
||
|
|
||
| class ErrorContextAnalyzer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error telemetry and metrics collection to track error patterns over time. This would help identify common issues and prioritize improvements. Also, consider adding structured error categories for better aggregation.
| return f"{self.error_type.upper()} ({self.severity}): {self.message} at {location}" | |
| class ErrorContextAnalyzer: | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| @dataclass | |
| class ErrorMetrics: | |
| error_type: str | |
| occurrence_count: int | |
| avg_severity: float | |
| common_files: List[str] | |
| class ErrorContextAnalyzer: | |
| def collect_error_metrics(self) -> Dict[str, ErrorMetrics]: | |
| metrics = {} | |
| for error in self.analyze_codebase(): | |
| if error.error_type not in metrics: | |
| metrics[error.error_type] = ErrorMetrics( | |
| error_type=error.error_type, | |
| occurrence_count=0, | |
| avg_severity=0.0, | |
| common_files=[] | |
| ) | |
| # Update metrics | |
| metrics[error.error_type].occurrence_count += 1 | |
| return metrics |
|
Persistent review updated to latest commit e233dfa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review by Korbit AI
Korbit automatically attempts to detect when you fix issues in new commits.
| Category | Issue | Status |
|---|---|---|
| High Coupling in Type Analysis Logic ▹ view | ||
| Incomplete Operation Type Inference ▹ view |
Files scanned
| File Path | Reviewed |
|---|---|
| codegen-on-oss/codegen_on_oss/analysis/enhanced_type_detection.py | ✅ |
Explore our documentation to understand the languages and file types we support and the files we ignore.
Check out our docs on how you can make Korbit work best for you and your team.
| class TypeAnalyzer: | ||
| """ | ||
| Analyzes code for type-related errors using AST analysis and type inference. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| """Initialize the TypeAnalyzer.""" | ||
| # Map of known Python types | ||
| self.python_types = { | ||
| 'str': str, | ||
| 'int': int, | ||
| 'float': float, | ||
| 'bool': bool, | ||
| 'list': list, | ||
| 'dict': dict, | ||
| 'tuple': tuple, | ||
| 'set': set, | ||
| 'None': type(None), | ||
| } | ||
|
|
||
| # Map of compatible binary operations | ||
| self.compatible_ops = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High Coupling in Type Analysis Logic 
Tell me more
What is the issue?
The TypeAnalyzer class has high coupling by directly embedding type compatibility rules and type mappings.
Why this matters
This tight coupling makes it difficult to modify or extend type compatibility rules without changing the main analyzer class.
Suggested change ∙ Feature Preview
Extract type compatibility logic into separate strategy classes:
class TypeCompatibilityStrategy:
def get_compatible_types(self, op_type: Type[ast.operator]) -> Dict[Type, List[Type]]:
pass
class DefaultTypeCompatibility(TypeCompatibilityStrategy):
def get_compatible_types(self, op_type: Type[ast.operator]) -> Dict[Type, List[Type]]:
return {...}
class TypeAnalyzer:
def __init__(self, compatibility_strategy: TypeCompatibilityStrategy = None):
self.compatibility_strategy = compatibility_strategy or DefaultTypeCompatibility()Provide feedback to improve future suggestions
💬 Looking for more details? Reply to this comment to chat with Korbit.
| if isinstance(node.op, ast.Add): | ||
| if left_type == str or right_type == str: | ||
| return str | ||
| elif left_type in (int, float) and right_type in (int, float): | ||
| return float if float in (left_type, right_type) else int | ||
| elif left_type == list and right_type == list: | ||
| return list | ||
| elif left_type == tuple and right_type == tuple: | ||
| return tuple |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete Operation Type Inference 
Tell me more
What is the issue?
The type inference in _get_expression_type method only handles the Add operation, missing other operations defined in compatible_ops.
Why this matters
Lack of type inference for other operations will result in missing type errors for valid Python operations like subtraction, multiplication, and division.
Suggested change ∙ Feature Preview
Add type inference for all supported operations:
for op_type, type_map in self.compatible_ops.items():
if isinstance(node.op, op_type):
if left_type in type_map and right_type in type_map[left_type]:
# Special case for str concatenation
if left_type == str and isinstance(node.op, ast.Add):
return str
# Special case for numeric operations
if left_type in (int, float) and right_type in (int, float):
return float if float in (left_type, right_type) else int
# For other types, return the left type
return left_typeProvide feedback to improve future suggestions
💬 Looking for more details? Reply to this comment to chat with Korbit.
PR DescriptionThis pull request introduces a comprehensive code analysis module with a focus on error detection and contextual error information. The goal is to provide developers with tools to identify and understand potential issues in their codebase, leading to improved code quality and maintainability. Click to see moreKey Technical ChangesThe key technical changes include the addition of an ErrorContextAnalyzer class for detecting and analyzing errors, a CodeError class to represent errors with detailed context, and FastAPI endpoints for accessing the analysis functionality. Enhanced type detection using AST analysis has also been implemented. The analysis.py file has been refactored to integrate these new components. Architecture DecisionsThe architectural decisions involve creating a modular design with separate classes for code analysis (CodeAnalyzer) and error context analysis (ErrorContextAnalyzer). Dependency injection is considered for future enhancements. The use of networkx for dependency graph creation and AST parsing for type analysis are also notable. Dependencies and InteractionsThis module depends on the codegen library for codebase representation and interacts with FastAPI for providing API endpoints. It also utilizes networkx for graph analysis and the ast module for static code analysis. There are no immediate dependencies on other parts of the system, but future integrations with code metrics and documentation generation are possible. Risk ConsiderationsPotential risks include performance overhead due to AST parsing and graph analysis, especially for large codebases. The accuracy of type inference and error detection is limited by the static analysis approach. There is also a risk of false positives or negatives in error detection. Thorough testing and performance monitoring are recommended. Notable Implementation DetailsNotable implementation details include the use of regular expressions for pattern matching in error detection, the creation of detailed error context information (e.g., context lines, suggested fixes), and the handling of circular dependencies. The enhanced_type_detection.py file provides more robust type analysis capabilities using AST parsing and type inference. |
| Args: | ||
| function_name: Name of the function to find | ||
| Returns: | ||
| The Function object if found, None otherwise | ||
| """ | ||
| for func in self.codebase.functions: | ||
| if func.name == function_name: | ||
| return func | ||
| return None | ||
|
|
||
| def document_functions(self) -> None: | ||
| """ | ||
| Generate documentation for functions in the codebase. | ||
| """ | ||
| document_functions_run(self.codebase) | ||
| def analyze_imports(self) -> Dict[str, Any]: | ||
|
|
||
| def analyze_imports(self) -> dict[str, Any]: | ||
| """ | ||
| Analyze import relationships in the codebase. | ||
| Returns: | ||
| A dictionary containing import analysis results | ||
| """ | ||
| graph = create_graph_from_codebase(self.codebase.repo_name) | ||
| graph = create_graph_from_codebase(self.codebase) | ||
| cycles = find_import_cycles(graph) | ||
| problematic_loops = find_problematic_import_loops(graph, cycles) | ||
|
|
||
| return { | ||
| "import_cycles": cycles, | ||
| "problematic_loops": problematic_loops | ||
| "import_graph": graph, | ||
| "cycles": cycles, | ||
| "problematic_loops": problematic_loops, | ||
| } | ||
|
|
||
| def convert_args_to_kwargs(self) -> None: | ||
| """ | ||
| Convert all function call arguments to keyword arguments. | ||
| """ | ||
| convert_all_calls_to_kwargs(self.codebase) | ||
|
|
||
| def visualize_module_dependencies(self) -> None: | ||
| """ | ||
| Visualize module dependencies in the codebase. | ||
| """ | ||
| module_dependencies_run(self.codebase) | ||
|
|
||
| def generate_mdx_documentation(self, class_name: str) -> str: | ||
| """ | ||
| Generate MDX documentation for a class. | ||
| Args: | ||
| class_name: Name of the class to document | ||
| Returns: | ||
| MDX documentation as a string | ||
| """ | ||
| for cls in self.codebase.classes: | ||
| if cls.name == class_name: | ||
| return render_mdx_page_for_class(cls) | ||
| return f"Class not found: {class_name}" | ||
|
|
||
| def print_symbol_attribution(self) -> None: | ||
| """ | ||
| Print attribution information for symbols in the codebase. | ||
| """ | ||
| print_symbol_attribution(self.codebase) | ||
|
|
||
| def get_extended_symbol_context(self, symbol_name: str, degree: int = 2) -> Dict[str, List[str]]: | ||
|
|
||
| def get_dependency_graph(self) -> nx.DiGraph: | ||
| """ | ||
| Get extended context (dependencies and usages) for a symbol. | ||
| Args: | ||
| symbol_name: Name of the symbol to analyze | ||
| degree: How many levels deep to collect dependencies and usages | ||
| Get a dependency graph for the codebase files. | ||
| Returns: | ||
| A dictionary containing dependencies and usages | ||
| A directed graph representing file dependencies | ||
| """ | ||
| symbol = self.find_symbol_by_name(symbol_name) | ||
| if symbol: | ||
| dependencies, usages = get_extended_context(symbol, degree) | ||
| return { | ||
| "dependencies": [dep.name for dep in dependencies], | ||
| "usages": [usage.name for usage in usages] | ||
| } | ||
| return {"dependencies": [], "usages": []} | ||
|
|
||
| def get_symbol_dependencies(self, symbol_name: str) -> List[str]: | ||
| G = nx.DiGraph() | ||
|
|
||
| # Add nodes for all files | ||
| for file in self.codebase.files: | ||
| G.add_node(file.name, type="file") | ||
|
|
||
| # Add edges for imports | ||
| for file in self.codebase.files: | ||
| for imp in file.imports: | ||
| if imp.imported_symbol and hasattr(imp.imported_symbol, "file"): | ||
| imported_file = imp.imported_symbol.file | ||
| if imported_file and imported_file.name != file.name: | ||
| G.add_edge(file.name, imported_file.name) | ||
|
|
||
| return G | ||
|
|
||
| def get_symbol_attribution(self, symbol_name: str) -> str: | ||
| """ | ||
| Get direct dependencies of a symbol. | ||
| Get attribution information for a symbol. | ||
| Args: | ||
| symbol_name: Name of the symbol to analyze | ||
| Returns: | ||
| A list of dependency symbol names | ||
| A string containing attribution information | ||
| """ | ||
| symbol = self.find_symbol_by_name(symbol_name) | ||
| if symbol and hasattr(symbol, "dependencies"): | ||
| return [dep.name for dep in symbol.dependencies] | ||
| return [] | ||
|
|
||
| def get_symbol_usages(self, symbol_name: str) -> List[str]: | ||
| if symbol is None: | ||
| return f"Symbol not found: {symbol_name}" | ||
|
|
||
| return print_symbol_attribution(symbol) | ||
|
|
||
| def get_context_for_symbol(self, symbol_name: str) -> dict[str, Any]: | ||
| """ | ||
| Get direct usages of a symbol. | ||
| Get context information for a symbol. | ||
| Args: | ||
| symbol_name: Name of the symbol to analyze | ||
| Returns: | ||
| A list of usage symbol names | ||
| A dictionary containing context information | ||
| """ | ||
| symbol = self.find_symbol_by_name(symbol_name) | ||
| if symbol and hasattr(symbol, "symbol_usages"): | ||
| return [usage.name for usage in symbol.symbol_usages] | ||
| return [] | ||
|
|
||
| def get_file_imports(self, file_path: str) -> List[str]: | ||
| """ | ||
| Get all imports in a file. | ||
| Args: | ||
| file_path: Path to the file to analyze | ||
| Returns: | ||
| A list of import statements | ||
| """ | ||
| file = self.find_file_by_path(file_path) | ||
| if file and hasattr(file, "imports"): | ||
| return [imp.source for imp in file.imports] | ||
| return [] | ||
|
|
||
| def get_file_exports(self, file_path: str) -> List[str]: | ||
| if symbol is None: | ||
| return {"error": f"Symbol not found: {symbol_name}"} | ||
|
|
||
| # Use the context to get more information about the symbol | ||
| ctx = self.context | ||
|
|
||
| # Get symbol node ID in the context graph | ||
| node_id = None | ||
| for n_id, node in enumerate(ctx.nodes): | ||
| if isinstance(node, Symbol) and node.name == symbol_name: | ||
| node_id = n_id | ||
| break | ||
|
|
||
| if node_id is None: | ||
| return {"error": f"Symbol not found in context: {symbol_name}"} | ||
|
|
||
| # Get predecessors (symbols that use this symbol) | ||
| predecessors = [] | ||
| for pred in ctx.predecessors(node_id): | ||
| if isinstance(pred, Symbol): | ||
| predecessors.append({ | ||
| "name": pred.name, | ||
| "type": pred.symbol_type.name | ||
| if hasattr(pred, "symbol_type") | ||
| else "Unknown", | ||
| }) | ||
|
|
||
| # Get successors (symbols used by this symbol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CodeAnalyzer class has grown quite large with multiple responsibilities. Consider breaking it down into smaller, more focused classes following the Single Responsibility Principle. For example, you could extract the dependency analysis and error analysis into separate classes.
| errors = self.analyze_file(file) | ||
|
|
||
| # Get import graph information | ||
| import_graph = self.build_import_graph() | ||
| importers = [] | ||
| imported = [] | ||
|
|
||
| if file.name in import_graph: | ||
| importers = [{"name": importer} for importer in import_graph.predecessors(file.name)] | ||
| imported = [{"name": imp} for imp in import_graph.successors(file.name)] | ||
|
|
||
| # Get function information | ||
| functions = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ErrorContextAnalyzer._find_line_number method uses a simple regex pattern matching that could fail in complex cases. Consider using AST parsing for more reliable line number detection. Also, add error handling for malformed source code.
| suggested_fix=f"Ensure the return value matches the declared return type {self._type_name(declared_return_type)}" | ||
| )) | ||
|
|
||
| return errors | ||
|
|
||
| def _get_expression_type(self, node: ast.AST, variable_types: Dict[str, Any]) -> Optional[Any]: | ||
| """ | ||
| Get the type of an expression. | ||
| Args: | ||
| node: The AST node representing the expression | ||
| variable_types: Dictionary mapping variable names to their types | ||
| Returns: | ||
| The type of the expression, or None if it cannot be determined | ||
| """ | ||
| if isinstance(node, ast.Name): | ||
| # Variable reference | ||
| return variable_types.get(node.id) | ||
| elif isinstance(node, ast.Constant): | ||
| # Literal value | ||
| return type(node.value) | ||
| elif isinstance(node, ast.List): | ||
| # List literal | ||
| return list | ||
| elif isinstance(node, ast.Dict): | ||
| # Dict literal | ||
| return dict | ||
| elif isinstance(node, ast.Tuple): | ||
| # Tuple literal | ||
| return tuple | ||
| elif isinstance(node, ast.Set): | ||
| # Set literal | ||
| return set | ||
| elif isinstance(node, ast.BinOp): | ||
| # Binary operation | ||
| left_type = self._get_expression_type(node.left, variable_types) | ||
| right_type = self._get_expression_type(node.right, variable_types) | ||
|
|
||
| # Determine result type based on operation and operand types | ||
| # This is a simplification - in a real implementation, we would need more sophisticated type inference | ||
| if isinstance(node.op, ast.Add): | ||
| if left_type == str or right_type == str: | ||
| return str | ||
| elif left_type in (int, float) and right_type in (int, float): | ||
| return float if float in (left_type, right_type) else int | ||
| elif left_type == list and right_type == list: | ||
| return list | ||
| elif left_type == tuple and right_type == tuple: | ||
| return tuple | ||
|
|
||
| # Add more operation type inference as needed | ||
|
|
||
| # For other expression types, we can't determine the type | ||
| return None | ||
|
|
||
| def _are_types_compatible(self, left_type: Any, right_type: Any, op: ast.operator) -> bool: | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TypeAnalyzer._get_expression_type method contains a complex series of if-elif statements. Consider using a pattern matching approach or strategy pattern to make this more maintainable and extensible.
|
|
||
| def analyze_function_parameters(self, function: Function) -> List[CodeError]: | ||
| """ | ||
| Analyze function parameters for errors. | ||
| Args: | ||
| function: The function to analyze | ||
| Returns: | ||
| A list of parameter-related errors | ||
| """ | ||
| errors = [] | ||
|
|
||
| if not hasattr(function, "parameters") or not hasattr(function, "function_calls"): | ||
| return errors | ||
|
|
||
| # Check for parameter type mismatches | ||
| for param in function.parameters: | ||
| if not hasattr(param, "type_annotation") or not param.type_annotation: | ||
| continue | ||
|
|
||
| # Check if parameter is used with correct type | ||
| # This is a simplified check and would need more sophisticated type inference in practice | ||
| param_name = param.name | ||
| param_type = param.type_annotation | ||
|
|
||
| # Look for usage of this parameter in the function body | ||
| if hasattr(function, "code_block") and hasattr(function.code_block, "source"): | ||
| source = function.code_block.source | ||
|
|
||
| # Simple pattern matching for potential type errors | ||
| # This is a simplified approach and would need more sophisticated analysis in practice | ||
| if re.search(rf"\b{param_name}\s*\+\s*\d+\b", source) and "str" in param_type: | ||
| line_number = self._find_line_number(function.code_block.source, rf"\b{param_name}\s*\+\s*\d+\b") | ||
| errors.append(CodeError( | ||
| error_type=ErrorType.TYPE_ERROR, | ||
| message=f"Potential type error: adding integer to string parameter '{param_name}'", | ||
| file_path=function.file.name if hasattr(function, "file") else None, | ||
| line_number=line_number, | ||
| severity=ErrorSeverity.HIGH, | ||
| symbol_name=function.name, | ||
| context_lines=self.get_context_lines(function.file.name, line_number) if hasattr(function, "file") else None, | ||
| suggested_fix=f"Ensure '{param_name}' is converted to int before addition or use string concatenation" | ||
| )) | ||
|
|
||
| # Check for call parameter mismatches | ||
| call_graph = self.build_call_graph() | ||
| for call in function.function_calls: | ||
| called_func_name = call.name | ||
|
|
||
| # Find the called function | ||
| called_func = None | ||
| for func in self.codebase.functions: | ||
| if func.name == called_func_name: | ||
| called_func = func | ||
| break | ||
|
|
||
| if not called_func or not hasattr(called_func, "parameters"): | ||
| continue | ||
|
|
||
| # Check if number of arguments matches | ||
| if hasattr(call, "args") and len(call.args) != len(called_func.parameters): | ||
| # Find the line number of the call | ||
| line_number = self._find_line_number(function.code_block.source, rf"\b{called_func_name}\s*\(") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The analyze_function_parameters method could benefit from dependency injection to make it more testable and configurable. Consider extracting the type checking logic into a separate injectable service.
Code Review: Error Context Analysis SystemI've completed a thorough review of this PR which adds a robust error context analysis system to the codebase. Overall, this is a well-structured and valuable addition that will significantly enhance the code analysis capabilities. Strengths
Suggestions for Improvement
Questions
Overall, this is a valuable addition to the codebase that will help developers identify and fix issues in their code more efficiently. The detailed context information and suggested fixes are particularly useful features. |
| return self.error_analyzer.get_function_error_context(function_name) | ||
|
|
||
| def get_file_error_context(self, file_path: str) -> dict[str, Any]: | ||
| """ | ||
| Get the most frequently changed files in the codebase. | ||
| Get detailed error context for a specific file. | ||
| Args: | ||
| limit: Maximum number of files to return | ||
| file_path: The path of the file to analyze | ||
| Returns: | ||
| A dictionary mapping file paths to change counts | ||
| A dictionary with detailed error context | ||
| """ | ||
| if not hasattr(self.codebase, "repo_operator") or not self.codebase.repo_operator: | ||
| return {} | ||
|
|
||
| try: | ||
| # Get commits from the last year | ||
| end_date = datetime.now(UTC) | ||
| start_date = end_date - timedelta(days=365) | ||
|
|
||
| # Get all commits in the date range | ||
| commits = self.codebase.repo_operator.get_commits(since=start_date, until=end_date) | ||
|
|
||
| # Count file changes | ||
| file_changes = {} | ||
| for commit in commits: | ||
| for file in commit.stats.files: | ||
| if file in file_changes: | ||
| file_changes[file] += 1 | ||
| else: | ||
| file_changes[file] = 1 | ||
|
|
||
| # Sort by change count and limit results | ||
| sorted_files = sorted(file_changes.items(), key=lambda x: x[1], reverse=True)[:limit] | ||
| return dict(sorted_files) | ||
| except Exception as e: | ||
| return {"error": str(e)} | ||
|
|
||
| def get_monthly_commits(repo_path: str) -> Dict[str, int]: | ||
| """ | ||
| Get the number of commits per month for the last 12 months. | ||
| return self.error_analyzer.get_file_error_context(file_path) | ||
|
|
||
| Args: | ||
| repo_path: Path to the git repository | ||
| def get_error_context(self, error: CodeError) -> dict[str, Any]: | ||
| """ | ||
| Get detailed context information for an error. | ||
| Returns: | ||
| Dictionary with month-year as key and number of commits as value | ||
| """ | ||
| end_date = datetime.now(UTC) | ||
| start_date = end_date - timedelta(days=365) | ||
| Args: | ||
| error: The error to get context for | ||
| date_format = "%Y-%m-%d" | ||
| since_date = start_date.strftime(date_format) | ||
| until_date = end_date.strftime(date_format) | ||
| Returns: | ||
| A dictionary with detailed context information | ||
| """ | ||
| return self.error_analyzer.get_error_context(error) | ||
|
|
||
| # Validate repo_path format (should be owner/repo) | ||
| if not re.match(r"^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$", repo_path): | ||
| print(f"Invalid repository path format: {repo_path}") | ||
| return {} | ||
|
|
||
| repo_url = f"https://github.com/{repo_path}" | ||
| # Request models for API endpoints | ||
| class RepoRequest(BaseModel): | ||
| """Request model for repository analysis.""" | ||
|
|
||
| # Validate URL | ||
| try: | ||
| parsed_url = urlparse(repo_url) | ||
| if not all([parsed_url.scheme, parsed_url.netloc]): | ||
| print(f"Invalid URL: {repo_url}") | ||
| return {} | ||
| except Exception: | ||
| print(f"Invalid URL: {repo_url}") | ||
| return {} | ||
| repo_url: str | ||
|
|
||
| try: | ||
| original_dir = os.getcwd() | ||
|
|
||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| # Using a safer approach with a list of arguments and shell=False | ||
| subprocess.run( | ||
| ["git", "clone", repo_url, temp_dir], | ||
| check=True, | ||
| capture_output=True, | ||
| shell=False, | ||
| text=True, | ||
| ) | ||
| os.chdir(temp_dir) | ||
|
|
||
| # Using a safer approach with a list of arguments and shell=False | ||
| result = subprocess.run( | ||
| [ | ||
| "git", | ||
| "log", | ||
| f"--since={since_date}", | ||
| f"--until={until_date}", | ||
| "--format=%aI", | ||
| ], | ||
| capture_output=True, | ||
| text=True, | ||
| check=True, | ||
| shell=False, | ||
| ) | ||
| commit_dates = result.stdout.strip().split("\n") | ||
|
|
||
| monthly_counts = {} | ||
| current_date = start_date | ||
| while current_date <= end_date: | ||
| month_key = current_date.strftime("%Y-%m") | ||
| monthly_counts[month_key] = 0 | ||
| current_date = ( | ||
| current_date.replace(day=1) + timedelta(days=32) | ||
| ).replace(day=1) | ||
|
|
||
| for date_str in commit_dates: | ||
| if date_str: # Skip empty lines | ||
| commit_date = datetime.fromisoformat(date_str.strip()) | ||
| month_key = commit_date.strftime("%Y-%m") | ||
| if month_key in monthly_counts: | ||
| monthly_counts[month_key] += 1 | ||
|
|
||
| return dict(sorted(monthly_counts.items())) | ||
|
|
||
| except subprocess.CalledProcessError as e: | ||
| print(f"Error executing git command: {e}") | ||
| return {} | ||
| except Exception as e: | ||
| print(f"Error processing git commits: {e}") | ||
| return {} | ||
| finally: | ||
| with contextlib.suppress(Exception): | ||
| os.chdir(original_dir) | ||
|
|
||
| class SymbolRequest(BaseModel): | ||
| """Request model for symbol analysis.""" | ||
|
|
||
| def calculate_cyclomatic_complexity(function): | ||
| """ | ||
| Calculate the cyclomatic complexity of a function. | ||
| Args: | ||
| function: The function to analyze | ||
| Returns: | ||
| The cyclomatic complexity score | ||
| """ | ||
| def analyze_statement(statement): | ||
| complexity = 0 | ||
| repo_url: str | ||
| symbol_name: str | ||
|
|
||
| if isinstance(statement, IfBlockStatement): | ||
| complexity += 1 | ||
| if hasattr(statement, "elif_statements"): | ||
| complexity += len(statement.elif_statements) | ||
|
|
||
| elif isinstance(statement, ForLoopStatement | WhileStatement): | ||
| complexity += 1 | ||
| class FileRequest(BaseModel): | ||
| """Request model for file analysis.""" | ||
|
|
||
| elif isinstance(statement, TryCatchStatement): | ||
| complexity += len(getattr(statement, "except_blocks", [])) | ||
| repo_url: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FastAPI endpoints could benefit from more robust error handling and input validation. Consider using Pydantic models for request validation and adding structured error responses.
| """ | ||
| Tests for the error context analysis functionality. | ||
| This module contains unit tests for the ErrorContextAnalyzer and related classes. | ||
| """ | ||
|
|
||
| import ast | ||
| import unittest | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| from codegen.sdk.core.codebase import Codebase | ||
| from codegen.sdk.core.file import SourceFile | ||
| from codegen.sdk.core.function import Function | ||
| from codegen.sdk.core.symbol import Symbol | ||
| from codegen_on_oss.analysis.error_context import ( | ||
| CodeError, | ||
| ErrorContextAnalyzer, | ||
| ErrorSeverity, | ||
| ErrorType | ||
| ) | ||
|
|
||
|
|
||
| class TestCodeError(unittest.TestCase): | ||
| """Tests for the CodeError class.""" | ||
|
|
||
| def test_code_error_initialization(self): | ||
| """Test that a CodeError can be initialized with all parameters.""" | ||
| error = CodeError( | ||
| error_type=ErrorType.SYNTAX_ERROR, | ||
| message="Invalid syntax", | ||
| file_path="test.py", | ||
| line_number=10, | ||
| column=5, | ||
| severity=ErrorSeverity.CRITICAL, | ||
| symbol_name="test_function", | ||
| context_lines={9: "def test_function():", 10: " print('Hello world'"}, | ||
| suggested_fix="Fix the syntax error" | ||
| ) | ||
|
|
||
| self.assertEqual(error.error_type, ErrorType.SYNTAX_ERROR) | ||
| self.assertEqual(error.message, "Invalid syntax") | ||
| self.assertEqual(error.file_path, "test.py") | ||
| self.assertEqual(error.line_number, 10) | ||
| self.assertEqual(error.column, 5) | ||
| self.assertEqual(error.severity, ErrorSeverity.CRITICAL) | ||
| self.assertEqual(error.symbol_name, "test_function") | ||
| self.assertEqual(error.context_lines, {9: "def test_function():", 10: " print('Hello world'"}) | ||
| self.assertEqual(error.suggested_fix, "Fix the syntax error") | ||
|
|
||
| def test_code_error_to_dict(self): | ||
| """Test that a CodeError can be converted to a dictionary.""" | ||
| error = CodeError( | ||
| error_type=ErrorType.SYNTAX_ERROR, | ||
| message="Invalid syntax", | ||
| file_path="test.py", | ||
| line_number=10, | ||
| severity=ErrorSeverity.CRITICAL | ||
| ) | ||
|
|
||
| error_dict = error.to_dict() | ||
|
|
||
| self.assertEqual(error_dict["error_type"], ErrorType.SYNTAX_ERROR) | ||
| self.assertEqual(error_dict["message"], "Invalid syntax") | ||
| self.assertEqual(error_dict["file_path"], "test.py") | ||
| self.assertEqual(error_dict["line_number"], 10) | ||
| self.assertEqual(error_dict["severity"], ErrorSeverity.CRITICAL) | ||
|
|
||
| def test_code_error_str(self): | ||
| """Test the string representation of a CodeError.""" | ||
| error = CodeError( | ||
| error_type=ErrorType.SYNTAX_ERROR, | ||
| message="Invalid syntax", | ||
| file_path="test.py", | ||
| line_number=10, | ||
| severity=ErrorSeverity.CRITICAL | ||
| ) | ||
|
|
||
| error_str = str(error) | ||
|
|
||
| self.assertIn(ErrorType.SYNTAX_ERROR.upper(), error_str) | ||
| self.assertIn("Invalid syntax", error_str) | ||
| self.assertIn("test.py:10", error_str) | ||
| self.assertIn(ErrorSeverity.CRITICAL, error_str) | ||
|
|
||
|
|
||
| class TestErrorContextAnalyzer(unittest.TestCase): | ||
| """Tests for the ErrorContextAnalyzer class.""" | ||
|
|
||
| def setUp(self): | ||
| """Set up test fixtures.""" | ||
| # Create a mock codebase | ||
| self.codebase = MagicMock(spec=Codebase) | ||
|
|
||
| # Create a mock file | ||
| self.file = MagicMock(spec=SourceFile) | ||
| self.file.name = "test.py" | ||
| self.file.source = "def test_function():\n x = 'hello' + 5\n return x" | ||
|
|
||
| # Create a mock function | ||
| self.function = MagicMock(spec=Function) | ||
| self.function.name = "test_function" | ||
| self.function.file = self.file | ||
| self.function.line_number = 1 | ||
| self.function.code_block = MagicMock() | ||
| self.function.code_block.source = "def test_function():\n x = 'hello' + 5\n return x" | ||
|
|
||
| # Set up the codebase with the file and function | ||
| self.codebase.files = [self.file] | ||
| self.codebase.functions = [self.function] | ||
| self.codebase.get_file.return_value = self.file | ||
|
|
||
| # Create the analyzer | ||
| self.analyzer = ErrorContextAnalyzer(self.codebase) | ||
|
|
||
| def test_get_context_lines(self): | ||
| """Test getting context lines around a specific line.""" | ||
| context_lines = self.analyzer.get_context_lines("test.py", 2, context_size=1) | ||
|
|
||
| self.assertEqual(context_lines, { | ||
| 1: "def test_function():", | ||
| 2: " x = 'hello' + 5", | ||
| 3: " return x" | ||
| }) | ||
|
|
||
| def test_analyze_function(self): | ||
| """Test analyzing a function for errors.""" | ||
| errors = self.analyzer.analyze_function(self.function) | ||
|
|
||
| # We should find at least one error (type error) | ||
| self.assertGreaterEqual(len(errors), 1) | ||
|
|
||
| # Check that we found a type error | ||
| type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR] | ||
| self.assertGreaterEqual(len(type_errors), 1) | ||
|
|
||
| # Check the error details | ||
| error = type_errors[0] | ||
| self.assertEqual(error.file_path, "test.py") | ||
| self.assertEqual(error.symbol_name, "test_function") | ||
| self.assertEqual(error.severity, ErrorSeverity.HIGH) | ||
| self.assertIn("'hello' + 5", str(error.context_lines)) | ||
|
|
||
| def test_analyze_file(self): | ||
| """Test analyzing a file for errors.""" | ||
| errors = self.analyzer.analyze_file(self.file) | ||
|
|
||
| # We should find at least one error (type error) | ||
| self.assertGreaterEqual(len(errors), 1) | ||
|
|
||
| # Check that we found a type error | ||
| type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR] | ||
| self.assertGreaterEqual(len(type_errors), 1) | ||
|
|
||
| def test_analyze_codebase(self): | ||
| """Test analyzing the entire codebase for errors.""" | ||
| error_dict = self.analyzer.analyze_codebase() | ||
|
|
||
| # We should have errors for our test file | ||
| self.assertIn("test.py", error_dict) | ||
| self.assertGreaterEqual(len(error_dict["test.py"]), 1) | ||
|
|
||
| def test_find_circular_imports(self): | ||
| """Test finding circular imports.""" | ||
| # Mock the build_import_graph method to return a graph with a cycle | ||
| import networkx as nx | ||
| G = nx.DiGraph() | ||
| G.add_edge("a.py", "b.py") | ||
| G.add_edge("b.py", "c.py") | ||
| G.add_edge("c.py", "a.py") | ||
|
|
||
| with patch.object(self.analyzer, 'build_import_graph', return_value=G): | ||
| cycles = self.analyzer.find_circular_imports() | ||
|
|
||
| # We should find one cycle | ||
| self.assertEqual(len(cycles), 1) | ||
|
|
||
| # The cycle should contain a.py, b.py, and c.py | ||
| cycle = cycles[0] | ||
| self.assertIn("a.py", cycle) | ||
| self.assertIn("b.py", cycle) | ||
| self.assertIn("c.py", cycle) | ||
|
|
||
| def test_get_function_error_context(self): | ||
| """Test getting detailed error context for a function.""" | ||
| # Mock the analyze_function method to return a specific error | ||
| error = CodeError( | ||
| error_type=ErrorType.TYPE_ERROR, | ||
| message="Cannot add string and integer", | ||
| file_path="test.py", | ||
| line_number=2, | ||
| severity=ErrorSeverity.HIGH, | ||
| symbol_name="test_function", | ||
| context_lines={1: "def test_function():", 2: " x = 'hello' + 5", 3: " return x"}, | ||
| suggested_fix="Convert the integer to a string: 'hello' + str(5)" | ||
| ) | ||
|
|
||
| with patch.object(self.analyzer, 'analyze_function', return_value=[error]): | ||
| context = self.analyzer.get_function_error_context("test_function") | ||
|
|
||
| # Check the context | ||
| self.assertEqual(context["function_name"], "test_function") | ||
| self.assertEqual(context["file_path"], "test.py") | ||
| self.assertEqual(len(context["errors"]), 1) | ||
|
|
||
| # Check the error details | ||
| error_dict = context["errors"][0] | ||
| self.assertEqual(error_dict["error_type"], ErrorType.TYPE_ERROR) | ||
| self.assertEqual(error_dict["message"], "Cannot add string and integer") | ||
| self.assertEqual(error_dict["line_number"], 2) | ||
| self.assertEqual(error_dict["severity"], ErrorSeverity.HIGH) | ||
| self.assertEqual(error_dict["suggested_fix"], "Convert the integer to a string: 'hello' + str(5)") | ||
|
|
||
| def test_get_file_error_context(self): | ||
| """Test getting detailed error context for a file.""" | ||
| # Mock the analyze_file method to return a specific error | ||
| error = CodeError( | ||
| error_type=ErrorType.TYPE_ERROR, | ||
| message="Cannot add string and integer", | ||
| file_path="test.py", | ||
| line_number=2, | ||
| severity=ErrorSeverity.HIGH, | ||
| symbol_name="test_function", | ||
| context_lines={1: "def test_function():", 2: " x = 'hello' + 5", 3: " return x"}, | ||
| suggested_fix="Convert the integer to a string: 'hello' + str(5)" | ||
| ) | ||
|
|
||
| with patch.object(self.analyzer, 'analyze_file', return_value=[error]): | ||
| context = self.analyzer.get_file_error_context("test.py") | ||
|
|
||
| # Check the context | ||
| self.assertEqual(context["file_path"], "test.py") | ||
| self.assertEqual(len(context["errors"]), 1) | ||
|
|
||
| # Check the error details | ||
| error_dict = context["errors"][0] | ||
| self.assertEqual(error_dict["error_type"], ErrorType.TYPE_ERROR) | ||
| self.assertEqual(error_dict["message"], "Cannot add string and integer") | ||
| self.assertEqual(error_dict["line_number"], 2) | ||
| self.assertEqual(error_dict["severity"], ErrorSeverity.HIGH) | ||
| self.assertEqual(error_dict["suggested_fix"], "Convert the integer to a string: 'hello' + str(5)") | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| unittest.main() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test coverage could be more comprehensive. Consider adding property-based testing using hypothesis for more thorough testing of type analysis and error detection. Also add edge cases and error conditions.
Code Quality ReviewHere are some specific code quality observations and suggestions: 1. Error Detection MethodsIn # Simple pattern matching for potential type errors
if re.search(rf"\b{param_name}\s*\+\s*\d+\b", source) and "str" in param_type:
line_number = self._find_line_number(function.code_block.source, rf"\b{param_name}\s*\+\s*\d+\b")
errors.append(CodeError(
error_type=ErrorType.TYPE_ERROR,
message=f"Potential type error: adding integer to string parameter '{param_name}'",
# ...
))This approach might lead to false positives. Consider using AST-based analysis for more accurate type checking, similar to how you're extracting local variables in the 2. Memory ManagementThe system builds multiple graphs (call graph, dependency graph, import graph) which could be memory-intensive for large codebases: def build_call_graph(self) -> nx.DiGraph:
if self._call_graph is not None:
return self._call_graph
G = nx.DiGraph()
# ...
self._call_graph = G
return GWhile you're already implementing lazy initialization (only building when needed), consider adding options to limit the scope of analysis or implement incremental graph building for very large codebases. 3. Error Context CollectionThe context line collection is well-implemented: def get_context_lines(self, file_path: str, line_number: int, context_size: int = 3) -> Dict[int, str]:
# ...
lines = file.source.splitlines()
start_line = max(0, line_number - context_size - 1)
end_line = min(len(lines), line_number + context_size)
return {i + 1: lines[i] for i in range(start_line, end_line)}However, for very large files, consider adding a parameter to limit the total number of context lines returned to prevent memory issues. 4. API Response StructureThe API responses are well-structured, but consider adding pagination or filtering options for large result sets: @app.post("/analyze_errors")
async def analyze_errors(request: ErrorRequest) -> dict[str, Any]:
# ...
if function_name:
# Analyze errors in a specific function
return analyzer.get_function_error_context(function_name)
elif file_path:
# Analyze errors in a specific file
return analyzer.get_file_error_context(file_path)
else:
# Analyze errors in the entire repository
return {"error_analysis": analyzer.analyze_errors()}For repositories with many errors, the response could be very large. Consider adding pagination or filtering by error type/severity. Overall, the code quality is high with good separation of concerns, clear method names, and comprehensive documentation. These suggestions are primarily focused on robustness and scalability for larger codebases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @codegen-sh[bot] - I've reviewed your changes - here's some feedback:
- Consider splitting the introduction of the new error analysis system and the removal/refactoring of the complexity analysis features into separate pull requests for easier review.
- Review the new API endpoints for potential overlap (e.g.,
/analyze_repovs/analyze_errors,/analyze_file//analyze_functionvs/analyze_errors) and consider consolidating them.
Here's what I looked at during the review
- 🟢 General issues: all looks good
- 🟢 Security: all looks good
- 🟡 Testing: 3 issues found
- 🟡 Complexity: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| self.assertIn(ErrorSeverity.CRITICAL, error_str) | ||
|
|
||
|
|
||
| class TestErrorContextAnalyzer(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Missing tests for various error types supported by ErrorContextAnalyzer.
Add tests for each supported error type: syntax errors (invalid code), parameter errors (wrong arg count/types), undefined variables, unused imports, and symbol-level circular dependencies.
Suggested implementation:
class TestErrorContextAnalyzer(unittest.TestCase):
"""Tests for the ErrorContextAnalyzer class."""
def test_syntax_error(self):
# Simulate file with a syntax error (invalid code)
code = "def foo(:\n pass"
analyzer = ErrorContextAnalyzer()
error = analyzer.analyze(code)
error_str = str(error)
self.assertIn(ErrorType.SYNTAX_ERROR.upper(), error_str)
self.assertIn("Invalid syntax", error_str)
def test_parameter_error(self):
# Simulate file with parameter error (wrong argument count or type)
code = "def foo(a, b):\n pass\n\nfoo(1)"
analyzer = ErrorContextAnalyzer()
error = analyzer.analyze(code)
error_str = str(error)
self.assertIn(ErrorType.PARAMETER_ERROR.upper(), error_str)
self.assertIn("wrong number of arguments", error_str)
def test_undefined_variable(self):
# Simulate file with undefined variable access error
code = "def foo():\n return x"
analyzer = ErrorContextAnalyzer()
error = analyzer.analyze(code)
error_str = str(error)
self.assertIn(ErrorType.UNDEFINED_VARIABLE.upper(), error_str)
self.assertIn("undefined variable", error_str)
def test_unused_import(self):
# Simulate file with an unused import error
code = "import os\n\ndef foo():\n pass"
analyzer = ErrorContextAnalyzer()
error = analyzer.analyze(code)
error_str = str(error)
self.assertIn(ErrorType.UNUSED_IMPORT.upper(), error_str)
self.assertIn("unused import", error_str)
def test_circular_dependency(self):
# Simulate file with symbol-level circular dependency error
code = "# Simulated circular dependency error"
analyzer = ErrorContextAnalyzer()
error = analyzer.analyze(code)
error_str = str(error)
self.assertIn(ErrorType.CIRCULAR_DEPENDENCY.upper(), error_str)
self.assertIn("circular dependency", error_str)Ensure that ErrorContextAnalyzer, ErrorType, and any related error constants are imported at the top of this file. For example:
from codegen.sdk.core.errors import ErrorContextAnalyzer, ErrorType, ErrorSeverity
Also verify that the analyzer.analyze() method behaves as expected for your test cases.
| def test_analyze_function(self): | ||
| """Test analyzing a function for errors.""" | ||
| errors = self.analyzer.analyze_function(self.function) | ||
|
|
||
| # We should find at least one error (type error) | ||
| self.assertGreaterEqual(len(errors), 1) | ||
|
|
||
| # Check that we found a type error | ||
| type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR] | ||
| self.assertGreaterEqual(len(type_errors), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Assertions in analysis tests could be more specific.
Replace the assertGreaterEqual(len(errors), 1) checks with assertions on the exact number and type of errors—e.g. expect exactly one TypeError with the specific message.
Suggested implementation:
def test_analyze_function(self):
"""Test analyzing a function for errors."""
errors = self.analyzer.analyze_function(self.function)
# Expect exactly one error overall
self.assertEqual(len(errors), 1)
# Expect exactly one type error
type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR]
self.assertEqual(len(type_errors), 1) # Check the error details
error = type_errors[0]
self.assertEqual(error.file_path, "test.py")
self.assertEqual(error.symbol_name, "test_function")
self.assertEqual(error.severity, ErrorSeverity.HIGH)
self.assertEqual(error.message, "Unsupported operand type(s) for +: 'str' and 'int'")
self.assertIn("'hello' + 5", str(error.context_lines)) def test_analyze_file(self):
"""Test analyzing a file for errors."""
errors = self.analyzer.analyze_file(self.file)
# Expect exactly one error overall
self.assertEqual(len(errors), 1)
# Expect exactly one type error
type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR]
self.assertEqual(len(type_errors), 1)If the expected error message in error.message differs from "Unsupported operand type(s) for +: 'str' and 'int'", please update the string accordingly.
Also, ensure that the analyzer and error objects are prepared to only generate a single error for the given input or adjust the test inputs to match these new assertions.
| def test_get_function_error_context(self): | ||
| """Test getting detailed error context for a function.""" | ||
| # Mock the analyze_function method to return a specific error | ||
| error = CodeError( | ||
| error_type=ErrorType.TYPE_ERROR, | ||
| message="Cannot add string and integer", | ||
| file_path="test.py", | ||
| line_number=2, | ||
| severity=ErrorSeverity.HIGH, | ||
| symbol_name="test_function", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Tests for context retrieval mock the analysis step.
These tests mock analyze_function and analyze_file, so they only cover formatting. Please add integration-style tests that run the real analysis and verify the returned context.
Suggested implementation:
import os
from codegen_on_oss.analysis.analyzer import analyze_file # Adjust the import based on your code base
# Existing test functions...
def test_get_function_error_context(self):
"""Test getting detailed error context for a function."""
# Mock the analyze_function method to return a specific error
error = CodeError(
error_type=ErrorType.TYPE_ERROR,
message="Cannot add string and integer",
file_path="test.py",
line_number=2,
severity=ErrorSeverity.HIGH,
symbol_name="test_function",
context_lines={1: "def test_function():", 2: " x = 'hello' + 5", 3: " return x"},def test_get_function_error_context_integration(self):
"""Integration test for retrieving detailed error context for a function."""
# Create a temporary source file with code that triggers a type error in a real analysis.
source_code = (
"def test_function():\n"
" x = 'hello' + 5\n"
" return x\n"
)
tmp_filename = "temp_test_file.py"
with open(tmp_filename, "w") as f:
f.write(source_code)
try:
# Run the real analysis on the temporary file.
errors = analyze_file(tmp_filename)
# Find the error associated with test_function.
error = next((e for e in errors if e.symbol_name == "test_function"), None)
self.assertIsNotNone(error, "Expected error for test_function not found.")
# Verify that the error context contains the expected lines.
self.assertEqual(error.context_lines.get(1), "def test_function():")
self.assertEqual(error.context_lines.get(2), " x = 'hello' + 5")
self.assertEqual(error.context_lines.get(3), " return x")
finally:
os.remove(tmp_filename)Ensure that:
- The analyze_file function is imported from the appropriate module.
- If any cleanup or configuration is required for the analyzer to run as an integration test, these changes are applied.
- You may need to adjust the temporary file path or content to match your project's conventions.
| # Use the context to get more information about the symbol | ||
| ctx = self.context | ||
|
|
||
| # Get symbol node ID in the context graph |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider building an index on context nodes to avoid repeating nested loops in methods like get_context_for_symbol and get_file_dependencies.
Consider building an index (e.g., a dictionary) on context nodes instead of iterating over the entire node list inside every analysis method. This avoids repeating nested loops in methods like get_context_for_symbol and get_file_dependencies. For example, you can create a helper method that indexes nodes by name during context creation:
def _build_context_mapping(self) -> Dict[str, int]:
"""
Build a mapping of node names to their indices in the context.
"""
return {node.name: idx for idx, node in enumerate(self.context.nodes) if hasattr(node, "name")}Then in your methods, use the mapping for a direct lookup:
def get_context_for_symbol(self, symbol_name: str) -> Dict[str, Any]:
mapping = self._build_context_mapping()
node_id = mapping.get(symbol_name)
if node_id is None:
return {"error": f"Symbol not found: {symbol_name}"}
# Optionally reuse node_id to fetch predecessors and successors as needed
# ... your remaining code here ...You can apply a similar change to get_file_dependencies to reduce repeated nested iterations. This approach centralizes the mapping logic and simplifies each lookup, reducing overall complexity while preserving functionality.
| if isinstance(node, ast.Call): | ||
| # Check if we're calling a function we know about | ||
| if isinstance(node.func, ast.Name) and node.func.id in variable_types: | ||
| # This is a simplification - in a real implementation, we would need to | ||
| # track function signatures and parameter types | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)
| if isinstance(node, ast.Call): | |
| # Check if we're calling a function we know about | |
| if isinstance(node.func, ast.Name) and node.func.id in variable_types: | |
| # This is a simplification - in a real implementation, we would need to | |
| # track function signatures and parameter types | |
| pass | |
| if isinstance(node, ast.Call) and (isinstance(node.func, ast.Name) and node.func.id in variable_types): | |
| pass | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.
| return None | ||
|
|
||
| def get_function_error_context(self, function_name: str) -> Dict[str, Any]: | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use the built-in function
nextinstead of a for-loop (use-next) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
| return result | ||
|
|
||
| def get_file_error_context(self, file_path: str) -> Dict[str, Any]: | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Convert for loop into list comprehension (
list-comprehension) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
|
|
||
| # Analyze a specific file | ||
| if files_with_errors: | ||
| file_to_analyze = files_with_errors[0][0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Extract code out into function [×2] (extract-method)
|
|
||
| try: | ||
| # Create a codebase from the repository | ||
| codebase = Codebase.from_repo(repo_url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Extract code out into function (extract-method)
|
|
||
| # Print function information | ||
| if 'function_name' in function_error_context: | ||
| print(f"\nFunction: {function_error_context['function_name']}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Extract code out into function (
extract-method) - Extract duplicate code into function (
extract-duplicate-method)
Documentation and Examples ReviewThe documentation and example scripts are well-written and provide good guidance on how to use the new functionality. Here are some observations and suggestions: README.md UpdatesThe README updates are comprehensive and clearly explain the new error context analysis system. The sections on error types, severity levels, and context information are particularly helpful. Suggestion: Consider adding a "Quick Start" section at the beginning with a minimal example to help users get started quickly. Example ScriptThe
Suggestions:
# Example of finding only circular dependencies
circular_deps = [error for file_path, errors in error_analysis.items()
for error in errors if error["error_type"] == ErrorType.CIRCULAR_DEPENDENCY]
print(f"Found {len(circular_deps)} circular dependencies")
# Example of finding critical errors
critical_errors = [error for file_path, errors in error_analysis.items()
for error in errors if error["severity"] == ErrorSeverity.CRITICAL]
print(f"Found {len(critical_errors)} critical errors")
# Example of applying suggested fixes
for file_path, errors in error_analysis.items():
for error in errors:
if error["suggested_fix"]:
print(f"Fix for {error['error_type']} in {file_path}: {error['suggested_fix']}")API DocumentationThe API endpoints are well-documented in the code, but consider adding more detailed API documentation in the README or a separate API.md file. This could include:
Overall, the documentation is thorough and will help users understand and utilize the new error context analysis system effectively. |
PR #22 Review: Add robust error context analysis systemI've completed a comprehensive review of PR #22 and left detailed comments on the pull request. Here's a summary of my findings: Overall AssessmentThis is a well-structured and valuable addition to the codebase that significantly enhances the code analysis capabilities. The error context analysis system provides detailed information about errors in code, including location, type, severity, and suggested fixes. Key Strengths
Improvement SuggestionsI've provided detailed feedback in the PR comments, focusing on:
The PR is well-implemented and ready for merging after considering the suggested improvements. The error context analysis system will be a valuable tool for developers to identify and fix issues in their code more efficiently. You can view my detailed comments here: #22 💻 View my work • React 👍 or 👎 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @codegen-sh[bot] - I've reviewed your changes - here's some feedback:
- Consider splitting the removal of existing analysis features (complexity, commits, MDX) from the addition of the new error context system into separate PRs.
- The new
enhanced_type_detection.pymodule appears to be unused in this PR.
Here's what I looked at during the review
- 🟡 General issues: 1 issue found
- 🟢 Security: all looks good
- 🟡 Testing: 9 issues found
- 🟡 Complexity: 2 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| class TestCodeError(unittest.TestCase): | ||
| """Tests for the CodeError class.""" | ||
|
|
||
| def test_code_error_initialization(self): | ||
| """Test that a CodeError can be initialized with all parameters.""" | ||
| error = CodeError( | ||
| error_type=ErrorType.SYNTAX_ERROR, | ||
| message="Invalid syntax", | ||
| file_path="test.py", | ||
| line_number=10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider testing default values for CodeError
Add tests that verify the default values of severity, context_lines, and suggested_fix when initializing CodeError without those parameters.
Suggested implementation:
def test_code_error_defaults(self):
"""Test that CodeError default values are set correctly when not provided."""
error = CodeError(
error_type=ErrorType.SYNTAX_ERROR,
message="Missing parameters",
file_path="test.py",
line_number=1,
column=1,
)
self.assertEqual(error.severity, ErrorSeverity.ERROR)
self.assertEqual(error.context_lines, [])
self.assertIsNone(error.suggested_fix)Insert this new test method within the TestCodeError class, ideally after the existing test_code_error_initialization method. Adjust default expectations (ErrorSeverity.ERROR, empty list, None) if your CodeError class implements different defaults.
| def test_get_context_lines(self): | ||
| """Test getting context lines around a specific line.""" | ||
| context_lines = self.analyzer.get_context_lines("test.py", 2, context_size=1) | ||
|
|
||
| self.assertEqual(context_lines, { | ||
| 1: "def test_function():", | ||
| 2: " x = 'hello' + 5", | ||
| 3: " return x" | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add tests for edge cases in get_context_lines
Consider adding tests for edge cases: requesting context near the start or end of the file, with context_size=0, and when the file is missing or has no source.
Suggested implementation:
def test_analyze_function(self):
"""Test analyzing a function for errors."""
errors = self.analyzer.analyze_function(self.function)
# We should find at least one error (type error)
self.assertGreaterEqual(len(errors), 1)
# Check that we found a type error
type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR]
self.assertGreaterEqual(len(type_errors), 1)
def test_get_context_lines_edge_cases(self):
"""Test get_context_lines for edge cases."""
# Edge case: context near the start of the file
context_near_start = self.analyzer.get_context_lines("test.py", 1, context_size=1)
self.assertEqual(context_near_start, {
1: "def test_function():",
2: " x = 'hello' + 5"
})
# Edge case: context near the end of the file
context_near_end = self.analyzer.get_context_lines("test.py", 3, context_size=1)
self.assertEqual(context_near_end, {
2: " x = 'hello' + 5",
3: " return x"
})
# Edge case: context_size=0 returns only the requested line
context_zero = self.analyzer.get_context_lines("test.py", 2, context_size=0)
self.assertEqual(context_zero, {
2: " x = 'hello' + 5"
})
# Edge case: file missing or file has no source
# Simulate a missing file by returning None
self.codebase.get_file.return_value = None
context_missing = self.analyzer.get_context_lines("nonexistent.py", 1, context_size=1)
self.assertEqual(context_missing, {})Ensure that the testing framework resets or correctly simulates the file retrieval behavior (i.e. codebase.get_file) for individual tests, if needed.
| def test_analyze_function(self): | ||
| """Test analyzing a function for errors.""" | ||
| errors = self.analyzer.analyze_function(self.function) | ||
|
|
||
| # We should find at least one error (type error) | ||
| self.assertGreaterEqual(len(errors), 1) | ||
|
|
||
| # Check that we found a type error | ||
| type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR] | ||
| self.assertGreaterEqual(len(type_errors), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Enhance error assertion specificity and coverage in test_analyze_function
Assert the exact number and types of errors for the sample input instead of only verifying at least one TYPE_ERROR. Also add tests for functions with no errors and for each error category your analyzer supports (parameter errors, undefined variables, return-type mismatches).
Suggested implementation:
def test_analyze_function(self):
"""Test analyzing a function for errors."""
errors = self.analyzer.analyze_function(self.function)
# Expect exactly one error for this sample function
self.assertEqual(len(errors), 1)
# Check that the error is a type error with expected details
error = errors[0]
self.assertEqual(error.error_type, ErrorType.TYPE_ERROR)
self.assertEqual(error.file_path, "test.py")
self.assertEqual(error.symbol_name, "test_function")
self.assertEqual(error.severity, ErrorSeverity.HIGH)
self.assertIn("'hello' + 5", str(error.context_lines)) def test_analyze_file(self):
"""Test analyzing a file for errors."""
errors = self.analyzer.analyze_file(self.file)
# Expect exactly one error in this file (type error)
self.assertEqual(len(errors), 1)
# Check that the error is a type error
self.assertEqual(errors[0].error_type, ErrorType.TYPE_ERROR)Ensure that the analyzer supports the error types: ErrorType.PARAMETER_ERROR, ErrorType.UNDEFINED_VARIABLE, and ErrorType.RETURN_TYPE_MISMATCH. If these error categories are not yet implemented, you will need to update the analyzer or adjust the tests accordingly.
|
|
||
| # Check that we found a type error | ||
| type_errors = [e for e in errors if e.error_type == ErrorType.TYPE_ERROR] | ||
| self.assertGreaterEqual(len(type_errors), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Enhance error assertion specificity and coverage in test_analyze_file
Match the specificity in test_analyze_function by asserting exact errors. Also add tests for:
- files with no errors
- syntax errors (ast.parse)
- unused imports (analyze_unused_imports)
- multiple functions/classes with errors
| def test_analyze_codebase(self): | ||
| """Test analyzing the entire codebase for errors.""" | ||
| error_dict = self.analyzer.analyze_codebase() | ||
|
|
||
| # We should have errors for our test file | ||
| self.assertIn("test.py", error_dict) | ||
| self.assertGreaterEqual(len(error_dict["test.py"]), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Expand test_analyze_codebase coverage
Cover multi-file codebases (mixing error-free and error-prone files), circular imports (exercising find_circular_imports), and circular symbol dependencies (exercising find_circular_dependencies).
Suggested implementation:
def test_find_circular_imports(self):
"""Test finding circular imports."""
# Mock the build_import_graph method to return a graph with a cycle
import networkx as nx
G = nx.DiGraph()
G.add_edge("a.py", "b.py")
G.add_edge("b.py", "c.py")
G.add_edge("c.py", "a.py")
with patch.object(self.analyzer, 'build_import_graph', return_value=G):
cycles = self.analyzer.find_circular_imports()
self.assertGreaterEqual(len(cycles), 1) def test_find_circular_imports(self):
import networkx as nx
G = nx.DiGraph()
G.add_edge("a.py", "b.py")
G.add_edge("b.py", "c.py")
G.add_edge("c.py", "a.py")
with patch.object(self.analyzer, 'build_import_graph', return_value=G):
cycles = self.analyzer.find_circular_imports()
self.assertGreaterEqual(len(cycles), 1)
def test_multi_file_analysis(self):
"""Test analyzing a multi-file codebase with error-free and error-prone files."""
# Simulate a multi-file error dictionary.
# We create a fake error object to mimic errors returned by the analyzer.
FakeError = type("FakeError", (), {}) # Dummy error class
fake_error = FakeError()
fake_error.error_type = "TYPE_ERROR"
multi_file_errors = {
"test.py": [fake_error],
"good.py": [],
"bad.py": [fake_error, fake_error]
}
with patch.object(self.analyzer, 'analyze_codebase', return_value=multi_file_errors):
error_dict = self.analyzer.analyze_codebase()
# Ensure errors exist for expected files.
self.assertIn("test.py", error_dict)
self.assertIn("bad.py", error_dict)
self.assertEqual(error_dict["good.py"], [])
self.assertGreaterEqual(len(error_dict["bad.py"]), 1)
def test_find_circular_dependencies(self):
"""Test finding circular symbol dependencies."""
# Mock the build_symbol_dependency_graph method to return a graph with a cycle
import networkx as nx
G = nx.DiGraph()
G.add_edge("a.py", "b.py")
G.add_edge("b.py", "a.py")
with patch.object(self.analyzer, 'build_symbol_dependency_graph', return_value=G):
cycles = self.analyzer.find_circular_dependencies()
self.assertGreaterEqual(len(cycles), 1)
Implementing these changes assumes that:
- The analyzer instance in the tests provides the methods build_import_graph, analyze_codebase, build_symbol_dependency_graph, find_circular_imports, and find_circular_dependencies.
- The FakeError object is used only to simulate minimal error objects with an error_type attribute. Adjust this stub to better match the actual error class if needed.
- Make sure that the necessary modules, particularly networkx and patch from unittest.mock, are imported in your file.
|
|
||
| # Analyze each file | ||
| for file in self.codebase.files: | ||
| file_errors = self.analyze_file(file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use named expression to simplify assignment and conditional (
use-named-expression) - Use the built-in function
nextinstead of a for-loop (use-next)
| # Add additional context based on error type | ||
| if error.error_type == ErrorType.PARAMETER_ERROR and error.symbol_name: | ||
| # Get information about the function | ||
| func = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use the built-in function
nextinstead of a for-loop [×2] (use-next) - Replace identity comprehension with call to collection constructor [×2] (
identity-comprehension)
Explanation
Convert list/set/tuple comprehensions that do not change the input elements into.
Before
# List comprehensions
[item for item in coll]
[item for item in friends.names()]
# Dict comprehensions
{k: v for k, v in coll}
{k: v for k, v in coll.items()} # Only if we know coll is a `dict`
# Unneeded call to `.items()`
dict(coll.items()) # Only if we know coll is a `dict`
# Set comprehensions
{item for item in coll}After
# List comprehensions
list(iter(coll))
list(iter(friends.names()))
# Dict comprehensions
dict(coll)
dict(coll)
# Unneeded call to `.items()`
dict(coll)
# Set comprehensions
set(coll)All these comprehensions are just creating a copy of the original collection.
They can all be simplified by simply constructing a new collection directly. The
resulting code is easier to read and shows the intent more clearly.
Convert list/set/tuple comprehensions that do not change the input elements into.
Before
# List comprehensions
[item for item in coll]
[item for item in friends.names()]
# Dict comprehensions
{k: v for k, v in coll}
{k: v for k, v in coll.items()} # Only if we know coll is a `dict`
# Unneeded call to `.items()`
dict(coll.items()) # Only if we know coll is a `dict`
# Set comprehensions
{item for item in coll}After
# List comprehensions
list(iter(coll))
list(iter(friends.names()))
# Dict comprehensions
dict(coll)
dict(coll)
# Unneeded call to `.items()`
dict(coll)
# Set comprehensions
set(coll)All these comprehensions are just creating a copy of the original collection.
They can all be simplified by simply constructing a new collection directly. The
resulting code is easier to read and shows the intent more clearly.
| for target in node.targets: | ||
| if isinstance(target, ast.Name): | ||
| local_vars.append(target.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Replace a for append loop with list extend (for-append-to-extend)
| for i, line in enumerate(lines): | ||
| if re.search(pattern, line): | ||
| return i + 1 | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Use the built-in function next instead of a for-loop (use-next)
| for i, line in enumerate(lines): | |
| if re.search(pattern, line): | |
| return i + 1 | |
| return None | |
| return next( | |
| (i + 1 for i, line in enumerate(lines) if re.search(pattern, line)), | |
| None, | |
| ) |
| return None | ||
|
|
||
| def get_function_error_context(self, function_name: str) -> Dict[str, Any]: | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use the built-in function
nextinstead of a for-loop (use-next) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
27f0eca to
f4656a2
Compare
Overview
This PR adds a robust and dynamic error context analysis system to the codebase. The system provides detailed contextual information about errors in code, including the error location, type, severity, and suggested fixes.
Features
Error Types Detected
The system can detect the following types of errors:
Context Information
For each error, the system provides detailed context information:
Implementation Details
The implementation uses various techniques to detect errors:
Documentation
Added comprehensive documentation in the README.md file, including:
Example Usage
💻 View my work • About Codegen
Summary by Sourcery
Add a robust error context analysis system to the codebase, providing detailed insights into code errors, their types, severity, and potential fixes
New Features:
Enhancements:
Documentation:
Tests: