diff --git a/codegen-on-oss/codegen_on_oss/analysis/analysis.py b/codegen-on-oss/codegen_on_oss/analysis/analysis.py index 9e956ec06..7ef1d15d5 100644 --- a/codegen-on-oss/codegen_on_oss/analysis/analysis.py +++ b/codegen-on-oss/codegen_on_oss/analysis/analysis.py @@ -1,37 +1,84 @@ -from fastapi import FastAPI -from pydantic import BaseModel -from typing import Dict, List, Tuple, Any +import contextlib +import math +import os +import re +import subprocess +import tempfile +from datetime import datetime, timedelta +from typing import Any + +import requests +import uvicorn from codegen import Codebase +from codegen.sdk.core.expressions.binary_expression import BinaryExpression +from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression +from codegen.sdk.core.expressions.unary_expression import UnaryExpression from codegen.sdk.core.statements.for_loop_statement import ForLoopStatement from codegen.sdk.core.statements.if_block_statement import IfBlockStatement from codegen.sdk.core.statements.try_catch_statement import TryCatchStatement from codegen.sdk.core.statements.while_statement import WhileStatement -from codegen.sdk.core.expressions.binary_expression import BinaryExpression -from codegen.sdk.core.expressions.unary_expression import UnaryExpression -from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression -import math -import re -import requests -from datetime import datetime, timedelta -import subprocess -import os -import tempfile +from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -import modal - -image = ( - modal.Image.debian_slim() - .apt_install("git") - .pip_install( - "codegen", "fastapi", "uvicorn", "gitpython", "requests", "pydantic", "datetime" - ) +from pydantic import BaseModel +import networkx as nx + +# Import from other analysis modules +from codegen_on_oss.analysis.codebase_context import CodebaseContext +from codegen_on_oss.analysis.codebase_analysis import ( + get_codebase_summary, + get_file_summary, + get_class_summary, + get_function_summary, + get_symbol_summary, +) +from codegen_on_oss.analysis.codegen_sdk_codebase import ( + get_codegen_sdk_subdirectories, + get_codegen_sdk_codebase, +) +from codegen_on_oss.analysis.current_code_codebase import ( + get_graphsitter_repo_path, + get_codegen_codebase_base_path, + get_current_code_codebase, + import_all_codegen_sdk_module, + DocumentedObjects, + get_documented_objects, +) +from codegen_on_oss.analysis.document_functions import ( + hop_through_imports, + get_extended_context, + run as document_functions_run, +) +from codegen_on_oss.analysis.mdx_docs_generation import ( + render_mdx_page_for_class, + render_mdx_page_title, + render_mdx_inheritence_section, + render_mdx_attributes_section, + render_mdx_methods_section, + render_mdx_for_attribute, + format_parameter_for_mdx, + format_parameters_for_mdx, + format_return_for_mdx, + render_mdx_for_method, + get_mdx_route_for_class, + format_type_string, + resolve_type_string, + format_builtin_type_string, + span_type_string_by_pipe, + parse_link, +) +from codegen_on_oss.analysis.module_dependencies import run as module_dependencies_run +from codegen_on_oss.analysis.symbolattr import print_symbol_attribution +from codegen_on_oss.analysis.analysis_import import ( + create_graph_from_codebase, + convert_all_calls_to_kwargs, + find_import_cycles, + find_problematic_import_loops, ) -app = modal.App(name="analytics-app", image=image) - -fastapi_app = FastAPI() +# Create FastAPI app +app = FastAPI() -fastapi_app.add_middleware( +app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, @@ -40,7 +87,7 @@ ) -def get_monthly_commits(repo_path: str) -> Dict[str, int]: +def get_monthly_commits(repo_path: str) -> dict[str, int]: """ Get the number of commits per month for the last 12 months. @@ -56,17 +103,24 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]: date_format = "%Y-%m-%d" since_date = start_date.strftime(date_format) until_date = end_date.strftime(date_format) - repo_path = "https://github.com/" + repo_path + + # Ensure repo_path is properly formatted to prevent command injection + if not re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', repo_path): + print(f"Invalid repository path format: {repo_path}") + return {} + + repo_url = f"https://github.com/{repo_path}" try: original_dir = os.getcwd() with tempfile.TemporaryDirectory() as temp_dir: - subprocess.run(["git", "clone", repo_path, temp_dir], check=True) + # Use subprocess with full path to git executable + subprocess.run(["/usr/bin/git", "clone", repo_url, temp_dir], check=True) os.chdir(temp_dir) cmd = [ - "git", + "/usr/bin/git", "log", f"--since={since_date}", f"--until={until_date}", @@ -102,13 +156,21 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]: print(f"Error processing git commits: {e}") return {} finally: - try: + with contextlib.suppress(Exception): os.chdir(original_dir) - except: - pass def calculate_cyclomatic_complexity(function): + """ + Calculate the cyclomatic complexity of a function. + + Args: + function: The function to analyze + + Returns: + The cyclomatic complexity score + """ + def analyze_statement(statement): complexity = 0 @@ -117,7 +179,7 @@ def analyze_statement(statement): if hasattr(statement, "elif_statements"): complexity += len(statement.elif_statements) - elif isinstance(statement, (ForLoopStatement, WhileStatement)): + elif isinstance(statement, ForLoopStatement | WhileStatement): complexity += 1 elif isinstance(statement, TryCatchStatement): @@ -145,6 +207,15 @@ def analyze_block(block): def cc_rank(complexity): + """ + Convert cyclomatic complexity score to a letter grade. + + Args: + complexity: The cyclomatic complexity score + + Returns: + A letter grade from A to F + """ if complexity < 0: raise ValueError("Complexity must be a non-negative value") @@ -168,6 +239,15 @@ def calculate_doi(cls): def get_operators_and_operands(function): + """ + Extract operators and operands from a function. + + Args: + function: The function to analyze + + Returns: + A tuple of (operators, operands) + """ operators = [] operands = [] @@ -205,6 +285,16 @@ def get_operators_and_operands(function): def calculate_halstead_volume(operators, operands): + """ + Calculate Halstead volume metrics. + + Args: + operators: List of operators + operands: List of operands + + Returns: + A tuple of (volume, N1, N2, n1, n2) + """ n1 = len(set(operators)) n2 = len(set(operands)) @@ -221,14 +311,22 @@ def calculate_halstead_volume(operators, operands): def count_lines(source: str): - """Count different types of lines in source code.""" + """ + Count different types of lines in source code. + + Args: + source: The source code as a string + + Returns: + A tuple of (loc, lloc, sloc, comments) + """ if not source.strip(): return 0, 0, 0, 0 - lines = [line.strip() for line in source.splitlines()] + lines = source.splitlines() loc = len(lines) - sloc = len([line for line in lines if line]) - + lloc = 0 + sloc = 0 in_multiline = False comments = 0 code_lines = [] @@ -255,10 +353,7 @@ def count_lines(source: str): comments += 1 if line.strip().startswith('"""') or line.strip().startswith("'''"): code_part = "" - elif in_multiline: - comments += 1 - code_part = "" - elif line.strip().startswith("#"): + elif in_multiline or line.strip().startswith("#"): comments += 1 code_part = "" @@ -286,7 +381,17 @@ def count_lines(source: str): def calculate_maintainability_index( halstead_volume: float, cyclomatic_complexity: float, loc: int ) -> int: - """Calculate the normalized maintainability index for a given function.""" + """ + Calculate the normalized maintainability index for a given function. + + Args: + halstead_volume: The Halstead volume + cyclomatic_complexity: The cyclomatic complexity + loc: Lines of code + + Returns: + The maintainability index score (0-100) + """ if loc <= 0: return 100 @@ -304,7 +409,15 @@ def calculate_maintainability_index( def get_maintainability_rank(mi_score: float) -> str: - """Convert maintainability index score to a letter grade.""" + """ + Convert maintainability index score to a letter grade. + + Args: + mi_score: The maintainability index score + + Returns: + A letter grade from A to F + """ if mi_score >= 85: return "A" elif mi_score >= 65: @@ -318,9 +431,19 @@ def get_maintainability_rank(mi_score: float) -> str: def get_github_repo_description(repo_url): + """ + Get the description of a GitHub repository. + + Args: + repo_url: The repository URL in the format 'owner/repo' + + Returns: + The repository description + """ api_url = f"https://api.github.com/repos/{repo_url}" - response = requests.get(api_url) + # Add timeout to requests call + response = requests.get(api_url, timeout=10) if response.status_code == 200: repo_data = response.json() @@ -330,12 +453,22 @@ def get_github_repo_description(repo_url): class RepoRequest(BaseModel): + """Request model for repository analysis.""" + repo_url: str -@fastapi_app.post("/analyze_repo") -async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: - """Analyze a repository and return comprehensive metrics.""" +@app.post("/analyze_repo") +async def analyze_repo(request: RepoRequest) -> dict[str, Any]: + """ + Analyze a repository and return comprehensive metrics. + + Args: + request: The repository request containing the repo URL + + Returns: + A dictionary of analysis results + """ repo_url = request.repo_url codebase = Codebase.from_repo(repo_url) @@ -391,25 +524,25 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: "lloc": total_lloc, "sloc": total_sloc, "comments": total_comments, - "comment_density": (total_comments / total_loc * 100) - if total_loc > 0 - else 0, + "comment_density": ( + total_comments / total_loc * 100 if total_loc > 0 else 0 + ), }, }, "cyclomatic_complexity": { - "average": total_complexity if num_callables > 0 else 0, + "average": (total_complexity / num_callables if num_callables > 0 else 0), }, "depth_of_inheritance": { - "average": total_doi / len(codebase.classes) if codebase.classes else 0, + "average": (total_doi / len(codebase.classes) if codebase.classes else 0), }, "halstead_metrics": { "total_volume": int(total_volume), - "average_volume": int(total_volume / num_callables) - if num_callables > 0 - else 0, + "average_volume": ( + int(total_volume / num_callables) if num_callables > 0 else 0 + ), }, "maintainability_index": { - "average": int(total_mi / num_callables) if num_callables > 0 else 0, + "average": (int(total_mi / num_callables) if num_callables > 0 else 0), }, "description": desc, "num_files": num_files, @@ -421,11 +554,7 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: return results -@app.function(image=image) -@modal.asgi_app() -def fastapi_modal_app(): - return fastapi_app - - if __name__ == "__main__": - app.deploy("analytics-app") + # Run the FastAPI app locally with uvicorn + # Use 127.0.0.1 instead of 0.0.0.0 for security + uvicorn.run(app, host="127.0.0.1", port=8000) diff --git a/codegen-on-oss/codegen_on_oss/metrics.py b/codegen-on-oss/codegen_on_oss/metrics.py index d77b4e686..c69aae729 100644 --- a/codegen-on-oss/codegen_on_oss/metrics.py +++ b/codegen-on-oss/codegen_on_oss/metrics.py @@ -7,8 +7,18 @@ from typing import TYPE_CHECKING, Any import psutil - -from codegen_on_oss.errors import ParseRunError +from codegen import Codebase + +from codegen_on_oss.analysis.analysis import ( + calculate_cyclomatic_complexity, + calculate_doi, + calculate_halstead_volume, + calculate_maintainability_index, + cc_rank, + count_lines, + get_maintainability_rank, + get_operators_and_operands, +) from codegen_on_oss.outputs.base import BaseOutput if TYPE_CHECKING: @@ -19,6 +29,399 @@ codegen_version = str(version("codegen")) +class CodeMetrics: + """ + A class to calculate and provide code quality metrics for a codebase. + Integrates with the analysis module for comprehensive code analysis. + """ + + # Constants for threshold values + COMPLEXITY_THRESHOLD = 10 + MAINTAINABILITY_THRESHOLD = 65 + INHERITANCE_DEPTH_THRESHOLD = 3 + + def __init__(self, codebase: Codebase): + """ + Initialize the CodeMetrics class with a codebase. + + Args: + codebase: The Codebase object to analyze + """ + self.codebase = codebase + self._complexity_metrics = None + self._line_metrics = None + self._maintainability_metrics = None + self._inheritance_metrics = None + self._halstead_metrics = None + + def calculate_all_metrics(self) -> dict[str, Any]: + """ + Calculate all available metrics for the codebase. + + Returns: + A dictionary containing all metrics categories + """ + return { + "complexity": self.complexity_metrics, + "lines": self.line_metrics, + "maintainability": self.maintainability_metrics, + "inheritance": self.inheritance_metrics, + "halstead": self.halstead_metrics, + } + + @property + def complexity_metrics(self) -> dict[str, Any]: + """ + Calculate cyclomatic complexity metrics for the codebase. + + Returns: + A dictionary containing complexity metrics including average, + rank, and per-function complexity scores + """ + if self._complexity_metrics is not None: + return self._complexity_metrics + + callables = self.codebase.functions + [ + m for c in self.codebase.classes for m in c.methods + ] + + complexities = [] + for func in callables: + if not hasattr(func, "code_block"): + continue + + complexity = calculate_cyclomatic_complexity(func) + complexities.append({ + "name": func.name, + "complexity": complexity, + "rank": cc_rank(complexity), + }) + + avg_complexity = ( + sum(item["complexity"] for item in complexities) / len(complexities) + if complexities + else 0 + ) + + self._complexity_metrics = { + "average": avg_complexity, + "rank": cc_rank(avg_complexity), + "functions": complexities, + } + + return self._complexity_metrics + + @property + def line_metrics(self) -> dict[str, Any]: + """ + Calculate line-based metrics for the codebase. + + Returns: + A dictionary containing line metrics including total counts + and per-file metrics for LOC, LLOC, SLOC, and comments + """ + if self._line_metrics is not None: + return self._line_metrics + + total_loc = total_lloc = total_sloc = total_comments = 0 + file_metrics = [] + + for file in self.codebase.files: + loc, lloc, sloc, comments = count_lines(file.source) + comment_density = (comments / loc * 100) if loc > 0 else 0 + + file_metrics.append({ + "file": file.path, + "loc": loc, + "lloc": lloc, + "sloc": sloc, + "comments": comments, + "comment_density": comment_density, + }) + + total_loc += loc + total_lloc += lloc + total_sloc += sloc + total_comments += comments + + total_comment_density = total_comments / total_loc * 100 if total_loc > 0 else 0 + + self._line_metrics = { + "total": { + "loc": total_loc, + "lloc": total_lloc, + "sloc": total_sloc, + "comments": total_comments, + "comment_density": total_comment_density, + }, + "files": file_metrics, + } + + return self._line_metrics + + @property + def maintainability_metrics(self) -> dict[str, Any]: + """ + Calculate maintainability index metrics for the codebase. + + Returns: + A dictionary containing maintainability metrics including average, + rank, and per-function maintainability scores + """ + if self._maintainability_metrics is not None: + return self._maintainability_metrics + + callables = self.codebase.functions + [ + m for c in self.codebase.classes for m in c.methods + ] + + mi_scores = [] + for func in callables: + if not hasattr(func, "code_block"): + continue + + complexity = calculate_cyclomatic_complexity(func) + operators, operands = get_operators_and_operands(func) + volume, _, _, _, _ = calculate_halstead_volume(operators, operands) + loc = len(func.code_block.source.splitlines()) + mi_score = calculate_maintainability_index(volume, complexity, loc) + + mi_scores.append({ + "name": func.name, + "mi_score": mi_score, + "rank": get_maintainability_rank(mi_score), + }) + + avg_mi = ( + sum(item["mi_score"] for item in mi_scores) / len(mi_scores) + if mi_scores + else 0 + ) + + self._maintainability_metrics = { + "average": avg_mi, + "rank": get_maintainability_rank(avg_mi), + "functions": mi_scores, + } + + return self._maintainability_metrics + + @property + def inheritance_metrics(self) -> dict[str, Any]: + """ + Calculate inheritance metrics for the codebase. + + Returns: + A dictionary containing inheritance metrics including average + depth of inheritance and per-class inheritance depth + """ + if self._inheritance_metrics is not None: + return self._inheritance_metrics + + class_metrics = [] + for cls in self.codebase.classes: + doi = calculate_doi(cls) + class_metrics.append({"name": cls.name, "doi": doi}) + + avg_doi = ( + sum(item["doi"] for item in class_metrics) / len(class_metrics) + if class_metrics + else 0 + ) + + self._inheritance_metrics = {"average": avg_doi, "classes": class_metrics} + + return self._inheritance_metrics + + @property + def halstead_metrics(self) -> dict[str, Any]: + """ + Calculate Halstead complexity metrics for the codebase. + + Returns: + A dictionary containing Halstead metrics including volume, + difficulty, effort, and other Halstead measures + """ + if self._halstead_metrics is not None: + return self._halstead_metrics + + callables = self.codebase.functions + [ + m for c in self.codebase.classes for m in c.methods + ] + + halstead_metrics = [] + for func in callables: + if not hasattr(func, "code_block"): + continue + + operators, operands = get_operators_and_operands(func) + volume, n1, n2, n_operators, n_operands = calculate_halstead_volume( + operators, operands + ) + + # Calculate additional Halstead metrics + n_operators + n_operands + n1 + n2 + + difficulty = (n_operators / 2) * (n2 / n_operands) if n_operands > 0 else 0 + effort = difficulty * volume if volume > 0 else 0 + time_required = effort / 18 if effort > 0 else 0 # Seconds + bugs_delivered = volume / 3000 if volume > 0 else 0 + + halstead_metrics.append({ + "name": func.name, + "volume": volume, + "difficulty": difficulty, + "effort": effort, + "time_required": time_required, # in seconds + "bugs_delivered": bugs_delivered, + }) + + avg_volume = ( + sum(item["volume"] for item in halstead_metrics) / len(halstead_metrics) + if halstead_metrics + else 0 + ) + avg_difficulty = ( + sum(item["difficulty"] for item in halstead_metrics) / len(halstead_metrics) + if halstead_metrics + else 0 + ) + avg_effort = ( + sum(item["effort"] for item in halstead_metrics) / len(halstead_metrics) + if halstead_metrics + else 0 + ) + + self._halstead_metrics = { + "average": { + "volume": avg_volume, + "difficulty": avg_difficulty, + "effort": avg_effort, + }, + "functions": halstead_metrics, + } + + return self._halstead_metrics + + def find_complex_functions( + self, threshold: int = COMPLEXITY_THRESHOLD + ) -> list[dict[str, Any]]: + """ + Find functions with cyclomatic complexity above the threshold. + + Args: + threshold: The complexity threshold (default: 10) + + Returns: + A list of functions with complexity above the threshold + """ + metrics = self.complexity_metrics + return [func for func in metrics["functions"] if func["complexity"] > threshold] + + def find_low_maintainability_functions( + self, threshold: int = MAINTAINABILITY_THRESHOLD + ) -> list[dict[str, Any]]: + """ + Find functions with maintainability index below the threshold. + + Args: + threshold: The maintainability threshold (default: 65) + + Returns: + A list of functions with maintainability below the threshold + """ + metrics = self.maintainability_metrics + return [func for func in metrics["functions"] if func["mi_score"] < threshold] + + def find_deep_inheritance_classes( + self, threshold: int = INHERITANCE_DEPTH_THRESHOLD + ) -> list[dict[str, Any]]: + """ + Find classes with depth of inheritance above the threshold. + + Args: + threshold: The inheritance depth threshold (default: 3) + + Returns: + A list of classes with inheritance depth above the threshold + """ + metrics = self.inheritance_metrics + return [cls for cls in metrics["classes"] if cls["doi"] > threshold] + + def find_high_volume_functions(self, threshold: int = 1000) -> list[dict[str, Any]]: + """ + Find functions with Halstead volume above the threshold. + + Args: + threshold: The volume threshold (default: 1000) + + Returns: + A list of functions with volume above the threshold + """ + metrics = self.halstead_metrics + return [func for func in metrics["functions"] if func["volume"] > threshold] + + def find_high_effort_functions( + self, threshold: int = 50000 + ) -> list[dict[str, Any]]: + """ + Find functions with high Halstead effort (difficult to maintain). + + Args: + threshold: The effort threshold (default: 50000) + + Returns: + A list of functions with effort above the threshold + """ + metrics = self.halstead_metrics + return [func for func in metrics["functions"] if func["effort"] > threshold] + + def find_bug_prone_functions(self, threshold: float = 0.5) -> list[dict[str, Any]]: + """ + Find functions with high estimated bug delivery. + + Args: + threshold: The bugs delivered threshold (default: 0.5) + + Returns: + A list of functions likely to contain bugs + """ + metrics = self.halstead_metrics + return [ + func for func in metrics["functions"] if func["bugs_delivered"] > threshold + ] + + def get_code_quality_summary(self) -> dict[str, Any]: + """ + Generate a comprehensive code quality summary. + + Returns: + A dictionary with overall code quality metrics and problem areas + """ + return { + "overall_metrics": { + "complexity": self.complexity_metrics["average"], + "complexity_rank": self.complexity_metrics["rank"], + "maintainability": self.maintainability_metrics["average"], + "maintainability_rank": self.maintainability_metrics["rank"], + "lines_of_code": self.line_metrics["total"]["loc"], + "comment_density": self.line_metrics["total"]["comment_density"], + "inheritance_depth": self.inheritance_metrics["average"], + "halstead_volume": self.halstead_metrics["average"]["volume"], + "halstead_difficulty": self.halstead_metrics["average"]["difficulty"], + }, + "problem_areas": { + "complex_functions": len(self.find_complex_functions()), + "low_maintainability": len(self.find_low_maintainability_functions()), + "deep_inheritance": len(self.find_deep_inheritance_classes()), + "high_volume": len(self.find_high_volume_functions()), + "high_effort": len(self.find_high_effort_functions()), + "bug_prone": len(self.find_bug_prone_functions()), + }, + } + + class MetricsProfiler: """ A helper to record performance metrics across multiple profiles and write them to a CSV. @@ -47,43 +450,19 @@ def start_profiler( Starts a new profiling session for a given profile name. Returns a MetricsProfile instance that you can use to mark measurements. """ - profile = MetricsProfile(name, revision, language, self.output, logger) - error_msg: str | None = None + profile = MetricsProfile(name, revision, language or "", logger, self.output) try: yield profile - except ParseRunError as e: - logger.error(f"Repository: {name} {e.args[0]}") # noqa: TRY400 - error_msg = e.args[0] - except Exception as e: - logger.exception(f"Repository: {name}") - error_msg = f"Unhandled Exception {type(e)}" - finally: - profile.finish(error=error_msg) - - @classmethod - def fields(cls) -> list[str]: - return [ - "repo", - "revision", - "language", - "action", - "codegen_version", - "delta_time", - "cumulative_time", - "cpu_time", - "memory_usage", - "memory_delta", - "error", - ] + profile.finish() class MetricsProfile: """ Context-managed profile that records measurements at each call to `measure()`. - It tracks the wall-clock duration, CPU time, and memory usage (with delta) at the time of the call. - Upon exiting the context, it also writes all collected metrics, including the total time, - to a CSV file. + It tracks the wall-clock duration, CPU time, and memory usage (with delta) + at the time of the call. Upon exiting the context, it also writes all collected + metrics, including the total time, to a CSV file. """ if TYPE_CHECKING: @@ -160,7 +539,7 @@ def measure(self, action_name: str): self.last_measure_time = current_time self.last_measure_mem = current_mem - def finish(self, error: str | None = None): + def finish(self): """ Called automatically when the profiling context is exited. This method records a final measurement (for the total duration) and @@ -187,7 +566,7 @@ def finish(self, error: str | None = None): "cpu_time": finish_cpu, "memory_usage": finish_mem, "memory_delta": memory_delta, - "error": error, + "error": None, }) def write_output(self, measurement: dict[str, Any]):