diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 0000000..47460af --- /dev/null +++ b/benchmark.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +""" +Benchmarking script for waka-readme-stats + +This script runs performance benchmarks on various parts of the codebase +to identify bottlenecks and measure improvements. + +Usage: + python benchmark.py --username [--full] + +Options: + --username GitHub username to use for benchmarking + --full Run full benchmark suite (including API calls) + --no-cache Disable caching for benchmarking +""" + +import argparse +import os +import sys +import time +from pathlib import Path + +# Add the parent directory to the path so we can import from sources +parent_dir = Path(__file__).resolve().parent +sys.path.append(str(parent_dir)) + +from sources.benchmarking import BenchmarkTracker, benchmark +from sources.manager_cache import CacheManager + +# Import conditionally to avoid errors if running without full dependencies +try: + from sources.main import main as waka_main +except ImportError: + print("Failed to import main module. Make sure all dependencies are installed.") + sys.exit(1) + + +@benchmark(name="Full Execution", metadata={"type": "full_run"}) +def run_full_benchmark(username, use_cache=True): + """Run a full benchmark of the waka-readme-stats process. + + Args: + username: GitHub username to use for benchmarking + use_cache: Whether to use caching during benchmarking + """ + # Set up environment variables for the test + os.environ["INPUT_GH_TOKEN"] = os.environ.get("GH_TOKEN", "") + os.environ["INPUT_WAKATIME_API_KEY"] = os.environ.get("WAKATIME_API_KEY", "") + os.environ["INPUT_SHOW_TIMEZONE"] = "True" + os.environ["INPUT_SHOW_LANGUAGE"] = "True" + os.environ["INPUT_SHOW_EDITORS"] = "True" + os.environ["INPUT_SHOW_PROJECTS"] = "True" + os.environ["INPUT_SHOW_OS"] = "True" + os.environ["INPUT_SHOW_COMMIT"] = "True" + os.environ["INPUT_SHOW_LANGUAGE_PER_REPO"] = "True" + os.environ["GITHUB_REPOSITORY"] = f"{username}/{username}" + + # Control caching behavior + if not use_cache: + # Clear cache before running + cache_manager = CacheManager(username) + cache_manager.clear_cache() + + # Run the main function + try: + waka_main() + except Exception as e: + print(f"Error running benchmark: {e}") + + +def print_system_info(): + """Print system information for context.""" + import platform + import multiprocessing + + print("System Information:") + print(f" - Python version: {platform.python_version()}") + print(f" - OS: {platform.system()} {platform.release()}") + print(f" - CPU cores: {multiprocessing.cpu_count()}") + print() + + +def main(): + """Main benchmark function.""" + parser = argparse.ArgumentParser(description="Benchmark waka-readme-stats") + parser.add_argument( + "--username", + required=True, + help="GitHub username to use for benchmarking" + ) + parser.add_argument( + "--full", + action="store_true", + help="Run full benchmark suite (including API calls)" + ) + parser.add_argument( + "--no-cache", + action="store_true", + help="Disable caching for benchmarking" + ) + + args = parser.parse_args() + + print("Starting benchmarks for waka-readme-stats...\n") + print_system_info() + + # Run with cache + if not args.no_cache: + print("Running benchmark with caching enabled...") + start_time = time.time() + run_full_benchmark(args.username, use_cache=True) + print(f"Completed in {time.time() - start_time:.2f}s with caching enabled\n") + + # Run without cache for comparison if requested + if args.no_cache: + print("Running benchmark with caching disabled...") + start_time = time.time() + run_full_benchmark(args.username, use_cache=False) + print(f"Completed in {time.time() - start_time:.2f}s with caching disabled\n") + + # Print detailed benchmark results + print(BenchmarkTracker.get_summary()) + + +if __name__ == "__main__": + main() diff --git a/sources/benchmarking.py b/sources/benchmarking.py new file mode 100644 index 0000000..8760099 --- /dev/null +++ b/sources/benchmarking.py @@ -0,0 +1,151 @@ +import time +from functools import wraps +from typing import Dict, Any, Callable, List, Optional, Tuple + + +class BenchmarkResult: + """Contains the result of a performance benchmark.""" + + def __init__(self, name: str, execution_time: float, metadata: Optional[Dict[str, Any]] = None): + """Initialize the benchmark result. + + Args: + name: Name of the benchmarked function or operation + execution_time: Time taken to execute in seconds + metadata: Additional metadata about the benchmark + """ + self.name = name + self.execution_time = execution_time + self.metadata = metadata or {} + + def __str__(self) -> str: + """String representation of the benchmark result.""" + return f"{self.name}: {self.execution_time:.4f}s" + + +class BenchmarkTracker: + """Tracks and manages benchmarks for performance analysis.""" + + _results: List[BenchmarkResult] = [] + + @classmethod + def add_result(cls, result: BenchmarkResult) -> None: + """Add a benchmark result to the tracker. + + Args: + result: The benchmark result to add + """ + cls._results.append(result) + + @classmethod + def get_results(cls) -> List[BenchmarkResult]: + """Get all benchmark results. + + Returns: + List of benchmark results + """ + return cls._results + + @classmethod + def clear_results(cls) -> None: + """Clear all benchmark results.""" + cls._results.clear() + + @classmethod + def get_total_execution_time(cls) -> float: + """Get the total execution time of all benchmarks. + + Returns: + Total execution time in seconds + """ + return sum(result.execution_time for result in cls._results) + + @classmethod + def get_summary(cls) -> str: + """Get a formatted summary of all benchmark results. + + Returns: + Formatted summary string + """ + if not cls._results: + return "No benchmarks recorded." + + summary = "Performance Benchmark Summary:\n" + summary += "=================================\n" + + for result in cls._results: + summary += f"{result}\n" + + # Add metadata if present + if result.metadata: + for key, value in result.metadata.items(): + summary += f" - {key}: {value}\n" + + summary += "=================================\n" + summary += f"Total execution time: {cls.get_total_execution_time():.4f}s\n" + + return summary + + +def benchmark(name: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Callable: + """Decorator to benchmark a function's execution time. + + Args: + name: Optional name for the benchmark + metadata: Optional metadata about the benchmark + + Returns: + Decorated function + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + benchmark_name = name if name else func.__name__ + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + + execution_time = end_time - start_time + + # Add dynamic metadata if provided + final_metadata = metadata.copy() if metadata else {} + if 'args_count' not in final_metadata: + final_metadata['args_count'] = len(args) + + benchmark_result = BenchmarkResult( + name=benchmark_name, + execution_time=execution_time, + metadata=final_metadata + ) + + BenchmarkTracker.add_result(benchmark_result) + return result + return wrapper + return decorator + + +def benchmark_block(name: str, metadata: Optional[Dict[str, Any]] = None) -> Tuple[Callable, Callable]: + """Context manager for benchmarking a block of code. + + Args: + name: Name for the benchmark + metadata: Optional metadata about the benchmark + + Returns: + Start and end functions for the benchmark + """ + start_time = [0.0] # Use a list to allow modification in nested scope + + def start() -> None: + start_time[0] = time.time() + + def end() -> None: + execution_time = time.time() - start_time[0] + benchmark_result = BenchmarkResult( + name=name, + execution_time=execution_time, + metadata=metadata + ) + BenchmarkTracker.add_result(benchmark_result) + + return start, end diff --git a/sources/benchmarking_test.py b/sources/benchmarking_test.py new file mode 100644 index 0000000..1790191 --- /dev/null +++ b/sources/benchmarking_test.py @@ -0,0 +1,123 @@ +import time +from unittest.mock import patch + +import pytest + +from sources.benchmarking import benchmark, benchmark_block, BenchmarkTracker, BenchmarkResult + + +@pytest.fixture +def clear_benchmark_results(): + """Fixture to clear benchmark results before and after each test.""" + BenchmarkTracker.clear_results() + yield + BenchmarkTracker.clear_results() + + +def test_benchmark_decorator(clear_benchmark_results): + """Test the benchmark decorator functionality.""" + # Define a function to benchmark + @benchmark() + def example_function(sleep_time): + time.sleep(sleep_time) + return "result" + + # Run the function + result = example_function(0.01) + + # Check the function still returns correctly + assert result == "result" + + # Check that the benchmark was recorded + benchmark_results = BenchmarkTracker.get_results() + assert len(benchmark_results) == 1 + assert benchmark_results[0].name == "example_function" + assert benchmark_results[0].execution_time >= 0.01 + assert benchmark_results[0].metadata.get("args_count") == 1 + + +def test_benchmark_with_custom_name(clear_benchmark_results): + """Test benchmark decorator with custom name.""" + @benchmark(name="CustomTest") + def example_function(): + return "result" + + example_function() + + benchmark_results = BenchmarkTracker.get_results() + assert len(benchmark_results) == 1 + assert benchmark_results[0].name == "CustomTest" + + +def test_benchmark_with_metadata(clear_benchmark_results): + """Test benchmark decorator with custom metadata.""" + @benchmark(metadata={"category": "io_operations"}) + def example_function(): + return "result" + + example_function() + + benchmark_results = BenchmarkTracker.get_results() + assert len(benchmark_results) == 1 + assert benchmark_results[0].metadata.get("category") == "io_operations" + assert benchmark_results[0].metadata.get("args_count") == 0 + + +def test_benchmark_block(clear_benchmark_results): + """Test the benchmark_block context manager.""" + start, end = benchmark_block("test_block", {"type": "code_block"}) + + start() + time.sleep(0.01) + end() + + benchmark_results = BenchmarkTracker.get_results() + assert len(benchmark_results) == 1 + assert benchmark_results[0].name == "test_block" + assert benchmark_results[0].execution_time >= 0.01 + assert benchmark_results[0].metadata.get("type") == "code_block" + + +def test_benchmark_tracker_get_total_execution_time(clear_benchmark_results): + """Test getting total execution time from the tracker.""" + BenchmarkTracker.add_result(BenchmarkResult("test1", 1.5)) + BenchmarkTracker.add_result(BenchmarkResult("test2", 2.5)) + + assert BenchmarkTracker.get_total_execution_time() == 4.0 + + +def test_benchmark_tracker_get_summary(clear_benchmark_results): + """Test getting a summary from the tracker.""" + BenchmarkTracker.add_result(BenchmarkResult( + "test1", 1.5, {"category": "api_calls"})) + BenchmarkTracker.add_result(BenchmarkResult( + "test2", 2.5, {"category": "data_processing"})) + + summary = BenchmarkTracker.get_summary() + + assert "Performance Benchmark Summary:" in summary + assert "test1: 1.5000s" in summary + assert "test2: 2.5000s" in summary + assert "category: api_calls" in summary + assert "category: data_processing" in summary + assert "Total execution time: 4.0000s" in summary + + +def test_benchmark_tracker_get_summary_empty(clear_benchmark_results): + """Test getting a summary when no benchmarks are recorded.""" + assert BenchmarkTracker.get_summary() == "No benchmarks recorded." + + +def test_benchmark_tracker_clear_results(clear_benchmark_results): + """Test clearing benchmark results.""" + BenchmarkTracker.add_result(BenchmarkResult("test1", 1.5)) + assert len(BenchmarkTracker.get_results()) == 1 + + BenchmarkTracker.clear_results() + assert len(BenchmarkTracker.get_results()) == 0 + + +def test_benchmark_result_str(): + """Test string representation of benchmark result.""" + result = BenchmarkResult("test_func", 1.2345) + assert str(result) == "test_func: 1.2345s" diff --git a/sources/manager_cache.py b/sources/manager_cache.py new file mode 100644 index 0000000..d7db222 --- /dev/null +++ b/sources/manager_cache.py @@ -0,0 +1,112 @@ +import json +import os +import time +from pathlib import Path +from typing import Dict, Any, Optional + + +class CacheManager: + """Manages caching for GitHub repository data to improve performance. + + This class provides functionality to cache and retrieve repository data, + significantly reducing API calls and processing time for users with many repos. + """ + + CACHE_DIR = '.cache' + CACHE_EXPIRY = 86400 # Cache expiry in seconds (24 hours) + + def __init__(self, user_id: str): + """Initialize the cache manager. + + Args: + user_id: GitHub username or organization name to create user-specific cache + """ + self.user_id = user_id + self.cache_path = Path(self.CACHE_DIR) / f"{user_id}_repo_cache.json" + self._ensure_cache_dir() + + def _ensure_cache_dir(self) -> None: + """Ensure cache directory exists.""" + os.makedirs(self.CACHE_DIR, exist_ok=True) + + def get_cached_data(self, repo_name: str) -> Optional[Dict[str, Any]]: + """Get cached data for a specific repository if it exists and is valid. + + Args: + repo_name: The name of the repository + + Returns: + The cached repository data or None if not cached or expired + """ + if not self.cache_path.exists(): + return None + + try: + with open(self.cache_path, 'r') as f: + cache_data = json.load(f) + + if repo_name not in cache_data: + return None + + repo_cache = cache_data[repo_name] + # Check if cache is expired + if time.time() - repo_cache.get('timestamp', 0) > self.CACHE_EXPIRY: + return None + + return repo_cache.get('data') + except (json.JSONDecodeError, IOError): + # If cache file is corrupted or cannot be read, return None + return None + + def update_cache(self, repo_name: str, data: Dict[str, Any]) -> None: + """Update the cache with new repository data. + + Args: + repo_name: The name of the repository + data: The repository data to cache + """ + cache_data = {} + if self.cache_path.exists(): + try: + with open(self.cache_path, 'r') as f: + cache_data = json.load(f) + except (json.JSONDecodeError, IOError): + # If cache file is corrupted, start with an empty cache + cache_data = {} + + # Update cache with new data + cache_data[repo_name] = { + 'timestamp': time.time(), + 'data': data + } + + with open(self.cache_path, 'w') as f: + json.dump(cache_data, f) + + def clear_cache(self) -> None: + """Clear the entire cache for the user.""" + if self.cache_path.exists(): + os.remove(self.cache_path) + + def get_repo_last_modified(self, repo_name: str) -> Optional[float]: + """Get the last modified timestamp of a cached repository. + + Args: + repo_name: The name of the repository + + Returns: + Timestamp of last modification or None if not cached + """ + if not self.cache_path.exists(): + return None + + try: + with open(self.cache_path, 'r') as f: + cache_data = json.load(f) + + if repo_name not in cache_data: + return None + + return cache_data[repo_name].get('timestamp') + except (json.JSONDecodeError, IOError): + return None diff --git a/sources/manager_cache_test.py b/sources/manager_cache_test.py new file mode 100644 index 0000000..791b64c --- /dev/null +++ b/sources/manager_cache_test.py @@ -0,0 +1,132 @@ +import json +import os +import time +from pathlib import Path + +import pytest + +from sources.manager_cache import CacheManager + + +@pytest.fixture +def cache_manager(): + manager = CacheManager('test_user') + # Ensure clean state for tests + if Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json').exists(): + os.remove(Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json')) + yield manager + # Clean up after tests + if Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json').exists(): + os.remove(Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json')) + + +def test_ensure_cache_dir_creation(cache_manager): + """Test that the cache directory is created.""" + assert Path(CacheManager.CACHE_DIR).exists() + + +def test_get_cached_data_no_cache_file(cache_manager): + """Test getting data when no cache file exists.""" + assert cache_manager.get_cached_data('repo1') is None + + +def test_update_and_get_cache(cache_manager): + """Test updating and retrieving cache.""" + test_data = {'name': 'repo1', 'language': 'Python'} + cache_manager.update_cache('repo1', test_data) + + assert Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json').exists() + assert cache_manager.get_cached_data('repo1') == test_data + + +def test_update_existing_cache(cache_manager): + """Test updating existing cache entry.""" + # Set initial data + initial_data = {'name': 'repo1', 'language': 'Python'} + cache_manager.update_cache('repo1', initial_data) + + # Update with new data + updated_data = {'name': 'repo1', 'language': 'JavaScript'} + cache_manager.update_cache('repo1', updated_data) + + # Verify update worked + assert cache_manager.get_cached_data('repo1') == updated_data + + +def test_multiple_repos_cache(cache_manager): + """Test caching multiple repositories.""" + repo1_data = {'name': 'repo1', 'language': 'Python'} + repo2_data = {'name': 'repo2', 'language': 'JavaScript'} + + cache_manager.update_cache('repo1', repo1_data) + cache_manager.update_cache('repo2', repo2_data) + + assert cache_manager.get_cached_data('repo1') == repo1_data + assert cache_manager.get_cached_data('repo2') == repo2_data + + +def test_clear_cache(cache_manager): + """Test clearing the cache.""" + # Add some data + cache_manager.update_cache('repo1', {'data': 'test'}) + + # Verify it exists + assert cache_manager.get_cached_data('repo1') is not None + + # Clear and verify it's gone + cache_manager.clear_cache() + assert cache_manager.get_cached_data('repo1') is None + assert not Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json').exists() + + +def test_cache_expiry(cache_manager, monkeypatch): + """Test that expired cache entries are not returned.""" + # Add data + cache_manager.update_cache('repo1', {'data': 'test'}) + + # Verify it exists + assert cache_manager.get_cached_data('repo1') is not None + + # Mock time to simulate passage of time beyond expiry + current_time = time.time() + future_time = current_time + CacheManager.CACHE_EXPIRY + 100 + monkeypatch.setattr(time, 'time', lambda: future_time) + + # Verify expired cache is not returned + assert cache_manager.get_cached_data('repo1') is None + + +def test_corrupted_cache_file(cache_manager): + """Test handling of corrupted cache files.""" + # Create a corrupted JSON file + os.makedirs(CacheManager.CACHE_DIR, exist_ok=True) + with open(Path(CacheManager.CACHE_DIR, 'test_user_repo_cache.json'), 'w') as f: + f.write('{"not valid JSON"') + + # Should handle gracefully and return None + assert cache_manager.get_cached_data('repo1') is None + + # Should be able to update cache even after corruption + cache_manager.update_cache('repo1', {'data': 'new'}) + assert cache_manager.get_cached_data('repo1') == {'data': 'new'} + + +def test_get_repo_last_modified(cache_manager, monkeypatch): + """Test getting the last modified timestamp.""" + # Mock time for consistent testing + test_time = 1617000000.0 + monkeypatch.setattr(time, 'time', lambda: test_time) + + # Add data + cache_manager.update_cache('repo1', {'data': 'test'}) + + # Check timestamp + assert cache_manager.get_repo_last_modified('repo1') == test_time + + # Non-existent repo + assert cache_manager.get_repo_last_modified('non_existent') is None + + +def test_get_repo_last_modified_no_cache(cache_manager): + """Test getting timestamp when no cache exists.""" + assert cache_manager.get_repo_last_modified('repo1') is None