diff --git a/CLAUDE.md b/CLAUDE.md index 8905fa2..e4bfe53 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,6 +2,23 @@ This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +## CRITICAL: Testing and Running Examples + +**ALWAYS use `./test.sh` to run tests and examples.** The environment variables are not set globally, but test.sh handles this automatically. + +```bash +# CORRECT - Always use test.sh: +./test.sh examples/01_basic_client_connection.py +./test.sh examples/21_statistics_usage.py +./test.sh /tmp/test_script.py + +# WRONG - Never use these directly: +uv run python examples/01_basic_client_connection.py +PROJECT_X_API_KEY="..." PROJECT_X_USERNAME="..." uv run python script.py +``` + +The test.sh script properly configures all required environment variables. DO NOT attempt to set PROJECT_X_API_KEY or PROJECT_X_USERNAME manually. + ## Project Status: v3.2.0 - Enhanced Type Safety Release **IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading. diff --git a/docs/v3.2.1-statistics-plan.md b/docs/v3.2.1-statistics-plan.md new file mode 100644 index 0000000..4ed5ca1 --- /dev/null +++ b/docs/v3.2.1-statistics-plan.md @@ -0,0 +1,186 @@ +# v3.2.1 Statistics and Analytics Overhaul + +## Overview +Comprehensive enhancement of statistics and analytics tracking across all ProjectX SDK components to provide accurate, performant, and actionable metrics. + +## Motivation +Current statistics implementation has several gaps: +- TradingSuite reports hardcoded zero values for many metrics +- Inconsistent use of StatsTrackingMixin across components +- Potential memory leaks in OrderManager (unbounded lists) +- Missing aggregation of component statistics +- No network or data quality metrics + +## Detailed Plan +The full implementation plan is available in the Obsidian documentation: +`Development/ProjectX SDK/Feature Planning/v3.2.1 Statistics and Analytics Overhaul.md` + +## Key Improvements + +### 1. Enhanced Statistics Infrastructure +- **EnhancedStatsTrackingMixin**: Async support, performance metrics, configurable retention +- **StatisticsAggregator**: Central aggregation for TradingSuite +- **Circular buffers**: Prevent memory leaks in timing metrics + +### 2. Component Updates +- All managers to inherit from enhanced mixin +- Fix TradingSuite hardcoded values +- Add WebSocket vs API tracking +- Implement data quality metrics + +### 3. Advanced Analytics +- Real-time performance dashboard +- Historical analytics with persistence +- Alerting system for anomalies +- Export capabilities (Prometheus, JSON) + +## Implementation Phases + +### Phase 1: Foundation (Week 1) ✅ COMPLETE +- [x] Create EnhancedStatsTrackingMixin +- [x] Implement StatisticsAggregator +- [x] Add comprehensive unit tests +- [x] PR review fixes (exception handling, PII sanitization, bounds checking) + +### Phase 2: Component Integration (Week 1-2) ✅ COMPLETE +- [x] Update OrderManager with enhanced stats +- [x] Update PositionManager with enhanced stats +- [x] Update RealtimeDataManager with enhanced stats +- [x] Fix TradingSuite statistics aggregation (fully integrated) +- [x] Integration testing + +### Phase 3: Advanced Features (Week 2-3) +- [ ] Implement PerformanceDashboard +- [ ] Add HistoricalAnalytics +- [ ] Create StatisticsAlerting +- [ ] Documentation and examples +- [ ] Performance testing + +## API Changes + +### New APIs (Non-breaking additions) +```python +# Enhanced statistics +suite.get_detailed_stats() # Comprehensive stats with performance metrics +suite.get_performance_dashboard() # Real-time dashboard +suite.export_metrics(format="prometheus") # Export for monitoring + +# Component enhancements +manager.get_performance_metrics() # Detailed performance data +manager.cleanup_old_stats() # Manual cleanup trigger +``` + +### Backward Compatibility +- All existing statistics APIs remain unchanged +- New features are opt-in via configuration +- Deprecation warnings for methods to be removed in v4.0 + +## Testing Strategy +- Unit tests for each statistics component +- Integration tests for aggregation +- Performance benchmarks (< 2% overhead) +- Memory leak detection (24h sustained load) +- Data accuracy validation + +## Configuration +```python +# Example configuration +config = { + "statistics": { + "enabled": True, + "level": "detailed", # basic | detailed | full + "retention_hours": 24, + "sampling_rate": 1.0, + "enable_profiling": False, + "cleanup_interval": 300 # seconds + } +} +``` + +## Success Metrics +- Performance overhead < 2% CPU +- No memory leaks after 24h operation +- 100% accurate statistics reporting +- All components have comprehensive statistics +- Clear, actionable metrics for users + +## Questions for Review +1. Should enhanced statistics be opt-in or enabled by default? +2. Which persistence backends should we support initially? +3. Priority for external monitoring integrations? +4. Acceptable performance overhead threshold? + +## Progress Updates + +### Session 4 (2025-08-18): Phase 2 Component Integration Complete + +**Accomplishments:** +- ✅ Updated PositionManager to use EnhancedStatsTrackingMixin +- ✅ Updated RealtimeDataManager to use EnhancedStatsTrackingMixin +- ✅ Added operation timing tracking to key methods +- ✅ Verified TradingSuite aggregation properly configured +- ✅ Created integration tests for component statistics +- ✅ Created example script demonstrating enhanced statistics +- ✅ Fixed async/await issues in statistics aggregator + +**Key Changes:** +- PositionManager now tracks cache hits/misses and operation timings +- RealtimeDataManager tracks tick/quote processing with metrics +- All components properly inherit from EnhancedStatsTrackingMixin +- StatisticsAggregator handles both sync and async methods gracefully +- Integration tests verify statistics are being collected + +**Files Modified:** +- src/project_x_py/position_manager/core.py (added enhanced stats) +- src/project_x_py/realtime_data_manager/core.py (added enhanced stats) +- src/project_x_py/realtime_data_manager/data_processing.py (tick tracking) +- src/project_x_py/utils/statistics_aggregator.py (async/await fix) +- tests/test_enhanced_statistics.py (new integration tests) +- examples/20_enhanced_statistics_demo.py (new demo script) + +**Next Steps:** +- Phase 3: Implement advanced features (dashboard, historical analytics) +- Add more comprehensive integration tests +- Performance testing under load +- Documentation updates + +### Session 3 (2025-01-18): PR Review Fixes Complete + +**Accomplishments:** +- ✅ Added comprehensive exception handling to all aggregator methods +- ✅ Enhanced PII sanitization for trading-specific data +- ✅ Implemented explicit health score bounds checking (0-100) +- ✅ Fixed division by zero issues in cache_hit_rate calculation +- ✅ Optimized memory calculation with sampling for large collections +- ✅ Created comprehensive test suite (tests/test_enhanced_statistics.py) +- ✅ All linting and type issues resolved + +**Key Improvements:** +- Circular buffers prevent memory leaks (deque with maxlen) +- Thread-safe operations with asyncio locks +- Performance metrics with percentiles (P50, P95, P99) +- Prometheus export format support +- Graceful degradation on component failures + +**Performance Validated:** +- CPU overhead: ~1.5% (target < 2%) ✅ +- Memory overhead: ~3% (target < 5%) ✅ +- No memory leaks in testing ✅ +- Thread-safe under concurrent access ✅ + +**Files Modified:** +- src/project_x_py/utils/enhanced_stats_tracking.py (722 lines) +- src/project_x_py/utils/statistics_aggregator.py (674 lines) +- src/project_x_py/order_manager/core.py (updated inheritance) +- src/project_x_py/trading_suite.py (integrated aggregator) +- tests/test_enhanced_statistics.py (423 lines - new) + +**Commits:** +- Initial implementation: `3d4e80c` +- PR review fixes: `f3da28a` + +## References +- PR #48: Statistics overhaul implementation +- Issue #XX: TradingSuite statistics showing zeros +- Issue #XX: OrderManager memory leak in fill_times +- v3.2.0 Release: Type safety improvements set foundation \ No newline at end of file diff --git a/examples/20_statistics_usage.py b/examples/20_statistics_usage.py new file mode 100644 index 0000000..f56f745 --- /dev/null +++ b/examples/20_statistics_usage.py @@ -0,0 +1,506 @@ +""" +Example demonstrating comprehensive statistics collection and monitoring. + +This example shows how to: +1. Collect real-time performance metrics +2. Monitor error rates and types +3. Track memory usage across components +4. Export statistics for external monitoring +5. Use statistics to make strategy decisions + +Author: SDK Team +Date: 2024-12-20 +""" + +import asyncio + +from project_x_py import TradingSuite + + +async def cleanup_trading_activity(suite, orders_placed): + """Clean up any open orders and positions.""" + print("\n" + "=" * 60) + print("CLEANUP - ENSURING NO OPEN ORDERS OR POSITIONS") + print("=" * 60) + + cleanup_successful = True + + # Cancel any open orders + if orders_placed: + print(f"\n🧹 Cancelling {len(orders_placed)} test orders...") + for order_id in orders_placed: + try: + await suite.orders.cancel_order(order_id) + print(f" ✅ Cancelled order {order_id}") + except Exception as e: + print(f" ⚠️ Error cancelling order {order_id}: {e}") + cleanup_successful = False + + # Check for any remaining open orders + try: + print("\n🔍 Checking for any remaining open orders...") + open_orders = await suite.orders.search_open_orders() + if open_orders: + print(f" Found {len(open_orders)} open orders, cancelling all...") + cancelled = await suite.orders.cancel_all_orders() + print(f" ✅ Cancelled {cancelled} orders") + except Exception as e: + print(f" ⚠️ Error checking/cancelling open orders: {e}") + cleanup_successful = False + + # Close any open positions + try: + print("\n🔍 Checking for open positions...") + positions = await suite.positions.get_all_positions() + if positions: + print(f" Found {len(positions)} open positions") + for position in positions: + try: + print(f" Closing position in {position.symbol}...") + # Place market order to close position + close_size = abs(position.netPos) + close_side = ( + 1 if position.netPos > 0 else 0 + ) # Sell if long, Buy if short + + response = await suite.orders.place_market_order( + contract_id=position.contractId, + side=close_side, + size=close_size, + ) + if response.success: + print(f" ✅ Placed closing order for {position.symbol}") + else: + print(f" ⚠️ Failed to close position: {response.message}") + cleanup_successful = False + except Exception as e: + print(f" ⚠️ Error closing position {position.symbol}: {e}") + cleanup_successful = False + else: + print(" ✅ No open positions found") + except Exception as e: + print(f" ⚠️ Error checking positions: {e}") + cleanup_successful = False + + # Wait for orders to process + if not cleanup_successful: + print("\n⏳ Waiting for cleanup to process...") + await asyncio.sleep(2) + + return cleanup_successful + + +async def main(): + """Demonstrate statistics usage throughout the SDK.""" + + print("=" * 60) + print("ProjectX SDK Statistics Usage Example") + print("=" * 60) + + suite = None + orders_placed = [] + + try: + # Create trading suite with all components + suite = await TradingSuite.create( + instrument="MNQ", + timeframes=["1min", "5min"], + features=["orderbook", "risk_manager"], # All features enabled + initial_days=1, + ) + + print(f"\n✅ Trading suite initialized for {suite.instrument}") + if suite.client.account_info: + print(f" Account: {suite.client.account_info.name}") + + # ========================================================================= + # 1. GENERATE REAL TRADING ACTIVITY + # ========================================================================= + print("\n" + "=" * 60) + print("1. GENERATING TRADING ACTIVITY FOR STATISTICS") + print("=" * 60) + + print("\n📈 Placing test orders to generate statistics...") + + # Get current market price + current_price = await suite.data.get_current_price() + if current_price: + print(f" Current {suite.instrument} price: ${current_price:,.2f}") + else: + # Fallback to a reasonable test price if market is closed + current_price = 20000.0 + print(f" Using test price: ${current_price:,.2f}") + + # Place buy limit orders below market + for i in range(3): + offset = 50 * (i + 1) # 50, 100, 150 points below + limit_price = current_price - offset + + print(f"\n Placing buy limit order at ${limit_price:,.2f}...") + response = await suite.orders.place_limit_order( + contract_id=str(suite.instrument_id), + side=0, # Buy + size=1, + limit_price=limit_price, + ) + + if response.success: + orders_placed.append(response.orderId) + print(f" ✅ Order placed: {response.orderId}") + + # Track custom operation + if hasattr(suite.orders, "track_operation"): + await suite.orders.track_operation( + "example_limit_order", + 15.5 + i * 2, # Simulate varying latencies + success=True, + metadata={"offset": offset}, + ) + else: + print(f" ❌ Order failed: {response.errorMessage}") + # Track error + if hasattr(suite.orders, "track_error"): + await suite.orders.track_error( + ValueError(f"Order placement failed: {response.errorMessage}"), + context="example_order_placement", + ) + + # Place sell limit orders above market + for i in range(2): + offset = 50 * (i + 1) + limit_price = current_price + offset + + print(f"\n Placing sell limit order at ${limit_price:,.2f}...") + response = await suite.orders.place_limit_order( + contract_id=str(suite.instrument_id), + side=1, # Sell + size=1, + limit_price=limit_price, + ) + + if response.success: + orders_placed.append(response.orderId) + print(f" ✅ Order placed: {response.orderId}") + + # ========================================================================= + # 2. COMPONENT-LEVEL STATISTICS + # ========================================================================= + print("\n" + "=" * 60) + print("2. COMPONENT-LEVEL STATISTICS") + print("=" * 60) + + # Get order manager statistics + if hasattr(suite.orders, "get_order_statistics"): + order_stats = suite.orders.get_order_statistics() + print("\n📊 Order Manager Statistics:") + print(f" Orders placed: {order_stats['orders_placed']}") + print(f" Orders filled: {order_stats['orders_filled']}") + print(f" Orders cancelled: {order_stats['orders_cancelled']}") + print(f" Orders rejected: {order_stats['orders_rejected']}") + print(f" Fill rate: {order_stats['fill_rate']:.1%}") + print(f" Avg fill time: {order_stats['avg_fill_time_ms']:.2f}ms") + + # Get position manager statistics + if hasattr(suite.positions, "get_position_statistics"): + position_stats = suite.positions.get_position_statistics() + print("\n📊 Position Manager Statistics:") + print(f" Positions tracked: {position_stats['total_positions']}") + print(f" Total P&L: ${position_stats['total_pnl']:.2f}") + print(f" Win rate: {position_stats['win_rate']:.1%}") + + # Get data manager statistics + if hasattr(suite.data, "get_memory_stats"): + data_stats = suite.data.get_memory_stats() + print("\n📊 Data Manager Statistics:") + print(f" Bars processed: {data_stats.get('total_bars', 0)}") + print(f" Ticks processed: {data_stats.get('ticks_processed', 0)}") + print(f" Quotes processed: {data_stats.get('quotes_processed', 0)}") + print(f" Memory usage: {data_stats.get('memory_usage_mb', 0):.2f}MB") + + # ========================================================================= + # 3. ENHANCED PERFORMANCE METRICS + # ========================================================================= + print("\n" + "=" * 60) + print("3. ENHANCED PERFORMANCE METRICS") + print("=" * 60) + + if hasattr(suite.orders, "get_performance_metrics"): + perf_metrics = suite.orders.get_performance_metrics() + + print("\n⚡ Order Manager Performance:") + + # Operation-level metrics + if "operation_stats" in perf_metrics: + for op_name, op_stats in perf_metrics["operation_stats"].items(): + print(f"\n {op_name}:") + print(f" Count: {op_stats['count']}") + print(f" Avg: {op_stats['avg_ms']:.2f}ms") + print(f" P50: {op_stats['p50_ms']:.2f}ms") + print(f" P95: {op_stats['p95_ms']:.2f}ms") + print(f" P99: {op_stats['p99_ms']:.2f}ms") + + # Network performance + if "network_stats" in perf_metrics: + net_stats = perf_metrics["network_stats"] + print("\n Network Performance:") + print(f" Total requests: {net_stats['total_requests']}") + print(f" Success rate: {net_stats['success_rate']:.1%}") + print(f" WebSocket reconnects: {net_stats['websocket_reconnects']}") + + # ========================================================================= + # 4. AGGREGATED SUITE STATISTICS + # ========================================================================= + print("\n" + "=" * 60) + print("4. AGGREGATED SUITE STATISTICS") + print("=" * 60) + + # Get aggregated statistics from all components + suite_stats = await suite.get_stats() + + print("\n🎯 Trading Suite Overview:") + print(f" Health Score: {suite_stats.get('health_score', 0):.1f}/100") + print(f" Total API Calls: {suite_stats.get('total_api_calls', 0)}") + print(f" Cache Hit Rate: {suite_stats.get('cache_hit_rate', 0):.1%}") + print(f" Active Subscriptions: {suite_stats.get('active_subscriptions', 0)}") + print(f" WebSocket Connected: {suite_stats.get('realtime_connected', False)}") + + # Cross-component metrics + print("\n🔄 Cross-Component Metrics:") + total_operations = sum( + len(comp.get("performance_metrics", {}).get("operation_stats", {})) + for comp in suite_stats.get("components", {}).values() + if isinstance(comp, dict) + ) + print(f" Total operations: {total_operations}") + + error_rate = suite_stats.get("total_errors", 0) / max( + suite_stats.get("total_api_calls", 1), 1 + ) + print(f" Overall error rate: {error_rate:.2%}") + print(f" Total memory: {suite_stats.get('memory_usage_mb', 0):.2f}MB") + + # ========================================================================= + # 5. EXPORT FOR EXTERNAL MONITORING + # ========================================================================= + print("\n" + "=" * 60) + print("5. EXPORT FOR EXTERNAL MONITORING") + print("=" * 60) + + # Export statistics in different formats + if hasattr(suite.orders, "export_stats"): + # JSON export for logging/storage + json_stats = suite.orders.export_stats("json") + + print("\n📄 JSON Export (sample):") + # Show a subset of the JSON export + if isinstance(json_stats, dict): + export_sample = { + "timestamp": json_stats.get("timestamp"), + "component": json_stats.get("component"), + "performance": { + "uptime_seconds": json_stats.get("performance", {}).get( + "uptime_seconds" + ), + "api_stats": { + "count": len( + json_stats.get("performance", {}) + .get("operation_stats", {}) + .keys() + ) + }, + }, + "errors": { + "total_errors": json_stats.get("errors", {}).get( + "total_errors" + ), + "errors_last_hour": json_stats.get("errors", {}).get( + "errors_last_hour" + ), + }, + } + else: + export_sample = {"error": "Invalid export format"} + print(" Error: JSON export returned unexpected format") + + import json + + print(json.dumps(export_sample, indent=2)) + + # Prometheus export for monitoring systems + prometheus_stats = suite.orders.export_stats("prometheus") + + print("\n📊 Prometheus Export (sample):") + # Show first few lines of Prometheus format + lines = ( + prometheus_stats.split("\n")[:5] + if isinstance(prometheus_stats, str) + else [] + ) + for line in lines: + print(f" {line}") + + # ========================================================================= + # 6. ERROR TRACKING + # ========================================================================= + print("\n" + "=" * 60) + print("6. ERROR TRACKING") + print("=" * 60) + + if hasattr(suite.orders, "get_error_stats"): + error_stats = suite.orders.get_error_stats() + + print("\n❌ Error Statistics:") + print(f" Total errors: {error_stats['total_errors']}") + print(f" Errors in last hour: {error_stats['errors_last_hour']}") + + if error_stats["error_types"]: + print(f" Error types: {', '.join(error_stats['error_types'].keys())}") + else: + print(" Error types: None") + + if error_stats.get("recent_errors"): + print("\n Recent errors:") + for error in error_stats["recent_errors"][-3:]: # Show last 3 errors + print(f" - {error['error_type']}: {error['message']}") + + # ========================================================================= + # 7. MEMORY MANAGEMENT + # ========================================================================= + print("\n" + "=" * 60) + print("7. MEMORY MANAGEMENT") + print("=" * 60) + + print("\n💾 Memory Usage by Component:") + + # Check memory for each component + total_memory = 0.0 + components = [ + ("Orders", suite.orders), + ("Positions", suite.positions), + ("Data", suite.data), + ("Risk", suite.risk_manager), + ("OrderBook", suite.orderbook), + ] + + for name, component in components: + if not component: + continue + + if hasattr(component, "get_enhanced_memory_stats"): + mem_stats = component.get_enhanced_memory_stats() + memory_mb = mem_stats["current_memory_mb"] + total_memory += memory_mb + print(f" {name}: {memory_mb:.3f}MB") + elif hasattr(component, "get_memory_stats"): + # get_memory_stats is now consistently synchronous across all components + mem_stats = component.get_memory_stats() + memory_mb = mem_stats.get("memory_usage_mb", 0) + total_memory += memory_mb + print(f" {name}: {memory_mb:.3f}MB") + + print(f" Total: {total_memory:.3f}MB") + + # ========================================================================= + # 8. DATA QUALITY METRICS + # ========================================================================= + print("\n" + "=" * 60) + print("8. DATA QUALITY METRICS") + print("=" * 60) + + if hasattr(suite.data, "get_data_quality_stats"): + quality_stats = suite.data.get_data_quality_stats() + + print("\n📊 Data Quality:") + print(f" Quality Score: {quality_stats['quality_score']:.1f}%") + print(f" Invalid Rate: {quality_stats['invalid_rate']:.2%}") + print(f" Total Points: {quality_stats['total_data_points']}") + print(f" Invalid Points: {quality_stats['invalid_data_points']}") + + # ========================================================================= + # 9. FINAL STATISTICS SUMMARY + # ========================================================================= + print("\n" + "=" * 60) + print("9. FINAL STATISTICS SUMMARY") + print("=" * 60) + + # Get final statistics after all operations + final_order_stats = suite.orders.get_order_statistics() + + print("\n📊 Session Summary:") + print(f" Total orders placed: {final_order_stats['orders_placed']}") + print(f" Total orders cancelled: {final_order_stats['orders_cancelled']}") + print( + f" Session duration: {suite.orders.get_performance_metrics().get('uptime_seconds', 0):.1f}s" + ) + + # Show how statistics can be used in strategy decisions + print("\n💡 Using Statistics for Strategy Decisions:") + + if hasattr(suite.orders, "get_performance_metrics"): + perf = suite.orders.get_performance_metrics() + + # Check network performance + if "network_stats" in perf: + success_rate = perf["network_stats"].get("success_rate", 0) + + if success_rate < 0.95: + print(f" ⚠️ Low API success rate ({success_rate:.1%})") + print(" → Strategy should reduce order frequency") + else: + print(f" ✅ High API success rate ({success_rate:.1%})") + print(" → Strategy can maintain normal operation") + + # Check error rates + if hasattr(suite.orders, "get_error_stats"): + errors = suite.orders.get_error_stats() + if errors["errors_last_hour"] > 10: + print(f" ⚠️ High error rate ({errors['errors_last_hour']} errors/hour)") + print(" → Strategy should switch to safe mode") + else: + print(f" ✅ Low error rate ({errors['errors_last_hour']} errors/hour)") + print(" → Strategy can continue normal trading") + + # Check memory usage + suite_stats = await suite.get_stats() + total_mem = suite_stats.get("total_memory_mb", 0) + if total_mem > 100: + print(f" ⚠️ High memory usage ({total_mem:.1f}MB)") + print(" → Trigger cleanup or reduce data retention") + else: + print(f" ✅ Normal memory usage ({total_mem:.1f}MB)") + print(" → No memory concerns") + + except Exception as e: + print(f"\n❌ Error during example execution: {e}") + import traceback + + traceback.print_exc() + + finally: + # Always clean up, even if there was an error + if suite: + # Perform cleanup + cleanup_success = await cleanup_trading_activity(suite, orders_placed) + + if cleanup_success: + print("\n✅ Cleanup successful!") + else: + print("\n⚠️ Cleanup completed with warnings") + + # Disconnect + print("\n" + "=" * 60) + print("Disconnecting...") + await suite.disconnect() + print("✅ Example complete!") + + print("\nKey Takeaways:") + print("• SDK provides comprehensive statistics without UI components") + print("• All statistics are easily accessible via async methods") + print("• Export formats support external monitoring systems") + print("• Statistics can drive adaptive strategy behavior") + print("• Memory and performance metrics help prevent issues") + print("• Always clean up open orders and positions on exit") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/project_x_py/order_manager/core.py b/src/project_x_py/order_manager/core.py index de76a7b..d275914 100644 --- a/src/project_x_py/order_manager/core.py +++ b/src/project_x_py/order_manager/core.py @@ -57,6 +57,7 @@ async def main(): """ import asyncio +import time from datetime import datetime from typing import TYPE_CHECKING, Any, Optional @@ -66,6 +67,7 @@ async def main(): from project_x_py.types.stats_types import OrderManagerStats from project_x_py.types.trading import OrderStatus from project_x_py.utils import ( + EnhancedStatsTrackingMixin, ErrorMessages, LogContext, LogMessages, @@ -74,7 +76,6 @@ async def main(): handle_errors, validate_response, ) -from project_x_py.utils.stats_tracking import StatsTrackingMixin from .bracket_orders import BracketOrderMixin from .order_types import OrderTypesMixin @@ -94,7 +95,7 @@ class OrderManager( OrderTypesMixin, BracketOrderMixin, PositionOrderMixin, - StatsTrackingMixin, + EnhancedStatsTrackingMixin, ): """ Async comprehensive order management system for ProjectX trading operations. @@ -168,7 +169,13 @@ def __init__( """ # Initialize mixins OrderTrackingMixin.__init__(self) - StatsTrackingMixin._init_stats_tracking(self) + EnhancedStatsTrackingMixin._init_enhanced_stats( + self, + max_errors=100, + max_timings=1000, + retention_hours=24, + enable_profiling=False, + ) self.project_x = project_x_client self.event_bus = event_bus # Store the event bus for emitting events @@ -405,63 +412,92 @@ async def place_order( }, ) - async with self.order_lock: - # Align all prices to tick size to prevent "Invalid price" errors - aligned_limit_price = await align_price_to_tick_size( - limit_price, contract_id, self.project_x - ) - aligned_stop_price = await align_price_to_tick_size( - stop_price, contract_id, self.project_x - ) - aligned_trail_price = await align_price_to_tick_size( - trail_price, contract_id, self.project_x - ) + # Align all prices to tick size to prevent "Invalid price" errors + aligned_limit_price = await align_price_to_tick_size( + limit_price, contract_id, self.project_x + ) + aligned_stop_price = await align_price_to_tick_size( + stop_price, contract_id, self.project_x + ) + aligned_trail_price = await align_price_to_tick_size( + trail_price, contract_id, self.project_x + ) - # Use account_info if no account_id provided - if account_id is None: - if not self.project_x.account_info: - raise ProjectXOrderError(ErrorMessages.ORDER_NO_ACCOUNT) - account_id = self.project_x.account_info.id - - # Build order request payload - payload = { - "accountId": account_id, - "contractId": contract_id, - "type": order_type, - "side": side, - "size": size, - "limitPrice": aligned_limit_price, - "stopPrice": aligned_stop_price, - "trailPrice": aligned_trail_price, - "linkedOrderId": linked_order_id, - } + # Use account_info if no account_id provided + if account_id is None: + if not self.project_x.account_info: + raise ProjectXOrderError(ErrorMessages.ORDER_NO_ACCOUNT) + account_id = self.project_x.account_info.id - # Only include customTag if it's provided and not None/empty - if custom_tag: - payload["customTag"] = custom_tag + # Build order request payload + payload = { + "accountId": account_id, + "contractId": contract_id, + "type": order_type, + "side": side, + "size": size, + "limitPrice": aligned_limit_price, + "stopPrice": aligned_stop_price, + "trailPrice": aligned_trail_price, + "linkedOrderId": linked_order_id, + } - # Place the order - response = await self.project_x._make_request( - "POST", "/Order/place", data=payload - ) + # Only include customTag if it's provided and not None/empty + if custom_tag: + payload["customTag"] = custom_tag - if not response.get("success", False): - error_msg = response.get("errorMessage", ErrorMessages.ORDER_FAILED) - error = ProjectXOrderError(error_msg) - self._track_error( - error, "place_order", {"contract_id": contract_id, "side": side} - ) - raise error + # Place the order with timing + start_time = time.time() + response = await self.project_x._make_request( + "POST", "/Order/place", data=payload + ) + duration_ms = (time.time() - start_time) * 1000 + + # Response should be a dict for order placement + if not isinstance(response, dict): + error = ProjectXOrderError("Invalid response format") + # Track error without holding locks + await self.track_error( + error, "place_order", {"contract_id": contract_id, "side": side} + ) + await self.track_operation("place_order", duration_ms, success=False) + raise error + + if not response.get("success", False): + error_msg = response.get("errorMessage", ErrorMessages.ORDER_FAILED) + error = ProjectXOrderError(error_msg) + # Track error without holding locks + await self.track_error( + error, "place_order", {"contract_id": contract_id, "side": side} + ) + await self.track_operation("place_order", duration_ms, success=False) + raise error + + result = OrderPlaceResponse( + orderId=response.get("orderId", 0), + success=response.get("success", False), + errorCode=response.get("errorCode", ""), + errorMessage=response.get("errorMessage", ""), + ) - result = OrderPlaceResponse(**response) + # Track successful operation without holding locks + await self.track_operation( + "place_order", + duration_ms, + success=True, + metadata={"size": size, "order_type": order_type}, + ) - # Update statistics + # Update statistics with order_lock + async with self.order_lock: self.stats["orders_placed"] += 1 self.stats["last_order_time"] = datetime.now() self.stats["total_volume"] += size if size > self.stats["largest_order"]: self.stats["largest_order"] = size - self._update_activity() + self._last_activity = ( + datetime.now() + ) # Update activity timestamp directly self.logger.info( LogMessages.ORDER_PLACED, @@ -524,6 +560,10 @@ async def search_open_orders( "POST", "/Order/searchOpen", data=params ) + # Response should be a dict for order search + if not isinstance(response, dict): + raise ProjectXOrderError("Invalid response format") + if not response.get("success", False): error_msg = response.get("errorMessage", ErrorMessages.ORDER_SEARCH_FAILED) raise ProjectXOrderError(error_msg) @@ -645,6 +685,10 @@ async def cancel_order(self, order_id: int, account_id: int | None = None) -> bo "POST", "/Order/cancel", data=payload ) + # Response should be a dict + if not isinstance(response, dict): + raise ProjectXOrderError("Invalid response format") + success = response.get("success", False) if response else False if success: @@ -743,6 +787,10 @@ async def modify_order( "POST", "/Order/modify", data=payload ) + # Response should be a dict + if not isinstance(response, dict): + raise ProjectXOrderError("Invalid response format") + if response and response.get("success", False): # Update statistics async with self.order_lock: @@ -834,7 +882,7 @@ async def cancel_all_orders( return results - async def get_order_statistics(self) -> OrderManagerStats: + def get_order_statistics(self) -> OrderManagerStats: """ Get comprehensive order management statistics and system health information. @@ -844,101 +892,104 @@ async def get_order_statistics(self) -> OrderManagerStats: Returns: Dict with complete statistics """ - async with self.order_lock: - # Use internal order tracking - _tracked_orders_count = len(self.tracked_orders) - - # Count position-order relationships - total_position_orders = 0 - position_summary = {} - for contract_id, orders in self.position_orders.items(): - entry_count = len(orders["entry_orders"]) - stop_count = len(orders["stop_orders"]) - target_count = len(orders["target_orders"]) - total_count = entry_count + stop_count + target_count - - if total_count > 0: - total_position_orders += total_count - position_summary[contract_id] = { - "entry": entry_count, - "stop": stop_count, - "target": target_count, - "total": total_count, - } - - # Callbacks now handled through EventBus - _callback_counts: dict[str, int] = {} - - # Calculate performance metrics - fill_rate = ( - self.stats["orders_filled"] / self.stats["orders_placed"] - if self.stats["orders_placed"] > 0 - else 0.0 - ) + # Note: This is now synchronous but thread-safe + # We make quick copies to minimize time accessing shared data + + # Make a copy of stats to work with + stats_copy = dict(self.stats) + + # Use internal order tracking + _tracked_orders_count = len(self.tracked_orders) + + # Count position-order relationships + total_position_orders = 0 + position_summary = {} + for contract_id, orders in self.position_orders.items(): + entry_count = len(orders["entry_orders"]) + stop_count = len(orders["stop_orders"]) + target_count = len(orders["target_orders"]) + total_count = entry_count + stop_count + target_count + + if total_count > 0: + total_position_orders += total_count + position_summary[contract_id] = { + "entry": entry_count, + "stop": stop_count, + "target": target_count, + "total": total_count, + } - rejection_rate = ( - self.stats["orders_rejected"] / self.stats["orders_placed"] - if self.stats["orders_placed"] > 0 - else 0.0 - ) + # Now calculate metrics + # Calculate performance metrics + fill_rate = ( + stats_copy["orders_filled"] / stats_copy["orders_placed"] + if stats_copy["orders_placed"] > 0 + else 0.0 + ) - avg_fill_time_ms = ( - sum(self.stats["fill_times_ms"]) / len(self.stats["fill_times_ms"]) - if self.stats["fill_times_ms"] - else 0.0 - ) + rejection_rate = ( + stats_copy["orders_rejected"] / stats_copy["orders_placed"] + if stats_copy["orders_placed"] > 0 + else 0.0 + ) - avg_order_response_time_ms = ( - sum(self.stats["order_response_times_ms"]) - / len(self.stats["order_response_times_ms"]) - if self.stats["order_response_times_ms"] - else 0.0 - ) + # Calculate basic timing metrics + avg_order_response_time_ms = ( + sum(stats_copy["order_response_times_ms"]) + / len(stats_copy["order_response_times_ms"]) + if stats_copy["order_response_times_ms"] + else 0.0 + ) - avg_order_size = ( - self.stats["total_volume"] / self.stats["orders_placed"] - if self.stats["orders_placed"] > 0 - else 0.0 - ) + avg_fill_time_ms = ( + sum(stats_copy["fill_times_ms"]) / len(stats_copy["fill_times_ms"]) + if stats_copy["fill_times_ms"] + else 0.0 + ) + fastest_fill_ms = ( + min(stats_copy["fill_times_ms"]) if stats_copy["fill_times_ms"] else 0.0 + ) + slowest_fill_ms = ( + max(stats_copy["fill_times_ms"]) if stats_copy["fill_times_ms"] else 0.0 + ) - fastest_fill_ms = ( - min(self.stats["fill_times_ms"]) if self.stats["fill_times_ms"] else 0.0 - ) - slowest_fill_ms = ( - max(self.stats["fill_times_ms"]) if self.stats["fill_times_ms"] else 0.0 - ) + avg_order_size = ( + stats_copy["total_volume"] / stats_copy["orders_placed"] + if stats_copy["orders_placed"] > 0 + else 0.0 + ) - return { - "orders_placed": self.stats["orders_placed"], - "orders_filled": self.stats["orders_filled"], - "orders_cancelled": self.stats["orders_cancelled"], - "orders_rejected": self.stats["orders_rejected"], - "orders_modified": self.stats["orders_modified"], - # Performance metrics - "fill_rate": fill_rate, - "avg_fill_time_ms": avg_fill_time_ms, - "rejection_rate": rejection_rate, - # Order types - "market_orders": self.stats["market_orders"], - "limit_orders": self.stats["limit_orders"], - "stop_orders": self.stats["stop_orders"], - "bracket_orders": self.stats["bracket_orders"], - # Timing statistics - "last_order_time": self.stats["last_order_time"].isoformat() - if self.stats["last_order_time"] - else None, - "avg_order_response_time_ms": avg_order_response_time_ms, - "fastest_fill_ms": fastest_fill_ms, - "slowest_fill_ms": slowest_fill_ms, - # Volume and value - "total_volume": self.stats["total_volume"], - "total_value": self.stats["total_value"], - "avg_order_size": avg_order_size, - "largest_order": self.stats["largest_order"], - # Risk metrics - "risk_violations": self.stats["risk_violations"], - "order_validation_failures": self.stats["order_validation_failures"], - } + return { + "orders_placed": stats_copy["orders_placed"], + "orders_filled": stats_copy["orders_filled"], + "orders_cancelled": stats_copy["orders_cancelled"], + "orders_rejected": stats_copy["orders_rejected"], + "orders_modified": stats_copy["orders_modified"], + # Performance metrics + "fill_rate": fill_rate, + "avg_fill_time_ms": avg_fill_time_ms, + "rejection_rate": rejection_rate, + # Order types + "market_orders": stats_copy["market_orders"], + "limit_orders": stats_copy["limit_orders"], + "stop_orders": stats_copy["stop_orders"], + "bracket_orders": stats_copy["bracket_orders"], + # Timing statistics + "last_order_time": stats_copy["last_order_time"].isoformat() + if stats_copy["last_order_time"] + else None, + "avg_order_response_time_ms": avg_order_response_time_ms, + "fastest_fill_ms": fastest_fill_ms, + "slowest_fill_ms": slowest_fill_ms, + # Volume and value + "total_volume": stats_copy["total_volume"], + "total_value": stats_copy["total_value"], + "avg_order_size": avg_order_size, + "largest_order": stats_copy["largest_order"], + # Risk metrics + "risk_violations": stats_copy["risk_violations"], + "order_validation_failures": stats_copy["order_validation_failures"], + } async def cleanup(self) -> None: """Clean up resources and connections.""" diff --git a/src/project_x_py/order_manager/tracking.py b/src/project_x_py/order_manager/tracking.py index 3fc2652..5a16fb2 100644 --- a/src/project_x_py/order_manager/tracking.py +++ b/src/project_x_py/order_manager/tracking.py @@ -410,7 +410,7 @@ async def fill_handler(event: Any) -> None: event_order_id = event_data.get("order_id") if not event_order_id and "order" in event_data: order_obj = event_data.get("order") - if hasattr(order_obj, "id"): + if order_obj and hasattr(order_obj, "id"): event_order_id = order_obj.id if event_order_id == order_id: is_filled = True @@ -425,7 +425,7 @@ async def terminal_handler(event: Any) -> None: event_order_id = event_data.get("order_id") if not event_order_id and "order" in event_data: order_obj = event_data.get("order") - if hasattr(order_obj, "id"): + if order_obj and hasattr(order_obj, "id"): event_order_id = order_obj.id if event_order_id == order_id: is_filled = False diff --git a/src/project_x_py/orderbook/__init__.py b/src/project_x_py/orderbook/__init__.py index ccc13fd..901e260 100644 --- a/src/project_x_py/orderbook/__init__.py +++ b/src/project_x_py/orderbook/__init__.py @@ -443,14 +443,14 @@ async def get_spread_analysis( return await self.profile.get_spread_analysis(window_minutes) # Delegate memory methods - async def get_memory_stats(self) -> OrderbookStats: + def get_memory_stats(self) -> OrderbookStats: """ Get comprehensive memory usage statistics. Delegates to MemoryManager.get_memory_stats(). See MemoryManager.get_memory_stats() for complete documentation. """ - return await self.memory_manager.get_memory_stats() + return self.memory_manager.get_memory_stats() async def cleanup(self) -> None: """Clean up resources and disconnect from real-time feeds.""" diff --git a/src/project_x_py/orderbook/memory.py b/src/project_x_py/orderbook/memory.py index 2e4912c..b07b5ad 100644 --- a/src/project_x_py/orderbook/memory.py +++ b/src/project_x_py/orderbook/memory.py @@ -327,7 +327,7 @@ async def _cleanup_market_history(self) -> None: # Delta history - deque handles its own cleanup with maxlen # No manual cleanup needed for deque with maxlen - async def get_memory_stats(self) -> OrderbookStats: + def get_memory_stats(self) -> OrderbookStats: """ Get comprehensive memory usage statistics. @@ -337,7 +337,7 @@ async def get_memory_stats(self) -> OrderbookStats: debugging memory issues, and validating that the cleanup strategies are working as expected. - The method is thread-safe and acquires the orderbook lock during execution. + Note: This is a synchronous method that computes stats immediately. Returns: Dict containing comprehensive memory statistics including: @@ -369,85 +369,85 @@ async def get_memory_stats(self) -> OrderbookStats: >>> print(f"Items cleaned: {stats['trades_cleaned'] + stats['depth_cleaned'] + " ... f"stats['history_cleaned']}") """ - async with self.orderbook.orderbook_lock: - # Calculate current depth statistics - bid_depth = self.orderbook.orderbook_bids.height - ask_depth = self.orderbook.orderbook_asks.height - - # Calculate trade statistics - trades_count = self.memory_stats.get("total_trades", 0) - total_volume = self.memory_stats.get("total_volume", 0) - avg_trade_size = total_volume / trades_count if trades_count > 0 else 0.0 - - # Calculate memory usage (rough estimate) - memory_usage_mb = ( - (bid_depth + ask_depth) * 0.0001 # Depth data - + self.orderbook.recent_trades.height * 0.0001 # Trade data - + len(self.orderbook.price_level_history) * 0.0001 # History data - ) + # Note: This method is synchronous and doesn't acquire locks + # It provides a snapshot of current stats without blocking + + # Calculate current depth statistics + bid_depth = self.orderbook.orderbook_bids.height + ask_depth = self.orderbook.orderbook_asks.height + + # Calculate trade statistics + trades_count = self.memory_stats.get("total_trades", 0) + total_volume = self.memory_stats.get("total_volume", 0) + avg_trade_size = total_volume / trades_count if trades_count > 0 else 0.0 + + # Calculate memory usage (rough estimate) + memory_usage_mb = ( + (bid_depth + ask_depth) * 0.0001 # Depth data + + self.orderbook.recent_trades.height * 0.0001 # Trade data + + len(self.orderbook.price_level_history) * 0.0001 # History data + ) - # Calculate spread from current best prices - best_bid = ( - float(self.orderbook.best_bid_history[-1]["price"]) - if self.orderbook.best_bid_history - else 0.0 - ) - best_ask = ( - float(self.orderbook.best_ask_history[-1]["price"]) - if self.orderbook.best_ask_history - else 0.0 - ) - current_spread = ( - best_ask - best_bid if best_bid > 0 and best_ask > 0 else 0.0 + # Calculate spread from current best prices + best_bid = ( + float(self.orderbook.best_bid_history[-1]["price"]) + if self.orderbook.best_bid_history + else 0.0 + ) + best_ask = ( + float(self.orderbook.best_ask_history[-1]["price"]) + if self.orderbook.best_ask_history + else 0.0 + ) + current_spread = best_ask - best_bid if best_bid > 0 and best_ask > 0 else 0.0 + + # Calculate spread volatility from history + spreads = [ + float(ask["price"]) - float(bid["price"]) + for bid, ask in zip( + self.orderbook.best_bid_history, + self.orderbook.best_ask_history, + strict=False, ) - - # Calculate spread volatility from history - spreads = [ - float(ask["price"]) - float(bid["price"]) - for bid, ask in zip( - self.orderbook.best_bid_history, - self.orderbook.best_ask_history, - strict=False, - ) - if float(bid["price"]) > 0 and float(ask["price"]) > 0 - ] - spread_volatility = 0.0 - if len(spreads) > 1: - avg_spread = sum(spreads) / len(spreads) - spread_volatility = ( - sum((s - avg_spread) ** 2 for s in spreads) / len(spreads) - ) ** 0.5 - - return { - # Depth statistics - "avg_bid_depth": bid_depth, - "avg_ask_depth": ask_depth, - "max_bid_depth": self.memory_stats.get("max_bid_depth", bid_depth), - "max_ask_depth": self.memory_stats.get("max_ask_depth", ask_depth), - # Trade statistics - "trades_processed": trades_count, - "avg_trade_size": avg_trade_size, - "largest_trade": self.memory_stats.get("largest_trade", 0), - "total_volume": total_volume, - # Market microstructure - "avg_spread": current_spread, - "spread_volatility": spread_volatility, - "price_levels": bid_depth + ask_depth, - "order_clustering": 0.0, # Would need more complex calculation - # Pattern detection - "icebergs_detected": self.memory_stats.get("icebergs_detected", 0), - "spoofing_alerts": self.memory_stats.get("spoofing_alerts", 0), - "unusual_patterns": self.memory_stats.get("unusual_patterns", 0), - # Performance metrics - "update_frequency_per_second": self.memory_stats.get( - "update_frequency", 0.0 - ), - "processing_latency_ms": self.memory_stats.get( - "processing_latency_ms", 0.0 - ), - "memory_usage_mb": memory_usage_mb, - # Data quality - "data_gaps": self.memory_stats.get("data_gaps", 0), - "invalid_updates": self.memory_stats.get("invalid_updates", 0), - "duplicate_updates": self.memory_stats.get("duplicate_updates", 0), - } + if float(bid["price"]) > 0 and float(ask["price"]) > 0 + ] + spread_volatility = 0.0 + if len(spreads) > 1: + avg_spread = sum(spreads) / len(spreads) + spread_volatility = ( + sum((s - avg_spread) ** 2 for s in spreads) / len(spreads) + ) ** 0.5 + + return { + # Depth statistics + "avg_bid_depth": bid_depth, + "avg_ask_depth": ask_depth, + "max_bid_depth": self.memory_stats.get("max_bid_depth", bid_depth), + "max_ask_depth": self.memory_stats.get("max_ask_depth", ask_depth), + # Trade statistics + "trades_processed": trades_count, + "avg_trade_size": avg_trade_size, + "largest_trade": self.memory_stats.get("largest_trade", 0), + "total_volume": total_volume, + # Market microstructure + "avg_spread": current_spread, + "spread_volatility": spread_volatility, + "price_levels": bid_depth + ask_depth, + "order_clustering": 0.0, # Would need more complex calculation + # Pattern detection + "icebergs_detected": self.memory_stats.get("icebergs_detected", 0), + "spoofing_alerts": self.memory_stats.get("spoofing_alerts", 0), + "unusual_patterns": self.memory_stats.get("unusual_patterns", 0), + # Performance metrics + "update_frequency_per_second": self.memory_stats.get( + "update_frequency", 0.0 + ), + "processing_latency_ms": self.memory_stats.get( + "processing_latency_ms", 0.0 + ), + "memory_usage_mb": memory_usage_mb, + # Data quality + "data_gaps": self.memory_stats.get("data_gaps", 0), + "invalid_updates": self.memory_stats.get("invalid_updates", 0), + "duplicate_updates": self.memory_stats.get("duplicate_updates", 0), + } diff --git a/src/project_x_py/position_manager/core.py b/src/project_x_py/position_manager/core.py index c7524e1..8333162 100644 --- a/src/project_x_py/position_manager/core.py +++ b/src/project_x_py/position_manager/core.py @@ -71,6 +71,7 @@ async def main(): """ import asyncio +import time from datetime import datetime from typing import TYPE_CHECKING, Any, Optional @@ -93,7 +94,7 @@ async def main(): ProjectXLogger, handle_errors, ) -from project_x_py.utils.stats_tracking import StatsTrackingMixin +from project_x_py.utils.enhanced_stats_tracking import EnhancedStatsTrackingMixin if TYPE_CHECKING: from project_x_py.order_manager import OrderManager @@ -107,7 +108,7 @@ class PositionManager( PositionMonitoringMixin, PositionOperationsMixin, PositionReportingMixin, - StatsTrackingMixin, + EnhancedStatsTrackingMixin, ): """ Async comprehensive position management system for ProjectX trading operations. @@ -224,7 +225,8 @@ def __init__( # Initialize all mixins PositionTrackingMixin.__init__(self) PositionMonitoringMixin.__init__(self) - StatsTrackingMixin._init_stats_tracking(self) + # Initialize enhanced stats tracking + self._init_enhanced_stats() self.project_x: ProjectXBase = project_x_client self.event_bus = event_bus # Store the event bus for emitting events @@ -435,10 +437,15 @@ async def get_all_positions(self, account_id: int | None = None) -> list[Positio In real-time mode, tracked positions are also updated via WebSocket, but this method always fetches fresh data from the API. """ + start_time = time.time() self.logger.info(LogMessages.POSITION_SEARCH, extra={"account_id": account_id}) positions = await self.project_x.search_open_positions(account_id=account_id) + # Track the operation timing + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation("get_all_positions", duration_ms, success=True) + # Update tracked positions async with self.position_lock: for position in positions: @@ -492,19 +499,39 @@ async def get_position( - Real-time mode: O(1) cache lookup, falls back to API if miss - Polling mode: Always makes API call via get_all_positions() """ + start_time = time.time() + # Try cached data first if real-time enabled if self._realtime_enabled: async with self.position_lock: cached_position = self.tracked_positions.get(contract_id) if cached_position: + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( + "get_position", + duration_ms, + success=True, + metadata={"cache_hit": True}, + ) return cached_position # Fallback to API search positions = await self.get_all_positions(account_id=account_id) for position in positions: if position.contractId == contract_id: + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( + "get_position", + duration_ms, + success=True, + metadata={"cache_hit": False}, + ) return position + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( + "get_position", duration_ms, success=False, metadata={"reason": "not_found"} + ) return None @handle_errors("refresh positions", reraise=False, default_return=False) diff --git a/src/project_x_py/position_manager/operations.py b/src/project_x_py/position_manager/operations.py index f9c2011..b0d063e 100644 --- a/src/project_x_py/position_manager/operations.py +++ b/src/project_x_py/position_manager/operations.py @@ -126,7 +126,9 @@ async def close_position_direct( - No price control - executes at current market price - For partial closes, use partially_close_position() """ - await self.project_x._ensure_authenticated() + # Ensure authenticated (using public method, not private _ensure_authenticated) + if not self.project_x.account_info: + await self.project_x.authenticate() if account_id is None: if not self.project_x.account_info: @@ -250,7 +252,9 @@ async def partially_close_position( - Remaining position continues with same average price - Close size must not exceed current position size """ - await self.project_x._ensure_authenticated() + # Ensure authenticated (using public method, not private _ensure_authenticated) + if not self.project_x.account_info: + await self.project_x.authenticate() if account_id is None: if not self.project_x.account_info: diff --git a/src/project_x_py/realtime_data_manager/core.py b/src/project_x_py/realtime_data_manager/core.py index 8a2f752..1bb5a32 100644 --- a/src/project_x_py/realtime_data_manager/core.py +++ b/src/project_x_py/realtime_data_manager/core.py @@ -142,6 +142,7 @@ async def on_new_bar(event): format_error_message, handle_errors, ) +from project_x_py.utils.enhanced_stats_tracking import EnhancedStatsTrackingMixin if TYPE_CHECKING: from project_x_py.client import ProjectXBase @@ -165,6 +166,7 @@ class RealtimeDataManager( CallbackMixin, DataAccessMixin, ValidationMixin, + EnhancedStatsTrackingMixin, ): """ Async optimized real-time OHLCV data manager for efficient multi-timeframe trading data. @@ -363,6 +365,9 @@ def __init__( # Initialize all mixins (they may need the above attributes) super().__init__() + # Initialize enhanced stats tracking + self._init_enhanced_stats() + # Set timezone for consistent timestamp handling self.timezone: Any = pytz.timezone(timezone) # CME timezone diff --git a/src/project_x_py/realtime_data_manager/data_processing.py b/src/project_x_py/realtime_data_manager/data_processing.py index 6eb574c..4bc2444 100644 --- a/src/project_x_py/realtime_data_manager/data_processing.py +++ b/src/project_x_py/realtime_data_manager/data_processing.py @@ -290,6 +290,9 @@ async def _process_tick_data(self, tick: dict[str, Any]) -> None: Args: tick: Dictionary containing tick data (timestamp, price, volume, etc.) """ + import time + + start_time = time.time() try: if not self.is_running: return @@ -329,8 +332,27 @@ async def _process_tick_data(self, tick: dict[str, Any]) -> None: self.memory_stats["ticks_processed"] += 1 await self._cleanup_old_data() + # Track operation timing if enhanced stats available + if hasattr(self, "track_operation"): + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( # pyright: ignore[reportAttributeAccessIssue] + "process_tick", + duration_ms, + success=True, + metadata={"price": price, "volume": volume}, + ) + except Exception as e: self.logger.error(f"Error processing tick data: {e}") + # Track failed operation if enhanced stats available + if hasattr(self, "track_operation"): + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( # pyright: ignore[reportAttributeAccessIssue] + "process_tick", + duration_ms, + success=False, + metadata={"error": str(e)}, + ) async def _update_timeframe_data( self, diff --git a/src/project_x_py/risk_manager/core.py b/src/project_x_py/risk_manager/core.py index ecfb74e..8ee27ae 100644 --- a/src/project_x_py/risk_manager/core.py +++ b/src/project_x_py/risk_manager/core.py @@ -22,7 +22,7 @@ ProjectXClientProtocol, RealtimeDataManagerProtocol, ) -from project_x_py.utils.stats_tracking import StatsTrackingMixin +from project_x_py.utils.enhanced_stats_tracking import EnhancedStatsTrackingMixin from .config import RiskConfig @@ -33,7 +33,7 @@ logger = logging.getLogger(__name__) -class RiskManager(StatsTrackingMixin): +class RiskManager(EnhancedStatsTrackingMixin): """Comprehensive risk management system for trading. Handles position sizing, risk validation, stop-loss management, @@ -68,7 +68,13 @@ def __init__( self.event_bus = event_bus self.config = config or RiskConfig() self.data_manager = data_manager - StatsTrackingMixin._init_stats_tracking(self) + # Initialize enhanced stats tracking + self._init_enhanced_stats( + max_errors=100, + max_timings=1000, + retention_hours=24, + enable_profiling=False, + ) # Track daily losses and trades self._daily_loss = Decimal("0") @@ -112,6 +118,9 @@ async def calculate_position_size( Returns: PositionSizingResponse with calculated size and risk metrics """ + import time + + start_time = time.time() try: # Get account info account = await self._get_account_info() @@ -160,7 +169,7 @@ async def calculate_position_size( if instrument: tick_size = float(instrument.tickSize) - return PositionSizingResponse( + result = PositionSizingResponse( position_size=position_size, risk_amount=actual_risk, risk_percent=actual_risk_percent, @@ -175,8 +184,22 @@ async def calculate_position_size( sizing_method="kelly" if use_kelly else "fixed_risk", ) + # Track successful operation + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( + "calculate_position_size", duration_ms, success=True + ) + + return result + except Exception as e: logger.error(f"Error calculating position size: {e}") + # Track failed operation + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation( + "calculate_position_size", duration_ms, success=False + ) + await self.track_error(e, "calculate_position_size") raise async def validate_trade( @@ -193,6 +216,9 @@ async def validate_trade( Returns: RiskValidationResponse with validation result and reasons """ + import time + + start_time = time.time() try: reasons = [] warnings = [] @@ -269,7 +295,7 @@ async def validate_trade( f"Multiple correlated positions ({correlated_count} positions)" ) - return RiskValidationResponse( + result = RiskValidationResponse( is_valid=is_valid, reasons=reasons, warnings=warnings, @@ -280,8 +306,18 @@ async def validate_trade( portfolio_risk=total_risk, ) + # Track successful operation + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation("validate_trade", duration_ms, success=True) + + return result + except Exception as e: logger.error(f"Error validating trade: {e}") + # Track failed operation + duration_ms = (time.time() - start_time) * 1000 + await self.track_operation("validate_trade", duration_ms, success=False) + await self.track_error(e, "validate_trade") return RiskValidationResponse( is_valid=False, reasons=[f"Validation error: {e!s}"], diff --git a/src/project_x_py/risk_manager/managed_trade.py b/src/project_x_py/risk_manager/managed_trade.py index f90efa0..4e56804 100644 --- a/src/project_x_py/risk_manager/managed_trade.py +++ b/src/project_x_py/risk_manager/managed_trade.py @@ -641,7 +641,7 @@ async def order_fill_handler(event: Any) -> None: event_order_id = event_data.get("order_id") if not event_order_id and "order" in event_data: order_obj = event_data.get("order") - if hasattr(order_obj, "id"): + if order_obj and hasattr(order_obj, "id"): event_order_id = order_obj.id if event_order_id == order.id: filled_successfully = True @@ -656,7 +656,7 @@ async def order_terminal_handler(event: Any) -> None: event_order_id = event_data.get("order_id") if not event_order_id and "order" in event_data: order_obj = event_data.get("order") - if hasattr(order_obj, "id"): + if order_obj and hasattr(order_obj, "id"): event_order_id = order_obj.id if event_order_id == order.id: filled_successfully = False diff --git a/src/project_x_py/trading_suite.py b/src/project_x_py/trading_suite.py index 5cad64f..a5d0e9e 100644 --- a/src/project_x_py/trading_suite.py +++ b/src/project_x_py/trading_suite.py @@ -61,8 +61,8 @@ PositionManagerConfig, ) from project_x_py.types.protocols import ProjectXClientProtocol -from project_x_py.types.stats_types import ComponentStats, TradingSuiteStats -from project_x_py.utils import ProjectXLogger +from project_x_py.types.stats_types import TradingSuiteStats +from project_x_py.utils import ProjectXLogger, StatisticsAggregator logger = ProjectXLogger.get_logger(__name__) @@ -233,6 +233,15 @@ def __init__( # Initialize unified event bus self.events = EventBus() + # Initialize statistics aggregator + self._stats_aggregator = StatisticsAggregator( + cache_ttl_seconds=5, + enable_caching=True, + ) + self._stats_aggregator.trading_suite = self + self._stats_aggregator.client = client + self._stats_aggregator.realtime_client = realtime_client + # Initialize core components with typed configs and event bus self.data = RealtimeDataManager( instrument=config.instrument, @@ -248,6 +257,10 @@ def __init__( client, config=config.get_order_manager_config(), event_bus=self.events ) + # Set aggregator references + self._stats_aggregator.order_manager = self.orders + self._stats_aggregator.data_manager = self.data + # Optional components self.orderbook: OrderBook | None = None self.risk_manager: RiskManager | None = None @@ -267,6 +280,9 @@ def __init__( config=config.get_position_manager_config(), ) + # Set aggregator reference + self._stats_aggregator.position_manager = self.positions + # Initialize risk manager if enabled and inject dependencies if Features.RISK_MANAGER in config.features: self.risk_manager = RiskManager( @@ -277,6 +293,7 @@ def __init__( config=config.get_risk_config(), ) self.positions.risk_manager = self.risk_manager + self._stats_aggregator.risk_manager = self.risk_manager # State tracking self._connected = False @@ -513,6 +530,7 @@ async def _initialize(self) -> None: subscribe_to_depth=True, subscribe_to_quotes=True, ) + self._stats_aggregator.orderbook = self.orderbook self._connected = True self._initialized = True @@ -817,128 +835,41 @@ async def wait_for( """ return await self.events.wait_for(event, timeout) - def get_stats(self) -> TradingSuiteStats: + async def get_stats(self) -> TradingSuiteStats: """ - Get comprehensive statistics from all components. + Get comprehensive statistics from all components using the aggregator. Returns: - Structured statistics from all active components + Structured statistics from all active components with accurate metrics """ - from datetime import datetime + return await self._stats_aggregator.aggregate_stats() - # Calculate uptime - uptime_seconds = ( - int((datetime.now() - self._created_at).total_seconds()) - if hasattr(self, "_created_at") - else 0 - ) + def get_stats_sync(self) -> TradingSuiteStats: + """ + Synchronous wrapper for get_stats for backward compatibility. - # Build component stats - components: dict[str, ComponentStats] = {} - if self.orders: - last_activity_obj = self.orders.stats.get("last_order_time") - components["order_manager"] = ComponentStats( - name="OrderManager", - status="connected" if self.orders else "disconnected", - uptime_seconds=uptime_seconds, - last_activity=last_activity_obj.isoformat() - if last_activity_obj - else None, - error_count=self.orders.get_error_stats()["total_errors"] - if hasattr(self.orders, "get_error_stats") - else 0, - memory_usage_mb=self.orders.get_memory_usage_mb() - if hasattr(self.orders, "get_memory_usage_mb") - else 0.0, - ) + Note: This is a deprecated method that will be removed in v4.0.0. + Use the async get_stats() method instead. - if self.positions: - last_activity_obj = self.positions.stats.get("last_position_update") - components["position_manager"] = ComponentStats( - name="PositionManager", - status="connected" if self.positions else "disconnected", - uptime_seconds=uptime_seconds, - last_activity=last_activity_obj.isoformat() - if last_activity_obj - else None, - error_count=self.positions.get_error_stats()["total_errors"] - if hasattr(self.positions, "get_error_stats") - else 0, - memory_usage_mb=self.positions.get_memory_usage_mb() - if hasattr(self.positions, "get_memory_usage_mb") - else 0.0, - ) - - if self.data: - last_activity_obj = self.data.memory_stats.get("last_update") - components["data_manager"] = ComponentStats( - name="RealtimeDataManager", - status="connected" if self.data else "disconnected", - uptime_seconds=uptime_seconds, - last_activity=last_activity_obj.isoformat() - if last_activity_obj - else None, - error_count=self.data.memory_stats.get("data_validation_errors", 0), - memory_usage_mb=self.data.memory_stats.get("memory_usage_mb", 0.0), - ) + Returns: + Structured statistics from all active components + """ + import asyncio + import warnings - if self.orderbook: - components["orderbook"] = ComponentStats( - name="OrderBook", - status="connected" if self.orderbook else "disconnected", - uptime_seconds=uptime_seconds, - last_activity=self.orderbook.last_orderbook_update.isoformat() - if self.orderbook.last_orderbook_update - else None, - error_count=self.orderbook.get_error_stats()["total_errors"] - if hasattr(self.orderbook, "get_error_stats") - else 0, - memory_usage_mb=self.orderbook.get_memory_usage_mb() - if hasattr(self.orderbook, "get_memory_usage_mb") - else 0.0, - ) + warnings.warn( + "get_stats_sync() is deprecated and will be removed in v4.0.0. " + "Use the async get_stats() method instead.", + DeprecationWarning, + stacklevel=2, + ) - if self.risk_manager: - components["risk_manager"] = ComponentStats( - name="RiskManager", - status="active" if self.risk_manager else "inactive", - uptime_seconds=uptime_seconds, - last_activity=self.risk_manager.get_activity_stats()["last_activity"] - if hasattr(self.risk_manager, "get_activity_stats") - else None, - error_count=self.risk_manager.get_error_stats()["total_errors"] - if hasattr(self.risk_manager, "get_error_stats") - else 0, - memory_usage_mb=self.risk_manager.get_memory_usage_mb() - if hasattr(self.risk_manager, "get_memory_usage_mb") - else 0.0, - ) + # Try to get or create event loop + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) - return { - "suite_id": getattr(self, "suite_id", "unknown"), - "instrument": self.instrument_id or self._symbol, - "created_at": getattr(self, "_created_at", datetime.now()).isoformat(), - "uptime_seconds": uptime_seconds, - "status": "active" if self.is_connected else "disconnected", - "connected": self.is_connected, - "components": components, - "realtime_connected": self.realtime.is_connected() - if self.realtime - else False, - "user_hub_connected": getattr(self.realtime, "user_connected", False) - if self.realtime - else False, - "market_hub_connected": getattr(self.realtime, "market_connected", False) - if self.realtime - else False, - "total_api_calls": 0, - "successful_api_calls": 0, - "failed_api_calls": 0, - "avg_response_time_ms": 0.0, - "cache_hit_rate": 0.0, - "memory_usage_mb": 0.0, - "active_subscriptions": 0, - "message_queue_size": 0, - "features_enabled": [f.value for f in self.config.features], - "timeframes": self.config.timeframes, - } + # Run the async method + return loop.run_until_complete(self.get_stats()) diff --git a/src/project_x_py/types/stats_types.py b/src/project_x_py/types/stats_types.py index f00b111..d6e8060 100644 --- a/src/project_x_py/types/stats_types.py +++ b/src/project_x_py/types/stats_types.py @@ -76,6 +76,7 @@ class ComponentStats(TypedDict): last_activity: str | None error_count: int memory_usage_mb: float + performance_metrics: NotRequired[dict[str, Any]] # Optional performance data class TradingSuiteStats(TypedDict): @@ -112,6 +113,10 @@ class TradingSuiteStats(TypedDict): features_enabled: list[str] timeframes: list[str] + # Calculated cross-component metrics + total_errors: NotRequired[int] # Total error count across all components + health_score: NotRequired[float] # Overall health score (0-100) + # Component-Specific Statistics Types class OrderManagerStats(TypedDict): diff --git a/src/project_x_py/utils/__init__.py b/src/project_x_py/utils/__init__.py index fd839d8..b4892ba 100644 --- a/src/project_x_py/utils/__init__.py +++ b/src/project_x_py/utils/__init__.py @@ -101,6 +101,9 @@ async def get_market_data(): get_polars_rows, ) +# Enhanced statistics tracking (v3.2.1) +from project_x_py.utils.enhanced_stats_tracking import EnhancedStatsTrackingMixin + # Environment utilities from project_x_py.utils.environment import get_env_var @@ -154,6 +157,7 @@ async def get_market_data(): calculate_sharpe_ratio, calculate_volatility_metrics, ) +from project_x_py.utils.statistics_aggregator import StatisticsAggregator # Trading calculations from project_x_py.utils.trading_calculations import ( @@ -169,6 +173,9 @@ async def get_market_data(): "ErrorCode", "ErrorContext", "ErrorMessages", + # Enhanced statistics (v3.2.1) + "EnhancedStatsTrackingMixin", + "StatisticsAggregator", # Rate limiting "LogContext", "LogMessages", diff --git a/src/project_x_py/utils/enhanced_stats_tracking.py b/src/project_x_py/utils/enhanced_stats_tracking.py new file mode 100644 index 0000000..4d41168 --- /dev/null +++ b/src/project_x_py/utils/enhanced_stats_tracking.py @@ -0,0 +1,803 @@ +""" +Enhanced statistics tracking mixin with async support and performance metrics. + +Author: SDK v3.2.1 +Date: 2025-01-18 + +Overview: + Provides comprehensive statistics tracking capabilities for all SDK components + with async support, circular buffers for memory management, and configurable + retention periods. + +Key Features: + - Async-safe operations with locks + - Circular buffers to prevent memory leaks + - Performance timing metrics + - Configurable retention periods + - Thread-safe aggregation + - PII sanitization for exports + - Graceful degradation on failures +""" + +import asyncio +import sys +import time +import traceback +from collections import deque +from datetime import datetime, timedelta +from typing import Any + +from project_x_py.utils.logging_config import ProjectXLogger + +logger = ProjectXLogger.get_logger(__name__) + + +class EnhancedStatsTrackingMixin: + """ + Enhanced mixin for comprehensive statistics tracking across all components. + + Provides async-safe, memory-efficient statistics collection with configurable + retention, performance metrics, and export capabilities. + """ + + def _init_enhanced_stats( + self, + max_errors: int = 100, + max_timings: int = 1000, + retention_hours: int = 24, + enable_profiling: bool = False, + ) -> None: + """ + Initialize enhanced statistics tracking. + + Args: + max_errors: Maximum error history entries + max_timings: Maximum timing samples to retain + retention_hours: Hours to retain detailed statistics + enable_profiling: Enable detailed performance profiling + """ + # Store max_timings for use in other methods + self._max_timings = max_timings + + # Error tracking with circular buffer + self._error_count = 0 + self._error_history: deque[dict[str, Any]] = deque(maxlen=max_errors) + self._error_types: dict[str, int] = {} + + # Performance metrics with circular buffers + self._api_timings: deque[float] = deque(maxlen=max_timings) + self._operation_timings: dict[str, deque[float]] = {} + self._last_activity = datetime.now() + self._start_time = time.time() + + # Memory tracking + self._memory_snapshots: deque[dict[str, Any]] = deque(maxlen=100) + self._last_memory_check = time.time() + + # Network metrics + self._network_stats = { + "total_requests": 0, + "successful_requests": 0, + "failed_requests": 0, + "total_bytes_sent": 0, + "total_bytes_received": 0, + "avg_latency_ms": 0.0, + "websocket_reconnects": 0, + "websocket_messages": 0, + } + + # Data quality metrics + self._data_quality: dict[str, Any] = { + "total_data_points": 0, + "invalid_data_points": 0, + "missing_data_points": 0, + "duplicate_data_points": 0, + "data_gaps": [], + "last_validation": None, + } + + # Configuration + self._retention_hours = retention_hours + self._enable_profiling = enable_profiling + self._cleanup_interval = 300 # 5 minutes + self._last_cleanup = time.time() + + # Fine-grained locks for different stat categories + # This prevents deadlocks by allowing concurrent access to different stat types + self._error_lock = asyncio.Lock() # For error tracking + self._timing_lock = asyncio.Lock() # For performance timings + self._network_lock = asyncio.Lock() # For network stats + self._data_quality_lock = asyncio.Lock() # For data quality metrics + self._memory_lock = asyncio.Lock() # For memory snapshots + self._component_lock = asyncio.Lock() # For component-specific stats + + # Legacy lock for backward compatibility (will be phased out) + self._stats_lock = asyncio.Lock() + + # Component-specific stats (to be overridden by each component) + self._component_stats: dict[str, Any] = {} + + logger.debug( + f"Enhanced stats initialized: retention={retention_hours}h, " + f"profiling={enable_profiling}" + ) + + async def track_operation( + self, + operation: str, + duration_ms: float, + success: bool = True, + metadata: dict[str, Any] | None = None, + ) -> None: + """ + Track an operation with timing and success metrics. + + Args: + operation: Operation name + duration_ms: Duration in milliseconds + success: Whether operation succeeded + metadata: Optional metadata about the operation + """ + # Use timing lock for operation timings + async with self._timing_lock: + # Update operation timings + if operation not in self._operation_timings: + self._operation_timings[operation] = deque(maxlen=self._max_timings) + self._operation_timings[operation].append(duration_ms) + + # Update activity timestamp + self._last_activity = datetime.now() + + # Use network lock for network stats + if metadata and ("bytes_sent" in metadata or "bytes_received" in metadata): + async with self._network_lock: + if "bytes_sent" in metadata: + self._network_stats["total_bytes_sent"] += metadata["bytes_sent"] + if "bytes_received" in metadata: + self._network_stats["total_bytes_received"] += metadata[ + "bytes_received" + ] + + # Update request counts with network lock + async with self._network_lock: + self._network_stats["total_requests"] += 1 + if success: + self._network_stats["successful_requests"] += 1 + else: + self._network_stats["failed_requests"] += 1 + + # Trigger cleanup if needed (no lock needed for time check) + current_time = time.time() + if current_time - self._last_cleanup > self._cleanup_interval: + await self._cleanup_old_stats_if_needed() + + async def track_error( + self, + error: Exception, + context: str | None = None, + details: dict[str, Any] | None = None, + ) -> None: + """ + Track an error occurrence with enhanced context. + + Args: + error: The exception that occurred + context: Context about where/why the error occurred + details: Additional error details + """ + # Sanitize details outside of lock to minimize lock time + sanitized_details = self._sanitize_for_export(details) if details else None + error_type = type(error).__name__ + + async with self._error_lock: + self._error_count += 1 + + # Update error type counts + self._error_types[error_type] = self._error_types.get(error_type, 0) + 1 + + # Store error in history + self._error_history.append( + { + "timestamp": datetime.now(), + "error_type": error_type, + "message": str(error), + "context": context, + "details": sanitized_details, + "traceback": traceback.format_exc() + if self._enable_profiling + else None, + } + ) + + async def track_data_quality( + self, + total_points: int, + invalid_points: int = 0, + missing_points: int = 0, + duplicate_points: int = 0, + ) -> None: + """ + Track data quality metrics. + + Args: + total_points: Total data points processed + invalid_points: Number of invalid points + missing_points: Number of missing points + duplicate_points: Number of duplicate points + """ + async with self._data_quality_lock: + # Type-safe integer updates with validation + def safe_int(value: Any, default: int = 0) -> int: + """Safely convert value to int with validation.""" + if value is None: + return default + if isinstance(value, int | float): + return int(value) + if isinstance(value, str) and value.isdigit(): + return int(value) + logger.warning(f"Invalid numeric value for data quality: {value}") + return default + + current_total = safe_int(self._data_quality.get("total_data_points", 0)) + current_invalid = safe_int(self._data_quality.get("invalid_data_points", 0)) + current_missing = safe_int(self._data_quality.get("missing_data_points", 0)) + current_duplicate = safe_int( + self._data_quality.get("duplicate_data_points", 0) + ) + + self._data_quality["total_data_points"] = current_total + total_points + self._data_quality["invalid_data_points"] = current_invalid + invalid_points + self._data_quality["missing_data_points"] = current_missing + missing_points + self._data_quality["duplicate_data_points"] = ( + current_duplicate + duplicate_points + ) + self._data_quality["last_validation"] = datetime.now() + + def get_performance_metrics(self) -> dict[str, Any]: + """ + Get detailed performance metrics. + + Returns: + Dictionary with performance statistics + """ + # Note: This is now synchronous but thread-safe + # We make quick copies to minimize time under locks + + # Make copies of timing data + operation_timings_copy = { + op_name: list(timings) + for op_name, timings in self._operation_timings.items() + } + api_timings_copy = list(self._api_timings) + last_activity_copy = self._last_activity + + # Copy network stats + network_stats_copy = dict(self._network_stats) + + # Now calculate metrics without holding any locks + operation_stats = {} + for op_name, timings in operation_timings_copy.items(): + if timings: + operation_stats[op_name] = { + "count": len(timings), + "avg_ms": sum(timings) / len(timings), + "min_ms": min(timings), + "max_ms": max(timings), + "p50_ms": self._calculate_percentile(timings, 50), + "p95_ms": self._calculate_percentile(timings, 95), + "p99_ms": self._calculate_percentile(timings, 99), + } + + # Calculate overall API timing stats + api_stats = {} + if api_timings_copy: + api_stats = { + "avg_response_time_ms": sum(api_timings_copy) / len(api_timings_copy), + "min_response_time_ms": min(api_timings_copy), + "max_response_time_ms": max(api_timings_copy), + "p50_response_time_ms": self._calculate_percentile( + api_timings_copy, 50 + ), + "p95_response_time_ms": self._calculate_percentile( + api_timings_copy, 95 + ), + } + + # Calculate network metrics + success_rate = ( + network_stats_copy["successful_requests"] + / network_stats_copy["total_requests"] + if network_stats_copy["total_requests"] > 0 + else 0.0 + ) + + return { + "operation_stats": operation_stats, + "api_stats": api_stats, + "network_stats": { + **network_stats_copy, + "success_rate": success_rate, + }, + "uptime_seconds": time.time() - self._start_time, + "last_activity": last_activity_copy.isoformat() + if last_activity_copy + else None, + } + + def get_error_stats(self) -> dict[str, Any]: + """ + Get enhanced error statistics. + + Returns: + Dictionary with error statistics + """ + # Note: This is now synchronous but thread-safe + # We make quick copies to minimize time accessing shared data + + error_count_copy = self._error_count + error_history_copy = list(self._error_history) + error_types_copy = dict(self._error_types) + + # Now calculate metrics without holding lock + recent_errors = error_history_copy[-10:] # Last 10 errors + + # Calculate error rate over time windows + now = datetime.now() + errors_last_hour = sum( + 1 + for e in error_history_copy + if (now - e["timestamp"]).total_seconds() < 3600 + ) + errors_last_day = sum( + 1 + for e in error_history_copy + if (now - e["timestamp"]).total_seconds() < 86400 + ) + + return { + "total_errors": error_count_copy, + "errors_last_hour": errors_last_hour, + "errors_last_day": errors_last_day, + "error_types": error_types_copy, + "recent_errors": recent_errors, + "last_error": recent_errors[-1] if recent_errors else None, + } + + def get_data_quality_stats(self) -> dict[str, Any]: + """ + Get data quality statistics. + + Returns: + Dictionary with data quality metrics + """ + # Note: This is now synchronous but thread-safe + # We make quick copies to minimize time accessing shared data + + data_quality_copy = dict(self._data_quality) + + # Now calculate metrics without holding lock + # Safe integer conversion with validation + def safe_int(value: Any, default: int = 0) -> int: + """Safely convert value to int with validation.""" + if value is None: + return default + if isinstance(value, int | float): + return int(value) + if isinstance(value, str) and value.isdigit(): + return int(value) + return default + + total = safe_int(data_quality_copy.get("total_data_points", 0)) + invalid = safe_int(data_quality_copy.get("invalid_data_points", 0)) + + quality_score = ((total - invalid) / total * 100) if total > 0 else 100.0 + + return { + **data_quality_copy, + "quality_score": quality_score, + "invalid_rate": (invalid / total) if total > 0 else 0.0, + } + + def get_enhanced_memory_stats(self) -> dict[str, Any]: + """ + Get enhanced memory usage statistics with automatic sampling. + + Returns: + Dictionary with memory statistics + """ + # Sample memory if enough time has passed + current_time = time.time() + should_sample = current_time - self._last_memory_check > 60 + + if should_sample: + # Calculate current memory usage + memory_mb = self._calculate_memory_usage() + + # Get error count for snapshot + error_count = self._error_count + + # Get operation count for snapshot + operation_count = sum(len(t) for t in self._operation_timings.values()) + + # Store snapshot + self._last_memory_check = current_time + self._memory_snapshots.append( + { + "timestamp": datetime.now(), + "memory_mb": memory_mb, + "error_count": error_count, + "operation_count": operation_count, + } + ) + + # Get latest stats and copy snapshots + current_memory = self._calculate_memory_usage() + + snapshots_copy = list(self._memory_snapshots) + + # Calculate trends without lock + memory_trend = [] + if len(snapshots_copy) >= 2: + memory_trend = [s["memory_mb"] for s in snapshots_copy[-10:]] + + return { + "current_memory_mb": current_memory, + "memory_trend": memory_trend, + "peak_memory_mb": max(s["memory_mb"] for s in snapshots_copy) + if snapshots_copy + else current_memory, + "avg_memory_mb": sum(s["memory_mb"] for s in snapshots_copy) + / len(snapshots_copy) + if snapshots_copy + else current_memory, + } + + def export_stats(self, format: str = "json") -> dict[str, Any] | str: + """ + Export statistics in specified format. + + Args: + format: Export format (json, prometheus, etc.) + + Returns: + Exported statistics + """ + # Get all stats (now all synchronous) + performance = self.get_performance_metrics() + errors = self.get_error_stats() + data_quality = self.get_data_quality_stats() + memory = self.get_enhanced_memory_stats() + + # Get component stats + component_stats_copy = dict(self._component_stats) + + stats = { + "timestamp": datetime.now().isoformat(), + "component": self.__class__.__name__, + "performance": performance, + "errors": errors, + "data_quality": data_quality, + "memory": memory, + "component_specific": self._sanitize_for_export(component_stats_copy), + } + + if format == "prometheus": + return self._format_prometheus(stats) + + return stats + + async def cleanup_old_stats(self) -> None: + """ + Clean up statistics older than retention period. + """ + cutoff_time = datetime.now() - timedelta(hours=self._retention_hours) + + # Clean up error history with error lock + async with self._error_lock: + while ( + self._error_history + and self._error_history[0]["timestamp"] < cutoff_time + ): + self._error_history.popleft() + + # Clean up memory snapshots with memory lock + async with self._memory_lock: + while ( + self._memory_snapshots + and self._memory_snapshots[0]["timestamp"] < cutoff_time + ): + self._memory_snapshots.popleft() + + # Clean up data gaps with data quality lock + async with self._data_quality_lock: + if "data_gaps" in self._data_quality: + gaps = self._data_quality.get("data_gaps", []) + if isinstance(gaps, list): + self._data_quality["data_gaps"] = [ + gap + for gap in gaps + if isinstance(gap, dict) + and gap.get("timestamp", datetime.min) >= cutoff_time + ] + + logger.debug(f"Cleaned up stats older than {cutoff_time}") + + async def _cleanup_old_stats_if_needed(self) -> None: + """ + Check if cleanup is needed and perform it. + """ + current_time = time.time() + if current_time - self._last_cleanup > self._cleanup_interval: + self._last_cleanup = current_time + await self.cleanup_old_stats() + + def _calculate_memory_usage(self) -> float: + """ + Calculate current memory usage of this component. + + Thread-safe memory calculation. + + Returns: + Memory usage in MB + """ + size = 0 + max_items_to_sample = 100 # Sample limit for large collections + + # Priority attributes to check + priority_attrs = [ + "_error_history", + "_error_types", + "_api_timings", + "_operation_timings", + "_memory_snapshots", + "_network_stats", + "_data_quality", + "_component_stats", + ] + + # Calculate size for each attribute (synchronous access) + for attr_name in priority_attrs: + if hasattr(self, attr_name): + attr = getattr(self, attr_name) + size += sys.getsizeof(attr) + + # For small collections, count all items + if isinstance(attr, list | dict | set | deque): + try: + items = attr.values() if isinstance(attr, dict) else attr + item_count = len(items) if hasattr(items, "__len__") else 0 + + if item_count <= max_items_to_sample: + # Count all items for small collections + for item in items: + size += sys.getsizeof(item) + else: + # Sample for large collections + sample_size = 0 + for i, item in enumerate(items): + if i >= max_items_to_sample: + break + sample_size += sys.getsizeof(item) + # Estimate total size based on sample + if max_items_to_sample > 0: + avg_item_size = sample_size / max_items_to_sample + size += int(avg_item_size * item_count) + except (AttributeError, TypeError): + pass + + # Component-specific attributes (check without locks as they're component-owned) + component_attrs = [ + "tracked_orders", + "order_status_cache", + "position_orders", + "_orders", + "_positions", + "_trades", + "_bars", + "_ticks", + "stats", + "_data", + "_order_history", + "_position_history", + ] + + for attr_name in component_attrs: + if hasattr(self, attr_name): + attr = getattr(self, attr_name) + size += sys.getsizeof(attr) + + # Only sample large component collections + if isinstance(attr, dict) and len(attr) > max_items_to_sample: + # Sample a subset + sample_size = 0 + for i, (k, v) in enumerate(attr.items()): + if i >= 10: # Small sample for component attrs + break + sample_size += sys.getsizeof(k) + sys.getsizeof(v) + # Rough estimate + if 10 > 0: + size += (sample_size // 10) * len(attr) + + return size / (1024 * 1024) + + def _calculate_percentile( + self, data: deque[float] | list[float], percentile: int + ) -> float: + """ + Calculate percentile value from data. + + Args: + data: Data points + percentile: Percentile to calculate (0-100) + + Returns: + Percentile value + """ + if not data: + return 0.0 + + sorted_data = sorted(data) + # Proper percentile calculation with bounds checking + index = max( + 0, min(len(sorted_data) - 1, int((len(sorted_data) - 1) * percentile / 100)) + ) + return sorted_data[index] + + def _sanitize_for_export(self, data: Any) -> Any: + """ + Sanitize data for export by removing PII. + + Args: + data: Data to sanitize + + Returns: + Sanitized data + """ + if isinstance(data, dict): + sanitized = {} + # Extended list of sensitive keys for trading data + sensitive_keys = { + "password", + "token", + "key", + "secret", + "auth", + "credential", + "account_id", + "accountid", + "account_name", + "accountname", + "balance", + "equity", + "pnl", + "profit", + "loss", + "position_size", + "positionsize", + "order_size", + "ordersize", + "api_key", + "apikey", + "session", + "cookie", + "username", + "email", + "phone", + "ssn", + "tax_id", + "bank", + "routing", + } + + for key, value in data.items(): + key_lower = key.lower() + # Check if key contains any sensitive patterns + if any(sensitive in key_lower for sensitive in sensitive_keys): + # Special handling for certain fields to show partial info + if ( + "account" in key_lower + and isinstance(value, str) + and len(value) > 4 + ): + # Show last 4 chars of account ID/name + sanitized[key] = f"***{value[-4:]}" + elif any( + x in key_lower + for x in ["pnl", "profit", "loss", "balance", "equity"] + ): + # Show if positive/negative but not actual value + if isinstance(value, int | float): + sanitized[key] = ( + "positive" + if value > 0 + else "negative" + if value < 0 + else "zero" + ) + else: + sanitized[key] = "***REDACTED***" + else: + sanitized[key] = "***REDACTED***" + else: + sanitized[key] = self._sanitize_for_export(value) + + return sanitized + elif isinstance(data, list | tuple): + return [self._sanitize_for_export(item) for item in data] + elif isinstance(data, str): + # Check for patterns that look like sensitive data + if len(data) > 20 and any(c in data for c in ["=", "Bearer", "Basic"]): + # Might be a token or auth header + return "***REDACTED***" + return data + else: + return data + + def _format_prometheus(self, stats: dict[str, Any]) -> str: + """ + Format statistics for Prometheus export. + + Args: + stats: Statistics dictionary + + Returns: + Prometheus-formatted string + """ + lines = [] + component = stats["component"].lower() + + # Performance metrics + if "performance" in stats: + perf = stats["performance"] + if perf.get("api_stats"): + lines.append( + f"# HELP {component}_api_response_time_ms API response time in milliseconds" + ) + lines.append(f"# TYPE {component}_api_response_time_ms summary") + lines.append( + f'{component}_api_response_time_ms{{quantile="0.5"}} {perf["api_stats"].get("p50_response_time_ms", 0)}' + ) + lines.append( + f'{component}_api_response_time_ms{{quantile="0.95"}} {perf["api_stats"].get("p95_response_time_ms", 0)}' + ) + lines.append( + f"{component}_api_response_time_ms_sum {perf['api_stats'].get('avg_response_time_ms', 0)}" + ) + + if "network_stats" in perf: + net = perf["network_stats"] + lines.append( + f"# HELP {component}_requests_total Total number of requests" + ) + lines.append(f"# TYPE {component}_requests_total counter") + lines.append( + f"{component}_requests_total {net.get('total_requests', 0)}" + ) + + lines.append( + f"# HELP {component}_request_success_rate Request success rate" + ) + lines.append(f"# TYPE {component}_request_success_rate gauge") + lines.append( + f"{component}_request_success_rate {net.get('success_rate', 0)}" + ) + + # Error metrics + if "errors" in stats: + err = stats["errors"] + lines.append(f"# HELP {component}_errors_total Total number of errors") + lines.append(f"# TYPE {component}_errors_total counter") + lines.append(f"{component}_errors_total {err.get('total_errors', 0)}") + + # Memory metrics + if "memory" in stats: + mem = stats["memory"] + lines.append( + f"# HELP {component}_memory_usage_mb Memory usage in megabytes" + ) + lines.append(f"# TYPE {component}_memory_usage_mb gauge") + lines.append( + f"{component}_memory_usage_mb {mem.get('current_memory_mb', 0)}" + ) + + return "\n".join(lines) diff --git a/src/project_x_py/utils/statistics_aggregator.py b/src/project_x_py/utils/statistics_aggregator.py new file mode 100644 index 0000000..eb40264 --- /dev/null +++ b/src/project_x_py/utils/statistics_aggregator.py @@ -0,0 +1,691 @@ +""" +Central statistics aggregation for TradingSuite. + +Author: SDK v3.2.1 +Date: 2025-01-18 + +Overview: + Provides centralized aggregation of statistics from all TradingSuite components + with async-safe operations and intelligent caching. + +Key Features: + - Aggregates stats from all components + - Caches results with TTL for performance + - Async-safe with proper locking + - Calculates cross-component metrics + - Supports multiple export formats +""" + +import asyncio +import time +from datetime import datetime +from typing import Any, cast + +from project_x_py.types.stats_types import ( + ComponentStats, + TradingSuiteStats, +) +from project_x_py.utils.logging_config import ProjectXLogger + +logger = ProjectXLogger.get_logger(__name__) + + +class StatisticsAggregator: + """ + Central aggregator for all TradingSuite component statistics. + + Collects, caches, and aggregates statistics from all components + with intelligent caching and cross-component metric calculation. + """ + + def __init__( + self, + cache_ttl_seconds: int = 5, + enable_caching: bool = True, + ): + """ + Initialize the statistics aggregator. + + Args: + cache_ttl_seconds: Cache time-to-live in seconds + enable_caching: Enable result caching + """ + self._cache_ttl = cache_ttl_seconds + self._enable_caching = enable_caching + self._cache: dict[str, Any] = {} + self._cache_timestamps: dict[str, float] = {} + self._aggregation_lock = asyncio.Lock() + + # Component references (set by TradingSuite) + self.trading_suite: Any = None + self.order_manager: Any = None + self.position_manager: Any = None + self.data_manager: Any = None + self.orderbook: Any = None + self.risk_manager: Any = None + self.client: Any = None + self.realtime_client: Any = None + + logger.debug( + f"StatisticsAggregator initialized: cache_ttl={cache_ttl_seconds}s" + ) + + async def aggregate_stats(self, force_refresh: bool = False) -> TradingSuiteStats: + """ + Aggregate statistics from all components. + + Args: + force_refresh: Force refresh bypassing cache + + Returns: + Aggregated statistics from all components + """ + async with self._aggregation_lock: + # Check cache if enabled + if self._enable_caching and not force_refresh: + cached = self._get_cached("aggregate_stats") + if cached is not None and isinstance(cached, dict): + # Cast to correct type for mypy + return cast(TradingSuiteStats, cached) + + # Collect stats from all components + stats = await self._collect_all_stats() + + # Calculate cross-component metrics + stats = await self._calculate_cross_metrics(stats) + + # Cache the result + if self._enable_caching: + self._set_cache("aggregate_stats", stats) + + return stats + + async def _collect_all_stats(self) -> TradingSuiteStats: + """ + Collect statistics from all components. + + Returns: + Raw statistics from all components + """ + suite = self.trading_suite + if not suite: + return self._get_empty_stats() + + # Get basic suite info + uptime_seconds = ( + int((datetime.now() - suite._created_at).total_seconds()) + if hasattr(suite, "_created_at") + else 0 + ) + + # Initialize components dictionary + components: dict[str, ComponentStats] = {} + + # Collect OrderManager stats + if self.order_manager: + components["order_manager"] = await self._get_order_manager_stats( + uptime_seconds + ) + + # Collect PositionManager stats + if self.position_manager: + components["position_manager"] = await self._get_position_manager_stats( + uptime_seconds + ) + + # Collect RealtimeDataManager stats + if self.data_manager: + components["data_manager"] = await self._get_data_manager_stats( + uptime_seconds + ) + + # Collect OrderBook stats + if self.orderbook: + components["orderbook"] = await self._get_orderbook_stats(uptime_seconds) + + # Collect RiskManager stats + if self.risk_manager: + components["risk_manager"] = await self._get_risk_manager_stats( + uptime_seconds + ) + + # Get client performance stats + client_stats = await self._get_client_stats() + + # Get realtime connection stats + realtime_stats = await self._get_realtime_stats() + + # Build the complete stats dictionary + stats: TradingSuiteStats = { + "suite_id": getattr(suite, "suite_id", "unknown"), + "instrument": suite.instrument_id or suite._symbol if suite else "unknown", + "created_at": getattr(suite, "_created_at", datetime.now()).isoformat(), + "uptime_seconds": uptime_seconds, + "status": "active" if suite and suite.is_connected else "disconnected", + "connected": suite.is_connected if suite else False, + "components": components, + # Client stats + "total_api_calls": client_stats["total_api_calls"], + "successful_api_calls": client_stats["successful_api_calls"], + "failed_api_calls": client_stats["failed_api_calls"], + "avg_response_time_ms": client_stats["avg_response_time_ms"], + "cache_hit_rate": client_stats["cache_hit_rate"], + "memory_usage_mb": client_stats["memory_usage_mb"], + # Realtime stats + "realtime_connected": realtime_stats["realtime_connected"], + "user_hub_connected": realtime_stats["user_hub_connected"], + "market_hub_connected": realtime_stats["market_hub_connected"], + "active_subscriptions": realtime_stats["active_subscriptions"], + "message_queue_size": realtime_stats["message_queue_size"], + # Features + "features_enabled": [f.value for f in suite.config.features] + if suite + else [], + "timeframes": suite.config.timeframes if suite else [], + } + + return stats + + async def _get_order_manager_stats(self, uptime_seconds: int) -> ComponentStats: + """Get OrderManager statistics.""" + om = self.order_manager + if not om: + return self._get_empty_component_stats("OrderManager", uptime_seconds) + + try: + # Get enhanced stats if available (now synchronous) + perf_metrics = {} + if hasattr(om, "get_performance_metrics"): + try: + perf_metrics = om.get_performance_metrics() + except Exception as e: + logger.warning( + f"Failed to get OrderManager performance metrics: {e}" + ) + + # Get error stats (now synchronous) + error_count = 0 + if hasattr(om, "get_error_stats"): + try: + error_stats = om.get_error_stats() + error_count = error_stats.get("total_errors", 0) + except Exception as e: + logger.warning(f"Failed to get OrderManager error stats: {e}") + + # Get memory usage (now synchronous) + memory_mb = 0.0 + if hasattr(om, "get_enhanced_memory_stats"): + try: + memory_stats = om.get_enhanced_memory_stats() + memory_mb = memory_stats.get("current_memory_mb", 0.0) + except Exception as e: + logger.warning(f"Failed to get OrderManager memory stats: {e}") + elif hasattr(om, "get_memory_usage_mb"): + try: + memory_mb = om.get_memory_usage_mb() + except Exception as e: + logger.warning(f"Failed to get OrderManager memory usage: {e}") + + # Get last activity + last_activity_obj = None + try: + last_activity_obj = ( + om.stats.get("last_order_time") if hasattr(om, "stats") else None + ) + except Exception as e: + logger.warning(f"Failed to get OrderManager last activity: {e}") + + return { + "name": "OrderManager", + "status": "connected", + "uptime_seconds": uptime_seconds, + "last_activity": last_activity_obj.isoformat() + if last_activity_obj + else None, + "error_count": error_count, + "memory_usage_mb": memory_mb, + "performance_metrics": perf_metrics, + } + except Exception as e: + logger.error(f"Critical error in OrderManager stats collection: {e}") + return self._get_empty_component_stats("OrderManager", uptime_seconds) + + async def _get_position_manager_stats(self, uptime_seconds: int) -> ComponentStats: + """Get PositionManager statistics.""" + pm = self.position_manager + if not pm: + return self._get_empty_component_stats("PositionManager", uptime_seconds) + + try: + # Get enhanced stats if available (now synchronous) + perf_metrics = {} + if hasattr(pm, "get_performance_metrics"): + try: + perf_metrics = pm.get_performance_metrics() + except Exception as e: + logger.warning( + f"Failed to get PositionManager performance metrics: {e}" + ) + + # Get error stats (now synchronous) + error_count = 0 + if hasattr(pm, "get_error_stats"): + try: + error_stats = pm.get_error_stats() + error_count = error_stats.get("total_errors", 0) + except Exception as e: + logger.warning(f"Failed to get PositionManager error stats: {e}") + + # Get memory usage (now synchronous) + memory_mb = 0.0 + if hasattr(pm, "get_enhanced_memory_stats"): + try: + memory_stats = pm.get_enhanced_memory_stats() + memory_mb = memory_stats.get("current_memory_mb", 0.0) + except Exception as e: + logger.warning(f"Failed to get PositionManager memory stats: {e}") + elif hasattr(pm, "get_memory_usage_mb"): + try: + memory_mb = pm.get_memory_usage_mb() + except Exception as e: + logger.warning(f"Failed to get PositionManager memory usage: {e}") + + # Get last activity + last_activity_obj = None + try: + last_activity_obj = ( + pm.stats.get("last_position_update") + if hasattr(pm, "stats") + else None + ) + except Exception as e: + logger.warning(f"Failed to get PositionManager last activity: {e}") + + return { + "name": "PositionManager", + "status": "connected", + "uptime_seconds": uptime_seconds, + "last_activity": last_activity_obj.isoformat() + if last_activity_obj + else None, + "error_count": error_count, + "memory_usage_mb": memory_mb, + "performance_metrics": perf_metrics, + } + except Exception as e: + logger.error(f"Critical error in PositionManager stats collection: {e}") + return self._get_empty_component_stats("PositionManager", uptime_seconds) + + async def _get_data_manager_stats(self, uptime_seconds: int) -> ComponentStats: + """Get RealtimeDataManager statistics.""" + dm = self.data_manager + if not dm: + return self._get_empty_component_stats( + "RealtimeDataManager", uptime_seconds + ) + + try: + # Get memory stats which include performance metrics + memory_mb = 0.0 + error_count = 0 + last_activity_obj = None + perf_metrics = {} + + if hasattr(dm, "get_memory_stats"): + try: + memory_stats = dm.get_memory_stats() + memory_mb = memory_stats.get("memory_usage_mb", 0.0) + error_count = memory_stats.get("data_validation_errors", 0) + last_activity_obj = memory_stats.get("last_update") + + # Extract performance metrics + perf_metrics = { + "ticks_processed": memory_stats.get("ticks_processed", 0), + "quotes_processed": memory_stats.get("quotes_processed", 0), + "trades_processed": memory_stats.get("trades_processed", 0), + "total_bars": memory_stats.get("total_bars", 0), + "websocket_messages": memory_stats.get("websocket_messages", 0), + } + except Exception as e: + logger.warning( + f"Failed to get RealtimeDataManager memory stats: {e}" + ) + + # Check running status safely + status = "disconnected" + try: + if hasattr(dm, "is_running"): + status = "connected" if dm.is_running else "disconnected" + except Exception as e: + logger.warning(f"Failed to get RealtimeDataManager status: {e}") + + return { + "name": "RealtimeDataManager", + "status": status, + "uptime_seconds": uptime_seconds, + "last_activity": last_activity_obj.isoformat() + if last_activity_obj + else None, + "error_count": error_count, + "memory_usage_mb": memory_mb, + "performance_metrics": perf_metrics, + } + except Exception as e: + logger.error(f"Critical error in RealtimeDataManager stats collection: {e}") + return self._get_empty_component_stats( + "RealtimeDataManager", uptime_seconds + ) + + async def _get_orderbook_stats(self, uptime_seconds: int) -> ComponentStats: + """Get OrderBook statistics.""" + ob = self.orderbook + if not ob: + return self._get_empty_component_stats("OrderBook", uptime_seconds) + + # Get enhanced stats if available (now synchronous) + if hasattr(ob, "get_performance_metrics"): + perf_metrics = ob.get_performance_metrics() + else: + perf_metrics = {} + + # Get error stats (now synchronous) + if hasattr(ob, "get_error_stats"): + error_stats = ob.get_error_stats() + error_count = error_stats.get("total_errors", 0) + else: + error_count = 0 + + # Get memory usage (now synchronous) + if hasattr(ob, "get_memory_stats"): + memory_stats = ob.get_memory_stats() + memory_mb = memory_stats.get("memory_usage_mb", 0.0) + elif hasattr(ob, "get_memory_usage_mb"): + memory_mb = ob.get_memory_usage_mb() + else: + memory_mb = 0.0 + + # Get last activity + last_activity_obj = ( + ob.last_orderbook_update if hasattr(ob, "last_orderbook_update") else None + ) + + return { + "name": "OrderBook", + "status": "connected", + "uptime_seconds": uptime_seconds, + "last_activity": last_activity_obj.isoformat() + if last_activity_obj + else None, + "error_count": error_count, + "memory_usage_mb": memory_mb, + "performance_metrics": perf_metrics, + } + + async def _get_risk_manager_stats(self, uptime_seconds: int) -> ComponentStats: + """Get RiskManager statistics.""" + rm = self.risk_manager + if not rm: + return self._get_empty_component_stats("RiskManager", uptime_seconds) + + try: + # Get enhanced stats if available (now synchronous) + perf_metrics = {} + if hasattr(rm, "get_performance_metrics"): + try: + perf_metrics = rm.get_performance_metrics() + except Exception as e: + logger.warning( + f"Failed to get RiskManager performance metrics: {e}" + ) + + # Get error stats (now synchronous) + error_count = 0 + if hasattr(rm, "get_error_stats"): + try: + error_stats = rm.get_error_stats() + error_count = error_stats.get("total_errors", 0) + except Exception as e: + logger.warning(f"Failed to get RiskManager error stats: {e}") + + # Get memory usage (now synchronous) + memory_mb = 0.0 + if hasattr(rm, "get_enhanced_memory_stats"): + try: + memory_stats = rm.get_enhanced_memory_stats() + memory_mb = memory_stats.get("current_memory_mb", 0.0) + except Exception as e: + logger.warning(f"Failed to get RiskManager memory stats: {e}") + elif hasattr(rm, "get_memory_usage_mb"): + try: + memory_mb = rm.get_memory_usage_mb() + except Exception as e: + logger.warning(f"Failed to get RiskManager memory usage: {e}") + + # Get last activity + last_activity = None + if hasattr(rm, "get_activity_stats"): + try: + activity_stats = await rm.get_activity_stats() + last_activity = activity_stats.get("last_activity") + except Exception as e: + logger.warning(f"Failed to get RiskManager activity stats: {e}") + + return { + "name": "RiskManager", + "status": "active", + "uptime_seconds": uptime_seconds, + "last_activity": last_activity, + "error_count": error_count, + "memory_usage_mb": memory_mb, + "performance_metrics": perf_metrics, + } + except Exception as e: + logger.error(f"Critical error in RiskManager stats collection: {e}") + return self._get_empty_component_stats("RiskManager", uptime_seconds) + + async def _get_client_stats(self) -> dict[str, Any]: + """Get ProjectX client statistics.""" + client = self.client + if not client: + return { + "total_api_calls": 0, + "successful_api_calls": 0, + "failed_api_calls": 0, + "avg_response_time_ms": 0.0, + "cache_hit_rate": 0.0, + "memory_usage_mb": 0.0, + } + + # Get performance stats from client + if hasattr(client, "get_performance_stats"): + perf_stats = await client.get_performance_stats() + + return { + "total_api_calls": perf_stats.get("api_calls", 0), + "successful_api_calls": perf_stats.get("successful_calls", 0), + "failed_api_calls": perf_stats.get("failed_calls", 0), + "avg_response_time_ms": perf_stats.get("avg_response_time_ms", 0.0), + "cache_hit_rate": perf_stats.get("cache_hit_ratio", 0.0), + "memory_usage_mb": perf_stats.get("memory_usage_mb", 0.0), + } + + # Fallback to basic stats + api_calls = getattr(client, "api_call_count", 0) + cache_hits = getattr(client, "cache_hit_count", 0) + total_requests = api_calls + cache_hits + + # Safe division for cache hit rate + cache_hit_rate = 0.0 + if total_requests > 0: + try: + cache_hit_rate = min(1.0, cache_hits / total_requests) + except (ZeroDivisionError, ValueError): + cache_hit_rate = 0.0 + + return { + "total_api_calls": api_calls, + "successful_api_calls": api_calls, # Assume successful if we have the count + "failed_api_calls": 0, + "avg_response_time_ms": 0.0, + "cache_hit_rate": cache_hit_rate, + "memory_usage_mb": 0.0, + } + + async def _get_realtime_stats(self) -> dict[str, Any]: + """Get realtime client statistics.""" + rt = self.realtime_client + if not rt: + return { + "realtime_connected": False, + "user_hub_connected": False, + "market_hub_connected": False, + "active_subscriptions": 0, + "message_queue_size": 0, + } + + return { + "realtime_connected": rt.is_connected() + if hasattr(rt, "is_connected") + else False, + "user_hub_connected": getattr(rt, "user_connected", False), + "market_hub_connected": getattr(rt, "market_connected", False), + "active_subscriptions": len(getattr(rt, "_subscriptions", [])), + "message_queue_size": len(getattr(rt, "_message_queue", [])), + } + + async def _calculate_cross_metrics( + self, stats: TradingSuiteStats + ) -> TradingSuiteStats: + """ + Calculate cross-component metrics. + + Args: + stats: Raw statistics + + Returns: + Statistics with cross-component metrics added + """ + try: + # Calculate total memory usage across all components + total_memory = sum( + comp.get("memory_usage_mb", 0) + for comp in stats.get("components", {}).values() + ) + stats["memory_usage_mb"] = max(0, total_memory) # Ensure non-negative + + # Calculate total error count + total_errors = sum( + comp.get("error_count", 0) + for comp in stats.get("components", {}).values() + ) + stats["total_errors"] = max(0, total_errors) # Ensure non-negative + + # Calculate overall health score (0-100) with bounds checking + health_score = 100.0 + + # Deduct for errors (max 20 points) + if total_errors > 0: + health_score -= min(20, total_errors * 2) + + # Deduct for disconnected components (max 30 points) + disconnected = sum( + 1 + for comp in stats.get("components", {}).values() + if comp.get("status") != "connected" and comp.get("status") != "active" + ) + if disconnected > 0: + health_score -= min(30, disconnected * 10) + + # Deduct for high memory usage (>500MB total, max 20 points) + if total_memory > 500: + memory_penalty = min(20, (total_memory - 500) / 50) + health_score -= memory_penalty + + # Deduct for poor cache performance (max 10 points) + cache_hit_rate = stats.get("cache_hit_rate", 0) + # Ensure cache_hit_rate is between 0 and 1 + cache_hit_rate = max(0.0, min(1.0, cache_hit_rate)) + if cache_hit_rate < 0.5: + cache_penalty = min(10, (0.5 - cache_hit_rate) * 20) + health_score -= cache_penalty + + # Ensure health score is within bounds [0, 100] + stats["health_score"] = max(0.0, min(100.0, health_score)) + + except Exception as e: + logger.error(f"Error calculating cross-component metrics: {e}") + # Set safe defaults on error + stats["health_score"] = 0.0 + stats["total_errors"] = stats.get("total_errors", 0) + stats["memory_usage_mb"] = stats.get("memory_usage_mb", 0.0) + + return stats + + def _get_cached(self, key: str) -> Any | None: + """ + Get cached value if still valid. + + Args: + key: Cache key + + Returns: + Cached value or None if expired/missing + """ + if key not in self._cache: + return None + + timestamp = self._cache_timestamps.get(key, 0) + if time.time() - timestamp > self._cache_ttl: + return None + + return self._cache[key] + + def _set_cache(self, key: str, value: Any) -> None: + """ + Set cache value with current timestamp. + + Args: + key: Cache key + value: Value to cache + """ + self._cache[key] = value + self._cache_timestamps[key] = time.time() + + def _get_empty_stats(self) -> TradingSuiteStats: + """Get empty statistics structure.""" + return { + "suite_id": "unknown", + "instrument": "unknown", + "created_at": datetime.now().isoformat(), + "uptime_seconds": 0, + "status": "disconnected", + "connected": False, + "components": {}, + "realtime_connected": False, + "user_hub_connected": False, + "market_hub_connected": False, + "total_api_calls": 0, + "successful_api_calls": 0, + "failed_api_calls": 0, + "avg_response_time_ms": 0.0, + "cache_hit_rate": 0.0, + "memory_usage_mb": 0.0, + "active_subscriptions": 0, + "message_queue_size": 0, + "features_enabled": [], + "timeframes": [], + } + + def _get_empty_component_stats( + self, name: str, uptime_seconds: int + ) -> ComponentStats: + """Get empty component statistics.""" + return { + "name": name, + "status": "disconnected", + "uptime_seconds": uptime_seconds, + "last_activity": None, + "error_count": 0, + "memory_usage_mb": 0.0, + "performance_metrics": {}, + } diff --git a/test_enhanced_stats.py b/test_enhanced_stats.py new file mode 100644 index 0000000..2a39b6b --- /dev/null +++ b/test_enhanced_stats.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +""" +Test script for enhanced statistics tracking in v3.2.1. + +This script creates a TradingSuite and tests the new statistics +aggregation functionality. +""" + +import asyncio +import json +from datetime import datetime + +from project_x_py import TradingSuite + + +async def main(): + """Test enhanced statistics.""" + print("Testing Enhanced Statistics Tracking (v3.2.1)") + print("=" * 60) + + # Create suite with all features + print("\n1. Creating TradingSuite...") + suite = await TradingSuite.create( + "MNQ", + timeframes=["1min", "5min"], + features=["orderbook", "risk_manager"], + initial_days=1, + ) + print(f" ✓ Suite created for: {suite.client.account_info.name}") + + # Wait for some data to accumulate + print("\n2. Letting data accumulate...") + await asyncio.sleep(3) + + # Get comprehensive stats + print("\n3. Getting aggregated statistics...") + stats = await suite.get_stats() + + # Display key metrics + print("\n4. Key Metrics:") + print(f" - Instrument: {stats.get('instrument', 'N/A')}") + print(f" - Status: {stats.get('status', 'N/A')}") + print(f" - Connected: {stats.get('connected', False)}") + print(f" - Uptime: {stats.get('uptime_seconds', 0)} seconds") + print(f" - Health Score: {stats.get('health_score', 0):.1f}/100") + + # Display component stats + print("\n5. Component Statistics:") + components = stats.get("components", {}) + for comp_name, comp_stats in components.items(): + print(f"\n {comp_stats.get('name', comp_name)}:") + print(f" - Status: {comp_stats.get('status', 'N/A')}") + print(f" - Memory: {comp_stats.get('memory_usage_mb', 0):.2f} MB") + print(f" - Errors: {comp_stats.get('error_count', 0)}") + + # Show performance metrics if available + perf = comp_stats.get("performance_metrics", {}) + if perf: + print(f" - Performance Metrics:") + for key, value in perf.items(): + if isinstance(value, (int, float)): + print(f" • {key}: {value}") + + # Display network stats + print("\n6. Network Statistics:") + print(f" - Total API Calls: {stats.get('total_api_calls', 0)}") + print(f" - Successful: {stats.get('successful_api_calls', 0)}") + print(f" - Failed: {stats.get('failed_api_calls', 0)}") + print(f" - Cache Hit Rate: {stats.get('cache_hit_rate', 0):.1%}") + print(f" - Avg Response Time: {stats.get('avg_response_time_ms', 0):.2f} ms") + + # Display realtime connection stats + print("\n7. Real-time Connection:") + print(f" - WebSocket Connected: {stats.get('realtime_connected', False)}") + print(f" - User Hub: {stats.get('user_hub_connected', False)}") + print(f" - Market Hub: {stats.get('market_hub_connected', False)}") + print(f" - Active Subscriptions: {stats.get('active_subscriptions', 0)}") + + # Test placing an order to generate stats + print("\n8. Testing order placement for stats...") + try: + # Get current price + current_price = await suite.data.get_current_price() + if current_price: + print(f" Current price: ${current_price:,.2f}") + + # Place a limit order well below market + order = await suite.orders.place_limit_order( + contract_id=suite.instrument_id, + side=0, # Buy + size=1, + limit_price=current_price - 500, # Far from market + ) + print(f" ✓ Order placed: {order.orderId}") + + # Cancel it immediately + await suite.orders.cancel_order(order.orderId) + print(f" ✓ Order cancelled") + + # Get updated stats + await asyncio.sleep(1) + updated_stats = await suite.get_stats() + + # Show OrderManager stats + om_stats = updated_stats.get("components", {}).get("order_manager", {}) + if om_stats: + print(f"\n Updated OrderManager Stats:") + print(f" - Memory: {om_stats.get('memory_usage_mb', 0):.2f} MB") + print(f" - Errors: {om_stats.get('error_count', 0)}") + + perf = om_stats.get("performance_metrics", {}) + if perf and "operation_stats" in perf: + op_stats = perf["operation_stats"] + if "place_order" in op_stats: + po_stats = op_stats["place_order"] + print(f" - Place Order Performance:") + print(f" • Count: {po_stats.get('count', 0)}") + print(f" • Avg: {po_stats.get('avg_ms', 0):.2f} ms") + print(f" • Min: {po_stats.get('min_ms', 0):.2f} ms") + print(f" • Max: {po_stats.get('max_ms', 0):.2f} ms") + except Exception as e: + print(f" ⚠️ Order test failed: {e}") + + # Export stats in different formats + print("\n9. Exporting statistics...") + + # Get JSON export + if hasattr(suite.orders, "export_stats"): + json_export = await suite.orders.export_stats(format="json") + print(f" ✓ JSON export available ({len(json.dumps(json_export))} bytes)") + + # Get Prometheus export + prom_export = await suite.orders.export_stats(format="prometheus") + print(f" ✓ Prometheus export available ({len(prom_export)} bytes)") + else: + print(" ⚠️ Export not available (will be in next update)") + + # Clean up + print("\n10. Cleaning up...") + await suite.disconnect() + print(" ✓ Suite disconnected") + + print("\n" + "=" * 60) + print("✅ Enhanced Statistics Test Complete!") + print(f" Total Memory Usage: {stats.get('memory_usage_mb', 0):.2f} MB") + print(f" Health Score: {stats.get('health_score', 0):.1f}/100") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_enhanced_statistics.py b/tests/test_enhanced_statistics.py new file mode 100644 index 0000000..f61aaf6 --- /dev/null +++ b/tests/test_enhanced_statistics.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +""" +Comprehensive unit tests for enhanced statistics tracking system. + +Tests the EnhancedStatsTrackingMixin and StatisticsAggregator for: +- Error handling and graceful degradation +- Memory leak prevention with circular buffers +- PII sanitization +- Thread safety +- Performance overhead +- Edge cases and boundary conditions +""" + +import asyncio +import sys +from collections import deque +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from project_x_py.utils.enhanced_stats_tracking import EnhancedStatsTrackingMixin +from project_x_py.utils.statistics_aggregator import StatisticsAggregator + + +class TestComponent(EnhancedStatsTrackingMixin): + """Test component that uses the enhanced stats tracking mixin.""" + + def __init__(self): + self._init_enhanced_stats( + max_errors=10, + max_timings=100, + retention_hours=1, + enable_profiling=True, + ) + + +class TestEnhancedStatsTracking: + """Test suite for EnhancedStatsTrackingMixin.""" + + @pytest.mark.asyncio + async def test_circular_buffer_prevents_memory_leak(self): + """Test that circular buffers prevent unbounded memory growth.""" + component = TestComponent() + + # Add more errors than the max + for i in range(20): + await component.track_error( + Exception(f"Error {i}"), + context=f"test_{i}", + details={"index": i}, + ) + + # Should only keep last 10 errors + assert len(component._error_history) == 10 + assert component._error_count == 20 # Total count preserved + + # Add more timings than the max + for i in range(150): + await component.track_operation( + operation="test_op", + duration_ms=float(i), + success=True, + ) + + # Should only keep last 100 timings in operation-specific buffer + assert len(component._operation_timings["test_op"]) == 100 + # Verify it contains the last 100 values (50-149) + assert min(component._operation_timings["test_op"]) == 50.0 + assert max(component._operation_timings["test_op"]) == 149.0 + + @pytest.mark.asyncio + async def test_pii_sanitization(self): + """Test that PII is properly sanitized from exports.""" + component = TestComponent() + + # Track error with sensitive data + await component.track_error( + Exception("Test error"), + context="trading", + details={ + "account_id": "ACC123456789", + "api_key": "secret_key_123", + "order_size": 100, + "pnl": 5000.50, + "balance": 100000, + "safe_field": "this_is_safe", + }, + ) + + # Export stats + exported = await component.export_stats(format="json") + + # Check that PII is sanitized in error details + recent_errors = exported["errors"]["recent_errors"] + if recent_errors: + details = recent_errors[0]["details"] + assert details["account_id"] == "***6789" # Last 4 chars + assert details["api_key"] == "***REDACTED***" + assert details["order_size"] == "***REDACTED***" + assert details["pnl"] == "positive" # Shows sign, not value + assert details["balance"] == "positive" # Shows sign, not value + assert details["safe_field"] == "this_is_safe" # Not sanitized + + @pytest.mark.asyncio + async def test_thread_safety(self): + """Test that concurrent access to stats is thread-safe.""" + component = TestComponent() + + async def track_operations(op_name: str, count: int): + for i in range(count): + await component.track_operation( + operation=op_name, + duration_ms=float(i), + success=True, + ) + + # Run multiple concurrent tasks + tasks = [ + track_operations("op1", 50), + track_operations("op2", 50), + track_operations("op3", 50), + ] + + await asyncio.gather(*tasks) + + # Verify all operations were tracked + assert "op1" in component._operation_timings + assert "op2" in component._operation_timings + assert "op3" in component._operation_timings + assert len(component._operation_timings["op1"]) == 50 + assert len(component._operation_timings["op2"]) == 50 + assert len(component._operation_timings["op3"]) == 50 + + @pytest.mark.asyncio + async def test_performance_percentiles(self): + """Test performance percentile calculations.""" + component = TestComponent() + + # Add known timing values + timings = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + for timing in timings: + await component.track_operation( + operation="test", + duration_ms=float(timing), + success=True, + ) + + metrics = await component.get_performance_metrics() + op_stats = metrics["operation_stats"]["test"] + + # Check percentiles + assert op_stats["p50_ms"] == 50 # Median + assert op_stats["p95_ms"] == 90 # 95th percentile + assert op_stats["p99_ms"] == 100 # 99th percentile + assert op_stats["min_ms"] == 10 + assert op_stats["max_ms"] == 100 + assert op_stats["avg_ms"] == 55 # Average + + @pytest.mark.asyncio + async def test_data_quality_tracking(self): + """Test data quality metrics tracking.""" + component = TestComponent() + + # Track various data quality issues + await component.track_data_quality( + total_points=1000, + invalid_points=10, + missing_points=5, + duplicate_points=3, + ) + + await component.track_data_quality( + total_points=500, + invalid_points=2, + missing_points=1, + duplicate_points=0, + ) + + quality_stats = await component.get_data_quality_stats() + + assert quality_stats["total_data_points"] == 1500 + assert quality_stats["invalid_data_points"] == 12 + assert quality_stats["missing_data_points"] == 6 + assert quality_stats["duplicate_data_points"] == 3 + assert quality_stats["quality_score"] > 98 # (1500-12)/1500 * 100 + assert quality_stats["invalid_rate"] < 0.01 # 12/1500 + + @pytest.mark.asyncio + async def test_cleanup_old_stats(self): + """Test that old statistics are properly cleaned up.""" + component = TestComponent() + component._retention_hours = 0 # Set to 0 for immediate cleanup + + # Add some stats + await component.track_error( + Exception("Old error"), + context="test", + ) + + # Manually set timestamp to be old + if component._error_history: + component._error_history[0]["timestamp"] = datetime.now() - timedelta( + hours=2 + ) + + # Trigger cleanup + await component.cleanup_old_stats() + + # Old error should be removed + assert len(component._error_history) == 0 + + @pytest.mark.asyncio + async def test_prometheus_export_format(self): + """Test Prometheus export format.""" + component = TestComponent() + + # Add some metrics + await component.track_operation("api_call", 100.0, success=True) + await component.track_error(Exception("Test error")) + + # Export in Prometheus format + prom_export = await component.export_stats(format="prometheus") + + # Check format + assert isinstance(prom_export, str) + assert "# HELP" in prom_export + assert "# TYPE" in prom_export + assert "testcomponent_" in prom_export.lower() + + +class TestStatisticsAggregator: + """Test suite for StatisticsAggregator.""" + + @pytest.mark.asyncio + async def test_aggregation_with_component_failures(self): + """Test that aggregation handles component failures gracefully.""" + aggregator = StatisticsAggregator(cache_ttl_seconds=1) + + # Create mock components that fail + failing_component = MagicMock() + failing_component.get_performance_metrics = AsyncMock( + side_effect=Exception("Component failed") + ) + + aggregator.order_manager = failing_component + aggregator.trading_suite = MagicMock() + aggregator.trading_suite.is_connected = False + aggregator.trading_suite.config = MagicMock() + aggregator.trading_suite.config.features = [] + aggregator.trading_suite.config.timeframes = [] + + # Should not raise, should return safe defaults + stats = await aggregator.aggregate_stats() + assert stats is not None + assert stats["status"] == "disconnected" + assert "components" in stats + + @pytest.mark.asyncio + async def test_cache_functionality(self): + """Test that caching works correctly.""" + aggregator = StatisticsAggregator(cache_ttl_seconds=1) + + # Mock a simple suite + aggregator.trading_suite = MagicMock() + aggregator.trading_suite.is_connected = True + aggregator.trading_suite.config = MagicMock() + aggregator.trading_suite.config.features = [] + aggregator.trading_suite.config.timeframes = ["1min"] + + # First call should compute + stats1 = await aggregator.aggregate_stats() + + # Second call should use cache + stats2 = await aggregator.aggregate_stats() + assert stats1 == stats2 # Should be identical + + # Wait for cache to expire + await asyncio.sleep(1.1) + + # Should recompute + stats3 = await aggregator.aggregate_stats() + assert stats3 is not None + + @pytest.mark.asyncio + async def test_health_score_calculation(self): + """Test health score calculation with various conditions.""" + aggregator = StatisticsAggregator() + + # Test with perfect health + stats = { + "components": {}, + "cache_hit_rate": 1.0, + "total_errors": 0, + } + result = await aggregator._calculate_cross_metrics(stats) + assert result["health_score"] == 100.0 + + # Test with errors + stats = { + "components": {"test": {"error_count": 10, "memory_usage_mb": 100}}, + "cache_hit_rate": 0.8, + } + result = await aggregator._calculate_cross_metrics(stats) + assert 0 <= result["health_score"] <= 100 + + # Test with disconnected components + stats = { + "components": { + "test1": {"status": "disconnected", "error_count": 0}, + "test2": {"status": "connected", "error_count": 0}, + }, + "cache_hit_rate": 0.5, + } + result = await aggregator._calculate_cross_metrics(stats) + assert result["health_score"] < 100 # Should be penalized + + @pytest.mark.asyncio + async def test_safe_division(self): + """Test that division by zero is handled safely.""" + aggregator = StatisticsAggregator() + + # Mock client with zero requests + mock_client = MagicMock() + mock_client.api_call_count = 0 + mock_client.cache_hit_count = 0 + + aggregator.client = mock_client + + # Should not raise division by zero + stats = await aggregator._get_client_stats() + assert stats["cache_hit_rate"] == 0.0 # Safe default + + @pytest.mark.asyncio + async def test_memory_calculation_performance(self): + """Test that memory calculation doesn't cause performance issues.""" + component = TestComponent() + + # Add many attributes to simulate large component + for i in range(1000): + setattr(component, f"attr_{i}", [1, 2, 3, 4, 5]) + + # Should complete quickly + import time + + start = time.time() + memory_mb = component._calculate_memory_usage() + duration = time.time() - start + + assert memory_mb > 0 + assert duration < 0.1 # Should be fast (< 100ms) + + def test_empty_stats_structure(self): + """Test that empty stats have correct structure.""" + aggregator = StatisticsAggregator() + + empty_stats = aggregator._get_empty_stats() + + # Check all required fields are present + assert "suite_id" in empty_stats + assert "instrument" in empty_stats + assert "status" in empty_stats + assert "connected" in empty_stats + assert "components" in empty_stats + assert "health_score" not in empty_stats # Added by cross-metrics + assert empty_stats["connected"] is False + assert empty_stats["status"] == "disconnected" + + def test_empty_component_stats_structure(self): + """Test that empty component stats have correct structure.""" + aggregator = StatisticsAggregator() + + empty_stats = aggregator._get_empty_component_stats("TestComponent", 100) + + assert empty_stats["name"] == "TestComponent" + assert empty_stats["status"] == "disconnected" + assert empty_stats["uptime_seconds"] == 100 + assert empty_stats["error_count"] == 0 + assert empty_stats["memory_usage_mb"] == 0.0 + assert empty_stats["performance_metrics"] == {} + + +@pytest.mark.asyncio +async def test_integration_stats_during_reconnection(): + """Test that statistics remain accurate during WebSocket reconnections.""" + # This would be an integration test with actual components + # Included here as a placeholder for comprehensive testing + pass + + +@pytest.mark.asyncio +async def test_stats_under_load(): + """Test statistics accuracy during high-frequency operations.""" + component = TestComponent() + + # Simulate high-frequency trading + tasks = [] + for i in range(100): + tasks.append( + component.track_operation( + operation=f"trade_{i % 10}", + duration_ms=float(i % 100), + success=i % 10 != 0, # 10% failure rate + ) + ) + + await asyncio.gather(*tasks) + + # Verify stats are accurate + metrics = await component.get_performance_metrics() + assert "operation_stats" in metrics + assert len(metrics["operation_stats"]) == 10 # 10 unique operations + + # Check network stats + assert metrics["network_stats"]["total_requests"] == 100 + assert metrics["network_stats"]["successful_requests"] == 90 + assert metrics["network_stats"]["failed_requests"] == 10 + assert 0.85 <= metrics["network_stats"]["success_rate"] <= 0.95 + + +@pytest.mark.asyncio +async def test_position_manager_stats_integration(): + """Test that PositionManager properly tracks statistics with EnhancedStatsTrackingMixin.""" + from project_x_py.models import Position + from project_x_py.position_manager import PositionManager + + # Create mock dependencies + mock_client = AsyncMock() + mock_event_bus = AsyncMock() + + # Setup mock response for search_open_positions + mock_positions = [ + Position( + id=1, + accountId=123, + contractId="MNQ", + type=1, # LONG + size=2, + averagePrice=15000.0, + creationTimestamp="2025-01-01T12:00:00Z", + ), + Position( + id=2, + accountId=123, + contractId="ES", + type=2, # SHORT + size=1, + averagePrice=4500.0, + creationTimestamp="2025-01-01T12:00:00Z", + ), + ] + mock_client.search_open_positions = AsyncMock(return_value=mock_positions) + + # Create PositionManager + position_manager = PositionManager( + project_x_client=mock_client, + event_bus=mock_event_bus, + ) + + # Perform operations to generate stats + await position_manager.get_all_positions() + await position_manager.get_position("MNQ") + await position_manager.get_position("ES") + await position_manager.get_position("NQ") # Not found + + # Verify stats are being tracked + metrics = await position_manager.get_performance_metrics() + assert "operation_stats" in metrics + + # Check that operations were tracked + assert "get_all_positions" in metrics["operation_stats"] + assert "get_position" in metrics["operation_stats"] + + # Verify operation counts + get_position_stats = metrics["operation_stats"]["get_position"] + assert get_position_stats["count"] == 3 # Called 3 times + # Check timing stats exist + assert "avg_ms" in get_position_stats + assert "p50_ms" in get_position_stats + assert "p95_ms" in get_position_stats + assert get_position_stats["avg_ms"] > 0 # Should have timing data + + +@pytest.mark.asyncio +async def test_realtime_data_manager_stats_integration(): + """Test that RealtimeDataManager properly tracks statistics with EnhancedStatsTrackingMixin.""" + from project_x_py.realtime_data_manager import RealtimeDataManager + + # Create mock dependencies + mock_client = AsyncMock() + mock_realtime_client = AsyncMock() + mock_event_bus = AsyncMock() + + # Create RealtimeDataManager + data_manager = RealtimeDataManager( + instrument="MNQ", + project_x=mock_client, + realtime_client=mock_realtime_client, + event_bus=mock_event_bus, + timeframes=["1min", "5min"], + ) + + # Verify enhanced stats are initialized + assert hasattr(data_manager, "_error_count") + assert hasattr(data_manager, "_operation_timings") + + # Track some operations + await data_manager.track_operation("process_tick", 0.5, success=True) + await data_manager.track_operation("process_tick", 0.3, success=True) + await data_manager.track_operation("process_tick", 0.8, success=False) + + # Get performance metrics + metrics = await data_manager.get_performance_metrics() + assert "operation_stats" in metrics + assert "process_tick" in metrics["operation_stats"] + + tick_stats = metrics["operation_stats"]["process_tick"] + assert tick_stats["count"] == 3 + assert "avg_ms" in tick_stats + assert tick_stats["avg_ms"] > 0 # Should have timing data + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_statistics_integration.py b/tests/test_statistics_integration.py new file mode 100644 index 0000000..a0225a1 --- /dev/null +++ b/tests/test_statistics_integration.py @@ -0,0 +1,317 @@ +""" +Integration tests for the enhanced statistics system. + +Tests that all components properly expose statistics and that +the data is easily accessible for monitoring and alerting systems. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from project_x_py import TradingSuite +from project_x_py.order_manager import OrderManager +from project_x_py.position_manager import PositionManager +from project_x_py.realtime_data_manager import RealtimeDataManager +from project_x_py.risk_manager import RiskManager + + +class TestStatisticsIntegration: + """Integration tests for statistics across all components.""" + + @pytest.mark.asyncio + async def test_all_components_have_stats_methods(self): + """Verify all components expose the required statistics methods.""" + # Create mock clients + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + mock_realtime = MagicMock() + + # Create components + order_manager = OrderManager(mock_client, mock_event_bus) + position_manager = PositionManager(mock_client, mock_event_bus) + risk_manager = RiskManager( + mock_client, + order_manager, + mock_event_bus, + position_manager=position_manager, + ) + + # Check OrderManager has enhanced stats methods + assert hasattr(order_manager, "track_operation") + assert hasattr(order_manager, "track_error") + assert hasattr(order_manager, "get_performance_metrics") + assert hasattr(order_manager, "get_error_stats") + assert hasattr(order_manager, "get_enhanced_memory_stats") + assert hasattr(order_manager, "export_stats") + + # Check PositionManager has enhanced stats methods + assert hasattr(position_manager, "track_operation") + assert hasattr(position_manager, "track_error") + assert hasattr(position_manager, "get_performance_metrics") + assert hasattr(position_manager, "get_error_stats") + assert hasattr(position_manager, "get_enhanced_memory_stats") + + # Check RiskManager has enhanced stats methods + assert hasattr(risk_manager, "track_operation") + assert hasattr(risk_manager, "track_error") + assert hasattr(risk_manager, "get_performance_metrics") + + @pytest.mark.asyncio + async def test_trading_suite_aggregates_stats(self): + """Test that TradingSuite properly aggregates statistics from all components.""" + with patch("project_x_py.trading_suite.ProjectX") as MockProjectX: + # Setup mocks + mock_client = AsyncMock() + mock_client.from_env = AsyncMock(return_value=mock_client) + mock_client.authenticate = AsyncMock() + mock_client.get_instrument = AsyncMock( + return_value=MagicMock(id="MNQ123", tickSize=0.25) + ) + mock_client.account_info = MagicMock(id=12345, name="Test") + MockProjectX.from_env.return_value = mock_client + + # Create suite + suite = TradingSuite(instrument="MNQ") + suite.client = mock_client + suite.instrument_info = MagicMock(id="MNQ123") + + # Mock components + suite.orders = MagicMock() + suite.orders.get_order_statistics = AsyncMock( + return_value={"orders_placed": 10, "orders_filled": 8, "fill_rate": 0.8} + ) + + suite.positions = MagicMock() + suite.positions.stats = {"positions_tracked": 5} + + suite.data = MagicMock() + suite.data.get_memory_stats = MagicMock( + return_value={"memory_usage_mb": 2.5, "bars_processed": 1000} + ) + + # Get aggregated stats + stats = await suite.get_stats() + + # Verify stats structure + assert isinstance(stats, dict) + assert "orders_placed" in stats + assert "positions_tracked" in stats + assert "data_memory_usage_mb" in stats + + # Verify aggregation includes all components + assert stats["orders_placed"] == 10 + assert stats["positions_tracked"] == 5 + assert stats["data_memory_usage_mb"] == 2.5 + + @pytest.mark.asyncio + async def test_stats_accessible_during_operation(self): + """Test that statistics remain accessible during active trading operations.""" + # Create mock component + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + + order_manager = OrderManager(mock_client, mock_event_bus) + + # Simulate operations + await order_manager.track_operation("test_op", 10.5, success=True) + await order_manager.track_operation("test_op", 20.3, success=True) + await order_manager.track_error( + ValueError("Test error"), context="test_context" + ) + + # Verify stats are accessible + perf_stats = await order_manager.get_performance_metrics() + assert "operation_stats" in perf_stats + assert "test_op" in perf_stats["operation_stats"] + assert perf_stats["operation_stats"]["test_op"]["count"] == 2 + + error_stats = await order_manager.get_error_stats() + assert error_stats["total_errors"] == 1 + assert "ValueError" in error_stats["error_types"] + + @pytest.mark.asyncio + async def test_stats_export_formats(self): + """Test that statistics can be exported in different formats.""" + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + + order_manager = OrderManager(mock_client, mock_event_bus) + + # Add some data + await order_manager.track_operation("export_test", 15.0) + + # Test JSON export + json_export = await order_manager.export_stats("json") + assert isinstance(json_export, dict) + assert "timestamp" in json_export + assert "component" in json_export + assert "performance" in json_export + + # Test Prometheus export + prom_export = await order_manager.export_stats("prometheus") + assert isinstance(prom_export, str) + assert "TYPE" in prom_export or "HELP" in prom_export or len(prom_export) > 0 + + @pytest.mark.asyncio + async def test_memory_stats_tracking(self): + """Test that memory statistics are properly tracked.""" + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + + position_manager = PositionManager(mock_client, mock_event_bus) + + # Get memory stats + mem_stats = await position_manager.get_enhanced_memory_stats() + + assert "current_memory_mb" in mem_stats + assert "memory_trend" in mem_stats + assert isinstance(mem_stats["current_memory_mb"], float) + assert mem_stats["current_memory_mb"] >= 0 + + @pytest.mark.asyncio + async def test_data_quality_tracking(self): + """Test data quality metrics tracking.""" + mock_client = MagicMock() + mock_event_bus = MagicMock() + mock_realtime = MagicMock() + + # Create data manager with minimal setup + data_manager = RealtimeDataManager( + instrument="MNQ", + project_x=mock_client, + realtime_client=mock_realtime, + event_bus=mock_event_bus, + ) + + # Track data quality + await data_manager.track_data_quality( + total_points=1000, invalid_points=10, missing_points=5, duplicate_points=2 + ) + + # Get quality stats + quality_stats = await data_manager.get_data_quality_stats() + + assert "quality_score" in quality_stats + assert "invalid_rate" in quality_stats + assert quality_stats["total_data_points"] == 1000 + assert quality_stats["invalid_data_points"] == 10 + + @pytest.mark.asyncio + async def test_cross_component_metrics(self): + """Test that cross-component metrics are calculated correctly.""" + with patch("project_x_py.trading_suite.ProjectX"): + suite = TradingSuite(instrument="MNQ") + + # Mock components with stats + suite.orders = MagicMock() + suite.orders.get_performance_metrics = AsyncMock( + return_value={ + "network_stats": {"total_requests": 100, "successful_requests": 95} + } + ) + suite.orders.get_error_stats = AsyncMock(return_value={"total_errors": 5}) + + suite.positions = MagicMock() + suite.positions.get_performance_metrics = AsyncMock( + return_value={ + "network_stats": {"total_requests": 50, "successful_requests": 48} + } + ) + suite.positions.get_error_stats = AsyncMock( + return_value={"total_errors": 2} + ) + + # Create aggregator + from project_x_py.utils.statistics_aggregator import StatisticsAggregator + + aggregator = StatisticsAggregator(suite) + + # Get aggregated stats + stats = await aggregator.get_aggregated_stats() + + # Verify cross-component metrics + assert "health_score" in stats + assert "total_operations" in stats + assert "overall_error_rate" in stats + + # Health score should be between 0 and 100 + assert 0 <= stats["health_score"] <= 100 + + @pytest.mark.asyncio + async def test_stats_persistence_across_operations(self): + """Test that statistics persist across multiple operations.""" + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + + order_manager = OrderManager(mock_client, mock_event_bus) + + # Track multiple operations + for i in range(10): + await order_manager.track_operation(f"op_{i % 3}", float(i * 10)) + + # Get stats + perf_stats = await order_manager.get_performance_metrics() + + # Verify all operations are tracked + assert len(perf_stats["operation_stats"]) == 3 # 3 unique operation types + total_ops = sum( + stats["count"] for stats in perf_stats["operation_stats"].values() + ) + assert total_ops == 10 + + @pytest.mark.asyncio + async def test_stats_cleanup_mechanism(self): + """Test that old statistics are properly cleaned up.""" + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + + order_manager = OrderManager(mock_client, mock_event_bus) + + # Track operations + for i in range(100): + await order_manager.track_operation("cleanup_test", float(i)) + + # Manually trigger cleanup + await order_manager.cleanup_old_stats() + + # Stats should still be accessible + perf_stats = await order_manager.get_performance_metrics() + assert "operation_stats" in perf_stats + + @pytest.mark.asyncio + async def test_concurrent_stats_access(self): + """Test that statistics can be accessed concurrently without issues.""" + mock_client = MagicMock() + mock_client.account_info = MagicMock(id=12345) + mock_event_bus = MagicMock() + + order_manager = OrderManager(mock_client, mock_event_bus) + + async def track_operations(): + for i in range(50): + await order_manager.track_operation(f"concurrent_{i % 5}", float(i)) + + async def read_stats(): + for _ in range(50): + stats = await order_manager.get_performance_metrics() + assert "operation_stats" in stats + await asyncio.sleep(0.001) + + # Run concurrently + await asyncio.gather(track_operations(), read_stats(), read_stats()) + + # Verify final state + final_stats = await order_manager.get_performance_metrics() + assert len(final_stats["operation_stats"]) == 5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_statistics_performance.py b/tests/test_statistics_performance.py new file mode 100644 index 0000000..e958141 --- /dev/null +++ b/tests/test_statistics_performance.py @@ -0,0 +1,375 @@ +""" +Performance benchmarking suite for enhanced statistics system. + +This module tests the performance impact of the enhanced statistics tracking +to ensure it meets the < 2% CPU overhead and < 5% memory overhead targets. +""" + +import asyncio +import gc +import time +from datetime import datetime +from typing import Any + +import pytest + +from project_x_py.utils.enhanced_stats_tracking import EnhancedStatsTrackingMixin +from project_x_py.utils.statistics_aggregator import StatisticsAggregator + + +class MockComponent(EnhancedStatsTrackingMixin): + """Mock component for performance testing.""" + + def __init__(self): + self._init_enhanced_stats() + self.operations_executed = 0 + + async def execute_operation(self, duration_ms: float = 1.0) -> None: + """Simulate an operation with timing.""" + start_time = time.time() + # Simulate work + await asyncio.sleep(duration_ms / 1000) + actual_duration = (time.time() - start_time) * 1000 + + # Track the operation + await self.track_operation("test_operation", actual_duration, success=True) + self.operations_executed += 1 + + +class TestStatisticsPerformance: + """Performance benchmarking tests for statistics system.""" + + @pytest.mark.asyncio + async def test_cpu_overhead(self): + """Test that statistics tracking adds < 2% CPU overhead.""" + # Create components with and without stats + component_with_stats = MockComponent() + + # Benchmark without stats tracking (baseline) + start_time = time.perf_counter() + operations = 1000 + + for _ in range(operations): + await asyncio.sleep(0.001) # 1ms operation + + baseline_duration = time.perf_counter() - start_time + + # Benchmark with stats tracking + start_time = time.perf_counter() + + for _ in range(operations): + await component_with_stats.execute_operation(1.0) + + stats_duration = time.perf_counter() - start_time + + # Calculate overhead + overhead_percent = ( + (stats_duration - baseline_duration) / baseline_duration + ) * 100 + + print(f"Baseline: {baseline_duration:.3f}s") + print(f"With stats: {stats_duration:.3f}s") + print(f"Overhead: {overhead_percent:.2f}%") + + # Assert overhead is less than 2% + assert overhead_percent < 2.0, ( + f"CPU overhead {overhead_percent:.2f}% exceeds 2% target" + ) + + @pytest.mark.asyncio + async def test_memory_overhead(self): + """Test that statistics tracking adds < 5% memory overhead.""" + import sys + + # Force garbage collection + gc.collect() + + # Create a component and measure base memory + component = MockComponent() + base_size = sys.getsizeof(component) + + # Execute many operations to populate statistics + for i in range(10000): + await component.track_operation(f"op_{i % 10}", float(i % 100)) + if i % 100 == 0: + await component.track_error( + ValueError(f"Test error {i}"), + f"context_{i}", + {"detail": f"value_{i}"}, + ) + + # Measure memory with statistics + gc.collect() + + # Calculate actual memory usage + memory_stats = await component.get_enhanced_memory_stats() + stats_memory_mb = memory_stats["current_memory_mb"] + + # Base object is small, so we check absolute memory usage + print(f"Base size: {base_size} bytes") + print(f"Stats memory: {stats_memory_mb:.2f} MB") + + # Assert memory usage is reasonable (< 10MB for 10K operations) + assert stats_memory_mb < 10.0, ( + f"Memory usage {stats_memory_mb:.2f}MB exceeds limit" + ) + + @pytest.mark.asyncio + async def test_high_frequency_operations(self): + """Test statistics handling at 1000+ operations/second.""" + component = MockComponent() + operations_per_second = 1000 + duration_seconds = 2 + + start_time = time.time() + tasks = [] + + # Create concurrent operations + for i in range(operations_per_second * duration_seconds): + task = component.track_operation( + f"high_freq_op_{i % 10}", + 0.1, # 0.1ms operations + success=i % 100 != 0, # 1% failure rate + ) + tasks.append(task) + + # Yield control periodically + if i % 100 == 0: + await asyncio.sleep(0) + + # Wait for all operations to complete + await asyncio.gather(*tasks) + + elapsed = time.time() - start_time + actual_ops_per_second = len(tasks) / elapsed + + print(f"Target: {operations_per_second} ops/sec") + print(f"Actual: {actual_ops_per_second:.0f} ops/sec") + print(f"Elapsed: {elapsed:.2f}s") + + # Verify we achieved target throughput + assert ( + actual_ops_per_second >= operations_per_second * 0.9 + ) # Allow 10% variance + + # Verify statistics are accurate + perf_metrics = await component.get_performance_metrics() + total_ops = sum( + stats["count"] for stats in perf_metrics["operation_stats"].values() + ) + assert total_ops == len(tasks) + + @pytest.mark.asyncio + async def test_circular_buffer_memory_bounds(self): + """Test that circular buffers prevent unbounded memory growth.""" + component = MockComponent() + + # Get initial memory + initial_stats = await component.get_enhanced_memory_stats() + initial_memory = initial_stats["current_memory_mb"] + + # Execute many more operations than buffer size (default 1000) + for i in range(5000): + await component.track_operation(f"buffer_test", float(i % 100)) + + # Check memory after many operations + mid_stats = await component.get_enhanced_memory_stats() + mid_memory = mid_stats["current_memory_mb"] + + # Execute many more operations + for i in range(5000, 10000): + await component.track_operation(f"buffer_test", float(i % 100)) + + # Check final memory + final_stats = await component.get_enhanced_memory_stats() + final_memory = final_stats["current_memory_mb"] + + print(f"Initial memory: {initial_memory:.2f} MB") + print(f"Mid memory: {mid_memory:.2f} MB") + print(f"Final memory: {final_memory:.2f} MB") + + # Memory should not grow significantly after buffer is full + memory_growth = final_memory - mid_memory + assert memory_growth < 0.5, ( + f"Memory grew by {memory_growth:.2f}MB after buffer full" + ) + + @pytest.mark.asyncio + async def test_concurrent_access_performance(self): + """Test performance with multiple concurrent accessors.""" + component = MockComponent() + num_workers = 10 + operations_per_worker = 100 + + async def worker(worker_id: int): + """Simulate a worker accessing statistics.""" + for i in range(operations_per_worker): + # Mix of operations + await component.track_operation(f"worker_{worker_id}_op", float(i)) + + if i % 10 == 0: + # Periodically read stats + _ = await component.get_performance_metrics() + + if i % 20 == 0: + # Occasionally track errors + await component.track_error( + RuntimeError(f"Worker {worker_id} error"), f"operation_{i}" + ) + + # Run workers concurrently + start_time = time.time() + await asyncio.gather(*[worker(i) for i in range(num_workers)]) + elapsed = time.time() - start_time + + total_operations = num_workers * operations_per_worker + ops_per_second = total_operations / elapsed + + print(f"Workers: {num_workers}") + print(f"Total operations: {total_operations}") + print(f"Elapsed: {elapsed:.2f}s") + print(f"Throughput: {ops_per_second:.0f} ops/sec") + + # Should handle concurrent access efficiently + assert ops_per_second > 500, ( + f"Concurrent throughput {ops_per_second:.0f} too low" + ) + + @pytest.mark.asyncio + async def test_statistics_aggregation_performance(self): + """Test performance of cross-component statistics aggregation.""" + + # Create mock suite with multiple components + class MockSuite: + def __init__(self): + self.orders = MockComponent() + self.positions = MockComponent() + self.data = MockComponent() + self.risk_manager = MockComponent() + self.orderbook = MockComponent() + + suite = MockSuite() + aggregator = StatisticsAggregator(suite) + + # Populate components with data + components = [suite.orders, suite.positions, suite.data] + for component in components: + for i in range(100): + await component.track_operation(f"op_{i}", float(i % 50)) + + # Benchmark aggregation + iterations = 100 + start_time = time.time() + + for _ in range(iterations): + stats = await aggregator.get_aggregated_stats() + assert "health_score" in stats + + elapsed = time.time() - start_time + avg_time_ms = (elapsed / iterations) * 1000 + + print(f"Aggregation iterations: {iterations}") + print(f"Total time: {elapsed:.3f}s") + print(f"Average time: {avg_time_ms:.2f}ms") + + # Aggregation should be fast (< 10ms) + assert avg_time_ms < 10, ( + f"Aggregation time {avg_time_ms:.2f}ms exceeds 10ms target" + ) + + @pytest.mark.asyncio + async def test_cleanup_performance(self): + """Test that cleanup operations don't block or slow down operations.""" + component = MockComponent() + + # Fill up statistics + for i in range(1000): + await component.track_operation(f"cleanup_test", float(i)) + if i % 10 == 0: + await component.track_error(Exception(f"Error {i}"), f"context_{i}") + + # Measure cleanup time + start_time = time.time() + await component.cleanup_old_stats() + cleanup_time = time.time() - start_time + + print(f"Cleanup time: {cleanup_time * 1000:.2f}ms") + + # Cleanup should be fast (< 100ms) + assert cleanup_time < 0.1, ( + f"Cleanup took {cleanup_time * 1000:.2f}ms, exceeds 100ms" + ) + + # Operations should continue to work during/after cleanup + await component.track_operation("post_cleanup", 1.0) + stats = await component.get_performance_metrics() + assert "operation_stats" in stats + + @pytest.mark.asyncio + async def test_export_performance(self): + """Test performance of statistics export in different formats.""" + component = MockComponent() + + # Populate with data + for i in range(500): + await component.track_operation(f"export_test_{i % 20}", float(i % 100)) + + # Test JSON export + start_time = time.time() + json_export = await component.export_stats("json") + json_time = time.time() - start_time + + # Test Prometheus export + start_time = time.time() + prometheus_export = await component.export_stats("prometheus") + prometheus_time = time.time() - start_time + + print(f"JSON export time: {json_time * 1000:.2f}ms") + print(f"Prometheus export time: {prometheus_time * 1000:.2f}ms") + + # Exports should be fast (< 50ms) + assert json_time < 0.05, f"JSON export took {json_time * 1000:.2f}ms" + assert prometheus_time < 0.05, ( + f"Prometheus export took {prometheus_time * 1000:.2f}ms" + ) + + @pytest.mark.asyncio + async def test_percentile_calculation_performance(self): + """Test performance of percentile calculations with large datasets.""" + component = MockComponent() + + # Add many timing samples + for i in range(1000): + component._api_timings.append(float(i)) + + # Benchmark percentile calculations + iterations = 1000 + start_time = time.time() + + for _ in range(iterations): + p50 = component._calculate_percentile(component._api_timings, 50) + p95 = component._calculate_percentile(component._api_timings, 95) + p99 = component._calculate_percentile(component._api_timings, 99) + + elapsed = time.time() - start_time + avg_time_us = (elapsed / iterations) * 1_000_000 + + print(f"Percentile calculations: {iterations}") + print(f"Average time: {avg_time_us:.2f}μs") + + # Percentile calculation should be fast (< 100μs) + assert avg_time_us < 100, f"Percentile calc took {avg_time_us:.2f}μs" + + +if __name__ == "__main__": + # Run performance tests + asyncio.run(TestStatisticsPerformance().test_cpu_overhead()) + asyncio.run(TestStatisticsPerformance().test_memory_overhead()) + asyncio.run(TestStatisticsPerformance().test_high_frequency_operations()) + asyncio.run(TestStatisticsPerformance().test_circular_buffer_memory_bounds()) + asyncio.run(TestStatisticsPerformance().test_concurrent_access_performance()) + asyncio.run(TestStatisticsPerformance().test_statistics_aggregation_performance()) + asyncio.run(TestStatisticsPerformance().test_cleanup_performance()) + asyncio.run(TestStatisticsPerformance().test_export_performance()) + asyncio.run(TestStatisticsPerformance().test_percentile_calculation_performance()) + print("\n✅ All performance tests passed!")