diff --git a/.github/workflows/asynctest.yml.inactive b/.github/workflows/asynctest.yml.inactive new file mode 100644 index 00000000..e7406639 --- /dev/null +++ b/.github/workflows/asynctest.yml.inactive @@ -0,0 +1,89 @@ +# =============================================================== +# ๐Ÿ”„ Async Safety & Performance Testing +# =============================================================== +# +# - runs the async safety and performance tests across multiple Python versions +# - includes linting, debugging, profiling, benchmarking, and validation +# - uploads the test artifacts as build artifacts +# - performs a performance regression check +# --------------------------------------------------------------- + +name: Tests & Coverage + +on: + push: + branches: ["main"] + pull_request: + branches: ["main"] + # schedule: + # - cron: '42 3 * * 1' # Monday 03:42 UTC + +permissions: + contents: write # needed *only* if the badge-commit step is enabled + checks: write + actions: read + +jobs: + async-testing: + name: ๐Ÿ”„ Async Safety & Performance Testing + runs-on: ubuntu-latest + needs: [test] + + strategy: + fail-fast: false + matrix: + python: ["3.11", "3.12"] + + steps: + - name: โฌ‡๏ธ Checkout source + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: ๐Ÿ Setup Python ${{ matrix.python }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + cache: pip + + - name: ๐Ÿ“ฆ Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e .[dev] + pip install flake8-async flake8-bugbear pytest-asyncio snakeviz aiomonitor + + - name: ๐Ÿ” Run async linting + run: | + make async-lint + + - name: ๐Ÿ› Run async debug tests + run: | + make async-debug + + - name: ๐Ÿ“Š Generate performance profiles + run: | + make profile + + - name: โšก Run async benchmarks + run: | + make async-benchmark + + - name: โœ… Validate async patterns + run: | + make async-validate + + - name: ๐Ÿ“Ž Upload async test artifacts + uses: actions/upload-artifact@v4 + with: + name: async-test-results + path: | + async_testing/reports/ + async_testing/profiles/ + retention-days: 30 + + - name: ๐Ÿ“ˆ Performance regression check + run: | + python async_testing/check_regression.py \ + --current async_testing/profiles/latest.prof \ + --baseline async_testing/profiles/baseline.prof \ + --threshold 20 # 20% regression threshold diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index a8a4656b..988053fa 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -65,7 +65,7 @@ jobs: python3 -m pip install --upgrade pip # install the project itself in *editable* mode so tests import the same codebase # and pull in every dev / test extra declared in pyproject.toml - pip install -e .[dev] + pip install -e .[dev,asyncpg] # belt-and-braces - keep the core test tool-chain pinned here too pip install pytest pytest-cov pytest-asyncio coverage[toml] diff --git a/.gitignore b/.gitignore index dbb63416..c29951a7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +CLAUDE.local.md +.aider* +.scannerwork llms-full.txt aider* .aider* diff --git a/CLAUDE.md b/CLAUDE.md index 6da09cf3..996301c0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -228,7 +228,7 @@ pytest -m "not slow" # To test everything: make autoflake isort black pre-commit -make interrogate doctest test smoketest lint-web flake8 bandit pylint +make doctest test htmlcov smoketest lint-web flake8 bandit interrogate pylint verify # Rules - When using git commit always add a -s to sign commits diff --git a/MANIFEST.in b/MANIFEST.in index 8bf2ca97..fc9fe92a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -27,6 +27,8 @@ include *.yml include *.json include *.sh include *.txt +recursive-include async_testing *.py +recursive-include async_testing *.yaml # 3๏ธโƒฃ Tooling/lint configuration dot-files (explicit so they're not lost) include .env.make @@ -73,7 +75,8 @@ prune *.egg-info prune charts prune k8s prune .devcontainer - +exclude CLAUDE.* +exclude llms-full.txt # Exclude deployment, mcp-servers and agent_runtimes prune deployment diff --git a/Makefile b/Makefile index e018df7f..1bbf6d78 100644 --- a/Makefile +++ b/Makefile @@ -3705,6 +3705,97 @@ pip-audit: ## ๐Ÿ”’ Audit Python dependencies for CVEs python3 -m pip install --quiet --upgrade pip-audit && \ pip-audit --strict || true" + + +## --------------------------------------------------------------------------- ## +## Async Code Testing and Performance Profiling +## --------------------------------------------------------------------------- ## +.PHONY: async-test async-lint profile async-monitor async-debug profile-serve + +ASYNC_TEST_DIR := async_testing +PROFILE_DIR := $(ASYNC_TEST_DIR)/profiles +REPORTS_DIR := $(ASYNC_TEST_DIR)/reports +VENV_PYTHON := $(VENV_DIR)/bin/python + +async-test: async-lint async-debug + @echo "๐Ÿ”„ Running comprehensive async safety tests..." + @mkdir -p $(REPORTS_DIR) + @PYTHONASYNCIODEBUG=1 $(VENV_PYTHON) -m pytest \ + tests/ \ + --asyncio-mode=auto \ + --tb=short \ + --junitxml=$(REPORTS_DIR)/async-test-results.xml \ + -v + +async-lint: + @echo "๐Ÿ” Running async-aware linting..." + @$(VENV_DIR)/bin/ruff check mcpgateway/ tests/ \ + --select=F,E,B,ASYNC \ + --output-format=github + @$(VENV_DIR)/bin/flake8 mcpgateway/ tests/ \ + --extend-select=B,ASYNC \ + --max-line-length=100 + @$(VENV_DIR)/bin/mypy mcpgateway/ \ + --warn-unused-coroutine \ + --strict + +profile: + @echo "๐Ÿ“Š Generating async performance profiles..." + @mkdir -p $(PROFILE_DIR) + @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/profiler.py \ + --scenarios websocket,database,mcp_calls \ + --output $(PROFILE_DIR) \ + --duration 60 + @echo "๐ŸŒ Starting SnakeViz server..." + @$(VENV_DIR)/bin/snakeviz $(PROFILE_DIR)/combined_profile.prof \ + --server --port 8080 + +profile-serve: + @echo "๐ŸŒ Starting SnakeViz profile server..." + @$(VENV_DIR)/bin/snakeviz $(PROFILE_DIR) \ + --server --port 8080 --hostname 0.0.0.0 + +async-monitor: + @echo "๐Ÿ‘๏ธ Starting aiomonitor for live async debugging..." + @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/monitor_runner.py \ + --webui_port 50101 \ + --console_port 50102 \ + --host localhost \ + --console-enabled + +async-debug: + @echo "๐Ÿ› Running async tests with debug mode..." + @PYTHONASYNCIODEBUG=1 $(VENV_PYTHON) -X dev \ + -m pytest tests/ \ + --asyncio-mode=auto \ + --capture=no \ + -v + +async-benchmark: + @echo "โšก Running async performance benchmarks..." + @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/benchmarks.py \ + --output $(REPORTS_DIR)/benchmark-results.json \ + --iterations 1000 + +profile-compare: + @echo "๐Ÿ“ˆ Comparing performance profiles..." + @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/profile_compare.py \ + --baseline $(PROFILE_DIR)/combined_profile.prof \ + --current $(PROFILE_DIR)/mcp_calls_profile.prof \ + --output $(REPORTS_DIR)/profile-comparison.json + +async-validate: + @echo "โœ… Validating async code patterns..." + @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/async_validator.py \ + --source mcpgateway/ \ + --report $(REPORTS_DIR)/async-validation.json + +async-clean: + @echo "๐Ÿงน Cleaning async testing artifacts..." + @rm -rf $(PROFILE_DIR)/* $(REPORTS_DIR)/* + @pkill -f "aiomonitor" || true + @pkill -f "snakeviz" || true + ## --------------------------------------------------------------------------- ## ## Gitleaks (Go binary - separate installation) ## --------------------------------------------------------------------------- ## diff --git a/async_testing/async_validator.py b/async_testing/async_validator.py new file mode 100644 index 00000000..0e8c3d82 --- /dev/null +++ b/async_testing/async_validator.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +""" +Validate async code patterns and detect common pitfalls. +""" + +import ast +import argparse +import json +from pathlib import Path +from typing import List, Dict, Any + +class AsyncCodeValidator: + """Validate async code for common patterns and pitfalls.""" + + def __init__(self): + self.issues = [] + self.suggestions = [] + + def validate_directory(self, source_dir: Path) -> Dict[str, Any]: + """Validate all Python files in directory.""" + + validation_results = { + 'files_checked': 0, + 'issues_found': 0, + 'suggestions': 0, + 'details': [] + } + + python_files = list(source_dir.rglob("*.py")) + + for file_path in python_files: + if self._should_skip_file(file_path): + continue + + file_results = self._validate_file(file_path) + validation_results['details'].append(file_results) + validation_results['files_checked'] += 1 + validation_results['issues_found'] += len(file_results['issues']) + validation_results['suggestions'] += len(file_results['suggestions']) + + return validation_results + + def _validate_file(self, file_path: Path) -> Dict[str, Any]: + """Validate a single Python file.""" + + file_results = { + 'file': str(file_path), + 'issues': [], + 'suggestions': [] + } + + try: + with open(file_path, 'r', encoding='utf-8') as f: + source_code = f.read() + + tree = ast.parse(source_code, filename=str(file_path)) + + # Analyze AST for async patterns + validator = AsyncPatternVisitor(file_path) + validator.visit(tree) + + file_results['issues'] = validator.issues + file_results['suggestions'] = validator.suggestions + + except Exception as e: + file_results['issues'].append({ + 'type': 'parse_error', + 'message': f"Failed to parse file: {str(e)}", + 'line': 0 + }) + + return file_results + + + def _should_skip_file(self, file_path: Path) -> bool: + """Determine if a file should be skipped (e.g., __init__.py files).""" + return file_path.name == "__init__.py" + +class AsyncPatternVisitor(ast.NodeVisitor): + """AST visitor to detect async patterns and issues.""" + + def __init__(self, file_path: Path): + self.file_path = file_path + self.issues = [] + self.suggestions = [] + self.in_async_function = False + + def visit_AsyncFunctionDef(self, node): + """Visit async function definitions.""" + + self.in_async_function = True + + # Check for blocking operations in async functions + self._check_blocking_operations(node) + + # Check for proper error handling + self._check_error_handling(node) + + self.generic_visit(node) + self.in_async_function = False + + def visit_Call(self, node): + """Visit function calls.""" + + if self.in_async_function: + # Check for potentially unawaited async calls + self._check_unawaited_calls(node) + + # Check for blocking I/O operations + self._check_blocking_io(node) + + self.generic_visit(node) + + def _check_blocking_operations(self, node): + """Check for blocking operations in async functions.""" + + blocking_patterns = [ + 'time.sleep', + 'requests.get', 'requests.post', + 'subprocess.run', 'subprocess.call', + 'open' # File I/O without async + ] + + for child in ast.walk(node): + if isinstance(child, ast.Call): + call_name = self._get_call_name(child) + if call_name in blocking_patterns: + self.issues.append({ + 'type': 'blocking_operation', + 'message': f"Blocking operation '{call_name}' in async function", + 'line': child.lineno, + 'suggestion': f"Use async equivalent of {call_name}" + }) + + def _check_unawaited_calls(self, node): + """Check for potentially unawaited async calls.""" + + # Look for calls that might return coroutines + async_patterns = [ + 'aiohttp', 'asyncio', 'asyncpg', + 'websockets', 'motor' # Common async libraries + ] + + call_name = self._get_call_name(node) + + for pattern in async_patterns: + if pattern in call_name: + # Check if this call is awaited + parent = getattr(node, 'parent', None) + if not isinstance(parent, ast.Await): + self.suggestions.append({ + 'type': 'potentially_unawaited', + 'message': f"Call to '{call_name}' might need await", + 'line': node.lineno + }) + break + + def _get_call_name(self, node): + """Extract the name of a function call.""" + + if isinstance(node.func, ast.Name): + return node.func.id + elif isinstance(node.func, ast.Attribute): + if isinstance(node.func.value, ast.Name): + return f"{node.func.value.id}.{node.func.attr}" + else: + return node.func.attr + return "unknown" + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Validate async code patterns and detect common pitfalls.") + parser.add_argument("--source", type=Path, required=True, help="Source directory to validate.") + parser.add_argument("--report", type=Path, required=True, help="Path to the output validation report.") + + args = parser.parse_args() + + validator = AsyncCodeValidator() + results = validator.validate_directory(args.source) + + with open(args.report, 'w') as f: + json.dump(results, f, indent=4) + + print(f"Validation report saved to {args.report}") diff --git a/async_testing/benchmarks.py b/async_testing/benchmarks.py new file mode 100644 index 00000000..83f4be8b --- /dev/null +++ b/async_testing/benchmarks.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +""" +Run async performance benchmarks and output results. +""" +import asyncio +import time +import json +import argparse +from pathlib import Path +from typing import Any, Dict + +class AsyncBenchmark: + """Run async performance benchmarks.""" + + def __init__(self, iterations: int): + self.iterations = iterations + self.results: Dict[str, Any] = { + 'iterations': self.iterations, + 'benchmarks': [] + } + + async def run_benchmarks(self) -> None: + """Run all benchmarks.""" + + # Example benchmarks + await self._benchmark_example("Example Benchmark 1", self.example_benchmark_1) + await self._benchmark_example("Example Benchmark 2", self.example_benchmark_2) + + async def _benchmark_example(self, name: str, benchmark_func) -> None: + """Run a single benchmark and record its performance.""" + + start_time = time.perf_counter() + + for _ in range(self.iterations): + await benchmark_func() + + end_time = time.perf_counter() + total_time = end_time - start_time + avg_time = total_time / self.iterations + + self.results['benchmarks'].append({ + 'name': name, + 'total_time': total_time, + 'average_time': avg_time + }) + + async def example_benchmark_1(self) -> None: + """An example async benchmark function.""" + await asyncio.sleep(0.001) + + async def example_benchmark_2(self) -> None: + """Another example async benchmark function.""" + await asyncio.sleep(0.002) + + def save_results(self, output_path: Path) -> None: + """Save benchmark results to a file.""" + + with open(output_path, 'w') as f: + json.dump(self.results, f, indent=4) + + print(f"Benchmark results saved to {output_path}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run async performance benchmarks.") + parser.add_argument("--output", type=Path, required=True, help="Path to the output benchmark results file.") + parser.add_argument("--iterations", type=int, default=1000, help="Number of iterations to run each benchmark.") + + args = parser.parse_args() + + benchmark = AsyncBenchmark(args.iterations) + asyncio.run(benchmark.run_benchmarks()) + benchmark.save_results(args.output) diff --git a/async_testing/config.yaml b/async_testing/config.yaml new file mode 100644 index 00000000..6bf4a429 --- /dev/null +++ b/async_testing/config.yaml @@ -0,0 +1,25 @@ +async_linting: + ruff_rules: ["F", "E", "B", "ASYNC"] + flake8_plugins: ["flake8-bugbear", "flake8-async"] + mypy_config: + warn_unused_coroutine: true + strict: true + +profiling: + output_dir: "async_testing/profiles" + snakeviz_port: 8080 + profile_scenarios: + - "websocket_stress_test" + - "database_query_performance" + - "concurrent_mcp_calls" + +monitoring: + aiomonitor_port: 50101 + debug_mode: true + task_tracking: true + +performance_thresholds: + websocket_connection: 100 # ms + database_query: 50 # ms + mcp_rpc_call: 100 # ms + concurrent_users: 100 # simultaneous connections diff --git a/async_testing/monitor_runner.py b/async_testing/monitor_runner.py new file mode 100644 index 00000000..f9871c3e --- /dev/null +++ b/async_testing/monitor_runner.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +""" +Runtime async monitoring with aiomonitor integration. +""" +import asyncio +from typing import Any, Dict +import aiomonitor +import argparse + +class AsyncMonitor: + """Monitor live async operations in mcpgateway.""" + + def __init__(self, webui_port: int = 50101, console_port: int = 50102, host: str = "localhost"): + self.webui_port = webui_port + self.console_port = console_port + self.host = host + self.monitor = None + self.running = False + + async def start_monitoring(self, console_enabled: bool = True): + """Start aiomonitor for live async debugging.""" + + print(f"๐Ÿ‘๏ธ Starting aiomonitor on http://{self.host}:{self.webui_port}") + + # Configure aiomonitor + self.monitor = aiomonitor.Monitor( + asyncio.get_event_loop(), + host=self.host, + webui_port=self.webui_port, + console_port=self.console_port, # TODO: FIX CONSOLE NOT CONNECTING TO PORT + console_enabled=console_enabled, + locals={'monitor': self} + ) + + self.monitor.start() + self.running = True + + if console_enabled: + print(f"๐ŸŒ aiomonitor console available at: http://{self.host}:{self.console_port}") + print("๐Ÿ“Š Available commands: ps, where, cancel, signal, console") + print("๐Ÿ” Use 'ps' to list running tasks") + print("๐Ÿ“ Use 'where ' to see task stack trace") + + # Keep monitoring running + try: + while self.running: + await asyncio.sleep(1) + + # Periodic task summary + tasks = [t for t in asyncio.all_tasks() if not t.done()] + if len(tasks) % 100 == 0 and len(tasks) > 0: + print(f"๐Ÿ“ˆ Current active tasks: {len(tasks)}") + + except KeyboardInterrupt: # TODO: FIX STACK TRACE STILL APPEARING ON CTRL-C + print("\n๐Ÿ›‘ Stopping aiomonitor...") + finally: + self.monitor.close() + + def stop_monitoring(self): + """Stop the monitoring.""" + self.running = False + + async def get_task_summary(self) -> Dict[str, Any]: + """Get summary of current async tasks.""" + + tasks = asyncio.all_tasks() + + summary: Dict[str, Any] = { + 'total_tasks': len(tasks), + 'running_tasks': len([t for t in tasks if not t.done()]), + 'completed_tasks': len([t for t in tasks if t.done()]), + 'cancelled_tasks': len([t for t in tasks if t.cancelled()]), + 'task_details': [] + } + + for task in tasks: + if not task.done(): + summary['task_details'].append({ + 'name': getattr(task, '_name', 'unnamed'), + 'state': task._state.name if hasattr(task, '_state') else 'unknown', + 'coro': str(task._coro) if hasattr(task, '_coro') else 'unknown' + }) + + return summary + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run aiomonitor for live async debugging.") + parser.add_argument("--host", type=str, default="localhost", help="Host to run aiomonitor on.") + parser.add_argument("--webui_port", type=int, default=50101, help="Port to run aiomonitor on.") + parser.add_argument("--console_port", type=int, default=50102, help="Port to run aiomonitor on.") + parser.add_argument("--console-enabled", action="store_true", help="Enable console for aiomonitor.") + + args = parser.parse_args() + + monitor = AsyncMonitor(webui_port=args.webui_port, console_port=args.console_port, host=args.host) + asyncio.run(monitor.start_monitoring(console_enabled=args.console_enabled)) diff --git a/async_testing/profile_compare.py b/async_testing/profile_compare.py new file mode 100644 index 00000000..c900e2bb --- /dev/null +++ b/async_testing/profile_compare.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +""" +Compare async performance profiles between builds. +""" + +import pstats +import json +import argparse +from pathlib import Path +from typing import Dict, Any + +class ProfileComparator: + """Compare performance profiles and detect regressions.""" + + def compare_profiles(self, baseline_path: Path, current_path: Path) -> Dict[str, Any]: + """Compare two performance profiles.""" + + baseline_stats = pstats.Stats(str(baseline_path)) + current_stats = pstats.Stats(str(current_path)) + + comparison: Dict[str, Any] = { + 'baseline_file': str(baseline_path), + 'current_file': str(current_path), + 'regressions': [], + 'improvements': [], + 'summary': {} + } + + # Compare overall performance + baseline_total_time = baseline_stats.total_tt + current_total_time = current_stats.total_tt + + total_time_change = ( + (current_total_time - baseline_total_time) / baseline_total_time * 100 + ) + + comparison['summary']['total_time_change'] = total_time_change + + # Compare function-level performance + baseline_functions = self._extract_function_stats(baseline_stats) + current_functions = self._extract_function_stats(current_stats) + + for func_name, baseline_time in baseline_functions.items(): + if func_name in current_functions: + current_time: float = current_functions[func_name] + change_percent = (current_time - baseline_time) / baseline_time * 100 + + if change_percent > 20: # 20% regression threshold + comparison['regressions'].append({ + 'function': func_name, + 'baseline_time': baseline_time, + 'current_time': current_time, + 'change_percent': change_percent + }) + elif change_percent < -10: # 10% improvement + comparison['improvements'].append({ + 'function': func_name, + 'baseline_time': baseline_time, + 'current_time': current_time, + 'change_percent': change_percent + }) + + return comparison + + + def _extract_function_stats(self, stats: pstats.Stats) -> Dict[str, float]: + """Extract function-level statistics from pstats.Stats.""" + + functions = {} + + for func, stat in stats.stats.items(): + func_name = f"{func[0]}:{func[1]}:{func[2]}" + tottime = stat[2] # Extract the 'tottime' (total time spent in the given function) + functions[func_name] = tottime + + return functions + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Compare performance profiles.") + parser.add_argument("--baseline", type=Path, required=True, help="Path to the baseline profile.") + parser.add_argument("--current", type=Path, required=True, help="Path to the current profile.") + parser.add_argument("--output", type=Path, required=True, help="Path to the output comparison report.") + + args = parser.parse_args() + + comparator = ProfileComparator() + comparison = comparator.compare_profiles(args.baseline, args.current) + + with open(args.output, 'w') as f: + json.dump(comparison, f, indent=4) + + print(f"Comparison report saved to {args.output}") diff --git a/async_testing/profiler.py b/async_testing/profiler.py new file mode 100644 index 00000000..47b14c32 --- /dev/null +++ b/async_testing/profiler.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +""" +Comprehensive async performance profiler for mcpgateway. +""" +import asyncio +import cProfile +import pstats +import time +import aiohttp +import websockets +import argparse +import json +from pathlib import Path +from typing import Dict, List, Any, Union + +class AsyncProfiler: + """Profile async operations in mcpgateway.""" + + def __init__(self, output_dir: str): + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + self.profiles = {} + + def _generate_combined_profile(self, scenarios: List[str]) -> None: + """ + Generate a combined profile for the given scenarios. + Args: + scenarios (List[str]): A list of scenario names. + """ + combined_profile_path = self.output_dir / "combined_profile.prof" + print(f"Generating combined profile at {combined_profile_path}") + + stats = pstats.Stats() + + for scenario in scenarios: + profile_path = self.output_dir / f"{scenario}_profile.prof" + stats.add(str(profile_path)) + + stats.dump_stats(str(combined_profile_path)) + + + def _generate_summary_report(self, results: Dict[str, Any]) -> Dict[str, Any]: + """ + Generate a summary report from the profiling results. + Args: + results (Dict[str, Any]): The profiling results. + """ + # Implementation of the summary report generation + print("Generating summary report with results:", results) + return {"results": results} + + + async def profile_all_scenarios(self, scenarios: List[str], duration: int) -> Dict[str, Any]: + """Profile all specified async scenarios.""" + + results: Dict[str, Union[Dict[str, Any], float]] = { + 'scenarios': {}, + 'summary': {}, + 'timestamp': time.time() + } + + # Ensure 'scenarios' and 'summary' keys are dictionaries + results['scenarios'] = {} + results['summary'] = {} + + for scenario in scenarios: + print(f"๐Ÿ“Š Profiling scenario: {scenario}") + + profile_path = self.output_dir / f"{scenario}_profile.prof" + profile_result = await self._profile_scenario(scenario, duration, profile_path) + + results['scenarios'][scenario] = profile_result + + # Generate combined profile + self._generate_combined_profile(scenarios) + + # Generate summary report + results['summary'] = self._generate_summary_report(results['scenarios']) + + return results + + async def _profile_scenario(self, scenario: str, duration: int, + output_path: Path) -> Dict[str, Any]: + """Profile a specific async scenario.""" + + scenario_methods = { + 'websocket': self._profile_websocket_operations, + 'database': self._profile_database_operations, + 'mcp_calls': self._profile_mcp_operations, + 'concurrent_requests': self._profile_concurrent_requests + } + + if scenario not in scenario_methods: + raise ValueError(f"Unknown scenario: {scenario}") + + # Run profiling + profiler = cProfile.Profile() + profiler.enable() + + start_time = time.time() + scenario_result = await scenario_methods[scenario](duration) + end_time = time.time() + + profiler.disable() + profiler.dump_stats(str(output_path)) + + # Analyze profile + stats = pstats.Stats(str(output_path)) + stats.sort_stats('cumulative') + + return { + 'scenario': scenario, + 'duration': end_time - start_time, + 'profile_file': str(output_path), + 'total_calls': stats.total_calls, + 'total_time': stats.total_tt, + 'top_functions': self._extract_top_functions(stats), + 'async_metrics': scenario_result + } + + async def _profile_concurrent_requests(self, duration: int) -> Dict[str, Any]: + """Profile concurrent HTTP requests.""" + + metrics: Dict[str, float] = { + 'requests_made': 0, + 'avg_response_time': 0, + 'successful_requests': 0, + 'failed_requests': 0 + } + + async def make_request(): + try: + async with aiohttp.ClientSession() as session: + start_time = time.time() + + async with session.get("http://localhost:4444/ws") as response: + await response.text() + + response_time = time.time() - start_time + metrics['requests_made'] += 1 + metrics['successful_requests'] += 1 + metrics['avg_response_time'] = ( + (metrics['avg_response_time'] * (metrics['requests_made'] - 1) + response_time) + / metrics['requests_made'] + ) + + except Exception: + metrics['failed_requests'] += 1 + + # Run concurrent requests + tasks: List[Any] = [] + end_time = time.time() + duration + + while time.time() < end_time: + if len(tasks) < 10: # Max 10 concurrent requests + task = asyncio.create_task(make_request()) + tasks.append(task) + + # Clean up completed tasks + tasks = [t for t in tasks if not t.done()] + await asyncio.sleep(0.1) + + # Wait for remaining tasks + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + return metrics + + async def _profile_websocket_operations(self, duration: int) -> Dict[str, Any]: + """Profile WebSocket connection and message handling.""" + + metrics: Dict[str, float] = { + 'connections_established': 0, + 'messages_sent': 0, + 'messages_received': 0, + 'connection_errors': 0, + 'avg_latency': 0 + } + + async def websocket_client(): + try: + async with websockets.connect("ws://localhost:4444/ws") as websocket: + metrics['connections_established'] += 1 + + # Send test messages + for i in range(10): + message = json.dumps({"type": "ping", "data": f"test_{i}"}) + start_time = time.time() + + await websocket.send(message) + metrics['messages_sent'] += 1 + + response = await websocket.recv() + metrics['messages_received'] += 1 + + latency = time.time() - start_time + metrics['avg_latency'] = ( + (metrics['avg_latency'] * i + latency) / (i + 1) + ) + + await asyncio.sleep(0.1) + + except Exception as e: + metrics['connection_errors'] += 1 + + # Run concurrent WebSocket clients + tasks: List[Any] = [] + end_time = time.time() + duration + + while time.time() < end_time: + if len(tasks) < 10: # Max 10 concurrent connections + task = asyncio.create_task(websocket_client()) + tasks.append(task) + + # Clean up completed tasks + tasks = [t for t in tasks if not t.done()] + await asyncio.sleep(0.1) + + # Wait for remaining tasks + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + return metrics + + async def _profile_database_operations(self, duration: int) -> Dict[str, Any]: + """Profile database query performance.""" + + metrics: Dict[str, float] = { + 'queries_executed': 0, + 'avg_query_time': 0, + 'connection_time': 0, + 'errors': 0 + } + + # Simulate database operations + async def database_operations(): + try: + # Simulate async database queries + query_start = time.time() + + # Mock database query (replace with actual database calls) + await asyncio.sleep(0.01) # Simulate 10ms query + + query_time = time.time() - query_start + metrics['queries_executed'] += 1 + metrics['avg_query_time'] = ( + (metrics['avg_query_time'] * (metrics['queries_executed'] - 1) + query_time) + / metrics['queries_executed'] + ) + + except Exception: + metrics['errors'] += 1 + + # Run database operations for specified duration + end_time = time.time() + duration + + while time.time() < end_time: + await database_operations() + await asyncio.sleep(0.001) # Small delay between operations + + return metrics + + async def _profile_mcp_operations(self, duration: int) -> Dict[str, Any]: + """Profile MCP server communication.""" + + metrics: Dict[str, float] = { + 'rpc_calls': 0, + 'avg_rpc_time': 0, + 'successful_calls': 0, + 'failed_calls': 0 + } + + async def mcp_rpc_call(): + try: + async with aiohttp.ClientSession() as session: + payload = { + "jsonrpc": "2.0", + "method": "tools/list", + "id": 1 + } + + start_time = time.time() + + async with session.post( + "http://localhost:4444/rpc", + json=payload, + timeout=aiohttp.ClientTimeout(total=5) + ) as response: + await response.json() + + rpc_time = time.time() - start_time + metrics['rpc_calls'] += 1 + metrics['successful_calls'] += 1 + metrics['avg_rpc_time'] = ( + (metrics['avg_rpc_time'] * (metrics['rpc_calls'] - 1) + rpc_time) + / metrics['rpc_calls'] + ) + + except Exception: + metrics['failed_calls'] += 1 + + # Run MCP operations + end_time = time.time() + duration + + while time.time() < end_time: + await mcp_rpc_call() + await asyncio.sleep(0.1) + + return metrics + + def _extract_top_functions(self, stats: pstats.Stats) -> List[Dict[str, Union[str, float, int]]]: + """ + Extract the top functions from the profiling stats. + Args: + stats (pstats.Stats): The profiling statistics. + Returns: + List[Dict[str, Union[str, float, int]]]: A list of dictionaries containing the top functions. + """ + top_functions: List[Dict[str, Any]] = [] + for func_stat in stats.fcn_list[:10]: # Get top 10 functions + top_functions.append({ + 'function_name': func_stat[2], + 'call_count': stats.stats[func_stat][0], + 'total_time': stats.stats[func_stat][2], + 'cumulative_time': stats.stats[func_stat][3] + }) + return top_functions + +# Main entry point for the script +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Async performance profiler for mcpgateway.") + parser.add_argument("--scenarios", type=str, required=True, help="Comma-separated list of scenarios to profile.") + parser.add_argument("--output", type=str, required=True, help="Output directory for profile files.") + parser.add_argument("--duration", type=int, default=60, help="Duration to run each scenario (in seconds).") + + args = parser.parse_args() + + scenarios = args.scenarios.split(",") + output_dir = args.output + duration = args.duration + + profiler = AsyncProfiler(output_dir) + + asyncio.run(profiler.profile_all_scenarios(scenarios, duration)) diff --git a/mcpgateway/alembic/env.py b/mcpgateway/alembic/env.py index b05656d3..052798aa 100644 --- a/mcpgateway/alembic/env.py +++ b/mcpgateway/alembic/env.py @@ -118,7 +118,6 @@ def _inside_alembic() -> bool: disable_existing_loggers=False, ) -# First-Party # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel diff --git a/mcpgateway/db.py b/mcpgateway/db.py index 9951ca6d..187a5dd7 100644 --- a/mcpgateway/db.py +++ b/mcpgateway/db.py @@ -32,13 +32,7 @@ from sqlalchemy.event import listen from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.hybrid import hybrid_property -from sqlalchemy.orm import ( - DeclarativeBase, - Mapped, - mapped_column, - relationship, - sessionmaker, -) +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, sessionmaker from sqlalchemy.orm.attributes import get_history # First-Party diff --git a/mcpgateway/handlers/sampling.py b/mcpgateway/handlers/sampling.py index a8e805ff..2ae4698c 100644 --- a/mcpgateway/handlers/sampling.py +++ b/mcpgateway/handlers/sampling.py @@ -218,7 +218,7 @@ async def create_message(self, db: Session, request: Dict[str, Any]) -> CreateMe if not self._validate_message(msg): raise SamplingError(f"Invalid message format: {msg}") - # FIXME: Implement actual model sampling - currently returns mock response + # TODO: Implement actual model sampling - currently returns mock response # pylint: disable=fixme # For now return mock response response = self._mock_sample(messages=messages) @@ -357,7 +357,7 @@ async def _add_context(self, _db: Session, messages: List[Dict[str, Any]], _cont >>> len(result) 2 """ - # FIXME: Implement context gathering based on type - currently no-op + # TODO: Implement context gathering based on type - currently no-op # pylint: disable=fixme # For now return original messages return messages diff --git a/mcpgateway/main.py b/mcpgateway/main.py index 7e5ca752..2ba43f71 100644 --- a/mcpgateway/main.py +++ b/mcpgateway/main.py @@ -34,17 +34,7 @@ from urllib.parse import urlparse, urlunparse # Third-Party -from fastapi import ( - APIRouter, - Body, - Depends, - FastAPI, - HTTPException, - Request, - status, - WebSocket, - WebSocketDisconnect, -) +from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException, Request, status, WebSocket, WebSocketDisconnect from fastapi.background import BackgroundTasks from fastapi.exception_handlers import request_validation_exception_handler as fastapi_default_validation_handler from fastapi.exceptions import RequestValidationError @@ -68,14 +58,7 @@ from mcpgateway.db import Prompt as DbPrompt from mcpgateway.db import PromptMetric, refresh_slugs_on_startup, SessionLocal from mcpgateway.handlers.sampling import SamplingHandler -from mcpgateway.models import ( - InitializeRequest, - InitializeResult, - ListResourceTemplatesResult, - LogLevel, - ResourceContent, - Root, -) +from mcpgateway.models import InitializeRequest, InitializeResult, ListResourceTemplatesResult, LogLevel, ResourceContent, Root from mcpgateway.plugins import PluginManager, PluginViolationError from mcpgateway.schemas import ( GatewayCreate, @@ -102,45 +85,20 @@ from mcpgateway.services.completion_service import CompletionService from mcpgateway.services.gateway_service import GatewayConnectionError, GatewayNameConflictError, GatewayNotFoundError, GatewayService from mcpgateway.services.logging_service import LoggingService -from mcpgateway.services.prompt_service import ( - PromptError, - PromptNameConflictError, - PromptNotFoundError, - PromptService, -) -from mcpgateway.services.resource_service import ( - ResourceError, - ResourceNotFoundError, - ResourceService, - ResourceURIConflictError, -) +from mcpgateway.services.prompt_service import PromptError, PromptNameConflictError, PromptNotFoundError, PromptService +from mcpgateway.services.resource_service import ResourceError, ResourceNotFoundError, ResourceService, ResourceURIConflictError from mcpgateway.services.root_service import RootService -from mcpgateway.services.server_service import ( - ServerError, - ServerNameConflictError, - ServerNotFoundError, - ServerService, -) +from mcpgateway.services.server_service import ServerError, ServerNameConflictError, ServerNotFoundError, ServerService from mcpgateway.services.tag_service import TagService -from mcpgateway.services.tool_service import ( - ToolError, - ToolNameConflictError, - ToolNotFoundError, - ToolService, -) +from mcpgateway.services.tool_service import ToolError, ToolNameConflictError, ToolNotFoundError, ToolService from mcpgateway.transports.sse_transport import SSETransport -from mcpgateway.transports.streamablehttp_transport import ( - SessionManagerWrapper, - streamable_http_auth, -) +from mcpgateway.transports.streamablehttp_transport import SessionManagerWrapper, streamable_http_auth from mcpgateway.utils.db_isready import wait_for_db_ready from mcpgateway.utils.error_formatter import ErrorFormatter from mcpgateway.utils.redis_isready import wait_for_redis_ready from mcpgateway.utils.retry_manager import ResilientHttpClient from mcpgateway.utils.verify_credentials import require_auth, require_auth_override -from mcpgateway.validation.jsonrpc import ( - JSONRPCError, -) +from mcpgateway.validation.jsonrpc import JSONRPCError # Import the admin routes from the new module from mcpgateway.version import router as version_router diff --git a/mcpgateway/services/__init__.py b/mcpgateway/services/__init__.py index 68a13bff..88c4fd67 100644 --- a/mcpgateway/services/__init__.py +++ b/mcpgateway/services/__init__.py @@ -12,7 +12,6 @@ - Gateway coordination """ -# First-Party from mcpgateway.services.gateway_service import GatewayError, GatewayService from mcpgateway.services.prompt_service import PromptError, PromptService from mcpgateway.services.resource_service import ResourceError, ResourceService diff --git a/mcpgateway/transports/streamablehttp_transport.py b/mcpgateway/transports/streamablehttp_transport.py index f4267611..d61e6b8b 100644 --- a/mcpgateway/transports/streamablehttp_transport.py +++ b/mcpgateway/transports/streamablehttp_transport.py @@ -42,13 +42,7 @@ from fastapi.security.utils import get_authorization_scheme_param from mcp import types from mcp.server.lowlevel import Server -from mcp.server.streamable_http import ( - EventCallback, - EventId, - EventMessage, - EventStore, - StreamId, -) +from mcp.server.streamable_http import EventCallback, EventId, EventMessage, EventStore, StreamId from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.types import JSONRPCMessage from starlette.datastructures import Headers diff --git a/mcpgateway/utils/verify_credentials.py b/mcpgateway/utils/verify_credentials.py index f843797f..a980f3f0 100644 --- a/mcpgateway/utils/verify_credentials.py +++ b/mcpgateway/utils/verify_credentials.py @@ -47,12 +47,7 @@ # Third-Party from fastapi import Cookie, Depends, HTTPException, status -from fastapi.security import ( - HTTPAuthorizationCredentials, - HTTPBasic, - HTTPBasicCredentials, - HTTPBearer, -) +from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer from fastapi.security.utils import get_authorization_scheme_param import jwt diff --git a/mcpgateway/validation/__init__.py b/mcpgateway/validation/__init__.py index a16ca886..1ed23c37 100644 --- a/mcpgateway/validation/__init__.py +++ b/mcpgateway/validation/__init__.py @@ -10,11 +10,7 @@ - Tag validation and normalization """ -from mcpgateway.validation.jsonrpc import ( - JSONRPCError, - validate_request, - validate_response, -) +from mcpgateway.validation.jsonrpc import JSONRPCError, validate_request, validate_response from mcpgateway.validation.tags import TagValidator, validate_tags_field __all__ = ["validate_request", "validate_response", "JSONRPCError", "TagValidator", "validate_tags_field"] diff --git a/pyproject.toml b/pyproject.toml index c0d783b4..7c86a5b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,7 +68,7 @@ dependencies = [ "sse-starlette>=3.0.2", "starlette>=0.47.2", "uvicorn>=0.35.0", - "zeroconf>=0.147.0", + "zeroconf>=0.147.0" ] # ---------------------------------------------------------------- @@ -101,6 +101,7 @@ asyncpg = [ # Optional dependency groups (development) dev = [ + "aiohttp>=3.12.14", "argparse-manpage>=4.6", "autoflake>=2.3.1", "bandit>=1.8.6", @@ -161,6 +162,7 @@ dev = [ "uv>=0.8.4", "vulture>=2.14", "yamllint>=1.37.1", + "websockets>=15.0.1" ] # UI Testing @@ -249,6 +251,20 @@ target-version = ["py310", "py311", "py312"] include = "\\.pyi?$" # isort configuration +# -------------------------------------------------------------------- +# ๐Ÿ›  Async tool configurations (async-test, async-lint, etc.) +# -------------------------------------------------------------------- +[tool.ruff] +select = ["F", "E", "W", "B", "ASYNC"] +unfixable = ["B"] # Never auto-fix critical bugbear warnings + +[tool.ruff.flake8-bugbear] +extend-immutable-calls = ["fastapi.Depends", "fastapi.Query"] + +[[tool.mypy.overrides]] +module = "tests.*" +disallow_untyped_defs = false + [tool.isort] ############################################################################### # Core behaviour @@ -320,6 +336,7 @@ warn_unreachable = true # Warn about unreachable code warn_unused_ignores = true # Warn if a "# type: ignore" is unnecessary warn_unused_configs = true # Warn about unused config options warn_redundant_casts = true # Warn if a cast does nothing +warn_unused_coroutine = true # Warn if an unused async coroutine is defined strict_equality = true # Disallow ==/!= between incompatible types # Output formatting @@ -333,6 +350,9 @@ exclude = [ '^\\.mypy_cache/', ] +# Plugins to use with mypy +plugins = ["pydantic.mypy"] # Enable mypy plugin for Pydantic models + [tool.pytest.ini_options] minversion = "6.0" addopts = "-ra -q --cov=mcpgateway --ignore=tests/playwright" diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 00000000..470e08ab --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,6 @@ +{ + "typeCheckingMode": "strict", + "reportUnusedCoroutine": "error", + "reportMissingTypeStubs": "warning", + "exclude": ["build", ".venv", "async_testing/profiles"] +} diff --git a/tests/async/test_async_safety.py b/tests/async/test_async_safety.py new file mode 100644 index 00000000..8849c74f --- /dev/null +++ b/tests/async/test_async_safety.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +""" +Comprehensive async safety tests for mcpgateway. +""" + +from typing import Any, List +import pytest +import asyncio +import time + + +class TestAsyncSafety: + """Test async safety and proper coroutine handling.""" + + @pytest.mark.asyncio + async def test_concurrent_operations_performance(self): + """Test performance of concurrent async operations.""" + + async def mock_operation(): + await asyncio.sleep(0.01) # 10ms operation + return "result" + + # Test concurrent execution + start_time = time.time() + + tasks = [mock_operation() for _ in range(100)] + results = await asyncio.gather(*tasks) + + end_time = time.time() + + # Should complete in roughly 10ms, not 1000ms (100 * 10ms) + assert end_time - start_time < 0.1, "Concurrent operations not properly parallelized" + assert len(results) == 100, "Not all operations completed" + + @pytest.mark.asyncio + async def test_task_cleanup(self): + """Test proper task cleanup and no task leaks.""" + + initial_tasks = len(asyncio.all_tasks()) + + async def background_task(): + await asyncio.sleep(0.1) + + # Create and properly manage tasks + tasks: List[Any] = [] + for _ in range(10): + task = asyncio.create_task(background_task()) + tasks.append(task) + + # Wait for completion + await asyncio.gather(*tasks) + + # Check no leaked tasks + final_tasks = len(asyncio.all_tasks()) + + # Allow for some variation but no significant leaks + assert final_tasks <= initial_tasks + 2, "Task leak detected" + + @pytest.mark.asyncio + async def test_exception_handling_in_async(self): + """Test proper exception handling in async operations.""" + + async def failing_operation(): + await asyncio.sleep(0.01) + raise ValueError("Test error") + + # Test exception handling doesn't break event loop + with pytest.raises(ValueError): + await failing_operation() + + # Event loop should still be functional + await asyncio.sleep(0.01) + assert True, "Event loop functional after exception"