-
Notifications
You must be signed in to change notification settings - Fork 0
Enhance Analysis Module with Detailed Error Detection and Validation #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Create fully interconnected analysis module with comprehensive metrics integration
Enhance analysis.py with better CodebaseContext integration
|
@CodiumAI-Agent /review |
Reviewer's GuideThis pull request refactors the analysis module by splitting the original monolithic Sequence Diagram for Error Detection ProcesssequenceDiagram
participant C as Caller
participant DE as detect_errors()
participant PV as ParameterValidator
participant CV as CallValidator
participant RV as ReturnValidator
participant QC as CodeQualityChecker
C->>DE: detect_errors(codebase, context)
activate DE
DE->>PV: detect_errors()
activate PV
PV-->>DE: parameter_errors
deactivate PV
DE->>CV: detect_errors()
activate CV
CV-->>DE: call_errors
deactivate CV
DE->>RV: detect_errors()
activate RV
RV-->>DE: return_errors
deactivate RV
DE->>QC: detect_errors()
activate QC
QC-->>DE: quality_errors
deactivate QC
DE-->>C: combined_error_results
deactivate DE
Class Diagram for New Error Detection ModuleclassDiagram
namespace ErrorDetection {
class ErrorSeverity {
<<enumeration>>
INFO
WARNING
ERROR
CRITICAL
}
class ErrorCategory {
<<enumeration>>
PARAMETER_ERROR
CALL_ERROR
RETURN_ERROR
CODE_QUALITY
SECURITY
PERFORMANCE
}
class DetectedError {
+ErrorCategory category
+ErrorSeverity severity
+string message
+string file_path
+int line_number
+string function_name
+string code_snippet
}
class ErrorDetector {
<<Abstract>>
+Codebase codebase
+CodebaseContext context
+List~DetectedError~ errors
+detect_errors()* List~DetectedError~
}
class ParameterValidator {
+detect_errors() List~DetectedError~
-_check_unused_parameters() None
-_check_parameter_count_mismatches() None
-_check_missing_required_parameters() None
}
class CallValidator {
+detect_errors() List~DetectedError~
-_check_circular_dependencies() None
-_check_potential_exceptions() None
}
class ReturnValidator {
+detect_errors() List~DetectedError~
-_check_inconsistent_return_types() None
-_check_missing_return_statements() None
}
class CodeQualityChecker {
+detect_errors() List~DetectedError~
-_check_unreachable_code() None
-_check_complex_functions() None
}
class detect_errors {
<<function>>
+detect_errors(codebase, context) dict
}
}
ErrorDetector <|-- ParameterValidator
ErrorDetector <|-- CallValidator
ErrorDetector <|-- ReturnValidator
ErrorDetector <|-- CodeQualityChecker
DetectedError "1" --* "many" ErrorDetector : contains
ErrorCategory "1" --o "1" DetectedError : uses
ErrorSeverity "1" --o "1" DetectedError : uses
detect_errors ..> ParameterValidator : uses
detect_errors ..> CallValidator : uses
detect_errors ..> ReturnValidator : uses
detect_errors ..> CodeQualityChecker : uses
note for ErrorDetector "Base class for specific error detectors."
note for detect_errors "Top-level function coordinating detectors."
Class Diagram for Refactored CodebaseContextclassDiagram
class CodebaseContext {
<<Dataclass>>
+List~Any~ projects
+Optional~Any~ config
#Dict~str, Symbol~ _symbol_cache
#Dict~str, Import~ _import_cache
#Dict~str, Set~str~~ _dependency_graph
+__post_init__() None
-_build_caches() None
-_cache_symbols(codebase) None
-_cache_imports(codebase) None
-_build_dependency_graph(codebase) None
+get_symbol(name) Optional~Symbol~
+get_import(source) Optional~Import~
+get_dependencies(symbol_name) Set~str~
+get_dependents(symbol_name) Set~str~
+get_function(name) Optional~Function~
+get_class(name) Optional~Class~
+get_symbols_by_type(symbol_type) List~Symbol~
+get_symbols_by_file(file_path) List~Symbol~
+get_imports_by_file(file_path) List~Import~
+find_symbol_usages(symbol_name) List~Symbol~
+find_import_usages(import_source) List~Symbol~
+find_related_symbols(symbol_name, max_depth) Tuple~Set~Symbol~, Set~Symbol~~
+get_import_graph() Dict~str, Set~str~~
+get_all_files() List~Any~
+get_all_symbols() List~Symbol~
+get_all_imports() List~Import~
+get_symbol_dependencies(symbol_name) List~Symbol~
+get_symbol_dependents(symbol_name) List~Symbol~
}
note for CodebaseContext "Refactored CodebaseContext focusing on caching and providing context."
class Symbol
class Import
class Function
class Class
CodebaseContext --> Symbol : uses/returns
CodebaseContext --> Import : uses/returns
CodebaseContext --> Function : uses/returns
CodebaseContext --> Class : uses/returns
Class Diagram for Updated Analysis Import ModuleclassDiagram
namespace AnalysisImport {
class functions {
<<module>>
+create_graph_from_codebase(repo_name) DiGraph
+find_import_cycles(graph) List~List~str~~
+find_problematic_import_loops(graph, cycles) List~List~str~~
+convert_all_calls_to_kwargs(codebase) None
+convert_function_calls_to_kwargs(function) None
+analyze_imports(codebase) Dict~str, Any~
}
}
note for AnalysisImport.functions "Functions related to import analysis and call conversion."
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
@sourcery-ai review |
|
/gemini review
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the You can disable this status message by setting the 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Join our Discord community for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
/review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
/improve |
|
/korbit-review |
|
@codecov-ai-reviewer review |
|
@codegen Implement and upgrade this PR with above Considerations and suggestions from other AI bots |
|
On it! We are reviewing the PR and will provide feedback shortly. |
PR Reviewer Guide 🔍(Review updated until commit 7417b17)Here are some key observations to aid the review process:
|
|
Persistent review updated to latest commit d8b0f88 |
|
I'll review and suggest improvements for this PR right away! Let me examine the changes in detail. |
|
Hey! 👋 I see one of the checks failed. I am on it! 🫡 |
PR Code Suggestions ✨Latest suggestions up to 7417b17
Previous suggestionsSuggestions up to commit 7417b17
Suggestions up to commit f5ec675
Suggestions up to commit f5ec675
Suggestions up to commit e82c99b
Suggestions up to commit 034b4d0
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PR DescriptionThis pull request introduces a comprehensive code analysis module to the Codegen-on-OSS project. The primary goal is to provide automated and detailed insights into code quality, potential errors, and architectural dependencies within Python codebases. This will enable developers to proactively identify and address issues, improve code maintainability, and enhance overall software reliability. Click to see moreKey Technical ChangesThe key technical changes include the addition of three new modules: Architecture DecisionsA layered architecture is adopted, separating concerns into distinct modules for error detection, function call analysis, and type validation. The Dependencies and InteractionsThis pull request introduces a dependency on the Risk ConsiderationsThe new analysis modules may introduce performance overhead, especially for large codebases. The type inference engine is currently limited and may not handle all type inference scenarios accurately. The API endpoints lack authentication and authorization, posing a potential security risk. Thorough testing and performance optimization are required to mitigate these risks. Notable Implementation DetailsThe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @codegen-sh[bot] - I've reviewed your changes - here's some feedback:
- This PR significantly refactors the
CodeAnalyzerclass, removing several existing analysis methods alongside adding the new ones. - Consider splitting the introduction of error detection, type validation, and function call analysis into separate pull requests for easier review.
Here's what I looked at during the review
- 🟢 General issues: all looks good
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| return f"[{self.error_type.name}] {self.message} at {location}{context}" | ||
|
|
||
|
|
||
| class TypeValidator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider decomposing the monolithic TypeValidator class into smaller, focused components like FunctionTypeValidator and shared helper functions.
Consider decomposing the monolithic validator into smaller, focused components. For example, you could extract function- and class-specific validations into separate validator classes or helper methods. This would isolate concerns and improve testability and readability.
Actionable Steps:
-
Extract Function and Class Validators:
class FunctionTypeValidator: def __init__(self, function, issues, builtin_types): self.function = function self.issues = issues self.builtin_types = builtin_types def validate(self): self._validate_return_annotation() self._validate_parameters() self._check_return_consistency() def _validate_return_annotation(self): if (not hasattr(self.function, "return_type") or not self.function.return_type) and ( not self.function.name.startswith("__") or self.function.name == "__call__" ): self.issues.append( TypeIssue( error_type=TypeValidationError.MISSING_TYPE_ANNOTATION, message=f"Function '{self.function.name}' is missing a return type annotation", file_path=self.function.filepath, line_number=self.function.line_number, function_name=self.function.name, class_name=getattr(self.function, "class_name", None), ) ) def _validate_parameters(self): for param in self.function.parameters: if not param.type_annotation and not param.name.startswith("_"): self.issues.append( TypeIssue( error_type=TypeValidationError.MISSING_TYPE_ANNOTATION, message=f"Parameter '{param.name}' in function '{self.function.name}' is missing a type annotation", file_path=self.function.filepath, line_number=self.function.line_number, function_name=self.function.name, class_name=getattr(self.function, "class_name", None), ) ) elif param.type_annotation: # reuse shared helper for type annotation validation validate_type_annotation(param.type_annotation, self.function, self.issues, self.builtin_types) def _check_return_consistency(self): # place the return consistency logic here; as originally in _check_return_type_consistency pass # Implementation remains similar
-
Create Shared Helpers for Type Annotation Validation:
def validate_type_annotation(type_annotation: str, context_symbol: Union[Function, Class], issues: List[TypeIssue], builtin_types: Set[str]) -> None: if type_annotation not in builtin_types: if not is_valid_user_type(type_annotation, context_symbol): issues.append( TypeIssue( error_type=TypeValidationError.INVALID_TYPE_ANNOTATION, message=f"Type annotation '{type_annotation}' may not be a valid type", file_path=context_symbol.filepath, line_number=context_symbol.line_number, function_name=getattr(context_symbol, "name", None), class_name=getattr(context_symbol, "class_name", None), ) ) if has_incorrect_generic_usage(type_annotation): issues.append( TypeIssue( error_type=TypeValidationError.INCORRECT_GENERIC_USAGE, message=f"Incorrect generic usage in type annotation '{type_annotation}'", file_path=context_symbol.filepath, line_number=context_symbol.line_number, function_name=getattr(context_symbol, "name", None), class_name=getattr(context_symbol, "class_name", None), ) ) def is_valid_user_type(type_name: str, context_symbol: Union[Function, Class]) -> bool: base_type = type_name.split("[")[0].split(".")[-1] # Include codebase lookup logic here; you may pass codebase if required. return True def has_incorrect_generic_usage(type_annotation: str) -> bool: if type_annotation.count("[") != type_annotation.count("]"): return True # Refactor rules for each generic into separate checks if needed generic_types = ["List", "Dict", "Tuple", "Set", "FrozenSet", "Optional", "Union", "Callable"] for generic in generic_types: if type_annotation.startswith(f"{generic}[") and type_annotation.endswith("]"): if generic == "Dict" and "," not in type_annotation: return True if generic == "Tuple" and not ("," in type_annotation or "..." in type_annotation): return True if generic == "Callable" and "[" in type_annotation and "]" in type_annotation: if type_annotation.count("[") < 2 or type_annotation.count("]") < 2: return True return False
-
Refactor the
TypeValidatorclass to utilize these new helpers:class TypeValidator: def __init__(self, codebase: Codebase, context: Optional[CodebaseContext] = None): self.codebase = codebase self.context = context self.issues: List[TypeIssue] = [] self.builtin_types = { ... } # as previously defined def validate_types(self) -> List[TypeIssue]: self.issues = [] for function in self.codebase.functions: FunctionTypeValidator(function, self.issues, self.builtin_types).validate() for cls in self.codebase.classes: # similar extraction for class validations self._validate_class_types(cls) return self.issues # ...
By introducing specialized validators and shared helper functions, you reduce nested complexity and make each component easier to understand and test while preserving all functionality.
| if not hasattr(function, "return_type") or not function.return_type: | ||
| # Skip if it's a special method like __init__ | ||
| if not function.name.startswith("__") or function.name == "__call__": | ||
| self.issues.append(TypeIssue( | ||
| error_type=TypeValidationError.MISSING_TYPE_ANNOTATION, | ||
| message=f"Function '{function.name}' is missing a return type annotation", | ||
| file_path=function.filepath, | ||
| line_number=function.line_number, | ||
| function_name=function.name, | ||
| class_name=function.class_name if hasattr(function, "class_name") else None | ||
| )) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)
| if not hasattr(function, "return_type") or not function.return_type: | |
| # Skip if it's a special method like __init__ | |
| if not function.name.startswith("__") or function.name == "__call__": | |
| self.issues.append(TypeIssue( | |
| error_type=TypeValidationError.MISSING_TYPE_ANNOTATION, | |
| message=f"Function '{function.name}' is missing a return type annotation", | |
| file_path=function.filepath, | |
| line_number=function.line_number, | |
| function_name=function.name, | |
| class_name=function.class_name if hasattr(function, "class_name") else None | |
| )) | |
| if (not hasattr(function, "return_type") or not function.return_type) and (not function.name.startswith("__") or function.name == "__call__"): | |
| self.issues.append(TypeIssue( | |
| error_type=TypeValidationError.MISSING_TYPE_ANNOTATION, | |
| message=f"Function '{function.name}' is missing a return type annotation", | |
| file_path=function.filepath, | |
| line_number=function.line_number, | |
| function_name=function.name, | |
| class_name=function.class_name if hasattr(function, "class_name") else None | |
| )) | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.
| if type_annotation not in self.builtin_types: | ||
| # Check if it's a valid user-defined type | ||
| if not self._is_valid_user_type(type_annotation): | ||
| self.issues.append(TypeIssue( | ||
| error_type=TypeValidationError.INVALID_TYPE_ANNOTATION, | ||
| message=f"Type annotation '{type_annotation}' may not be a valid type", | ||
| file_path=context_symbol.filepath, | ||
| line_number=context_symbol.line_number, | ||
| function_name=context_symbol.name if isinstance(context_symbol, Function) else None, | ||
| class_name=context_symbol.name if isinstance(context_symbol, Class) else getattr(context_symbol, "class_name", None) | ||
| )) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)
| if type_annotation not in self.builtin_types: | |
| # Check if it's a valid user-defined type | |
| if not self._is_valid_user_type(type_annotation): | |
| self.issues.append(TypeIssue( | |
| error_type=TypeValidationError.INVALID_TYPE_ANNOTATION, | |
| message=f"Type annotation '{type_annotation}' may not be a valid type", | |
| file_path=context_symbol.filepath, | |
| line_number=context_symbol.line_number, | |
| function_name=context_symbol.name if isinstance(context_symbol, Function) else None, | |
| class_name=context_symbol.name if isinstance(context_symbol, Class) else getattr(context_symbol, "class_name", None) | |
| )) | |
| if type_annotation not in self.builtin_types and not self._is_valid_user_type(type_annotation): | |
| self.issues.append(TypeIssue( | |
| error_type=TypeValidationError.INVALID_TYPE_ANNOTATION, | |
| message=f"Type annotation '{type_annotation}' may not be a valid type", | |
| file_path=context_symbol.filepath, | |
| line_number=context_symbol.line_number, | |
| function_name=context_symbol.name if isinstance(context_symbol, Function) else None, | |
| class_name=context_symbol.name if isinstance(context_symbol, Class) else getattr(context_symbol, "class_name", None) | |
| )) | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.
| if generic == "Callable" and "[" in type_annotation and "]" in type_annotation: | ||
| # Callable[[arg1, arg2], return_type] | ||
| if type_annotation.count("[") < 2 or type_annotation.count("]") < 2: | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)
| if generic == "Callable" and "[" in type_annotation and "]" in type_annotation: | |
| # Callable[[arg1, arg2], return_type] | |
| if type_annotation.count("[") < 2 or type_annotation.count("]") < 2: | |
| return True | |
| if generic == "Callable" and "[" in type_annotation and "]" in type_annotation and (type_annotation.count("[") < 2 or type_annotation.count("]") < 2): | |
| return True | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.
| if hasattr(stmt, "type") and stmt.type == "assignment": | ||
| if hasattr(stmt, "left") and hasattr(stmt, "right"): | ||
| # Infer type from right side | ||
| right_type = self._infer_expression_type(stmt.right, file_path) | ||
| if right_type and hasattr(stmt.left, "name"): | ||
| var_key = f"{function.name}.{stmt.left.name}" | ||
| self.type_map[file_path][var_key] = right_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)
| if hasattr(stmt, "type") and stmt.type == "assignment": | |
| if hasattr(stmt, "left") and hasattr(stmt, "right"): | |
| # Infer type from right side | |
| right_type = self._infer_expression_type(stmt.right, file_path) | |
| if right_type and hasattr(stmt.left, "name"): | |
| var_key = f"{function.name}.{stmt.left.name}" | |
| self.type_map[file_path][var_key] = right_type | |
| if hasattr(stmt, "type") and stmt.type == "assignment" and (hasattr(stmt, "left") and hasattr(stmt, "right")): | |
| right_type = self._infer_expression_type(stmt.right, file_path) | |
| if right_type and hasattr(stmt.left, "name"): | |
| var_key = f"{function.name}.{stmt.left.name}" | |
| self.type_map[file_path][var_key] = right_type | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.
| "parameter_details": parameters | ||
| } | ||
|
|
||
| def _analyze_statement_for_parameters(self, statement: Any, parameters: Dict[str, Dict[str, Any]]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Low code quality found in ParameterAnalysis._analyze_statement_for_parameters - 20% (low-code-quality)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
| results = {} | ||
|
|
||
| for function in self.codebase.functions: | ||
| results[function.name] = self.analyze_parameter_usage(function) | ||
|
|
||
| return results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): We've found these issues:
- Convert for loop into dictionary comprehension (
dict-comprehension) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
| results = {} | |
| for function in self.codebase.functions: | |
| results[function.name] = self.analyze_parameter_usage(function) | |
| return results | |
| return { | |
| function.name: self.analyze_parameter_usage(function) | |
| for function in self.codebase.functions | |
| } |
| for imp in self.codebase.imports: | ||
| if imp.imported_name == base_type: | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Use the built-in function next instead of a for-loop (use-next)
| if generic == "Tuple" and not ("," in type_annotation or "..." in type_annotation): | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Simplify logical expression using De Morgan identities (de-morgan)
| if hasattr(stmt, "finally_block"): | ||
| self._infer_types_in_block(stmt.finally_block, function, file_path) | ||
|
|
||
| def _infer_expression_type(self, expr: Any, file_path: str) -> Optional[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Low code quality found in TypeInferenceEngine._infer_expression_type - 18% (low-code-quality)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review by Korbit AI
Korbit automatically attempts to detect when you fix issues in new commits.
| Category | Issue | Status |
|---|---|---|
| Inefficient Issue Grouping and Conversion ▹ view | ✅ Fix detected | |
| Hardcoded operator lists should be constants ▹ view | ✅ Fix detected | |
| Missing specific type hint for block parameter ▹ view | ✅ Fix detected | |
| Unexplained Complexity Threshold ▹ view |
Files scanned
| File Path | Reviewed |
|---|---|
| codegen-on-oss/codegen_on_oss/analysis/function_call_analysis.py | ✅ |
| codegen-on-oss/codegen_on_oss/analysis/type_validation.py | ✅ |
| codegen-on-oss/codegen_on_oss/analysis/error_detection.py | ✅ |
| codegen-on-oss/codegen_on_oss/analysis/analysis.py | ✅ |
Explore our documentation to understand the languages and file types we support and the files we ignore.
Check out our docs on how you can make Korbit work best for you and your team.
| # Group issues by type | ||
| issues_by_type = {} | ||
| for issue in issues: | ||
| error_type = issue.error_type.name | ||
| if error_type not in issues_by_type: | ||
| issues_by_type[error_type] = [] | ||
| issues_by_type[error_type].append(issue.to_dict()) | ||
|
|
||
| # Group issues by file | ||
| issues_by_file = {} | ||
| for issue in issues: | ||
| file_path = issue.file_path | ||
| if file_path not in issues_by_file: | ||
| issues_by_file[file_path] = [] | ||
| issues_by_file[file_path].append(issue.to_dict()) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| op = expr.operators[0].source if hasattr(expr.operators[0], "source") else None | ||
| if op in ["+", "-", "*", "/", "%", "**"]: | ||
| # Numeric operations | ||
| return "float" | ||
| elif op in ["==", "!=", "<", ">", "<=", ">=", "and", "or", "not"]: | ||
| # Boolean operations | ||
| return "bool" |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
|
||
| return self.type_map | ||
|
|
||
| def _infer_types_in_block(self, block: Any, function: Function, file_path: str) -> None: |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| for function in self.codebase.functions: | ||
| complexity = calculate_cyclomatic_complexity(function) | ||
|
|
||
| if complexity > 15: # Threshold for high complexity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unexplained Complexity Threshold 
Tell me more
What is the issue?
Magic number 15 used as a complexity threshold without clear explanation of why this value was chosen.
Why this matters
Future developers won't understand the significance of this threshold or know when it should be adjusted.
Suggested change ∙ Feature Preview
Define a named constant with explanatory comment:
# Based on industry standard cyclomatic complexity limits for maintainable code
MAX_ACCEPTABLE_COMPLEXITY = 15
def _detect_complex_functions(self) -> None:
for function in self.codebase.functions:
complexity = calculate_cyclomatic_complexity(function)
if complexity > MAX_ACCEPTABLE_COMPLEXITY:Provide feedback to improve future suggestions
💬 Looking for more details? Reply to this comment to chat with Korbit.
PR Review: Enhanced Analysis ModuleI've conducted a thorough review of this PR which enhances the analysis module with detailed error detection, function call analysis, and type validation features. Overall, this is a significant and valuable addition to the codebase that will greatly improve the analysis capabilities. Strengths
Suggestions for Improvement
Minor Issues
ConclusionThis PR represents a significant enhancement to the analysis capabilities of the codebase. With a few improvements to testing, error handling, and performance optimization, it will be a valuable addition. I recommend addressing the testing gap as a priority before merging. |
|
I'm continuing with my review of PR #20. I'll provide a comprehensive analysis of the enhanced analysis module. |
|
I'll provide a more comprehensive review of PR #20, focusing on code quality, architecture, and potential improvements. |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Qodo Merge was enabled for this repository. To continue using it, please link your Git account with your Qodo account here. Persistent review updated to latest commit 7417b17 |
|
Persistent review updated to latest commit 7417b17 |
PR DescriptionThis pull request introduces a comprehensive code analysis module for Python codebases within the Codegen-on-OSS project. The primary goal is to provide detailed error detection, function call analysis, and type validation capabilities to improve code quality and maintainability. Click to see moreKey Technical ChangesThe key changes include:
Architecture DecisionsThe architectural decisions include:
Dependencies and InteractionsThis module depends on the Risk ConsiderationsPotential risks include:
Notable Implementation DetailsNotable implementation details include:
|
|
|
||
|
|
||
| class ErrorCategory(Enum): | ||
| """Categories of errors that can be detected.""" | ||
|
|
||
| PARAMETER_ERROR = auto() | ||
| CALL_ERROR = auto() | ||
| RETURN_ERROR = auto() | ||
| CODE_QUALITY = auto() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ErrorCategory enum has potential missing important error categories:
- MEMORY_LEAK: For memory-related issues
- CONCURRENCY: For thread-safety and async issues
- TYPE_ERROR: For typing-related issues
These categories are important for catching critical runtime issues in Python applications. Consider adding these additional categories.
| class ErrorCategory(Enum): | |
| """Categories of errors that can be detected.""" | |
| PARAMETER_ERROR = auto() | |
| CALL_ERROR = auto() | |
| RETURN_ERROR = auto() | |
| CODE_QUALITY = auto() | |
| class ErrorCategory(Enum): | |
| """Categories of errors that can be detected.""" | |
| PARAMETER_ERROR = auto() | |
| CALL_ERROR = auto() | |
| RETURN_ERROR = auto() | |
| CODE_QUALITY = auto() | |
| SECURITY = auto() | |
| PERFORMANCE = auto() | |
| MEMORY_LEAK = auto() | |
| CONCURRENCY = auto() | |
| TYPE_ERROR = auto() |
|
|
||
|
|
||
| @dataclass | ||
| class DetectedError: | ||
| """ | ||
| Represents an error detected in the code. | ||
| Attributes: | ||
| category: The category of the error | ||
| severity: The severity of the error | ||
| message: A descriptive message about the error | ||
| file_path: Path to the file containing the error | ||
| line_number: Line number where the error occurs (optional) | ||
| function_name: Name of the function containing the error (optional) | ||
| code_snippet: Snippet of code containing the error (optional) | ||
| """ | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DetectedError class has no validation logic for line_number. Line numbers should always be positive integers. Consider adding property validation. Additionally, the code_snippet length should be reasonably limited to avoid memory issues with very large snippets.
| @dataclass | |
| class DetectedError: | |
| """ | |
| Represents an error detected in the code. | |
| Attributes: | |
| category: The category of the error | |
| severity: The severity of the error | |
| message: A descriptive message about the error | |
| file_path: Path to the file containing the error | |
| line_number: Line number where the error occurs (optional) | |
| function_name: Name of the function containing the error (optional) | |
| code_snippet: Snippet of code containing the error (optional) | |
| """ | |
| @dataclass | |
| class DetectedError: | |
| category: ErrorCategory | |
| severity: ErrorSeverity | |
| message: str | |
| file_path: str | |
| line_number: int | None = field(default=None) | |
| function_name: str | None = None | |
| code_snippet: str | None = field(default=None) | |
| def __post_init__(self): | |
| if self.line_number is not None and self.line_number < 1: | |
| raise ValueError("Line number must be positive") | |
| if self.code_snippet is not None and len(self.code_snippet) > 1000: | |
| self.code_snippet = f"{self.code_snippet[:1000]}...[truncated]" |
| ) # function -> set of functions it calls | ||
| self._build_graph() | ||
|
|
||
| def _build_graph(self) -> None: | ||
| """Build the function call graph.""" | ||
| # Initialize all functions as nodes in the graph | ||
| for function in self.codebase.functions: | ||
| self.callers[function.name] = set() | ||
| self.callees[function.name] = set() | ||
|
|
||
| # Add edges for function calls | ||
| for function in self.codebase.functions: | ||
| if not hasattr(function, "code_block"): | ||
| continue | ||
|
|
||
| for call in function.code_block.function_calls: | ||
| # Skip calls to functions not in the codebase | ||
| if call.name not in self.callees: | ||
| continue | ||
|
|
||
| self.callees[function.name].add(call.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _build_graph method in FunctionCallGraph lacks crucial error handling and performance optimization. Consider:
- Adding error handling for circular references
- Implementing a cache for repeated lookups
- Adding cycle detection
- Including function call frequency tracking
| ) # function -> set of functions it calls | |
| self._build_graph() | |
| def _build_graph(self) -> None: | |
| """Build the function call graph.""" | |
| # Initialize all functions as nodes in the graph | |
| for function in self.codebase.functions: | |
| self.callers[function.name] = set() | |
| self.callees[function.name] = set() | |
| # Add edges for function calls | |
| for function in self.codebase.functions: | |
| if not hasattr(function, "code_block"): | |
| continue | |
| for call in function.code_block.function_calls: | |
| # Skip calls to functions not in the codebase | |
| if call.name not in self.callees: | |
| continue | |
| self.callees[function.name].add(call.name) | |
| def _build_graph(self) -> None: | |
| """Build the function call graph.""" | |
| self._call_frequency = defaultdict(int) | |
| self._cache = {} | |
| # Initialize all functions as nodes | |
| for function in self.codebase.functions: | |
| self.callers[function.name] = set() | |
| self.callees[function.name] = set() | |
| try: | |
| if not hasattr(function, "code_block"): | |
| continue | |
| for call in function.code_block.function_calls: | |
| if call.name not in self.callees: | |
| continue | |
| # Track call frequency | |
| self._call_frequency[call.name] += 1 | |
| # Detect potential cycles | |
| if call.name in self.callees[function.name]: | |
| logger.warning(f"Circular call detected between {function.name} and {call.name}") | |
| self.callees[function.name].add(call.name) | |
| self.callers[call.name].add(function.name) | |
| except Exception as e: | |
| logger.error(f"Error processing function {function.name}: {str(e)}") |
| """ | ||
| return [ | ||
| error | ||
| for error in self.errors | ||
| if error.function_name == function_name | ||
| ] | ||
|
|
||
|
|
||
| class TypeInferenceEngine: | ||
| """ | ||
| Infers types for variables and expressions. | ||
| This class provides methods for inferring types based on usage patterns | ||
| and context. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, codebase: Codebase, context: CodebaseContext | None = None | ||
| ): | ||
| """ | ||
| Initialize the type inference engine. | ||
| Args: | ||
| codebase: The codebase to analyze | ||
| context: Optional context for the analysis | ||
| """ | ||
| self.codebase = codebase | ||
| self.context = context | ||
| self.inferred_types: dict[str, dict[str, str]] = ( | ||
| {} | ||
| ) # function_name -> {variable_name: type} | ||
|
|
||
| def infer_types(self) -> dict[str, dict[str, str]]: | ||
| """ | ||
| Infer types for variables in the codebase. | ||
| Returns: | ||
| A dictionary mapping function names to dictionaries mapping | ||
| variable names to inferred types | ||
| """ | ||
| self.inferred_types = {} | ||
|
|
||
| for function in self.codebase.functions: | ||
| if not hasattr(function, "code_block"): | ||
| continue | ||
|
|
||
| self.inferred_types[function.name] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TypeInferenceEngine lacks handling of important Python type features:
- Union types
- Optional types
- Generic types
- Type aliases
Add support for these Python typing features to make the type inference more robust and accurate.
| """ | |
| return [ | |
| error | |
| for error in self.errors | |
| if error.function_name == function_name | |
| ] | |
| class TypeInferenceEngine: | |
| """ | |
| Infers types for variables and expressions. | |
| This class provides methods for inferring types based on usage patterns | |
| and context. | |
| """ | |
| def __init__( | |
| self, codebase: Codebase, context: CodebaseContext | None = None | |
| ): | |
| """ | |
| Initialize the type inference engine. | |
| Args: | |
| codebase: The codebase to analyze | |
| context: Optional context for the analysis | |
| """ | |
| self.codebase = codebase | |
| self.context = context | |
| self.inferred_types: dict[str, dict[str, str]] = ( | |
| {} | |
| ) # function_name -> {variable_name: type} | |
| def infer_types(self) -> dict[str, dict[str, str]]: | |
| """ | |
| Infer types for variables in the codebase. | |
| Returns: | |
| A dictionary mapping function names to dictionaries mapping | |
| variable names to inferred types | |
| """ | |
| self.inferred_types = {} | |
| for function in self.codebase.functions: | |
| if not hasattr(function, "code_block"): | |
| continue | |
| self.inferred_types[function.name] = {} | |
| class TypeInferenceEngine: | |
| def _normalize_type(self, type_hint: str) -> str: | |
| """Normalize type hints to a canonical form.""" | |
| # Handle Optional[] -> Union[None, T] | |
| if type_hint.startswith('Optional['): | |
| inner = type_hint[9:-1] | |
| return f"Union[None, {inner}]" | |
| # Handle Union types | |
| if type_hint.startswith('Union['): | |
| types = [t.strip() for t in type_hint[6:-1].split(',')] | |
| return f"Union[{', '.join(sorted(types))}]" | |
| # Handle generics | |
| if '[' in type_hint: | |
| base, params = type_hint.split('[', 1) | |
| params = [self._normalize_type(p.strip()) for p in params[:-1].split(',')] | |
| return f"{base}[{', '.join(params)}]" | |
| return type_hint | |
| def infer_types(self) -> dict[str, dict[str, str]]: | |
| self.inferred_types = {} | |
| self.type_aliases = {} | |
| # First pass - collect type aliases | |
| for function in self.codebase.functions: | |
| if not hasattr(function, "code_block"): | |
| continue | |
| # Look for type alias assignments | |
| for stmt in function.code_block.statements: | |
| if self._is_type_alias(stmt): | |
| alias_name = stmt.left.name | |
| alias_type = self._normalize_type(stmt.right.type) | |
| self.type_aliases[alias_name] = alias_type |
| class CodebaseContext: | ||
| """MultiDiGraph Wrapper with TransactionManager""" | ||
|
|
||
| # =====[ __init__ attributes ]===== | ||
| node_classes: NodeClasses | ||
| programming_language: ProgrammingLanguage | ||
| repo_path: str | ||
| repo_name: str | ||
| codeowners_parser: CodeOwnersParser | None | ||
| config: CodebaseConfig | ||
| secrets: SecretsConfig | ||
|
|
||
| # =====[ computed attributes ]===== | ||
| transaction_manager: TransactionManager | ||
| pending_syncs: list[DiffLite] # Diffs that have been applied to disk, but not the graph (to be used for sync graph) | ||
| all_syncs: list[DiffLite] # All diffs that have been applied to the graph (to be used for graph reset) | ||
| _autocommit: AutoCommit | ||
| generation: int | ||
| parser: Parser[Expression] | ||
| synced_commit: GitCommit | None | ||
| directories: dict[Path, Directory] | ||
| base_url: str | None | ||
| extensions: list[str] | ||
| config_parser: ConfigParser | None | ||
| dependency_manager: DependencyManager | None | ||
| language_engine: LanguageEngine | None | ||
| _computing = False | ||
| filepath_idx: dict[str, NodeId] | ||
| _ext_module_idx: dict[str, NodeId] | ||
| flags: Flags | ||
| session_options: SessionOptions = SessionOptions() | ||
| projects: list[ProjectConfig] | ||
| unapplied_diffs: list[DiffLite] | ||
| io: IO | ||
| progress: Progress | ||
|
|
||
| def __init__( | ||
| self, | ||
| projects: list[ProjectConfig], | ||
| config: CodebaseConfig | None = None, | ||
| secrets: SecretsConfig | None = None, | ||
| io: IO | None = None, | ||
| progress: Progress | None = None, | ||
| ) -> None: | ||
| """Initializes codebase graph and TransactionManager""" | ||
| from codegen.sdk.core.parser import Parser | ||
|
|
||
| self.progress = progress or StubProgress() | ||
| self.filepath_idx = {} | ||
| self._ext_module_idx = {} | ||
| self.generation = 0 | ||
|
|
||
| # NOTE: The differences between base_path, repo_name, and repo_path | ||
| # /home/codegen/projects/my-project/src | ||
| # ^^^ <- Base Path (Optional) | ||
| # ^^^^^^^^^^ <----- Repo Name | ||
| # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ <----- Repo Path | ||
| # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ <- Full Path | ||
| # (full_path is unused for CGB, but is used elsewhere.) | ||
|
|
||
| # =====[ __init__ attributes ]===== | ||
| self.projects = projects | ||
| context = projects[0] | ||
| self.node_classes = get_node_classes(context.programming_language) | ||
| self.config = config or CodebaseConfig() | ||
| self.secrets = secrets or SecretsConfig() | ||
| self.repo_name = context.repo_operator.repo_name | ||
| self.repo_path = str(Path(context.repo_operator.repo_path).resolve()) | ||
| self.full_path = os.path.join(self.repo_path, context.base_path) if context.base_path else self.repo_path | ||
| self.codeowners_parser = context.repo_operator.codeowners_parser | ||
| self.base_url = context.repo_operator.base_url | ||
| if not self.config.allow_external: | ||
| # TODO: Fix this to be more robust with multiple projects | ||
| self.io = io or FileIO(allowed_paths=[Path(self.repo_path).resolve()]) | ||
| else: | ||
| self.io = io or FileIO() | ||
| # =====[ computed attributes ]===== | ||
| self.transaction_manager = TransactionManager() | ||
| self._autocommit = AutoCommit(self) | ||
| self.init_nodes = None | ||
| self.init_edges = None | ||
| self.directories = dict() | ||
| self.parser = Parser.from_node_classes(self.node_classes, log_parse_warnings=self.config.debug) | ||
| self.extensions = self.node_classes.file_cls.get_extensions() | ||
| # ORDER IS IMPORTANT HERE! | ||
| self.config_parser = get_config_parser_for_language(context.programming_language, self) | ||
| self.dependency_manager = get_dependency_manager(context.programming_language, self) | ||
| self.language_engine = get_language_engine(context.programming_language, self) | ||
| self.programming_language = context.programming_language | ||
|
|
||
| # Raise warning if language is not supported | ||
| if self.programming_language is ProgrammingLanguage.UNSUPPORTED or self.programming_language is ProgrammingLanguage.OTHER: | ||
| logger.warning("WARNING: The codebase is using an unsupported language!") | ||
| logger.warning("Some features may not work as expected. Advanced static analysis will be disabled but simple file IO will still work.") | ||
|
|
||
| # Assert config assertions | ||
| # External import resolution must be enabled if syspath is enabled | ||
| if self.config.py_resolve_syspath: | ||
| if not self.config.allow_external: | ||
| msg = "allow_external must be set to True when py_resolve_syspath is enabled" | ||
| raise ValueError(msg) | ||
|
|
||
| # Build the graph | ||
| if not self.config.exp_lazy_graph and self.config.use_pink != PinkMode.ALL_FILES: | ||
| self.build_graph(context.repo_operator) | ||
| try: | ||
| self.synced_commit = context.repo_operator.head_commit | ||
| except ValueError as e: | ||
| logger.exception("Error getting commit head %s", e) | ||
| self.synced_commit = None | ||
| self.pending_syncs = [] | ||
| self.all_syncs = [] | ||
| self.unapplied_diffs = [] | ||
| self.flags = Flags() | ||
|
|
||
| def __repr__(self): | ||
| return self.__class__.__name__ | ||
|
|
||
| @property | ||
| def _graph(self) -> PyDiGraph[Importable, Edge]: | ||
| if not self.__graph_ready: | ||
| logger.info("Lazily Computing Graph") | ||
| self.build_graph(self.projects[0].repo_operator) | ||
| return self.__graph | ||
|
|
||
| @_graph.setter | ||
| def _graph(self, value: PyDiGraph[Importable, Edge]) -> None: | ||
| self.__graph = value | ||
|
|
||
| @stopwatch_with_sentry(name="build_graph") | ||
| @commiter | ||
| def build_graph(self, repo_operator: RepoOperator) -> None: | ||
| """Builds a codebase graph based on the current file state of the given repo operator""" | ||
| self.__graph_ready = True | ||
| self._graph.clear() | ||
|
|
||
| # =====[ Add all files to the graph in parallel ]===== | ||
| syncs = defaultdict(lambda: []) | ||
| if self.config.disable_file_parse: | ||
| logger.warning("WARNING: File parsing is disabled!") | ||
| else: | ||
| for filepath, _ in repo_operator.iter_files(subdirs=self.projects[0].subdirectories, extensions=self.extensions, ignore_list=GLOBAL_FILE_IGNORE_LIST): | ||
| syncs[SyncType.ADD].append(self.to_absolute(filepath)) | ||
| logger.info(f"> Parsing {len(syncs[SyncType.ADD])} files in {self.projects[0].subdirectories or 'ALL'} subdirectories with {self.extensions} extensions") | ||
| self._process_diff_files(syncs, incremental=False) | ||
| files: list[SourceFile] = self.get_nodes(NodeType.FILE) | ||
| logger.info(f"> Found {len(files)} files") | ||
| logger.info(f"> Found {len(self.nodes)} nodes and {len(self.edges)} edges") | ||
| if self.config.track_graph: | ||
| self.old_graph = self._graph.copy() | ||
|
|
||
| @stopwatch | ||
| @commiter | ||
| def apply_diffs(self, diff_list: list[DiffLite]) -> None: | ||
| """Applies the given set of diffs to the graph in order to match the current file system content""" | ||
| if self.session_options: | ||
| self.session_options = self.session_options.model_copy(update={"max_seconds": None}) | ||
| logger.info(f"Applying {len(diff_list)} diffs to graph") | ||
| files_to_sync: dict[Path, SyncType] = {} | ||
| # Gather list of deleted files, new files to add, and modified files to reparse | ||
| file_cls = self.node_classes.file_cls | ||
| extensions = file_cls.get_extensions() | ||
| for diff in diff_list: | ||
| filepath = Path(diff.path) | ||
| if extensions is not None and filepath.suffix not in extensions: | ||
| continue | ||
| if self.projects[0].subdirectories is not None and not any(filepath.relative_to(subdir) for subdir in self.projects[0].subdirectories): | ||
| continue | ||
|
|
||
| if diff.change_type == ChangeType.Added: | ||
| # Sync by adding the added file to the graph | ||
| files_to_sync[filepath] = SyncType.ADD | ||
| elif diff.change_type == ChangeType.Modified: | ||
| files_to_sync[filepath] = SyncType.REPARSE | ||
| elif diff.change_type == ChangeType.Renamed: | ||
| files_to_sync[diff.rename_from] = SyncType.DELETE | ||
| files_to_sync[diff.rename_to] = SyncType.ADD | ||
| elif diff.change_type == ChangeType.Removed: | ||
| files_to_sync[filepath] = SyncType.DELETE | ||
| else: | ||
| logger.warning(f"Unhandled diff change type: {diff.change_type}") | ||
| by_sync_type = defaultdict(lambda: []) | ||
| if self.config.disable_file_parse: | ||
| logger.warning("WARNING: File parsing is disabled!") | ||
| else: | ||
| for filepath, sync_type in files_to_sync.items(): | ||
| if self.get_file(filepath) is None: | ||
| if sync_type is SyncType.DELETE: | ||
| # SourceFile is already deleted, nothing to do here | ||
| continue | ||
| elif sync_type is SyncType.REPARSE: | ||
| # SourceFile needs to be parsed for the first time | ||
| sync_type = SyncType.ADD | ||
| elif sync_type is SyncType.ADD: | ||
| # If the file was deleted earlier, we need to reparse so we can remove old edges | ||
| sync_type = SyncType.REPARSE | ||
|
|
||
| by_sync_type[sync_type].append(filepath) | ||
| self.generation += 1 | ||
| self._process_diff_files(by_sync_type) | ||
|
|
||
| def _reset_files(self, syncs: list[DiffLite]) -> None: | ||
| files_to_write = [] | ||
| files_to_remove = [] | ||
| modified_files = set() | ||
| for sync in syncs: | ||
| if sync.path in modified_files: | ||
| continue | ||
| if sync.change_type == ChangeType.Removed: | ||
| files_to_write.append((sync.path, sync.old_content)) | ||
| modified_files.add(sync.path) | ||
| logger.info(f"Removing {sync.path} from disk") | ||
| elif sync.change_type == ChangeType.Modified: | ||
| files_to_write.append((sync.path, sync.old_content)) | ||
| modified_files.add(sync.path) | ||
| elif sync.change_type == ChangeType.Renamed: | ||
| files_to_write.append((sync.rename_from, sync.old_content)) | ||
| files_to_remove.append(sync.rename_to) | ||
| modified_files.add(sync.rename_from) | ||
| modified_files.add(sync.rename_to) | ||
| elif sync.change_type == ChangeType.Added: | ||
| files_to_remove.append(sync.path) | ||
| modified_files.add(sync.path) | ||
| logger.info(f"Writing {len(files_to_write)} files to disk and removing {len(files_to_remove)} files") | ||
| for file in files_to_remove: | ||
| self.io.delete_file(file) | ||
| to_save = set() | ||
| for file, content in files_to_write: | ||
| self.io.write_file(file, content) | ||
| to_save.add(file) | ||
| self.io.save_files(to_save) | ||
|
|
||
| @stopwatch | ||
| def reset_codebase(self) -> None: | ||
| self._reset_files(self.all_syncs + self.pending_syncs + self.unapplied_diffs) | ||
| self.unapplied_diffs.clear() | ||
|
|
||
| @stopwatch | ||
| def undo_applied_diffs(self) -> None: | ||
| self.transaction_manager.clear_transactions() | ||
| self.reset_codebase() | ||
| self.io.check_changes() | ||
| self.pending_syncs.clear() # Discard pending changes | ||
| if len(self.all_syncs) > 0: | ||
| logger.info(f"Unapplying {len(self.all_syncs)} diffs to graph. Current graph commit: {self.synced_commit}") | ||
| self._revert_diffs(list(reversed(self.all_syncs))) | ||
| self.all_syncs.clear() | ||
|
|
||
| @stopwatch | ||
| @commiter(reset=True) | ||
| def _revert_diffs(self, diff_list: list[DiffLite]) -> None: | ||
| """Resets the graph to its initial solve branch file state""" | ||
| reversed_diff_list = list(DiffLite.from_reverse_diff(diff) for diff in diff_list) | ||
| self._autocommit.reset() | ||
| self.apply_diffs(reversed_diff_list) | ||
| # ====== [ Re-resolve lost edges from previous syncs ] ====== | ||
| self.prune_graph() | ||
| if self.config.verify_graph: | ||
| post_reset_validation(self.old_graph.nodes(), self._graph.nodes(), get_edges(self.old_graph), get_edges(self._graph), self.repo_name, self.projects[0].subdirectories) | ||
|
|
||
| def save_commit(self, commit: GitCommit) -> None: | ||
| if commit is not None: | ||
| logger.info(f"Saving commit {commit.hexsha} to graph") | ||
| self.all_syncs.clear() | ||
| self.unapplied_diffs.clear() | ||
| self.synced_commit = commit | ||
| if self.config.verify_graph: | ||
| self.old_graph = self._graph.copy() | ||
|
|
||
| @stopwatch | ||
| def prune_graph(self) -> None: | ||
| # ====== [ Remove orphaned external modules ] ====== | ||
| external_modules = self.get_nodes(NodeType.EXTERNAL) | ||
| for module in external_modules: | ||
| if not any(self.predecessors(module.node_id)): | ||
| self.remove_node(module.node_id) | ||
| self._ext_module_idx.pop(module._idx_key, None) | ||
|
|
||
| def build_directory_tree(self) -> None: | ||
| """Builds the directory tree for the codebase""" | ||
| # Reset and rebuild the directory tree | ||
| self.directories = dict() | ||
|
|
||
| for file_path, _ in self.projects[0].repo_operator.iter_files( | ||
| subdirs=self.projects[0].subdirectories, | ||
| ignore_list=GLOBAL_FILE_IGNORE_LIST, | ||
| skip_content=True, | ||
| ): | ||
| file_path = Path(file_path) | ||
| directory = self.get_directory(file_path.parent, create_on_missing=True) | ||
| directory._add_file(file_path.name) | ||
|
|
||
| def get_directory(self, directory_path: PathLike, create_on_missing: bool = False, ignore_case: bool = False) -> Directory | None: | ||
| """Returns the directory object for the given path, or None if the directory does not exist. | ||
| If create_on_missing is set, use a recursive strategy to create the directory object and all subdirectories. | ||
| """ | ||
| # If not part of repo path, return None | ||
| absolute_path = self.to_absolute(directory_path) | ||
| if not self.is_subdir(absolute_path) and not self.config.allow_external: | ||
| assert False, f"Directory {absolute_path} is not part of repo path {self.repo_path}" | ||
| return None | ||
|
|
||
| # Get the directory | ||
| if dir := self.directories.get(absolute_path, None): | ||
| return dir | ||
| if ignore_case: | ||
| for path, directory in self.directories.items(): | ||
| if str(absolute_path).lower() == str(path).lower(): | ||
| return directory | ||
|
|
||
| # If the directory does not exist, create it | ||
| if create_on_missing: | ||
| # Get the parent directory and create it if it does not exist | ||
| parent_path = absolute_path.parent | ||
|
|
||
| # Base Case | ||
| if str(absolute_path) == str(self.repo_path) or str(absolute_path) == str(parent_path): | ||
| root_directory = Directory(ctx=self, path=absolute_path, dirpath="") | ||
| self.directories[absolute_path] = root_directory | ||
| return root_directory | ||
|
|
||
| # Recursively create the parent directory | ||
| parent = self.get_directory(parent_path, create_on_missing=True) | ||
| # Create the directory | ||
| directory = Directory(ctx=self, path=absolute_path, dirpath=str(self.to_relative(absolute_path))) | ||
| # Add the directory to the parent | ||
| parent._add_subdirectory(directory.name) | ||
| # Add the directory to the tree | ||
| self.directories[absolute_path] = directory | ||
| return directory | ||
| """ | ||
| Manages context for a codebase. | ||
| This class provides methods for resolving symbols, tracking imports, | ||
| and analyzing dependencies within a codebase. | ||
| """ | ||
|
|
||
| projects: List[Any] | ||
| config: Optional[Any] = None | ||
| _symbol_cache: Dict[str, Symbol] = None | ||
| _import_cache: Dict[str, Import] = None | ||
| _dependency_graph: Dict[str, Set[str]] = None | ||
|
|
||
| def __post_init__(self): | ||
| """Initialize caches and graphs after instance creation.""" | ||
| self._symbol_cache = {} | ||
| self._import_cache = {} | ||
| self._dependency_graph = {} | ||
| self._build_caches() | ||
|
|
||
| def _build_caches(self): | ||
| """Build caches for symbols and imports.""" | ||
| for project in self.projects: | ||
| if hasattr(project, "codebase") and project.codebase: | ||
| self._cache_symbols(project.codebase) | ||
| self._cache_imports(project.codebase) | ||
| self._build_dependency_graph(project.codebase) | ||
|
|
||
| def _cache_symbols(self, codebase: Codebase): | ||
| """ | ||
| Cache symbols from a codebase. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CodebaseContext class has inefficient symbol caching. Consider:
- Using a weakref dictionary to prevent memory leaks
- Implementing an LRU cache for frequently accessed symbols
- Adding cache invalidation on context changes
- Including cache statistics for monitoring
| class CodebaseContext: | |
| """MultiDiGraph Wrapper with TransactionManager""" | |
| # =====[ __init__ attributes ]===== | |
| node_classes: NodeClasses | |
| programming_language: ProgrammingLanguage | |
| repo_path: str | |
| repo_name: str | |
| codeowners_parser: CodeOwnersParser | None | |
| config: CodebaseConfig | |
| secrets: SecretsConfig | |
| # =====[ computed attributes ]===== | |
| transaction_manager: TransactionManager | |
| pending_syncs: list[DiffLite] # Diffs that have been applied to disk, but not the graph (to be used for sync graph) | |
| all_syncs: list[DiffLite] # All diffs that have been applied to the graph (to be used for graph reset) | |
| _autocommit: AutoCommit | |
| generation: int | |
| parser: Parser[Expression] | |
| synced_commit: GitCommit | None | |
| directories: dict[Path, Directory] | |
| base_url: str | None | |
| extensions: list[str] | |
| config_parser: ConfigParser | None | |
| dependency_manager: DependencyManager | None | |
| language_engine: LanguageEngine | None | |
| _computing = False | |
| filepath_idx: dict[str, NodeId] | |
| _ext_module_idx: dict[str, NodeId] | |
| flags: Flags | |
| session_options: SessionOptions = SessionOptions() | |
| projects: list[ProjectConfig] | |
| unapplied_diffs: list[DiffLite] | |
| io: IO | |
| progress: Progress | |
| def __init__( | |
| self, | |
| projects: list[ProjectConfig], | |
| config: CodebaseConfig | None = None, | |
| secrets: SecretsConfig | None = None, | |
| io: IO | None = None, | |
| progress: Progress | None = None, | |
| ) -> None: | |
| """Initializes codebase graph and TransactionManager""" | |
| from codegen.sdk.core.parser import Parser | |
| self.progress = progress or StubProgress() | |
| self.filepath_idx = {} | |
| self._ext_module_idx = {} | |
| self.generation = 0 | |
| # NOTE: The differences between base_path, repo_name, and repo_path | |
| # /home/codegen/projects/my-project/src | |
| # ^^^ <- Base Path (Optional) | |
| # ^^^^^^^^^^ <----- Repo Name | |
| # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ <----- Repo Path | |
| # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ <- Full Path | |
| # (full_path is unused for CGB, but is used elsewhere.) | |
| # =====[ __init__ attributes ]===== | |
| self.projects = projects | |
| context = projects[0] | |
| self.node_classes = get_node_classes(context.programming_language) | |
| self.config = config or CodebaseConfig() | |
| self.secrets = secrets or SecretsConfig() | |
| self.repo_name = context.repo_operator.repo_name | |
| self.repo_path = str(Path(context.repo_operator.repo_path).resolve()) | |
| self.full_path = os.path.join(self.repo_path, context.base_path) if context.base_path else self.repo_path | |
| self.codeowners_parser = context.repo_operator.codeowners_parser | |
| self.base_url = context.repo_operator.base_url | |
| if not self.config.allow_external: | |
| # TODO: Fix this to be more robust with multiple projects | |
| self.io = io or FileIO(allowed_paths=[Path(self.repo_path).resolve()]) | |
| else: | |
| self.io = io or FileIO() | |
| # =====[ computed attributes ]===== | |
| self.transaction_manager = TransactionManager() | |
| self._autocommit = AutoCommit(self) | |
| self.init_nodes = None | |
| self.init_edges = None | |
| self.directories = dict() | |
| self.parser = Parser.from_node_classes(self.node_classes, log_parse_warnings=self.config.debug) | |
| self.extensions = self.node_classes.file_cls.get_extensions() | |
| # ORDER IS IMPORTANT HERE! | |
| self.config_parser = get_config_parser_for_language(context.programming_language, self) | |
| self.dependency_manager = get_dependency_manager(context.programming_language, self) | |
| self.language_engine = get_language_engine(context.programming_language, self) | |
| self.programming_language = context.programming_language | |
| # Raise warning if language is not supported | |
| if self.programming_language is ProgrammingLanguage.UNSUPPORTED or self.programming_language is ProgrammingLanguage.OTHER: | |
| logger.warning("WARNING: The codebase is using an unsupported language!") | |
| logger.warning("Some features may not work as expected. Advanced static analysis will be disabled but simple file IO will still work.") | |
| # Assert config assertions | |
| # External import resolution must be enabled if syspath is enabled | |
| if self.config.py_resolve_syspath: | |
| if not self.config.allow_external: | |
| msg = "allow_external must be set to True when py_resolve_syspath is enabled" | |
| raise ValueError(msg) | |
| # Build the graph | |
| if not self.config.exp_lazy_graph and self.config.use_pink != PinkMode.ALL_FILES: | |
| self.build_graph(context.repo_operator) | |
| try: | |
| self.synced_commit = context.repo_operator.head_commit | |
| except ValueError as e: | |
| logger.exception("Error getting commit head %s", e) | |
| self.synced_commit = None | |
| self.pending_syncs = [] | |
| self.all_syncs = [] | |
| self.unapplied_diffs = [] | |
| self.flags = Flags() | |
| def __repr__(self): | |
| return self.__class__.__name__ | |
| @property | |
| def _graph(self) -> PyDiGraph[Importable, Edge]: | |
| if not self.__graph_ready: | |
| logger.info("Lazily Computing Graph") | |
| self.build_graph(self.projects[0].repo_operator) | |
| return self.__graph | |
| @_graph.setter | |
| def _graph(self, value: PyDiGraph[Importable, Edge]) -> None: | |
| self.__graph = value | |
| @stopwatch_with_sentry(name="build_graph") | |
| @commiter | |
| def build_graph(self, repo_operator: RepoOperator) -> None: | |
| """Builds a codebase graph based on the current file state of the given repo operator""" | |
| self.__graph_ready = True | |
| self._graph.clear() | |
| # =====[ Add all files to the graph in parallel ]===== | |
| syncs = defaultdict(lambda: []) | |
| if self.config.disable_file_parse: | |
| logger.warning("WARNING: File parsing is disabled!") | |
| else: | |
| for filepath, _ in repo_operator.iter_files(subdirs=self.projects[0].subdirectories, extensions=self.extensions, ignore_list=GLOBAL_FILE_IGNORE_LIST): | |
| syncs[SyncType.ADD].append(self.to_absolute(filepath)) | |
| logger.info(f"> Parsing {len(syncs[SyncType.ADD])} files in {self.projects[0].subdirectories or 'ALL'} subdirectories with {self.extensions} extensions") | |
| self._process_diff_files(syncs, incremental=False) | |
| files: list[SourceFile] = self.get_nodes(NodeType.FILE) | |
| logger.info(f"> Found {len(files)} files") | |
| logger.info(f"> Found {len(self.nodes)} nodes and {len(self.edges)} edges") | |
| if self.config.track_graph: | |
| self.old_graph = self._graph.copy() | |
| @stopwatch | |
| @commiter | |
| def apply_diffs(self, diff_list: list[DiffLite]) -> None: | |
| """Applies the given set of diffs to the graph in order to match the current file system content""" | |
| if self.session_options: | |
| self.session_options = self.session_options.model_copy(update={"max_seconds": None}) | |
| logger.info(f"Applying {len(diff_list)} diffs to graph") | |
| files_to_sync: dict[Path, SyncType] = {} | |
| # Gather list of deleted files, new files to add, and modified files to reparse | |
| file_cls = self.node_classes.file_cls | |
| extensions = file_cls.get_extensions() | |
| for diff in diff_list: | |
| filepath = Path(diff.path) | |
| if extensions is not None and filepath.suffix not in extensions: | |
| continue | |
| if self.projects[0].subdirectories is not None and not any(filepath.relative_to(subdir) for subdir in self.projects[0].subdirectories): | |
| continue | |
| if diff.change_type == ChangeType.Added: | |
| # Sync by adding the added file to the graph | |
| files_to_sync[filepath] = SyncType.ADD | |
| elif diff.change_type == ChangeType.Modified: | |
| files_to_sync[filepath] = SyncType.REPARSE | |
| elif diff.change_type == ChangeType.Renamed: | |
| files_to_sync[diff.rename_from] = SyncType.DELETE | |
| files_to_sync[diff.rename_to] = SyncType.ADD | |
| elif diff.change_type == ChangeType.Removed: | |
| files_to_sync[filepath] = SyncType.DELETE | |
| else: | |
| logger.warning(f"Unhandled diff change type: {diff.change_type}") | |
| by_sync_type = defaultdict(lambda: []) | |
| if self.config.disable_file_parse: | |
| logger.warning("WARNING: File parsing is disabled!") | |
| else: | |
| for filepath, sync_type in files_to_sync.items(): | |
| if self.get_file(filepath) is None: | |
| if sync_type is SyncType.DELETE: | |
| # SourceFile is already deleted, nothing to do here | |
| continue | |
| elif sync_type is SyncType.REPARSE: | |
| # SourceFile needs to be parsed for the first time | |
| sync_type = SyncType.ADD | |
| elif sync_type is SyncType.ADD: | |
| # If the file was deleted earlier, we need to reparse so we can remove old edges | |
| sync_type = SyncType.REPARSE | |
| by_sync_type[sync_type].append(filepath) | |
| self.generation += 1 | |
| self._process_diff_files(by_sync_type) | |
| def _reset_files(self, syncs: list[DiffLite]) -> None: | |
| files_to_write = [] | |
| files_to_remove = [] | |
| modified_files = set() | |
| for sync in syncs: | |
| if sync.path in modified_files: | |
| continue | |
| if sync.change_type == ChangeType.Removed: | |
| files_to_write.append((sync.path, sync.old_content)) | |
| modified_files.add(sync.path) | |
| logger.info(f"Removing {sync.path} from disk") | |
| elif sync.change_type == ChangeType.Modified: | |
| files_to_write.append((sync.path, sync.old_content)) | |
| modified_files.add(sync.path) | |
| elif sync.change_type == ChangeType.Renamed: | |
| files_to_write.append((sync.rename_from, sync.old_content)) | |
| files_to_remove.append(sync.rename_to) | |
| modified_files.add(sync.rename_from) | |
| modified_files.add(sync.rename_to) | |
| elif sync.change_type == ChangeType.Added: | |
| files_to_remove.append(sync.path) | |
| modified_files.add(sync.path) | |
| logger.info(f"Writing {len(files_to_write)} files to disk and removing {len(files_to_remove)} files") | |
| for file in files_to_remove: | |
| self.io.delete_file(file) | |
| to_save = set() | |
| for file, content in files_to_write: | |
| self.io.write_file(file, content) | |
| to_save.add(file) | |
| self.io.save_files(to_save) | |
| @stopwatch | |
| def reset_codebase(self) -> None: | |
| self._reset_files(self.all_syncs + self.pending_syncs + self.unapplied_diffs) | |
| self.unapplied_diffs.clear() | |
| @stopwatch | |
| def undo_applied_diffs(self) -> None: | |
| self.transaction_manager.clear_transactions() | |
| self.reset_codebase() | |
| self.io.check_changes() | |
| self.pending_syncs.clear() # Discard pending changes | |
| if len(self.all_syncs) > 0: | |
| logger.info(f"Unapplying {len(self.all_syncs)} diffs to graph. Current graph commit: {self.synced_commit}") | |
| self._revert_diffs(list(reversed(self.all_syncs))) | |
| self.all_syncs.clear() | |
| @stopwatch | |
| @commiter(reset=True) | |
| def _revert_diffs(self, diff_list: list[DiffLite]) -> None: | |
| """Resets the graph to its initial solve branch file state""" | |
| reversed_diff_list = list(DiffLite.from_reverse_diff(diff) for diff in diff_list) | |
| self._autocommit.reset() | |
| self.apply_diffs(reversed_diff_list) | |
| # ====== [ Re-resolve lost edges from previous syncs ] ====== | |
| self.prune_graph() | |
| if self.config.verify_graph: | |
| post_reset_validation(self.old_graph.nodes(), self._graph.nodes(), get_edges(self.old_graph), get_edges(self._graph), self.repo_name, self.projects[0].subdirectories) | |
| def save_commit(self, commit: GitCommit) -> None: | |
| if commit is not None: | |
| logger.info(f"Saving commit {commit.hexsha} to graph") | |
| self.all_syncs.clear() | |
| self.unapplied_diffs.clear() | |
| self.synced_commit = commit | |
| if self.config.verify_graph: | |
| self.old_graph = self._graph.copy() | |
| @stopwatch | |
| def prune_graph(self) -> None: | |
| # ====== [ Remove orphaned external modules ] ====== | |
| external_modules = self.get_nodes(NodeType.EXTERNAL) | |
| for module in external_modules: | |
| if not any(self.predecessors(module.node_id)): | |
| self.remove_node(module.node_id) | |
| self._ext_module_idx.pop(module._idx_key, None) | |
| def build_directory_tree(self) -> None: | |
| """Builds the directory tree for the codebase""" | |
| # Reset and rebuild the directory tree | |
| self.directories = dict() | |
| for file_path, _ in self.projects[0].repo_operator.iter_files( | |
| subdirs=self.projects[0].subdirectories, | |
| ignore_list=GLOBAL_FILE_IGNORE_LIST, | |
| skip_content=True, | |
| ): | |
| file_path = Path(file_path) | |
| directory = self.get_directory(file_path.parent, create_on_missing=True) | |
| directory._add_file(file_path.name) | |
| def get_directory(self, directory_path: PathLike, create_on_missing: bool = False, ignore_case: bool = False) -> Directory | None: | |
| """Returns the directory object for the given path, or None if the directory does not exist. | |
| If create_on_missing is set, use a recursive strategy to create the directory object and all subdirectories. | |
| """ | |
| # If not part of repo path, return None | |
| absolute_path = self.to_absolute(directory_path) | |
| if not self.is_subdir(absolute_path) and not self.config.allow_external: | |
| assert False, f"Directory {absolute_path} is not part of repo path {self.repo_path}" | |
| return None | |
| # Get the directory | |
| if dir := self.directories.get(absolute_path, None): | |
| return dir | |
| if ignore_case: | |
| for path, directory in self.directories.items(): | |
| if str(absolute_path).lower() == str(path).lower(): | |
| return directory | |
| # If the directory does not exist, create it | |
| if create_on_missing: | |
| # Get the parent directory and create it if it does not exist | |
| parent_path = absolute_path.parent | |
| # Base Case | |
| if str(absolute_path) == str(self.repo_path) or str(absolute_path) == str(parent_path): | |
| root_directory = Directory(ctx=self, path=absolute_path, dirpath="") | |
| self.directories[absolute_path] = root_directory | |
| return root_directory | |
| # Recursively create the parent directory | |
| parent = self.get_directory(parent_path, create_on_missing=True) | |
| # Create the directory | |
| directory = Directory(ctx=self, path=absolute_path, dirpath=str(self.to_relative(absolute_path))) | |
| # Add the directory to the parent | |
| parent._add_subdirectory(directory.name) | |
| # Add the directory to the tree | |
| self.directories[absolute_path] = directory | |
| return directory | |
| """ | |
| Manages context for a codebase. | |
| This class provides methods for resolving symbols, tracking imports, | |
| and analyzing dependencies within a codebase. | |
| """ | |
| projects: List[Any] | |
| config: Optional[Any] = None | |
| _symbol_cache: Dict[str, Symbol] = None | |
| _import_cache: Dict[str, Import] = None | |
| _dependency_graph: Dict[str, Set[str]] = None | |
| def __post_init__(self): | |
| """Initialize caches and graphs after instance creation.""" | |
| self._symbol_cache = {} | |
| self._import_cache = {} | |
| self._dependency_graph = {} | |
| self._build_caches() | |
| def _build_caches(self): | |
| """Build caches for symbols and imports.""" | |
| for project in self.projects: | |
| if hasattr(project, "codebase") and project.codebase: | |
| self._cache_symbols(project.codebase) | |
| self._cache_imports(project.codebase) | |
| self._build_dependency_graph(project.codebase) | |
| def _cache_symbols(self, codebase: Codebase): | |
| """ | |
| Cache symbols from a codebase. | |
| from weakref import WeakValueDictionary | |
| from functools import lru_cache | |
| @dataclass | |
| class CodebaseContext: | |
| projects: List[Any] | |
| config: Optional[Any] = None | |
| _symbol_cache: WeakValueDictionary = field(default_factory=WeakValueDictionary) | |
| _import_cache: WeakValueDictionary = field(default_factory=WeakValueDictionary) | |
| _dependency_graph: Dict[str, Set[str]] = field(default_factory=dict) | |
| _cache_stats: Dict[str, int] = field(default_factory=lambda: {'hits': 0, 'misses': 0}) | |
| def __post_init__(self): | |
| self._build_caches() | |
| @lru_cache(maxsize=1000) | |
| def get_symbol(self, name: str) -> Optional[Symbol]: | |
| if symbol := self._symbol_cache.get(name): | |
| self._cache_stats['hits'] += 1 | |
| return symbol | |
| self._cache_stats['misses'] += 1 | |
| return None | |
| def invalidate_caches(self): | |
| """Invalidate all caches when context changes.""" | |
| self._symbol_cache.clear() | |
| self._import_cache.clear() | |
| self.get_symbol.cache_clear() | |
| def get_cache_stats(self) -> Dict[str, int]: | |
| return self._cache_stats.copy() |
|
🧩 I need a bit more direction! This task is complex - could you break it down into smaller steps? Reach out to our Slack community channel for any help! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've completed my review and didn't find any issues... but I did find this bunny.
(\(\
( -.-)
o_(")(")Files scanned
| File Path | Reviewed |
|---|---|
| codegen-on-oss/codegen_on_oss/analysis/codebase_context.py | ✅ |
Explore our documentation to understand the languages and file types we support and the files we ignore.
Check out our docs on how you can make Korbit work best for you and your team.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @codegen-sh[bot] - I've reviewed your changes - here's some feedback:
- Consider if any core logic removed from
codebase_context.py(e.g., detailed graph syncing, transactions) needs re-implementation elsewhere. - The FastAPI application previously in
analysis.pyappears to be removed; clarify how the new API endpoints mentioned are exposed. - With analysis logic now split into multiple modules, consider if a unified facade or entry point is needed for consumers.
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 3 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| from codegen.sdk.codebase.node_classes.generic_node_classes import GenericNodeClasses | ||
|
|
||
| return GenericNodeClasses | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): The new CodebaseContext design is cleaner but may need cache invalidation safeguards.
Add a strategy to invalidate or refresh caches for symbols, imports, and dependency graphs when the codebase changes dynamically.
| - Identify dependency cycles | ||
| 1. Creating new detector classes that inherit from `ErrorDetector` | ||
| 2. Implementing custom analysis logic in the `detect_errors` method | ||
| 3. Adding the new detector to the `CodeAnalysisError` class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Clarify role of CodeAnalysisError in adding custom detectors
The example omits step 3’s detector registration in CodeAnalysisError, which isn’t typically used for registration. Please clarify this step or correct the class name and update the example.
| ) | ||
|
|
||
|
|
||
| class ParameterValidator(ErrorDetector): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting common logic like function lookup and error grouping into helper functions to reduce code duplication and improve code organization.
Consider extracting common patterns into helper functions to reduce nesting and repetition. For example, the logic to find a function by name in the codebase is repeated in both parameter and call validations. You can move it into the base class:
class ErrorDetector:
# ... existing code ...
def _find_function(self, name: str) -> Any:
for f in self.codebase.functions:
if f.name == name:
return f
return NoneThen update the validations accordingly:
# In ParameterValidator._check_parameter_count_mismatches
called_function = self._find_function(call.name)
if not called_function or not hasattr(called_function, "parameters"):
continue# In ParameterValidator._check_missing_required_parameters
called_function = self._find_function(call.name)
if not called_function or not hasattr(called_function, "parameters") or not hasattr(call, "arguments"):
continueThis approach lowers nesting and centralizes the lookup logic. Similarly, consider extracting repetitive loops (like grouping errors in detect_errors) into a reusable helper to further simplify the code.
| else: | ||
| type_stats["variables_without_type"] += 1 | ||
|
|
||
| return { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting reusable helper functions for error creation and grouping to reduce code duplication and improve readability.
Consider extracting reusable helper functions for error creation and grouping to simplify the nested loops and duplicated structures. For example, you can refactor the repeated error append code into a private helper in the validator:
def _report_error(self, issue: TypeIssue, message: str, function, line_number: int | None = None):
self.errors.append(
TypeValidationError(
issue=issue,
message=message,
file_path=function.filepath,
function_name=function.name,
line_number=line_number,
)
)Then update your validation methods to use it:
if not hasattr(function, "return_type") or not function.return_type:
self._report_error(
TypeIssue.MISSING_ANNOTATION,
f"Function '{function.name}' is missing a return type annotation",
function
)Similarly, in your analyze_types function, you can refactor error grouping by using helper functions:
def group_errors(errors, key_func):
groups = {}
for error in errors:
key = key_func(error)
groups.setdefault(key, []).append({
"message": error.message,
"file_path": error.file_path,
"function_name": error.function_name,
"line_number": error.line_number,
})
return groups
errors_by_issue = group_errors(errors, lambda err: err.issue.name)
errors_by_file = group_errors(errors, lambda err: err.file_path)These small focused changes reduce inline repetition and nested conditions while keeping all functionality intact.
|
|
||
| return max(dfs(callee, depth + 1) for callee in self.callees[node]) | ||
|
|
||
| return dfs(function_name, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting the inline DFS traversals in get_call_depth() and find_path() into separate helper methods to improve readability and reduce nesting.
Consider extracting the inline DFS traversals into separate helper methods. This can reduce complexity by isolating recursion and improving readability. For example, you could refactor both DFS traversals in get_call_depth() and find_path() as follows:
def _dfs_call_depth(self, node: str, depth: int, visited: set[str]) -> int:
if node in visited:
return depth
visited.add(node)
if not self.callees.get(node, set()):
return depth
return max(self._dfs_call_depth(callee, depth + 1, visited.copy())
for callee in self.callees[node])
def get_call_depth(self, function_name: str) -> int:
return self._dfs_call_depth(function_name, 0, set())Similarly, for find_path():
def _dfs_find_path(
self, node: str, target: str, visited: set[str], path: list[str]
) -> bool:
if node == target:
path.append(node)
return True
if node in visited:
return False
visited.add(node)
path.append(node)
for callee in self.callees.get(node, set()):
if self._dfs_find_path(callee, target, visited, path):
return True
path.pop()
return False
def find_path(self, from_function: str, to_function: str) -> list[str]:
if from_function == to_function:
return [from_function]
visited = set()
path = []
return path if self._dfs_find_path(from_function, to_function, visited, path) else []These small, focused methods encapsulate the DFS logic, reduce nesting, and improve code clarity without changing functionality.
| # Find the function | ||
| function = None | ||
| for f in self.codebase.functions: | ||
| if f.name == function_name: | ||
| function = f | ||
| break | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Use the built-in function next instead of a for-loop (use-next)
| # Find the function | |
| function = None | |
| for f in self.codebase.functions: | |
| if f.name == function_name: | |
| function = f | |
| break | |
| function = next( | |
| (f for f in self.codebase.functions if f.name == function_name), None | |
| ) |
| param_counts = [] | ||
|
|
||
| for function in self.codebase.functions: | ||
| if hasattr(function, "parameters"): | ||
| param_counts.append((function.name, len(function.parameters))) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Convert for loop into list comprehension (list-comprehension)
| param_counts = [] | |
| for function in self.codebase.functions: | |
| if hasattr(function, "parameters"): | |
| param_counts.append((function.name, len(function.parameters))) | |
| param_counts = [ | |
| (function.name, len(function.parameters)) | |
| for function in self.codebase.functions | |
| if hasattr(function, "parameters") | |
| ] |
| # Find unused parameters | ||
| unused = param_names - used_names | ||
| if unused: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)
| # Find unused parameters | |
| unused = param_names - used_names | |
| if unused: | |
| if unused := param_names - used_names: |
| if hasattr(function, "code_block") and hasattr( | ||
| function, "return_type" | ||
| ): | ||
| return_types = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Convert for loop into set comprehension (set-comprehension)
| if not hasattr(function, "code_block"): | ||
| continue | ||
|
|
||
| self.inferred_types[function.name] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Convert for loop into dictionary comprehension (
dict-comprehension) - Use the built-in function
nextinstead of a for-loop (use-next)
Code Review: Enhanced Analysis ModuleI've reviewed the PR that enhances the analysis module with detailed error detection and validation capabilities. Here's my assessment: Overall ArchitectureThe PR introduces a well-structured, modular approach to code analysis with three main components:
Strengths
Suggestions for Improvement
ConclusionThis PR significantly enhances the analysis capabilities of the codebase with well-designed, modular components for error detection, function call analysis, and type validation. The architecture is clean and extensible, making it easy to add new analysis capabilities in the future. I recommend addressing the integration with the main |
Specific Code SuggestionsError Detection Module
def get_errors_by_severity(self, severity: ErrorSeverity) -> list[DetectedError]:
"""
Get errors of a specific severity level.
Args:
severity: The severity level to filter by
Returns:
A list of errors with the specified severity
"""
return [error for error in self.errors if error.severity == severity]
Function Call Analysis Module
def get_call_depth(self, function_name: str) -> int:
"""Get the maximum call depth of a function."""
if not hasattr(self, "_depth_cache"):
self._depth_cache = {}
if function_name in self._depth_cache:
return self._depth_cache[function_name]
# ... existing implementation ...
self._depth_cache[function_name] = result
return resultType Validation Module
General Suggestions
class CodeAnalyzer:
"""
Central class for code analysis that integrates all analysis components.
"""
def __init__(self, codebase: Codebase, context: CodebaseContext | None = None):
self.codebase = codebase
self.context = context
def analyze_all(self) -> dict[str, Any]:
"""
Perform comprehensive analysis of the codebase.
Returns:
A dictionary containing all analysis results
"""
return {
"errors": self.analyze_errors(),
"function_calls": self.analyze_function_calls(),
"types": self.analyze_types(),
"complexity": self.analyze_complexity(),
"imports": self.analyze_imports()
}
def analyze_errors(self) -> dict[str, Any]:
"""
Analyze errors in the codebase.
Returns:
A dictionary containing error analysis results
"""
return detect_errors(self.codebase, self.context)
def analyze_function_calls(self) -> dict[str, Any]:
"""
Analyze function calls in the codebase.
Returns:
A dictionary containing function call analysis results
"""
return analyze_function_calls(self.codebase, self.context)
def analyze_types(self) -> dict[str, Any]:
"""
Analyze types in the codebase.
Returns:
A dictionary containing type analysis results
"""
return analyze_types(self.codebase, self.context)
# ... other methods ...
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class RepoRequest(BaseModel):
repo_url: str
class FileRequest(BaseModel):
repo_url: str
file_path: str
class FunctionRequest(BaseModel):
repo_url: str
function_name: str
@app.post("/analyze_repo")
def analyze_repo(request: RepoRequest):
try:
codebase = Codebase.from_repo(request.repo_url)
analyzer = CodeAnalyzer(codebase)
return analyzer.analyze_all()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze_file")
def analyze_file(request: FileRequest):
try:
codebase = Codebase.from_repo(request.repo_url)
analyzer = CodeAnalyzer(codebase)
file = analyzer.find_file_by_path(request.file_path)
if not file:
raise HTTPException(status_code=404, detail="File not found")
# Analyze just this file
# ...
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ... other endpoints ... |
|
I've completed a comprehensive review of PR #20 "Enhance Analysis Module with Detailed Error Detection and Validation". The PR introduces excellent enhancements to the analysis capabilities with three well-designed modules:
I've left detailed comments on the PR with:
The main areas for improvement are:
Overall, this is a valuable addition to the codebase with a clean, extensible architecture that will make it easier to identify and fix issues in codebases. 💻 View my work • React 👍 or 👎 |
27f0eca to
f4656a2
Compare
Enhanced Analysis Module
This PR significantly enhances the analysis capabilities of the codebase by adding detailed error detection, function call analysis, and type validation features. These enhancements will allow users to identify issues in their code with greater precision and gain deeper insights into their codebase structure.
Key Features Added
Error Detection
Function Call Analysis
Type Validation
API Enhancements
Documentation
These enhancements will make it easier to identify and fix issues in codebases, leading to more robust and maintainable code.
Testing
The new functionality has been tested with various codebases to ensure accuracy and performance. The error detection system has been designed to minimize false positives while still catching important issues.
💻 View my work • About Codegen
Summary by Sourcery
Enhance the analysis module with comprehensive code analysis capabilities, including detailed error detection, function call analysis, and type validation features
New Features:
Enhancements:
Documentation: