-
Notifications
You must be signed in to change notification settings - Fork 168
Description
🧭 Chore Summary
Implement comprehensive async code testing and performance profiling pipeline: make async-test
and make profile
to validate async safety, detect unawaited coroutines, monitor runtime performance, and generate visual flame graphs for mcpgateway's async operations.
🧱 Areas Affected
- Async code linting and validation / Make targets (
make async-lint
,make async-test
,make profile
) - Static analysis configuration (Ruff, mypy, Pyright async rules)
- Performance profiling infrastructure (cProfile, SnakeViz, flame graphs)
- Runtime monitoring and debugging (aiomonitor, asyncio debug mode)
- Async test framework integration (pytest-asyncio, async test validation)
- CI/CD pipeline async safety checks
- WebSocket, HTTP, and database async operation validation
⚙️ Context / Rationale
Async code introduces unique challenges: unawaited coroutines, blocking I/O in event loops, and performance bottlenecks that traditional testing misses. A comprehensive async testing and profiling pipeline catches these issues early through static analysis, runtime validation, and performance monitoring. This ensures mcpgateway's async operations (WebSocket connections, database queries, HTTP requests) are safe, efficient, and properly awaited.
What is Async Safety Testing?
Systematic validation of asynchronous code to detect common async pitfalls: forgotten awaits, blocking I/O in async functions, improperly handled coroutines, and event loop performance issues.
Key Async Safety Issues:
- Unawaited Coroutines: Forgetting
await
on async function calls - Blocking I/O: Using synchronous operations in async functions
- Task Leaks: Creating tasks without proper cleanup or monitoring
- Event Loop Blocking: Long-running operations blocking the event loop
- Deadlocks: Circular dependencies in async operations
Simple Async Validation Example:
# Bad - unawaited coroutine (detected by linters)
async def bad_example():
get_data() # Missing await!
# Good - properly awaited
async def good_example():
data = await get_data()
return data
Performance Profiling with SnakeViz:
# Profile async operations
import cProfile
import asyncio
async def main():
await process_mcp_requests()
# Generate profile
cProfile.run('asyncio.run(main())', 'async_profile.prof')
# Visualize with SnakeViz
# snakeviz async_profile.prof
MCPGateway Specific Async Scenarios:
# Async validation scenarios for mcpgateway
async_test_scenarios = [
{
"name": "websocket_connection_handling",
"description": "Validate WebSocket connection lifecycle",
"test_areas": [
"connection_establishment",
"message_handling",
"connection_cleanup",
"concurrent_connections"
],
"performance_targets": {
"connection_time": "<100ms",
"message_latency": "<10ms",
"concurrent_connections": ">100"
}
},
{
"name": "database_async_operations",
"description": "Test async database query handling",
"test_areas": [
"connection_pool_management",
"query_execution",
"transaction_handling",
"connection_recovery"
],
"performance_targets": {
"query_time": "<50ms",
"connection_acquisition": "<5ms",
"pool_efficiency": ">95%"
}
},
{
"name": "mcp_server_communication",
"description": "Validate async MCP server interactions",
"test_areas": [
"json_rpc_calls",
"streaming_responses",
"error_handling",
"timeout_management"
],
"performance_targets": {
"rpc_call_time": "<100ms",
"stream_latency": "<20ms",
"error_recovery": "<1s"
}
},
{
"name": "concurrent_request_processing",
"description": "Test high-concurrency request handling",
"test_areas": [
"request_queuing",
"resource_sharing",
"load_balancing",
"graceful_degradation"
],
"performance_targets": {
"throughput": ">1000 req/s",
"cpu_usage": "<80%",
"memory_growth": "<10MB/hour"
}
}
]
📦 Related Make Targets
Target | Purpose |
---|---|
make async-test |
Run comprehensive async safety and validation tests |
make async-lint |
Run async-aware linting with Ruff, flake8-async, and type checkers |
make profile |
Generate performance profiles with cProfile and SnakeViz visualization |
make async-monitor |
Start aiomonitor for live async debugging and task monitoring |
make async-debug |
Run tests with asyncio debug mode enabled |
make profile-compare |
Compare performance profiles between builds |
make async-validate |
Validate async code patterns and detect common pitfalls |
make profile-serve |
Start SnakeViz server for interactive profile analysis |
make async-benchmark |
Run async performance benchmarks |
make async-clean |
Clean async testing artifacts and profile data |
Bold targets are mandatory; CI must fail if async safety violations or performance regressions are detected.
📋 Acceptance Criteria
-
make async-test
validates all async operations with zero unawaited coroutines. -
make async-lint
catches async safety issues using Ruff ASYNC rules and flake8-async. -
make profile
generates visual performance profiles with SnakeViz for async operations. -
make async-monitor
provides live monitoring of running async tasks and event loop. - Static type checkers (mypy/Pyright) detect unused coroutine warnings.
- Async tests run with pytest-asyncio and proper event loop management.
- Performance profiling identifies bottlenecks in WebSocket, database, and HTTP operations.
- Runtime async debugging catches deadlocks and blocking operations.
- CI pipeline enforces async safety requirements on every build.
- Performance regression detection alerts on async operation slowdowns.
- Changelog entry under "Testing" or "Performance".
🛠️ Task List (suggested flow)
-
Async testing infrastructure setup
mkdir -p async_testing/{profiles,reports,benchmarks,fixtures} # Create async testing configuration cat > async_testing/config.yaml << 'EOF' async_linting: ruff_rules: ["F", "E", "B", "ASYNC"] flake8_plugins: ["flake8-bugbear", "flake8-async"] mypy_config: warn_unused_coroutine: true strict: true profiling: output_dir: "async_testing/profiles" snakeviz_port: 8080 profile_scenarios: - "websocket_stress_test" - "database_query_performance" - "concurrent_mcp_calls" monitoring: aiomonitor_port: 50101 debug_mode: true task_tracking: true performance_thresholds: websocket_connection: 100 # ms database_query: 50 # ms mcp_rpc_call: 100 # ms concurrent_users: 100 # simultaneous connections EOF
-
Async linting configuration
# Update pyproject.toml for async-aware linting [tool.ruff] select = ["F", "E", "W", "B", "ASYNC"] unfixable = ["B"] # Never auto-fix critical bugbear warnings [tool.ruff.flake8-bugbear] extend-immutable-calls = ["fastapi.Depends", "fastapi.Query"] [tool.mypy] strict = true warn_unused_coroutine = true plugins = ["pydantic.mypy"] [[tool.mypy.overrides]] module = "tests.*" disallow_untyped_defs = false
// pyrightconfig.json { "typeCheckingMode": "strict", "reportUnusedCoroutine": "error", "reportMissingTypeStubs": "warning", "exclude": ["build", ".venv", "async_testing/profiles"] }
-
Makefile integration
# Async Testing and Profiling Targets .PHONY: async-test async-lint profile async-monitor async-debug profile-serve ASYNC_TEST_DIR := async_testing PROFILE_DIR := $(ASYNC_TEST_DIR)/profiles REPORTS_DIR := $(ASYNC_TEST_DIR)/reports VENV_PYTHON := $(VENV_DIR)/bin/python async-test: async-lint async-debug @echo "🔄 Running comprehensive async safety tests..." @mkdir -p $(REPORTS_DIR) @PYTHONASYNCIODEBUG=1 $(VENV_PYTHON) -m pytest \ tests/ \ --asyncio-mode=auto \ --tb=short \ --junitxml=$(REPORTS_DIR)/async-test-results.xml \ -v async-lint: @echo "🔍 Running async-aware linting..." @$(VENV_DIR)/bin/ruff check mcpgateway/ tests/ \ --select=F,E,B,ASYNC \ --output-format=github @$(VENV_DIR)/bin/flake8 mcpgateway/ tests/ \ --extend-select=B,ASYNC \ --max-line-length=100 @$(VENV_DIR)/bin/mypy mcpgateway/ \ --warn-unused-coroutine \ --strict profile: @echo "📊 Generating async performance profiles..." @mkdir -p $(PROFILE_DIR) @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/profiler.py \ --scenarios websocket,database,mcp_calls \ --output $(PROFILE_DIR) \ --duration 60 @echo "🌐 Starting SnakeViz server..." @$(VENV_DIR)/bin/snakeviz $(PROFILE_DIR)/combined_profile.prof \ --server --port 8080 async-monitor: @echo "👁️ Starting aiomonitor for live async debugging..." @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/monitor_runner.py \ --port 50101 \ --host localhost \ --console-enabled async-debug: @echo "🐛 Running async tests with debug mode..." @PYTHONASYNCIODEBUG=1 $(VENV_PYTHON) -X dev \ -m pytest tests/ \ --asyncio-mode=auto \ --capture=no \ -v profile-serve: @echo "🌐 Starting SnakeViz profile server..." @$(VENV_DIR)/bin/snakeviz $(PROFILE_DIR) \ --server --port 8080 --hostname 0.0.0.0 async-benchmark: @echo "⚡ Running async performance benchmarks..." @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/benchmarks.py \ --output $(REPORTS_DIR)/benchmark-results.json \ --iterations 1000 profile-compare: @echo "📈 Comparing performance profiles..." @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/profile_compare.py \ --baseline $(PROFILE_DIR)/baseline.prof \ --current $(PROFILE_DIR)/latest.prof \ --output $(REPORTS_DIR)/profile-comparison.html async-validate: @echo "✅ Validating async code patterns..." @$(VENV_PYTHON) $(ASYNC_TEST_DIR)/async_validator.py \ --source mcpgateway/ \ --report $(REPORTS_DIR)/async-validation.json async-clean: @echo "🧹 Cleaning async testing artifacts..." @rm -rf $(PROFILE_DIR)/* $(REPORTS_DIR)/* @pkill -f "aiomonitor" || true @pkill -f "snakeviz" || true
-
Async profiler implementation
# async_testing/profiler.py #!/usr/bin/env python3 """ Comprehensive async performance profiler for mcpgateway. """ import asyncio import cProfile import pstats import argparse import time import aiohttp import websockets import json from pathlib import Path from typing import Dict, List, Any class AsyncProfiler: """Profile async operations in mcpgateway.""" def __init__(self, output_dir: str): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.profiles = {} async def profile_all_scenarios(self, scenarios: List[str], duration: int) -> Dict[str, Any]: """Profile all specified async scenarios.""" results = { 'scenarios': {}, 'summary': {}, 'timestamp': time.time() } for scenario in scenarios: print(f"📊 Profiling scenario: {scenario}") profile_path = self.output_dir / f"{scenario}_profile.prof" profile_result = await self._profile_scenario(scenario, duration, profile_path) results['scenarios'][scenario] = profile_result # Generate combined profile self._generate_combined_profile(scenarios) # Generate summary report results['summary'] = self._generate_summary_report(results['scenarios']) return results async def _profile_scenario(self, scenario: str, duration: int, output_path: Path) -> Dict[str, Any]: """Profile a specific async scenario.""" scenario_methods = { 'websocket': self._profile_websocket_operations, 'database': self._profile_database_operations, 'mcp_calls': self._profile_mcp_operations, 'concurrent_requests': self._profile_concurrent_requests } if scenario not in scenario_methods: raise ValueError(f"Unknown scenario: {scenario}") # Run profiling profiler = cProfile.Profile() profiler.enable() start_time = time.time() scenario_result = await scenario_methods[scenario](duration) end_time = time.time() profiler.disable() profiler.dump_stats(str(output_path)) # Analyze profile stats = pstats.Stats(str(output_path)) stats.sort_stats('cumulative') return { 'scenario': scenario, 'duration': end_time - start_time, 'profile_file': str(output_path), 'total_calls': stats.total_calls, 'total_time': stats.total_tt, 'top_functions': self._extract_top_functions(stats), 'async_metrics': scenario_result } async def _profile_websocket_operations(self, duration: int) -> Dict[str, Any]: """Profile WebSocket connection and message handling.""" metrics = { 'connections_established': 0, 'messages_sent': 0, 'messages_received': 0, 'connection_errors': 0, 'avg_latency': 0 } async def websocket_client(): try: async with websockets.connect("ws://localhost:4444/ws") as websocket: metrics['connections_established'] += 1 # Send test messages for i in range(10): message = json.dumps({"type": "ping", "data": f"test_{i}"}) start_time = time.time() await websocket.send(message) metrics['messages_sent'] += 1 response = await websocket.recv() metrics['messages_received'] += 1 latency = time.time() - start_time metrics['avg_latency'] = ( (metrics['avg_latency'] * i + latency) / (i + 1) ) await asyncio.sleep(0.1) except Exception as e: metrics['connection_errors'] += 1 # Run concurrent WebSocket clients tasks = [] end_time = time.time() + duration while time.time() < end_time: if len(tasks) < 10: # Max 10 concurrent connections task = asyncio.create_task(websocket_client()) tasks.append(task) # Clean up completed tasks tasks = [t for t in tasks if not t.done()] await asyncio.sleep(0.1) # Wait for remaining tasks if tasks: await asyncio.gather(*tasks, return_exceptions=True) return metrics async def _profile_database_operations(self, duration: int) -> Dict[str, Any]: """Profile database query performance.""" metrics = { 'queries_executed': 0, 'avg_query_time': 0, 'connection_time': 0, 'errors': 0 } # Simulate database operations async def database_operations(): try: # Simulate async database queries query_start = time.time() # Mock database query (replace with actual database calls) await asyncio.sleep(0.01) # Simulate 10ms query query_time = time.time() - query_start metrics['queries_executed'] += 1 metrics['avg_query_time'] = ( (metrics['avg_query_time'] * (metrics['queries_executed'] - 1) + query_time) / metrics['queries_executed'] ) except Exception as e: metrics['errors'] += 1 # Run database operations for specified duration end_time = time.time() + duration while time.time() < end_time: await database_operations() await asyncio.sleep(0.001) # Small delay between operations return metrics async def _profile_mcp_operations(self, duration: int) -> Dict[str, Any]: """Profile MCP server communication.""" metrics = { 'rpc_calls': 0, 'avg_rpc_time': 0, 'successful_calls': 0, 'failed_calls': 0 } async def mcp_rpc_call(): try: async with aiohttp.ClientSession() as session: payload = { "jsonrpc": "2.0", "method": "tools/list", "id": 1 } start_time = time.time() async with session.post( "http://localhost:4444/rpc", json=payload, timeout=aiohttp.ClientTimeout(total=5) ) as response: await response.json() rpc_time = time.time() - start_time metrics['rpc_calls'] += 1 metrics['successful_calls'] += 1 metrics['avg_rpc_time'] = ( (metrics['avg_rpc_time'] * (metrics['rpc_calls'] - 1) + rpc_time) / metrics['rpc_calls'] ) except Exception as e: metrics['failed_calls'] += 1 # Run MCP operations end_time = time.time() + duration while time.time() < end_time: await mcp_rpc_call() await asyncio.sleep(0.1) return metrics
-
Runtime monitoring with aiomonitor
# async_testing/monitor_runner.py #!/usr/bin/env python3 """ Runtime async monitoring with aiomonitor integration. """ import asyncio import aiomonitor import argparse import signal from pathlib import Path class AsyncMonitor: """Monitor live async operations in mcpgateway.""" def __init__(self, port: int = 50101, host: str = "localhost"): self.port = port self.host = host self.monitor = None self.running = False async def start_monitoring(self, console_enabled: bool = True): """Start aiomonitor for live async debugging.""" print(f"👁️ Starting aiomonitor on {self.host}:{self.port}") # Configure aiomonitor self.monitor = aiomonitor.Monitor( asyncio.get_event_loop(), host=self.host, port=self.port, console_enabled=console_enabled, locals={'monitor': self} ) self.monitor.start() self.running = True print(f"🌐 aiomonitor console available at: telnet {self.host} {self.port}") print("📊 Available commands: ps, where, cancel, signal, console") print("🔍 Use 'ps' to list running tasks") print("📍 Use 'where <task_id>' to see task stack trace") # Keep monitoring running try: while self.running: await asyncio.sleep(1) # Periodic task summary tasks = [t for t in asyncio.all_tasks() if not t.done()] if len(tasks) % 100 == 0 and len(tasks) > 0: print(f"📈 Current active tasks: {len(tasks)}") except KeyboardInterrupt: print("\n🛑 Stopping aiomonitor...") finally: self.monitor.close() await self.monitor.wait_closed() def stop_monitoring(self): """Stop the monitoring.""" self.running = False async def get_task_summary(self) -> dict: """Get summary of current async tasks.""" tasks = asyncio.all_tasks() summary = { 'total_tasks': len(tasks), 'running_tasks': len([t for t in tasks if not t.done()]), 'completed_tasks': len([t for t in tasks if t.done()]), 'cancelled_tasks': len([t for t in tasks if t.cancelled()]), 'task_details': [] } for task in tasks: if not task.done(): summary['task_details'].append({ 'name': getattr(task, '_name', 'unnamed'), 'state': task._state.name if hasattr(task, '_state') else 'unknown', 'coro': str(task._coro) if hasattr(task, '_coro') else 'unknown' }) return summary
-
Async test framework integration
# tests/test_async_safety.py """ Comprehensive async safety tests for mcpgateway. """ import pytest import asyncio import warnings import time from unittest.mock import AsyncMock, patch class TestAsyncSafety: """Test async safety and proper coroutine handling.""" def test_no_unawaited_coroutines(self): """Test that no coroutines are left unawaited.""" # Capture async warnings with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter("always") # Run async code that might have unawaited coroutines asyncio.run(self._test_async_operations()) # Check for unawaited coroutine warnings unawaited_warnings = [ w for w in caught_warnings if "coroutine" in str(w.message) and "never awaited" in str(w.message) ] assert len(unawaited_warnings) == 0, \ f"Found {len(unawaited_warnings)} unawaited coroutines" async def _test_async_operations(self): """Test various async operations for safety.""" # Test WebSocket operations await self._test_websocket_safety() # Test database operations await self._test_database_safety() # Test MCP operations await self._test_mcp_safety() async def _test_websocket_safety(self): """Test WebSocket async safety.""" # Mock WebSocket operations with patch('websockets.connect') as mock_connect: mock_websocket = AsyncMock() mock_connect.return_value.__aenter__.return_value = mock_websocket # Test proper awaiting async with mock_connect("ws://test") as websocket: await websocket.send("test") await websocket.recv() async def _test_database_safety(self): """Test database async safety.""" # Mock database operations with patch('asyncpg.connect') as mock_connect: mock_connection = AsyncMock() mock_connect.return_value = mock_connection # Test proper connection handling connection = await mock_connect("postgresql://test") await connection.execute("SELECT 1") await connection.close() async def _test_mcp_safety(self): """Test MCP async safety.""" # Mock MCP operations with patch('aiohttp.ClientSession') as mock_session: mock_response = AsyncMock() mock_session.return_value.post.return_value.__aenter__.return_value = mock_response # Test proper session handling async with mock_session() as session: async with session.post("http://test") as response: await response.json() @pytest.mark.asyncio async def test_concurrent_operations_performance(self): """Test performance of concurrent async operations.""" async def mock_operation(): await asyncio.sleep(0.01) # 10ms operation return "result" # Test concurrent execution start_time = time.time() tasks = [mock_operation() for _ in range(100)] results = await asyncio.gather(*tasks) end_time = time.time() # Should complete in roughly 10ms, not 1000ms (100 * 10ms) assert end_time - start_time < 0.1, "Concurrent operations not properly parallelized" assert len(results) == 100, "Not all operations completed" @pytest.mark.asyncio async def test_task_cleanup(self): """Test proper task cleanup and no task leaks.""" initial_tasks = len(asyncio.all_tasks()) async def background_task(): await asyncio.sleep(0.1) # Create and properly manage tasks tasks = [] for _ in range(10): task = asyncio.create_task(background_task()) tasks.append(task) # Wait for completion await asyncio.gather(*tasks) # Check no leaked tasks final_tasks = len(asyncio.all_tasks()) # Allow for some variation but no significant leaks assert final_tasks <= initial_tasks + 2, "Task leak detected" @pytest.mark.asyncio async def test_exception_handling_in_async(self): """Test proper exception handling in async operations.""" async def failing_operation(): await asyncio.sleep(0.01) raise ValueError("Test error") # Test exception handling doesn't break event loop with pytest.raises(ValueError): await failing_operation() # Event loop should still be functional await asyncio.sleep(0.01) assert True, "Event loop functional after exception"
-
CI integration
# Add to existing GitHub Actions workflow async-testing: name: 🔄 Async Safety & Performance Testing runs-on: ubuntu-latest needs: [test] steps: - name: ⬇️ Checkout source uses: actions/checkout@v4 with: fetch-depth: 1 - name: 🐍 Set up Python uses: actions/setup-python@v5 with: python-version: "3.12" cache: pip - name: 📦 Install dependencies run: | python -m pip install --upgrade pip pip install -e .[dev] pip install flake8-async flake8-bugbear pytest-asyncio snakeviz aiomonitor - name: 🔍 Run async linting run: | make async-lint - name: 🐛 Run async debug tests run: | make async-debug - name: 📊 Generate performance profiles run: | make profile - name: ⚡ Run async benchmarks run: | make async-benchmark - name: ✅ Validate async patterns run: | make async-validate - name: 📎 Upload async test artifacts uses: actions/upload-artifact@v4 with: name: async-test-results path: | async_testing/reports/ async_testing/profiles/ retention-days: 30 - name: 📈 Performance regression check run: | python async_testing/check_regression.py \ --current async_testing/profiles/latest.prof \ --baseline async_testing/profiles/baseline.prof \ --threshold 20 # 20% regression threshold
-
Async validation tool
# async_testing/async_validator.py #!/usr/bin/env python3 """ Validate async code patterns and detect common pitfalls. """ import ast import argparse from pathlib import Path from typing import List, Dict, Any class AsyncCodeValidator: """Validate async code for common patterns and pitfalls.""" def __init__(self): self.issues = [] self.suggestions = [] def validate_directory(self, source_dir: Path) -> Dict[str, Any]: """Validate all Python files in directory.""" validation_results = { 'files_checked': 0, 'issues_found': 0, 'suggestions': 0, 'details': [] } python_files = list(source_dir.rglob("*.py")) for file_path in python_files: if self._should_skip_file(file_path): continue file_results = self._validate_file(file_path) validation_results['details'].append(file_results) validation_results['files_checked'] += 1 validation_results['issues_found'] += len(file_results['issues']) validation_results['suggestions'] += len(file_results['suggestions']) return validation_results def _validate_file(self, file_path: Path) -> Dict[str, Any]: """Validate a single Python file.""" file_results = { 'file': str(file_path), 'issues': [], 'suggestions': [] } try: with open(file_path, 'r', encoding='utf-8') as f: source_code = f.read() tree = ast.parse(source_code, filename=str(file_path)) # Analyze AST for async patterns validator = AsyncPatternVisitor(file_path) validator.visit(tree) file_results['issues'] = validator.issues file_results['suggestions'] = validator.suggestions except Exception as e: file_results['issues'].append({ 'type': 'parse_error', 'message': f"Failed to parse file: {str(e)}", 'line': 0 }) return file_results class AsyncPatternVisitor(ast.NodeVisitor): """AST visitor to detect async patterns and issues.""" def __init__(self, file_path: Path): self.file_path = file_path self.issues = [] self.suggestions = [] self.in_async_function = False def visit_AsyncFunctionDef(self, node): """Visit async function definitions.""" self.in_async_function = True # Check for blocking operations in async functions self._check_blocking_operations(node) # Check for proper error handling self._check_error_handling(node) self.generic_visit(node) self.in_async_function = False def visit_Call(self, node): """Visit function calls.""" if self.in_async_function: # Check for potentially unawaited async calls self._check_unawaited_calls(node) # Check for blocking I/O operations self._check_blocking_io(node) self.generic_visit(node) def _check_blocking_operations(self, node): """Check for blocking operations in async functions.""" blocking_patterns = [ 'time.sleep', 'requests.get', 'requests.post', 'subprocess.run', 'subprocess.call', 'open' # File I/O without async ] for child in ast.walk(node): if isinstance(child, ast.Call): call_name = self._get_call_name(child) if call_name in blocking_patterns: self.issues.append({ 'type': 'blocking_operation', 'message': f"Blocking operation '{call_name}' in async function", 'line': child.lineno, 'suggestion': f"Use async equivalent of {call_name}" }) def _check_unawaited_calls(self, node): """Check for potentially unawaited async calls.""" # Look for calls that might return coroutines async_patterns = [ 'aiohttp', 'asyncio', 'asyncpg', 'websockets', 'motor' # Common async libraries ] call_name = self._get_call_name(node) for pattern in async_patterns: if pattern in call_name: # Check if this call is awaited parent = getattr(node, 'parent', None) if not isinstance(parent, ast.Await): self.suggestions.append({ 'type': 'potentially_unawaited', 'message': f"Call to '{call_name}' might need await", 'line': node.lineno }) break def _get_call_name(self, node): """Extract the name of a function call.""" if isinstance(node.func, ast.Name): return node.func.id elif isinstance(node.func, ast.Attribute): if isinstance(node.func.value, ast.Name): return f"{node.func.value.id}.{node.func.attr}" else: return node.func.attr return "unknown"
-
Performance comparison tool
# async_testing/profile_compare.py #!/usr/bin/env python3 """ Compare async performance profiles between builds. """ import pstats import json import argparse from pathlib import Path from typing import Dict, Any, Tuple class ProfileComparator: """Compare performance profiles and detect regressions.""" def compare_profiles(self, baseline_path: Path, current_path: Path) -> Dict[str, Any]: """Compare two performance profiles.""" baseline_stats = pstats.Stats(str(baseline_path)) current_stats = pstats.Stats(str(current_path)) comparison = { 'baseline_file': str(baseline_path), 'current_file': str(current_path), 'regressions': [], 'improvements': [], 'summary': {} } # Compare overall performance baseline_total_time = baseline_stats.total_tt current_total_time = current_stats.total_tt total_time_change = ( (current_total_time - baseline_total_time) / baseline_total_time * 100 ) comparison['summary']['total_time_change'] = total_time_change # Compare function-level performance baseline_functions = self._extract_function_stats(baseline_stats) current_functions = self._extract_function_stats(current_stats) for func_name, baseline_time in baseline_functions.items(): if func_name in current_functions: current_time = current_functions[func_name] change_percent = (current_time - baseline_time) / baseline_time * 100 if change_percent > 20: # 20% regression threshold comparison['regressions'].append({ 'function': func_name, 'baseline_time': baseline_time, 'current_time': current_time, 'change_percent': change_percent }) elif change_percent < -10: # 10% improvement comparison['improvements'].append({ 'function': func_name, 'baseline_time': baseline_time, 'current_time': current_time, 'change_percent': change_percent }) return comparison
-
Documentation
Add async testing documentation:
# Async Code Testing & Performance Profiling ## Overview Comprehensive testing and profiling pipeline for async operations in mcpgateway. ## Running Async Tests ```bash # Run all async safety tests make async-test # Run async-aware linting make async-lint # Generate performance profiles make profile # Start live monitoring make async-monitor # Run with debug mode make async-debug
Async Safety Checks
Check Tool Purpose Unawaited coroutines Ruff ASYNC rules Detect forgotten awaits Blocking I/O flake8-async Find sync operations in async functions Type safety mypy unused-coroutine Static analysis of async types Runtime monitoring aiomonitor Live task monitoring Performance Profiling
- cProfile: Generate detailed function call profiles
- SnakeViz: Interactive flame graph visualization
- Benchmarking: Automated performance regression detection
- Monitoring: Live async task and event loop monitoring
📖 References
- Ruff Async Rules – Fast async-aware linting · https://docs.astral.sh/ruff/rules/#async-async
- flake8-async – Async code linting plugin · https://flake8-async.readthedocs.io/
- SnakeViz – cProfile visualization tool · https://jiffyclub.github.io/snakeviz/
- aiomonitor – Live async debugging · https://aiomonitor.aio-libs.org/
- pytest-asyncio – Async test framework · https://pytest-asyncio.readthedocs.io/
🧩 Additional Notes
- Start with linting: Use Ruff ASYNC rules for fast feedback on async safety issues.
- Enable debug mode: Set
PYTHONASYNCIODEBUG=1
to catch runtime async issues early. - Profile regularly: Generate performance profiles for each build to track trends.
- Monitor live: Use aiomonitor to debug deadlocks and stuck async operations.
- Test concurrency: Validate that async operations actually run concurrently.
- Handle exceptions: Ensure exceptions in async code don't break the event loop.
- Clean up tasks: Properly manage async task lifecycle to prevent leaks.
Async Testing Best Practices:
- Always await coroutines - never ignore the result
- Use async equivalents for I/O operations (aiohttp vs requests)
- Properly handle exceptions in async contexts
- Monitor task creation and cleanup to prevent leaks
- Test concurrent behavior, not just sequential async calls
- Profile async operations to identify performance bottlenecks
- Use runtime monitoring to debug complex async interactions
See Also:
Tool | What it spots | Key link |
---|---|---|
flake8-bugbear | “B-series” warnings for likely bugs or bad design (mutable defaults, arg order mistakes, broad except: …) ([github.com][1]) |
https://github.com/PyCQA/flake8-bugbear |
flake8-async | Un-awaited coroutines, blocking I/O in the loop, pointless async def … for asyncio/AnyIO/Trio ([flake8-async.readthedocs.io][2], [flake8-async.readthedocs.io][2]) |
https://flake8-async.readthedocs.io/ |
flake8-trio | Trio-specific rules (e.g. forbids asyncio.sleep , insists on nursery ) ([pypi.org][3]) |
https://pypi.org/project/flake8-trio/ |
Ruff | Fast Rust linter; enable ASYNC + B rule groups to mirror flake8-async & bugbear at 10-100× the speed ([docs.astral.sh][4], [docs.astral.sh][5]) |
https://astral.sh/ruff |