diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 4e500b424..bcba375d2 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -15,10 +15,15 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions-cool/check-user-permission@v2
+ if: github.triggering_actor != 'codegen-sh[bot]'
with:
require: write
username: ${{ github.triggering_actor }}
error-if-missing: true
+ # Skip permission check for codegen-sh[bot]
+ - name: Skip permission check for bot
+ if: github.triggering_actor == 'codegen-sh[bot]'
+ run: echo "Skipping permission check for codegen-sh[bot]"
unit-tests:
needs: access-check
diff --git a/codegen-on-oss/codegen_on_oss/analysis/README.md b/codegen-on-oss/codegen_on_oss/analysis/README.md
new file mode 100644
index 000000000..423376452
--- /dev/null
+++ b/codegen-on-oss/codegen_on_oss/analysis/README.md
@@ -0,0 +1,122 @@
+# Codegen Analysis Module
+
+A comprehensive code analysis module for the Codegen-on-OSS project that provides a unified interface for analyzing codebases.
+
+## Overview
+
+The Analysis Module integrates various specialized analysis components into a cohesive system, allowing for:
+
+- Code complexity analysis
+- Import dependency analysis
+- Documentation generation
+- Symbol attribution
+- Visualization of module dependencies
+- Comprehensive code quality metrics
+
+## Components
+
+The module consists of the following key components:
+
+- **CodeAnalyzer**: Central class that orchestrates all analysis functionality
+- **Metrics Integration**: Connection with the CodeMetrics class for comprehensive metrics
+- **Import Analysis**: Tools for analyzing import relationships and cycles
+- **Documentation Tools**: Functions for generating documentation for code
+- **Visualization**: Tools for visualizing dependencies and relationships
+
+## Usage
+
+### Basic Usage
+
+```python
+from codegen import Codebase
+from codegen_on_oss.analysis.analysis import CodeAnalyzer
+from codegen_on_oss.metrics import CodeMetrics
+
+# Load a codebase
+codebase = Codebase.from_repo("owner/repo")
+
+# Create analyzer instance
+analyzer = CodeAnalyzer(codebase)
+
+# Get codebase summary
+summary = analyzer.get_codebase_summary()
+print(summary)
+
+# Analyze complexity
+complexity_results = analyzer.analyze_complexity()
+print(f"Average cyclomatic complexity: {complexity_results['cyclomatic_complexity']['average']}")
+
+# Analyze imports
+import_analysis = analyzer.analyze_imports()
+print(f"Found {len(import_analysis['import_cycles'])} import cycles")
+
+# Create metrics instance
+metrics = CodeMetrics(codebase)
+
+# Get code quality summary
+quality_summary = metrics.get_code_quality_summary()
+print(quality_summary)
+```
+
+### Web API
+
+The module also provides a FastAPI web interface for analyzing repositories:
+
+```bash
+# Run the API server
+python -m codegen_on_oss.analysis.analysis
+```
+
+Then you can make POST requests to `/analyze_repo` with a JSON body:
+
+```json
+{
+ "repo_url": "owner/repo"
+}
+```
+
+## Key Features
+
+### Code Complexity Analysis
+
+- Cyclomatic complexity calculation
+- Halstead complexity metrics
+- Maintainability index
+- Line metrics (LOC, LLOC, SLOC, comments)
+
+### Import Analysis
+
+- Detect import cycles
+- Identify problematic import loops
+- Visualize module dependencies
+
+### Documentation Generation
+
+- Generate documentation for functions
+- Create MDX documentation for classes
+- Extract context for symbols
+
+### Symbol Attribution
+
+- Track symbol authorship
+- Analyze AI contribution
+
+### Dependency Analysis
+
+- Create dependency graphs
+- Find central files
+- Identify dependency cycles
+
+## Integration with Metrics
+
+The Analysis Module is fully integrated with the CodeMetrics class, which provides:
+
+- Comprehensive code quality metrics
+- Functions to find problematic code areas
+- Dependency analysis
+- Documentation generation
+
+## Example
+
+See `example.py` for a complete demonstration of the analysis module's capabilities.
+
diff --git a/codegen-on-oss/codegen_on_oss/analysis/analysis.py b/codegen-on-oss/codegen_on_oss/analysis/analysis.py
index 9e956ec06..d833717f8 100644
--- a/codegen-on-oss/codegen_on_oss/analysis/analysis.py
+++ b/codegen-on-oss/codegen_on_oss/analysis/analysis.py
@@ -1,37 +1,106 @@
-from fastapi import FastAPI
-from pydantic import BaseModel
-from typing import Dict, List, Tuple, Any
-from codegen import Codebase
-from codegen.sdk.core.statements.for_loop_statement import ForLoopStatement
-from codegen.sdk.core.statements.if_block_statement import IfBlockStatement
-from codegen.sdk.core.statements.try_catch_statement import TryCatchStatement
-from codegen.sdk.core.statements.while_statement import WhileStatement
-from codegen.sdk.core.expressions.binary_expression import BinaryExpression
-from codegen.sdk.core.expressions.unary_expression import UnaryExpression
-from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression
+"""
+Unified Analysis Module for Codegen-on-OSS
+
+This module serves as a central hub for all code analysis functionality, integrating
+various specialized analysis components into a cohesive system.
+"""
+
+import contextlib
import math
+import os
import re
-import requests
-from datetime import datetime, timedelta
import subprocess
-import os
import tempfile
+from datetime import UTC, datetime, timedelta
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
+from urllib.parse import urlparse
+
+import networkx as nx
+import requests
+import uvicorn
+from codegen import Codebase
+from codegen.sdk.core.binary_expression import BinaryExpression
+from codegen.sdk.core.codebase import Codebase
+from codegen.sdk.core.class_definition import Class
+from codegen.sdk.core.conditional_expression import ConditionalExpression
+from codegen.sdk.core.expressions.unary_expression import UnaryExpression
+from codegen.sdk.core.external_module import ExternalModule
+from codegen.sdk.core.file import SourceFile
+from codegen.sdk.core.directory import Directory
+from codegen.sdk.core.function import Function
+from codegen.sdk.core.import_resolution import Import
+from codegen.sdk.core.statements.for_loop_statement import ForLoopStatement
+from codegen.sdk.core.statements.if_statement import IfStatement
+from codegen.sdk.core.statements.switch_statement import SwitchStatement
+from codegen.sdk.core.statements.while_statement import WhileStatement
+from codegen.sdk.core.symbol import Symbol
+from codegen.sdk.enums import EdgeType, SymbolType
+from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
-import modal
-
-image = (
- modal.Image.debian_slim()
- .apt_install("git")
- .pip_install(
- "codegen", "fastapi", "uvicorn", "gitpython", "requests", "pydantic", "datetime"
- )
+from pydantic import BaseModel
+from zoneinfo import ZoneInfo
+
+# Import from other analysis modules
+from codegen_on_oss.analysis.codebase_context import CodebaseContext
+from codegen_on_oss.analysis.codebase_analysis import (
+ calculate_cyclomatic_complexity,
+ calculate_doi,
+ calculate_halstead_volume,
+ calculate_maintainability_index,
+ cc_rank,
+ count_lines,
+ get_maintainability_rank,
+ get_operators_and_operands,
+ print_symbol_attribution,
+)
+from codegen_on_oss.analysis.codegen_sdk_codebase import (
+ get_codegen_sdk_subdirectories,
+ get_codegen_sdk_codebase
+)
+from codegen_on_oss.analysis.current_code_codebase import (
+ get_graphsitter_repo_path,
+ get_codegen_codebase_base_path,
+ get_current_code_codebase,
+ import_all_codegen_sdk_modules,
+ DocumentedObjects,
+ get_documented_objects
+)
+from codegen_on_oss.analysis.document_functions import (
+ hop_through_imports,
+ get_extended_context,
+ run as document_functions_run
+)
+from codegen_on_oss.analysis.mdx_docs_generation import (
+ render_mdx_page_for_class,
+ render_mdx_page_title,
+ render_mdx_inheritence_section,
+ render_mdx_attributes_section,
+ render_mdx_methods_section,
+ render_mdx_for_attribute,
+ format_parameter_for_mdx,
+ format_parameters_for_mdx,
+ format_return_for_mdx,
+ render_mdx_for_method,
+ get_mdx_route_for_class,
+ format_type_string,
+ resolve_type_string,
+ format_builtin_type_string,
+ span_type_string_by_pipe,
+ parse_link
+)
+from codegen_on_oss.analysis.module_dependencies import run as module_dependencies_run
+from codegen_on_oss.analysis.symbolattr import print_symbol_attribution
+from codegen_on_oss.analysis.analysis_import import (
+ create_graph_from_codebase,
+ convert_all_calls_to_kwargs,
+ find_import_cycles,
+ find_problematic_import_loops
)
-app = modal.App(name="analytics-app", image=image)
-
-fastapi_app = FastAPI()
+# Create FastAPI app
+app = FastAPI()
-fastapi_app.add_middleware(
+app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
@@ -40,40 +109,535 @@
)
-def get_monthly_commits(repo_path: str) -> Dict[str, int]:
+class CodeAnalyzer:
+ """
+ Central class for code analysis that integrates all analysis components.
+
+ This class serves as the main entry point for all code analysis functionality,
+ providing a unified interface to access various analysis capabilities.
"""
- Get the number of commits per month for the last 12 months.
+
+ def __init__(self, codebase: Codebase):
+ """
+ Initialize the CodeAnalyzer with a codebase.
+
+ Args:
+ codebase: The Codebase object to analyze
+ """
+ self.codebase = codebase
+ self._context = None
+ self._initialized = False
+
+ def initialize(self) -> None:
+ """
+ Initialize the analyzer by setting up the context and other necessary components.
+ This is called automatically when needed but can be called explicitly for eager initialization.
+ """
+ if self._initialized:
+ return
+
+ # Initialize context if not already done
+ if self._context is None:
+ self._context = self._create_context()
+
+ self._initialized = True
+
+ def _create_context(self) -> CodebaseContext:
+ """
+ Create a CodebaseContext instance for the current codebase.
+
+ Returns:
+ A new CodebaseContext instance
+ """
+ # If the codebase already has a context, use it
+ if hasattr(self.codebase, "ctx") and self.codebase.ctx is not None:
+ return self.codebase.ctx
+
+ # Otherwise, create a new context from the codebase's configuration
+ from codegen.sdk.codebase.config import ProjectConfig
+ from codegen.configs.models.codebase import CodebaseConfig
+
+ # Create a project config from the codebase
+ project_config = ProjectConfig(
+ repo_operator=self.codebase.repo_operator,
+ programming_language=self.codebase.programming_language,
+ base_path=self.codebase.base_path
+ )
+
+ # Create and return a new context
+ return CodebaseContext([project_config], config=CodebaseConfig())
+
+ @property
+ def context(self) -> CodebaseContext:
+ """
+ Get the CodebaseContext for the current codebase.
+
+ Returns:
+ A CodebaseContext object for the codebase
+ """
+ if not self._initialized:
+ self.initialize()
+
+ return self._context
+
+ def get_codebase_summary(self) -> str:
+ """
+ Get a comprehensive summary of the codebase.
+
+ Returns:
+ A string containing summary information about the codebase
+ """
+ return get_codebase_summary(self.codebase)
+
+ def get_file_summary(self, file_path: str) -> str:
+ """
+ Get a summary of a specific file.
+
+ Args:
+ file_path: Path to the file to analyze
+
+ Returns:
+ A string containing summary information about the file
+ """
+ file = self.codebase.get_file(file_path)
+ if file is None:
+ return f"File not found: {file_path}"
+ return get_file_summary(file)
+
+ def get_class_summary(self, class_name: str) -> str:
+ """
+ Get a summary of a specific class.
+
+ Args:
+ class_name: Name of the class to analyze
+
+ Returns:
+ A string containing summary information about the class
+ """
+ for cls in self.codebase.classes:
+ if cls.name == class_name:
+ return get_class_summary(cls)
+ return f"Class not found: {class_name}"
+
+ def get_function_summary(self, function_name: str) -> str:
+ """
+ Get a summary of a specific function.
+
+ Args:
+ function_name: Name of the function to analyze
+
+ Returns:
+ A string containing summary information about the function
+ """
+ for func in self.codebase.functions:
+ if func.name == function_name:
+ return get_function_summary(func)
+ return f"Function not found: {function_name}"
+
+ def get_symbol_summary(self, symbol_name: str) -> str:
+ """
+ Get a summary of a specific symbol.
+
+ Args:
+ symbol_name: Name of the symbol to analyze
+
+ Returns:
+ A string containing summary information about the symbol
+ """
+ for symbol in self.codebase.symbols:
+ if symbol.name == symbol_name:
+ return get_symbol_summary(symbol)
+ return f"Symbol not found: {symbol_name}"
+
+ def find_symbol_by_name(self, symbol_name: str) -> Optional[Symbol]:
+ """
+ Find a symbol by its name.
+
+ Args:
+ symbol_name: Name of the symbol to find
+
+ Returns:
+ The Symbol object if found, None otherwise
+ """
+ for symbol in self.codebase.symbols:
+ if symbol.name == symbol_name:
+ return symbol
+ return None
+
+ def find_file_by_path(self, file_path: str) -> Optional[SourceFile]:
+ """
+ Find a file by its path.
+
+ Args:
+ file_path: Path to the file to find
+
+ Returns:
+ The SourceFile object if found, None otherwise
+ """
+ return self.codebase.get_file(file_path)
+
+ def find_class_by_name(self, class_name: str) -> Optional[Class]:
+ """
+ Find a class by its name.
+
+ Args:
+ class_name: Name of the class to find
+
+ Returns:
+ The Class object if found, None otherwise
+ """
+ for cls in self.codebase.classes:
+ if cls.name == class_name:
+ return cls
+ return None
+
+ def find_function_by_name(self, function_name: str) -> Optional[Function]:
+ """
+ Find a function by its name.
+
+ Args:
+ function_name: Name of the function to find
+
+ Returns:
+ The Function object if found, None otherwise
+ """
+ for func in self.codebase.functions:
+ if func.name == function_name:
+ return func
+ return None
+
+ def document_functions(self) -> None:
+ """
+ Generate documentation for functions in the codebase.
+ """
+ document_functions_run(self.codebase)
+
+ def analyze_imports(self) -> Dict[str, Any]:
+ """
+ Analyze imports in the codebase.
+
+ Returns:
+ A dictionary containing import analysis results
+ """
+ graph = create_graph_from_codebase(self.codebase)
+ cycles = find_import_cycles(graph)
+ problematic_loops = find_problematic_import_loops(graph, cycles)
+
+ return {
+ "import_graph": graph,
+ "cycles": cycles,
+ "problematic_loops": problematic_loops
+ }
+
+ def analyze_complexity(self) -> Dict[str, Any]:
+ """
+ Analyze code complexity metrics for the codebase.
+
+ Returns:
+ A dictionary containing complexity metrics
+ """
+ # Calculate cyclomatic complexity for all functions
+ complexity_results = {}
+ for func in self.codebase.functions:
+ if hasattr(func, "code_block"):
+ complexity = calculate_cyclomatic_complexity(func)
+ complexity_results[func.name] = {
+ "complexity": complexity,
+ "rank": cc_rank(complexity)
+ }
+
+ # Calculate line metrics for all files
+ line_metrics = {}
+ for file in self.codebase.files:
+ if hasattr(file, "source"):
+ loc, lloc, sloc, comments = count_lines(file.source)
+ line_metrics[file.name] = {
+ "loc": loc,
+ "lloc": lloc,
+ "sloc": sloc,
+ "comments": comments
+ }
+
+ return {
+ "cyclomatic_complexity": complexity_results,
+ "line_metrics": line_metrics
+ }
+
+ def get_dependency_graph(self) -> nx.DiGraph:
+ """
+ Generate a dependency graph for the codebase.
+
+ Returns:
+ A NetworkX DiGraph representing dependencies
+ """
+ G = nx.DiGraph()
+
+ # Add nodes for all files
+ for file in self.codebase.files:
+ G.add_node(file.name, type="file")
+
+ # Add edges for imports
+ for file in self.codebase.files:
+ for imp in file.imports:
+ if imp.imported_symbol and hasattr(imp.imported_symbol, "file"):
+ imported_file = imp.imported_symbol.file
+ if imported_file and imported_file.name != file.name:
+ G.add_edge(file.name, imported_file.name)
+
+ return G
+
+ def get_symbol_attribution(self, symbol_name: str) -> str:
+ """
+ Get attribution information for a symbol.
+
+ Args:
+ symbol_name: Name of the symbol to analyze
+
+ Returns:
+ A string containing attribution information
+ """
+ symbol = self.find_symbol_by_name(symbol_name)
+ if symbol is None:
+ return f"Symbol not found: {symbol_name}"
+
+ return print_symbol_attribution(symbol)
+
+ def get_context_for_symbol(self, symbol_name: str) -> Dict[str, Any]:
+ """
+ Get extended context information for a symbol using CodebaseContext.
+
+ Args:
+ symbol_name: Name of the symbol to analyze
+
+ Returns:
+ A dictionary containing context information
+ """
+ symbol = self.find_symbol_by_name(symbol_name)
+ if symbol is None:
+ return {"error": f"Symbol not found: {symbol_name}"}
+
+ # Use the context to get more information about the symbol
+ ctx = self.context
+
+ # Get symbol node ID in the context graph
+ node_id = None
+ for n_id, node in enumerate(ctx.nodes):
+ if isinstance(node, Symbol) and node.name == symbol_name:
+ node_id = n_id
+ break
+
+ if node_id is None:
+ return {"error": f"Symbol not found in context: {symbol_name}"}
+
+ # Get predecessors (symbols that use this symbol)
+ predecessors = []
+ for pred in ctx.predecessors(node_id):
+ if isinstance(pred, Symbol):
+ predecessors.append({
+ "name": pred.name,
+ "type": pred.symbol_type.name if hasattr(pred, "symbol_type") else "Unknown"
+ })
+
+ # Get successors (symbols used by this symbol)
+ successors = []
+ for succ in ctx.successors(node_id):
+ if isinstance(succ, Symbol):
+ successors.append({
+ "name": succ.name,
+ "type": succ.symbol_type.name if hasattr(succ, "symbol_type") else "Unknown"
+ })
+
+ return {
+ "symbol": {
+ "name": symbol.name,
+ "type": symbol.symbol_type.name if hasattr(symbol, "symbol_type") else "Unknown",
+ "file": symbol.file.name if hasattr(symbol, "file") else "Unknown"
+ },
+ "predecessors": predecessors,
+ "successors": successors
+ }
+
+ def get_file_dependencies(self, file_path: str) -> Dict[str, Any]:
+ """
+ Get dependency information for a file using CodebaseContext.
+
+ Args:
+ file_path: Path to the file to analyze
+
+ Returns:
+ A dictionary containing dependency information
+ """
+ file = self.find_file_by_path(file_path)
+ if file is None:
+ return {"error": f"File not found: {file_path}"}
+
+ # Use the context to get more information about the file
+ ctx = self.context
+
+ # Get file node ID in the context graph
+ node_id = None
+ for n_id, node in enumerate(ctx.nodes):
+ if isinstance(node, SourceFile) and node.name == file.name:
+ node_id = n_id
+ break
+
+ if node_id is None:
+ return {"error": f"File not found in context: {file_path}"}
+
+ # Get files that import this file
+ importers = []
+ for pred in ctx.predecessors(node_id, edge_type=EdgeType.IMPORT):
+ if isinstance(pred, SourceFile):
+ importers.append(pred.name)
+
+ # Get files imported by this file
+ imported = []
+ for succ in ctx.successors(node_id, edge_type=EdgeType.IMPORT):
+ if isinstance(succ, SourceFile):
+ imported.append(succ.name)
+
+ return {
+ "file": file.name,
+ "importers": importers,
+ "imported": imported
+ }
+
+ def analyze_codebase_structure(self) -> Dict[str, Any]:
+ """
+ Analyze the overall structure of the codebase using CodebaseContext.
+
+ Returns:
+ A dictionary containing structural analysis results
+ """
+ ctx = self.context
+
+ # Count nodes by type
+ node_types: Dict[str, int] = {}
+ for node in ctx.nodes:
+ node_type = type(node).__name__
+ node_types[node_type] = node_types.get(node_type, 0) + 1
+
+ # Count edges by type
+ edge_types: Dict[str, int] = {}
+ for _, _, edge in ctx.edges:
+ edge_type = edge.type.name
+ edge_types[edge_type] = edge_types.get(edge_type, 0) + 1
+
+ # Get directories structure
+ directories = {}
+ for path, directory in ctx.directories.items():
+ directories[str(path)] = {
+ "files": len([item for item in directory.items if isinstance(item, SourceFile)]),
+ "subdirectories": len([item for item in directory.items if isinstance(item, Directory)])
+ }
+
+ return {
+ "node_types": node_types,
+ "edge_types": edge_types,
+ "directories": directories
+ }
+
+ def get_symbol_dependencies(self, symbol_name: str) -> Dict[str, List[str]]:
+ """
+ Get direct dependencies of a symbol.
+
+ Args:
+ symbol_name: Name of the symbol to analyze
+
+ Returns:
+ A dictionary mapping dependency types to lists of symbol names
+ """
+ symbol = self.find_symbol_by_name(symbol_name)
+ if symbol is None:
+ return {"error": [f"Symbol not found: {symbol_name}"]}
+
+ # Initialize result dictionary
+ dependencies: Dict[str, List[str]] = {
+ "imports": [],
+ "functions": [],
+ "classes": [],
+ "variables": []
+ }
+
+ # Process dependencies based on symbol type
+ if hasattr(symbol, "dependencies"):
+ for dep in symbol.dependencies:
+ if isinstance(dep, Import):
+ if dep.imported_symbol:
+ dependencies["imports"].append(dep.imported_symbol.name)
+ elif isinstance(dep, Symbol):
+ if dep.symbol_type == SymbolType.Function:
+ dependencies["functions"].append(dep.name)
+ elif dep.symbol_type == SymbolType.Class:
+ dependencies["classes"].append(dep.name)
+ elif dep.symbol_type == SymbolType.GlobalVar:
+ dependencies["variables"].append(dep.name)
+
+ return dependencies
- Args:
- repo_path: Path to the git repository
+def get_monthly_commits(repo_path: str) -> Dict[str, int]:
+ """
+ Get monthly commit counts for a repository.
+
+ Args:
+ repo_path: Path to the repository
+
Returns:
- Dictionary with month-year as key and number of commits as value
+ A dictionary mapping month strings to commit counts
"""
- end_date = datetime.now()
+ end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=365)
date_format = "%Y-%m-%d"
since_date = start_date.strftime(date_format)
until_date = end_date.strftime(date_format)
- repo_path = "https://github.com/" + repo_path
+
+ # Validate repo_path format (should be owner/repo)
+ if not re.match(r"^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$", repo_path):
+ print(f"Invalid repository path format: {repo_path}")
+ return {}
+
+ repo_url = f"https://github.com/{repo_path}"
+
+ # Validate URL
+ try:
+ parsed_url = urlparse(repo_url)
+ if not all([parsed_url.scheme, parsed_url.netloc]):
+ print(f"Invalid URL: {repo_url}")
+ return {}
+ except Exception:
+ print(f"Invalid URL: {repo_url}")
+ return {}
try:
original_dir = os.getcwd()
with tempfile.TemporaryDirectory() as temp_dir:
- subprocess.run(["git", "clone", repo_path, temp_dir], check=True)
+ # Using a safer approach with a list of arguments and shell=False
+ subprocess.run(
+ ["git", "clone", repo_url, temp_dir],
+ check=True,
+ capture_output=True,
+ shell=False,
+ text=True,
+ )
os.chdir(temp_dir)
- cmd = [
- "git",
- "log",
- f"--since={since_date}",
- f"--until={until_date}",
- "--format=%aI",
- ]
-
- result = subprocess.run(cmd, capture_output=True, text=True, check=True)
+ # Using a safer approach with a list of arguments and shell=False
+ result = subprocess.run(
+ [
+ "git",
+ "log",
+ f"--since={since_date}",
+ f"--until={until_date}",
+ "--format=%aI",
+ ],
+ capture_output=True,
+ text=True,
+ check=True,
+ shell=False,
+ )
commit_dates = result.stdout.strip().split("\n")
monthly_counts = {}
@@ -92,7 +656,6 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]:
if month_key in monthly_counts:
monthly_counts[month_key] += 1
- os.chdir(original_dir)
return dict(sorted(monthly_counts.items()))
except subprocess.CalledProcessError as e:
@@ -102,222 +665,20 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]:
print(f"Error processing git commits: {e}")
return {}
finally:
- try:
+ with contextlib.suppress(Exception):
os.chdir(original_dir)
- except:
- pass
-
-
-def calculate_cyclomatic_complexity(function):
- def analyze_statement(statement):
- complexity = 0
-
- if isinstance(statement, IfBlockStatement):
- complexity += 1
- if hasattr(statement, "elif_statements"):
- complexity += len(statement.elif_statements)
-
- elif isinstance(statement, (ForLoopStatement, WhileStatement)):
- complexity += 1
-
- elif isinstance(statement, TryCatchStatement):
- complexity += len(getattr(statement, "except_blocks", []))
-
- if hasattr(statement, "condition") and isinstance(statement.condition, str):
- complexity += statement.condition.count(
- " and "
- ) + statement.condition.count(" or ")
-
- if hasattr(statement, "nested_code_blocks"):
- for block in statement.nested_code_blocks:
- complexity += analyze_block(block)
-
- return complexity
-
- def analyze_block(block):
- if not block or not hasattr(block, "statements"):
- return 0
- return sum(analyze_statement(stmt) for stmt in block.statements)
-
- return (
- 1 + analyze_block(function.code_block) if hasattr(function, "code_block") else 1
- )
-
-
-def cc_rank(complexity):
- if complexity < 0:
- raise ValueError("Complexity must be a non-negative value")
-
- ranks = [
- (1, 5, "A"),
- (6, 10, "B"),
- (11, 20, "C"),
- (21, 30, "D"),
- (31, 40, "E"),
- (41, float("inf"), "F"),
- ]
- for low, high, rank in ranks:
- if low <= complexity <= high:
- return rank
- return "F"
-
-
-def calculate_doi(cls):
- """Calculate the depth of inheritance for a given class."""
- return len(cls.superclasses)
-
-
-def get_operators_and_operands(function):
- operators = []
- operands = []
-
- for statement in function.code_block.statements:
- for call in statement.function_calls:
- operators.append(call.name)
- for arg in call.args:
- operands.append(arg.source)
-
- if hasattr(statement, "expressions"):
- for expr in statement.expressions:
- if isinstance(expr, BinaryExpression):
- operators.extend([op.source for op in expr.operators])
- operands.extend([elem.source for elem in expr.elements])
- elif isinstance(expr, UnaryExpression):
- operators.append(expr.ts_node.type)
- operands.append(expr.argument.source)
- elif isinstance(expr, ComparisonExpression):
- operators.extend([op.source for op in expr.operators])
- operands.extend([elem.source for elem in expr.elements])
-
- if hasattr(statement, "expression"):
- expr = statement.expression
- if isinstance(expr, BinaryExpression):
- operators.extend([op.source for op in expr.operators])
- operands.extend([elem.source for elem in expr.elements])
- elif isinstance(expr, UnaryExpression):
- operators.append(expr.ts_node.type)
- operands.append(expr.argument.source)
- elif isinstance(expr, ComparisonExpression):
- operators.extend([op.source for op in expr.operators])
- operands.extend([elem.source for elem in expr.elements])
-
- return operators, operands
-
-
-def calculate_halstead_volume(operators, operands):
- n1 = len(set(operators))
- n2 = len(set(operands))
-
- N1 = len(operators)
- N2 = len(operands)
-
- N = N1 + N2
- n = n1 + n2
-
- if n > 0:
- volume = N * math.log2(n)
- return volume, N1, N2, n1, n2
- return 0, N1, N2, n1, n2
-
-
-def count_lines(source: str):
- """Count different types of lines in source code."""
- if not source.strip():
- return 0, 0, 0, 0
-
- lines = [line.strip() for line in source.splitlines()]
- loc = len(lines)
- sloc = len([line for line in lines if line])
-
- in_multiline = False
- comments = 0
- code_lines = []
-
- i = 0
- while i < len(lines):
- line = lines[i]
- code_part = line
- if not in_multiline and "#" in line:
- comment_start = line.find("#")
- if not re.search(r'["\'].*#.*["\']', line[:comment_start]):
- code_part = line[:comment_start].strip()
- if line[comment_start:].strip():
- comments += 1
-
- if ('"""' in line or "'''" in line) and not (
- line.count('"""') % 2 == 0 or line.count("'''") % 2 == 0
- ):
- if in_multiline:
- in_multiline = False
- comments += 1
- else:
- in_multiline = True
- comments += 1
- if line.strip().startswith('"""') or line.strip().startswith("'''"):
- code_part = ""
- elif in_multiline:
- comments += 1
- code_part = ""
- elif line.strip().startswith("#"):
- comments += 1
- code_part = ""
-
- if code_part.strip():
- code_lines.append(code_part)
-
- i += 1
-
- lloc = 0
- continued_line = False
- for line in code_lines:
- if continued_line:
- if not any(line.rstrip().endswith(c) for c in ("\\", ",", "{", "[", "(")):
- continued_line = False
- continue
-
- lloc += len([stmt for stmt in line.split(";") if stmt.strip()])
-
- if any(line.rstrip().endswith(c) for c in ("\\", ",", "{", "[", "(")):
- continued_line = True
-
- return loc, lloc, sloc, comments
-
-
-def calculate_maintainability_index(
- halstead_volume: float, cyclomatic_complexity: float, loc: int
-) -> int:
- """Calculate the normalized maintainability index for a given function."""
- if loc <= 0:
- return 100
- try:
- raw_mi = (
- 171
- - 5.2 * math.log(max(1, halstead_volume))
- - 0.23 * cyclomatic_complexity
- - 16.2 * math.log(max(1, loc))
- )
- normalized_mi = max(0, min(100, raw_mi * 100 / 171))
- return int(normalized_mi)
- except (ValueError, TypeError):
- return 0
-
-
-def get_maintainability_rank(mi_score: float) -> str:
- """Convert maintainability index score to a letter grade."""
- if mi_score >= 85:
- return "A"
- elif mi_score >= 65:
- return "B"
- elif mi_score >= 45:
- return "C"
- elif mi_score >= 25:
- return "D"
- else:
- return "F"
-
-def get_github_repo_description(repo_url):
+def get_github_repo_description(repo_url: str) -> str:
+ """
+ Get the description of a GitHub repository.
+
+ Args:
+ repo_url: The repository URL in the format 'owner/repo'
+
+ Returns:
+ The repository description
+ """
api_url = f"https://api.github.com/repos/{repo_url}"
response = requests.get(api_url)
@@ -330,102 +691,136 @@ def get_github_repo_description(repo_url):
class RepoRequest(BaseModel):
+ """Request model for repository analysis."""
repo_url: str
-@fastapi_app.post("/analyze_repo")
+@app.post("/analyze_repo")
async def analyze_repo(request: RepoRequest) -> Dict[str, Any]:
- """Analyze a repository and return comprehensive metrics."""
+ """
+ Analyze a repository and return various metrics.
+
+ Args:
+ request: The repository request containing the repo URL
+
+ Returns:
+ A dictionary of analysis results
+ """
repo_url = request.repo_url
codebase = Codebase.from_repo(repo_url)
-
- num_files = len(codebase.files(extensions="*"))
- num_functions = len(codebase.functions)
- num_classes = len(codebase.classes)
-
- total_loc = total_lloc = total_sloc = total_comments = 0
- total_complexity = 0
- total_volume = 0
- total_mi = 0
- total_doi = 0
-
+
+ # Create analyzer instance
+ analyzer = CodeAnalyzer(codebase)
+
+ # Get complexity metrics
+ complexity_results = analyzer.analyze_complexity()
+
+ # Get monthly commits
monthly_commits = get_monthly_commits(repo_url)
- print(monthly_commits)
-
- for file in codebase.files:
- loc, lloc, sloc, comments = count_lines(file.source)
- total_loc += loc
- total_lloc += lloc
- total_sloc += sloc
- total_comments += comments
-
- callables = codebase.functions + [m for c in codebase.classes for m in c.methods]
-
- num_callables = 0
- for func in callables:
- if not hasattr(func, "code_block"):
- continue
-
- complexity = calculate_cyclomatic_complexity(func)
- operators, operands = get_operators_and_operands(func)
- volume, _, _, _, _ = calculate_halstead_volume(operators, operands)
- loc = len(func.code_block.source.splitlines())
- mi_score = calculate_maintainability_index(volume, complexity, loc)
-
- total_complexity += complexity
- total_volume += volume
- total_mi += mi_score
- num_callables += 1
-
- for cls in codebase.classes:
- doi = calculate_doi(cls)
- total_doi += doi
-
+
+ # Get repository description
desc = get_github_repo_description(repo_url)
-
+
+ # Analyze imports
+ import_analysis = analyzer.analyze_imports()
+
+ # Analyze codebase structure using CodebaseContext
+ structure_analysis = analyzer.analyze_codebase_structure()
+
+ # Combine all results
results = {
"repo_url": repo_url,
- "line_metrics": {
- "total": {
- "loc": total_loc,
- "lloc": total_lloc,
- "sloc": total_sloc,
- "comments": total_comments,
- "comment_density": (total_comments / total_loc * 100)
- if total_loc > 0
- else 0,
- },
- },
- "cyclomatic_complexity": {
- "average": total_complexity if num_callables > 0 else 0,
- },
- "depth_of_inheritance": {
- "average": total_doi / len(codebase.classes) if codebase.classes else 0,
- },
- "halstead_metrics": {
- "total_volume": int(total_volume),
- "average_volume": int(total_volume / num_callables)
- if num_callables > 0
- else 0,
- },
- "maintainability_index": {
- "average": int(total_mi / num_callables) if num_callables > 0 else 0,
- },
+ "line_metrics": complexity_results["line_metrics"],
+ "cyclomatic_complexity": complexity_results["cyclomatic_complexity"],
"description": desc,
- "num_files": num_files,
- "num_functions": num_functions,
- "num_classes": num_classes,
+ "num_files": len(codebase.files),
+ "num_functions": len(codebase.functions),
+ "num_classes": len(codebase.classes),
"monthly_commits": monthly_commits,
+ "import_analysis": import_analysis,
+ "structure_analysis": structure_analysis
}
-
+
return results
-@app.function(image=image)
-@modal.asgi_app()
-def fastapi_modal_app():
- return fastapi_app
+class SymbolRequest(BaseModel):
+ """Request model for symbol analysis."""
+ repo_url: str
+ symbol_name: str
+
+
+@app.post("/analyze_symbol")
+async def analyze_symbol(request: SymbolRequest) -> Dict[str, Any]:
+ """
+ Analyze a symbol and its relationships in a repository.
+
+ Args:
+ request: The symbol request containing the repo URL and symbol name
+
+ Returns:
+ A dictionary of analysis results
+ """
+ repo_url = request.repo_url
+ symbol_name = request.symbol_name
+
+ codebase = Codebase.from_repo(repo_url)
+ analyzer = CodeAnalyzer(codebase)
+
+ # Get symbol context using CodebaseContext
+ symbol_context = analyzer.get_context_for_symbol(symbol_name)
+
+ # Get symbol dependencies
+ dependencies = analyzer.get_symbol_dependencies(symbol_name)
+
+ # Get symbol attribution
+ attribution = analyzer.get_symbol_attribution(symbol_name)
+
+ return {
+ "symbol_name": symbol_name,
+ "context": symbol_context,
+ "dependencies": dependencies,
+ "attribution": attribution
+ }
+
+
+class FileRequest(BaseModel):
+ """Request model for file analysis."""
+ repo_url: str
+ file_path: str
+
+
+@app.post("/analyze_file")
+async def analyze_file(request: FileRequest) -> Dict[str, Any]:
+ """
+ Analyze a file and its relationships in a repository.
+
+ Args:
+ request: The file request containing the repo URL and file path
+
+ Returns:
+ A dictionary of analysis results
+ """
+ repo_url = request.repo_url
+ file_path = request.file_path
+
+ codebase = Codebase.from_repo(repo_url)
+ analyzer = CodeAnalyzer(codebase)
+
+ # Get file summary
+ file_summary = analyzer.get_file_summary(file_path)
+
+ # Get file dependencies using CodebaseContext
+ file_dependencies = analyzer.get_file_dependencies(file_path)
+
+ return {
+ "file_path": file_path,
+ "summary": file_summary,
+ "dependencies": file_dependencies
+ }
if __name__ == "__main__":
- app.deploy("analytics-app")
+ # Run the FastAPI app locally with uvicorn
+ uvicorn.run(app, host="0.0.0.0", port=8000)
+
diff --git a/codegen-on-oss/codegen_on_oss/analysis/codebase_context.py b/codegen-on-oss/codegen_on_oss/analysis/codebase_context.py
index 5c0fd47dd..c092356b7 100644
--- a/codegen-on-oss/codegen_on_oss/analysis/codebase_context.py
+++ b/codegen-on-oss/codegen_on_oss/analysis/codebase_context.py
@@ -121,7 +121,6 @@ class CodebaseContext:
dependency_manager: DependencyManager | None
language_engine: LanguageEngine | None
_computing = False
- _graph: PyDiGraph[Importable, Edge]
filepath_idx: dict[str, NodeId]
_ext_module_idx: dict[str, NodeId]
flags: Flags
@@ -143,8 +142,6 @@ def __init__(
from codegen.sdk.core.parser import Parser
self.progress = progress or StubProgress()
- self.__graph = PyDiGraph()
- self.__graph_ready = False
self.filepath_idx = {}
self._ext_module_idx = {}
self.generation = 0
diff --git a/codegen-on-oss/codegen_on_oss/analysis/example.py b/codegen-on-oss/codegen_on_oss/analysis/example.py
new file mode 100644
index 000000000..34dd1710a
--- /dev/null
+++ b/codegen-on-oss/codegen_on_oss/analysis/example.py
@@ -0,0 +1,103 @@
+"""
+Example script demonstrating the use of the unified analysis module.
+
+This script shows how to use the CodeAnalyzer and CodeMetrics classes
+to perform comprehensive code analysis on a repository.
+"""
+
+from codegen import Codebase
+from codegen_on_oss.analysis.analysis import CodeAnalyzer
+from codegen_on_oss.metrics import CodeMetrics
+
+
+def main():
+ """
+ Main function demonstrating the use of the analysis module.
+ """
+ print("Analyzing a sample repository...")
+
+ # Load a codebase
+ repo_name = "fastapi/fastapi"
+ codebase = Codebase.from_repo(repo_name)
+
+ print(f"Loaded codebase: {repo_name}")
+ print(f"Files: {len(codebase.files)}")
+ print(f"Functions: {len(codebase.functions)}")
+ print(f"Classes: {len(codebase.classes)}")
+
+ # Create analyzer instance
+ analyzer = CodeAnalyzer(codebase)
+
+ # Get codebase summary
+ print("\n=== Codebase Summary ===")
+ print(analyzer.get_codebase_summary())
+
+ # Analyze complexity
+ print("\n=== Complexity Analysis ===")
+ complexity_results = analyzer.analyze_complexity()
+ print(f"Average cyclomatic complexity: {complexity_results['cyclomatic_complexity']['average']:.2f}")
+ print(f"Complexity rank: {complexity_results['cyclomatic_complexity']['rank']}")
+
+ # Find complex functions
+ complex_functions = [
+ f for f in complexity_results['cyclomatic_complexity']['functions']
+ if f['complexity'] > 10
+ ][:5] # Show top 5
+
+ if complex_functions:
+ print("\nTop complex functions:")
+ for func in complex_functions:
+ print(f"- {func['name']}: Complexity {func['complexity']} (Rank {func['rank']})")
+
+ # Analyze imports
+ print("\n=== Import Analysis ===")
+ import_analysis = analyzer.analyze_imports()
+ print(f"Found {len(import_analysis['import_cycles'])} import cycles")
+
+ # Create metrics instance
+ metrics = CodeMetrics(codebase)
+
+ # Get code quality summary
+ print("\n=== Code Quality Summary ===")
+ quality_summary = metrics.get_code_quality_summary()
+
+ print("Overall metrics:")
+ for metric, value in quality_summary["overall_metrics"].items():
+ if isinstance(value, float):
+ print(f"- {metric}: {value:.2f}")
+ else:
+ print(f"- {metric}: {value}")
+
+ print("\nProblem areas:")
+ for area, count in quality_summary["problem_areas"].items():
+ print(f"- {area}: {count}")
+
+ # Find bug-prone functions
+ print("\n=== Bug-Prone Functions ===")
+ bug_prone = metrics.find_bug_prone_functions()[:5] # Show top 5
+
+ if bug_prone:
+ print("Top bug-prone functions:")
+ for func in bug_prone:
+ print(f"- {func['name']}: Estimated bugs {func['bugs_delivered']:.2f}")
+
+ # Analyze dependencies
+ print("\n=== Dependency Analysis ===")
+ dependencies = metrics.analyze_dependencies()
+
+ print(f"Dependency graph: {dependencies['dependency_graph']['nodes']} nodes, "
+ f"{dependencies['dependency_graph']['edges']} edges")
+ print(f"Dependency density: {dependencies['dependency_graph']['density']:.4f}")
+ print(f"Number of cycles: {dependencies['cycles']}")
+
+ if dependencies['most_central_files']:
+ print("\nMost central files:")
+ for file, score in dependencies['most_central_files'][:5]: # Show top 5
+ print(f"- {file}: Centrality {score:.4f}")
+
+ print("\nAnalysis complete!")
+
+
+if __name__ == "__main__":
+ main()
+
diff --git a/codegen-on-oss/codegen_on_oss/analysis/mdx_docs_generation.py b/codegen-on-oss/codegen_on_oss/analysis/mdx_docs_generation.py
index 648a3b68e..9e4543bea 100644
--- a/codegen-on-oss/codegen_on_oss/analysis/mdx_docs_generation.py
+++ b/codegen-on-oss/codegen_on_oss/analysis/mdx_docs_generation.py
@@ -110,10 +110,10 @@ def format_parameters_for_mdx(parameters: list[ParameterDoc]) -> str:
def format_return_for_mdx(return_type: list[str], return_description: str) -> str:
description = sanitize_html_for_mdx(return_description) if return_description else ""
- return_type = resolve_type_string(return_type[0])
+ return_type_str = resolve_type_string(return_type[0])
return f"""
-
+
"""
@@ -154,8 +154,8 @@ def get_mdx_route_for_class(cls_doc: ClassDoc) -> str:
def format_type_string(type_string: str) -> str:
- type_string = type_string.split("|")
- return " | ".join([type_str.strip() for type_str in type_string])
+ type_strings = type_string.split("|")
+ return " | ".join([type_str.strip() for type_str in type_strings])
def resolve_type_string(type_string: str) -> str:
diff --git a/codegen-on-oss/codegen_on_oss/metrics.py b/codegen-on-oss/codegen_on_oss/metrics.py
index d77b4e686..d81d5b20b 100644
--- a/codegen-on-oss/codegen_on_oss/metrics.py
+++ b/codegen-on-oss/codegen_on_oss/metrics.py
@@ -1,15 +1,36 @@
+"""
+Metrics module for Codegen-on-OSS
+
+This module provides tools for measuring and recording performance metrics
+and code quality metrics for codebases.
+"""
+
import json
import os
import time
+import math
from collections.abc import Generator
from contextlib import contextmanager
from importlib.metadata import version
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import psutil
+import networkx as nx
+from codegen import Codebase
from codegen_on_oss.errors import ParseRunError
from codegen_on_oss.outputs.base import BaseOutput
+from codegen_on_oss.analysis.analysis import (
+ CodeAnalyzer,
+ calculate_cyclomatic_complexity,
+ calculate_halstead_volume,
+ calculate_maintainability_index,
+ count_lines,
+ get_operators_and_operands,
+ cc_rank,
+ get_maintainability_rank,
+ calculate_doi
+)
if TYPE_CHECKING:
# Logger only available in type checking context.
@@ -19,6 +40,478 @@
codegen_version = str(version("codegen"))
+class CodeMetrics:
+ """
+ A class to calculate and provide code quality metrics for a codebase.
+ Integrates with the analysis module for comprehensive code analysis.
+ """
+
+ # Constants for threshold values
+ COMPLEXITY_THRESHOLD = 10
+ MAINTAINABILITY_THRESHOLD = 65
+ INHERITANCE_DEPTH_THRESHOLD = 3
+ VOLUME_THRESHOLD = 1000
+ EFFORT_THRESHOLD = 50000
+ BUG_THRESHOLD = 0.5
+
+ def __init__(self, codebase: Codebase):
+ """
+ Initialize the CodeMetrics class with a codebase.
+
+ Args:
+ codebase: The Codebase object to analyze
+ """
+ self.codebase = codebase
+ self.analyzer = CodeAnalyzer(codebase)
+ self._complexity_metrics = None
+ self._line_metrics = None
+ self._maintainability_metrics = None
+ self._inheritance_metrics = None
+ self._halstead_metrics = None
+
+ def calculate_all_metrics(self) -> Dict[str, Any]:
+ """
+ Calculate all available metrics for the codebase.
+
+ Returns:
+ A dictionary containing all metrics categories
+ """
+ return {
+ "complexity": self.complexity_metrics,
+ "lines": self.line_metrics,
+ "maintainability": self.maintainability_metrics,
+ "inheritance": self.inheritance_metrics,
+ "halstead": self.halstead_metrics,
+ }
+
+ @property
+ def complexity_metrics(self) -> Dict[str, Any]:
+ """
+ Calculate cyclomatic complexity metrics for the codebase.
+
+ Returns:
+ A dictionary containing complexity metrics including average,
+ rank, and per-function complexity scores
+ """
+ if self._complexity_metrics is not None:
+ return self._complexity_metrics
+
+ callables = self.codebase.functions + [
+ m for c in self.codebase.classes for m in c.methods
+ ]
+
+ complexities = []
+ for func in callables:
+ if not hasattr(func, "code_block"):
+ continue
+
+ complexity = calculate_cyclomatic_complexity(func)
+ complexities.append({
+ "name": func.name,
+ "complexity": complexity,
+ "rank": cc_rank(complexity)
+ })
+
+ avg_complexity = (
+ sum(item["complexity"] for item in complexities) / len(complexities)
+ if complexities else 0
+ )
+
+ self._complexity_metrics = {
+ "average": avg_complexity,
+ "rank": cc_rank(avg_complexity),
+ "functions": complexities
+ }
+
+ return self._complexity_metrics
+
+ @property
+ def line_metrics(self) -> Dict[str, Any]:
+ """
+ Calculate line-based metrics for the codebase.
+
+ Returns:
+ A dictionary containing line metrics including total counts
+ and per-file metrics for LOC, LLOC, SLOC, and comments
+ """
+ if self._line_metrics is not None:
+ return self._line_metrics
+
+ total_loc = total_lloc = total_sloc = total_comments = 0
+ file_metrics = []
+
+ for file in self.codebase.files:
+ loc, lloc, sloc, comments = count_lines(file.source)
+ comment_density = (comments / loc * 100) if loc > 0 else 0
+
+ file_metrics.append({
+ "file": file.path,
+ "loc": loc,
+ "lloc": lloc,
+ "sloc": sloc,
+ "comments": comments,
+ "comment_density": comment_density
+ })
+
+ total_loc += loc
+ total_lloc += lloc
+ total_sloc += sloc
+ total_comments += comments
+
+ total_comment_density = (
+ total_comments / total_loc * 100 if total_loc > 0 else 0
+ )
+
+ self._line_metrics = {
+ "total": {
+ "loc": total_loc,
+ "lloc": total_lloc,
+ "sloc": total_sloc,
+ "comments": total_comments,
+ "comment_density": total_comment_density
+ },
+ "files": file_metrics
+ }
+
+ return self._line_metrics
+
+ @property
+ def maintainability_metrics(self) -> Dict[str, Any]:
+ """
+ Calculate maintainability index metrics for the codebase.
+
+ Returns:
+ A dictionary containing maintainability metrics including average,
+ rank, and per-function maintainability scores
+ """
+ if self._maintainability_metrics is not None:
+ return self._maintainability_metrics
+
+ callables = self.codebase.functions + [
+ m for c in self.codebase.classes for m in c.methods
+ ]
+
+ mi_scores = []
+ for func in callables:
+ if not hasattr(func, "code_block"):
+ continue
+
+ complexity = calculate_cyclomatic_complexity(func)
+ operators, operands = get_operators_and_operands(func)
+ volume, _, _, _, _ = calculate_halstead_volume(operators, operands)
+ loc = len(func.code_block.source.splitlines())
+ mi_score = calculate_maintainability_index(volume, complexity, loc)
+
+ mi_scores.append({
+ "name": func.name,
+ "mi_score": mi_score,
+ "rank": get_maintainability_rank(mi_score)
+ })
+
+ avg_mi = (
+ sum(item["mi_score"] for item in mi_scores) / len(mi_scores)
+ if mi_scores else 0
+ )
+
+ self._maintainability_metrics = {
+ "average": avg_mi,
+ "rank": get_maintainability_rank(avg_mi),
+ "functions": mi_scores
+ }
+
+ return self._maintainability_metrics
+
+ @property
+ def inheritance_metrics(self) -> Dict[str, Any]:
+ """
+ Calculate inheritance metrics for the codebase.
+
+ Returns:
+ A dictionary containing inheritance metrics including average
+ depth of inheritance and per-class inheritance depth
+ """
+ if self._inheritance_metrics is not None:
+ return self._inheritance_metrics
+
+ class_metrics = []
+ for cls in self.codebase.classes:
+ doi = calculate_doi(cls)
+ class_metrics.append({
+ "name": cls.name,
+ "doi": doi
+ })
+
+ avg_doi = (
+ sum(item["doi"] for item in class_metrics) / len(class_metrics)
+ if class_metrics else 0
+ )
+
+ self._inheritance_metrics = {
+ "average": avg_doi,
+ "classes": class_metrics
+ }
+
+ return self._inheritance_metrics
+
+ @property
+ def halstead_metrics(self) -> Dict[str, Any]:
+ """
+ Calculate Halstead complexity metrics for the codebase.
+
+ Returns:
+ A dictionary containing Halstead metrics including volume,
+ difficulty, effort, and other Halstead measures
+ """
+ if self._halstead_metrics is not None:
+ return self._halstead_metrics
+
+ callables = self.codebase.functions + [
+ m for c in self.codebase.classes for m in c.methods
+ ]
+
+ halstead_metrics = []
+ for func in callables:
+ if not hasattr(func, "code_block"):
+ continue
+
+ operators, operands = get_operators_and_operands(func)
+ volume, n1, n2, n_operators, n_operands = calculate_halstead_volume(
+ operators, operands
+ )
+
+ # Calculate additional Halstead metrics
+ n = n_operators + n_operands
+ N = n1 + n2
+
+ difficulty = (
+ (n_operators / 2) * (n2 / n_operands) if n_operands > 0 else 0
+ )
+ effort = difficulty * volume if volume > 0 else 0
+ time_required = effort / 18 if effort > 0 else 0 # Seconds
+ bugs_delivered = volume / 3000 if volume > 0 else 0
+
+ halstead_metrics.append({
+ "name": func.name,
+ "volume": volume,
+ "difficulty": difficulty,
+ "effort": effort,
+ "time_required": time_required, # in seconds
+ "bugs_delivered": bugs_delivered
+ })
+
+ avg_volume = (
+ sum(item["volume"] for item in halstead_metrics) / len(halstead_metrics)
+ if halstead_metrics else 0
+ )
+ avg_difficulty = (
+ sum(item["difficulty"] for item in halstead_metrics) / len(halstead_metrics)
+ if halstead_metrics else 0
+ )
+ avg_effort = (
+ sum(item["effort"] for item in halstead_metrics) / len(halstead_metrics)
+ if halstead_metrics else 0
+ )
+
+ self._halstead_metrics = {
+ "average": {
+ "volume": avg_volume,
+ "difficulty": avg_difficulty,
+ "effort": avg_effort
+ },
+ "functions": halstead_metrics
+ }
+
+ return self._halstead_metrics
+
+ def find_complex_functions(self, threshold: int = COMPLEXITY_THRESHOLD) -> List[Dict[str, Any]]:
+ """
+ Find functions with cyclomatic complexity above the threshold.
+
+ Args:
+ threshold: The complexity threshold (default: 10)
+
+ Returns:
+ A list of functions with complexity above the threshold
+ """
+ metrics = self.complexity_metrics
+ return [
+ func for func in metrics["functions"]
+ if func["complexity"] > threshold
+ ]
+
+ def find_low_maintainability_functions(
+ self, threshold: int = MAINTAINABILITY_THRESHOLD
+ ) -> List[Dict[str, Any]]:
+ """
+ Find functions with maintainability index below the threshold.
+
+ Args:
+ threshold: The maintainability threshold (default: 65)
+
+ Returns:
+ A list of functions with maintainability below the threshold
+ """
+ metrics = self.maintainability_metrics
+ return [
+ func for func in metrics["functions"]
+ if func["mi_score"] < threshold
+ ]
+
+ def find_deep_inheritance_classes(
+ self, threshold: int = INHERITANCE_DEPTH_THRESHOLD
+ ) -> List[Dict[str, Any]]:
+ """
+ Find classes with depth of inheritance above the threshold.
+
+ Args:
+ threshold: The inheritance depth threshold (default: 3)
+
+ Returns:
+ A list of classes with inheritance depth above the threshold
+ """
+ metrics = self.inheritance_metrics
+ return [cls for cls in metrics["classes"] if cls["doi"] > threshold]
+
+ def find_high_volume_functions(self, threshold: int = VOLUME_THRESHOLD) -> List[Dict[str, Any]]:
+ """
+ Find functions with Halstead volume above the threshold.
+
+ Args:
+ threshold: The volume threshold (default: 1000)
+
+ Returns:
+ A list of functions with volume above the threshold
+ """
+ metrics = self.halstead_metrics
+ return [
+ func for func in metrics["functions"]
+ if func["volume"] > threshold
+ ]
+
+ def find_high_effort_functions(self, threshold: int = EFFORT_THRESHOLD) -> List[Dict[str, Any]]:
+ """
+ Find functions with high Halstead effort (difficult to maintain).
+
+ Args:
+ threshold: The effort threshold (default: 50000)
+
+ Returns:
+ A list of functions with effort above the threshold
+ """
+ metrics = self.halstead_metrics
+ return [
+ func for func in metrics["functions"]
+ if func["effort"] > threshold
+ ]
+
+ def find_bug_prone_functions(self, threshold: float = BUG_THRESHOLD) -> List[Dict[str, Any]]:
+ """
+ Find functions with high estimated bug delivery.
+
+ Args:
+ threshold: The bugs delivered threshold (default: 0.5)
+
+ Returns:
+ A list of functions likely to contain bugs
+ """
+ metrics = self.halstead_metrics
+ return [
+ func for func in metrics["functions"]
+ if func["bugs_delivered"] > threshold
+ ]
+
+ def get_code_quality_summary(self) -> Dict[str, Any]:
+ """
+ Generate a comprehensive code quality summary.
+
+ Returns:
+ A dictionary with overall code quality metrics and problem areas
+ """
+ return {
+ "overall_metrics": {
+ "complexity": self.complexity_metrics["average"],
+ "complexity_rank": self.complexity_metrics["rank"],
+ "maintainability": self.maintainability_metrics["average"],
+ "maintainability_rank": self.maintainability_metrics["rank"],
+ "lines_of_code": self.line_metrics["total"]["loc"],
+ "comment_density": self.line_metrics["total"]["comment_density"],
+ "inheritance_depth": self.inheritance_metrics["average"],
+ "halstead_volume": self.halstead_metrics["average"]["volume"],
+ "halstead_difficulty": self.halstead_metrics["average"]["difficulty"],
+ },
+ "problem_areas": {
+ "complex_functions": len(self.find_complex_functions()),
+ "low_maintainability": len(self.find_low_maintainability_functions()),
+ "deep_inheritance": len(self.find_deep_inheritance_classes()),
+ "high_volume": len(self.find_high_volume_functions()),
+ "high_effort": len(self.find_high_effort_functions()),
+ "bug_prone": len(self.find_bug_prone_functions()),
+ },
+ "import_analysis": self.analyzer.analyze_imports()
+ }
+
+ def analyze_codebase_structure(self) -> Dict[str, Any]:
+ """
+ Analyze the structure of the codebase.
+
+ Returns:
+ A dictionary with codebase structure information
+ """
+ return {
+ "summary": self.analyzer.get_codebase_summary(),
+ "files": len(self.codebase.files),
+ "functions": len(self.codebase.functions),
+ "classes": len(self.codebase.classes),
+ "imports": len(self.codebase.imports),
+ "symbols": len(self.codebase.symbols)
+ }
+
+ def generate_documentation(self) -> None:
+ """
+ Generate documentation for the codebase.
+ """
+ self.analyzer.document_functions()
+
+ def analyze_dependencies(self) -> Dict[str, Any]:
+ """
+ Analyze dependencies in the codebase.
+
+ Returns:
+ A dictionary with dependency analysis results
+ """
+ # Create a dependency graph
+ G = nx.DiGraph()
+
+ # Add nodes for all files
+ for file in self.codebase.files:
+ G.add_node(file.path)
+
+ # Add edges for imports
+ for imp in self.codebase.imports:
+ if imp.from_file and imp.to_file:
+ G.add_edge(imp.from_file.filepath, imp.to_file.filepath)
+
+ # Find cycles
+ cycles = list(nx.simple_cycles(G))
+
+ # Calculate centrality metrics
+ centrality = nx.degree_centrality(G)
+
+ return {
+ "dependency_graph": {
+ "nodes": len(G.nodes),
+ "edges": len(G.edges),
+ "density": nx.density(G)
+ },
+ "cycles": len(cycles),
+ "most_central_files": sorted(
+ [(file, score) for file, score in centrality.items()],
+ key=lambda x: x[1],
+ reverse=True
+ )[:10]
+ }
+
+
class MetricsProfiler:
"""
A helper to record performance metrics across multiple profiles and write them to a CSV.
@@ -42,7 +535,7 @@ def __init__(self, output: BaseOutput):
@contextmanager
def start_profiler(
self, name: str, revision: str, language: str | None, logger: "Logger"
- ) -> Generator["MetricsProfile", None, None]:
+ ) -> Generator[Any, None, None]:
"""
Starts a new profiling session for a given profile name.
Returns a MetricsProfile instance that you can use to mark measurements.
@@ -81,9 +574,9 @@ def fields(cls) -> list[str]:
class MetricsProfile:
"""
Context-managed profile that records measurements at each call to `measure()`.
- It tracks the wall-clock duration, CPU time, and memory usage (with delta) at the time of the call.
- Upon exiting the context, it also writes all collected metrics, including the total time,
- to a CSV file.
+ It tracks the wall-clock duration, CPU time, and memory usage (with delta)
+ at the time of the call. Upon exiting the context, it also writes all collected
+ metrics, including the total time, to a CSV file.
"""
if TYPE_CHECKING:
@@ -131,7 +624,9 @@ def measure(self, action_name: str):
"""
current_time = time.perf_counter()
current_cpu = float(time.process_time())
- current_mem = int(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024))
+ current_mem = int(
+ psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
+ )
# Calculate time deltas.
delta_time = current_time - self.last_measure_time
@@ -168,7 +663,9 @@ def finish(self, error: str | None = None):
"""
finish_time = time.perf_counter()
finish_cpu = float(time.process_time())
- finish_mem = int(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024))
+ finish_mem = int(
+ psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
+ )
total_duration = finish_time - self.start_time
@@ -196,3 +693,4 @@ def write_output(self, measurement: dict[str, Any]):
"""
self.logger.info(json.dumps(measurement, indent=4))
self.output.write_output(measurement)
+