diff --git a/CHANGELOG.md b/CHANGELOG.md index dad171e..0f93ae7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Migration guides will be provided for all breaking changes - Semantic versioning (MAJOR.MINOR.PATCH) is strictly followed +## [3.1.8] - 2025-08-12 + +### Fixed +- **πŸ”§ Real-time Data Processing**: Fixed real-time data not being processed for E-mini contracts (NQ/ES) + - Symbol matching now handles contract resolution (e.g., NQ resolves to ENQ) + - Stores both original instrument and resolved symbol ID for proper matching + - Affects all contracts where user symbol differs from exchange symbol + +### Added +- **⏱️ Bar Timer Mechanism**: Automatic bar creation during low-volume periods + - Creates empty bars (volume=0) at regular intervals when no ticks arrive + - Ensures consistent bar generation for all instruments regardless of trading activity + - Particularly important for low-volume contracts and after-hours trading + - Empty bars maintain price continuity using the last close price + +### Improved +- Enhanced symbol validation to support both user-specified and exchange-resolved symbols +- Better handling of futures contract name resolution (NQβ†’ENQ, ESβ†’EP, etc.) +- More robust real-time data pipeline for all futures contracts + +## [3.1.7] - 2025-08-12 + +### Changed +- Updated documentation and examples for better clarity +- Minor code improvements and optimizations + +### Documentation +- Updated CLAUDE.md with current v3.1.7 information +- Corrected code examples to use TradingSuite API +- Removed references to deprecated factory functions + ## [3.1.6] - 2025-08-12 ### Fixed diff --git a/CLAUDE.md b/CLAUDE.md index 7eaabfd..e51d78f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,7 +2,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. -## Project Status: v3.1.6 - Stable Production Release +## Project Status: v3.1.8 - Stable Production Release **IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading. @@ -166,29 +166,17 @@ uv run python -m build # Alternative build command **Async Factory Functions**: Use async `create_*` functions for component initialization: ```python -# Async factory pattern (v2.0.0+) +# TradingSuite - Recommended approach (v3.0.0+) async def setup_trading(): - async with ProjectX.from_env() as client: - await client.authenticate() - - # Create managers with async patterns - realtime_client = await create_realtime_client( - client.jwt_token, - str(client.account_id) - ) - - order_manager = create_order_manager(client, realtime_client) - position_manager = create_position_manager(client, realtime_client) - - # Or use the all-in-one factory - suite = await create_trading_suite( - instrument="MNQ", - project_x=client, - jwt_token=client.jwt_token, - account_id=client.account_id - ) - - return suite + # Simple one-line setup with TradingSuite + suite = await TradingSuite.create( + "MNQ", + timeframes=["1min", "5min"], + features=["orderbook"] + ) + + # Everything is ready - client authenticated, realtime connected + return suite ``` **Dependency Injection**: Managers receive their dependencies (ProjectX client, realtime client) rather than creating them. @@ -300,6 +288,16 @@ async with ProjectX.from_env() as client: ## Recent Changes +### v3.1.8 - Latest Release +- **Fixed**: Real-time data processing for E-mini contracts (NQ/ES) that resolve to different symbols +- **Added**: Bar timer mechanism to create empty bars during low-volume periods +- **Improved**: Symbol matching to handle contract resolution (e.g., NQβ†’ENQ) +- **Enhanced**: Real-time data manager now properly processes all futures contracts + +### v3.1.7 - Previous Release +- Minor updates and improvements +- Documentation enhancements + ### v3.1.6 - Critical Deadlock Fix - **Fixed**: Deadlock when calling `suite.data` methods from event handler callbacks (Issue #39) - **Improved**: Event emission now non-blocking to prevent handler deadlocks @@ -343,29 +341,31 @@ async with ProjectX.from_env() as client: - All core modules organized as packages with focused submodules - Improved code organization and maintainability -### Trading Suite Usage +### Trading Suite Usage (v3.0.0+) ```python # Complete trading suite with all managers -from project_x_py import create_trading_suite +from project_x_py import TradingSuite async def main(): - suite = await create_trading_suite( - instrument="MNQ", + suite = await TradingSuite.create( + "MNQ", timeframes=["1min", "5min"], - enable_orderbook=True, - enable_risk_management=True + features=["orderbook", "risk_manager"], + initial_days=5 ) # All managers are integrated and ready - await suite.start() + # No need to call start() - already connected # Access individual managers - order = await suite.order_manager.place_market_order( - "MNQ", 1, "BUY" + order = await suite.orders.place_market_order( + contract_id=suite.instrument_info.id, + side=0, # Buy + size=1 ) - position = suite.position_manager.get_position("MNQ") - bars = suite.data_manager.get_bars("MNQ", "1min") + position = await suite.positions.get_position("MNQ") + bars = await suite.data.get_data("1min") ``` ### Key Async Examples @@ -375,24 +375,23 @@ async with ProjectX.from_env() as client: await client.authenticate() bars = await client.get_bars("MNQ", days=5) -# Real-time data +# Real-time data with TradingSuite async def stream_data(): - async with ProjectX.from_env() as client: - await client.authenticate() - - realtime = await create_realtime_client( - client.jwt_token, - str(client.account_id) - ) - - data_manager = create_realtime_data_manager( - "MNQ", client, realtime - ) - - # Set up callbacks - data_manager.on_bar_received = handle_bar - - # Start streaming - await realtime.connect() - await data_manager.start_realtime_feed() + suite = await TradingSuite.create( + "MNQ", + timeframes=["1min", "5min"] + ) + + # Register event handlers + from project_x_py import EventType + + async def handle_bar(event): + print(f"New bar: {event.data}") + + await suite.on(EventType.NEW_BAR, handle_bar) + + # Data is already streaming + # Access current data + current_price = await suite.data.get_current_price() + bars = await suite.data.get_data("1min") ``` \ No newline at end of file diff --git a/GEMINI.md b/GEMINI.md index 00bf74c..8a6e1f6 100644 --- a/GEMINI.md +++ b/GEMINI.md @@ -2,7 +2,7 @@ This file provides guidance to Google's Gemini models when working with code in this repository. -## Project Status: v3.1.1 - Stable Production Release +## Project Status: v3.1.7 - Stable Production Release **IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading. @@ -96,7 +96,7 @@ uv run python -m build # Alternative build command ## Project Architecture -### Core Components (v3.0.1 - Multi-file Packages) +### Core Components (v3.0.2 - Multi-file Packages) **ProjectX Client (`src/project_x_py/client/`)** - Main async API client for TopStepX ProjectX Gateway @@ -166,29 +166,17 @@ uv run python -m build # Alternative build command **Async Factory Functions**: Use async `create_*` functions for component initialization: ```python -# Async factory pattern (v2.0.0+) +# TradingSuite - Recommended approach (v3.0.0+) async def setup_trading(): - async with ProjectX.from_env() as client: - await client.authenticate() - - # Create managers with async patterns - realtime_client = await create_realtime_client( - client.jwt_token, - str(client.account_id) - ) - - order_manager = create_order_manager(client, realtime_client) - position_manager = create_position_manager(client, realtime_client) - - # Or use the all-in-one factory - suite = await create_trading_suite( - instrument="MNQ", - project_x=client, - jwt_token=client.jwt_token, - account_id=client.account_id - ) - - return suite + # Simple one-line setup with TradingSuite + suite = await TradingSuite.create( + "MNQ", + timeframes=["1min", "5min"], + features=["orderbook"] + ) + + # Everything is ready - client authenticated, realtime connected + return suite ``` **Dependency Injection**: Managers receive their dependencies (ProjectX client, realtime client) rather than creating them. @@ -300,6 +288,34 @@ async with ProjectX.from_env() as client: ## Recent Changes +### v3.1.7 - Latest Release +- Minor updates and improvements +- Documentation enhancements + +### v3.1.6 - Critical Deadlock Fix +- **Fixed**: Deadlock when calling `suite.data` methods from event handler callbacks (Issue #39) +- **Improved**: Event emission now non-blocking to prevent handler deadlocks +- **Enhanced**: Event triggering moved outside lock scope for better concurrency +- **Added**: Missing asyncio import in data_processing module +- **Maintained**: Full API compatibility - no breaking changes + +### v3.1.5 - Enhanced Bar Data Retrieval +- **Added**: Optional `start_time` and `end_time` parameters to `get_bars()` method +- **Improved**: Precise time range specification for historical data queries +- **Enhanced**: Full timezone support with automatic UTC conversion +- **Maintained**: Complete backward compatibility with existing `days` parameter + +### v3.1.4 - WebSocket Connection Fix +- **Fixed**: Critical WebSocket error with missing `_use_batching` attribute +- **Improved**: Proper mixin initialization in ProjectXRealtimeClient +- **Enhanced**: More robust real-time connection handling + +### v3.0.2 - Bug Fixes and Improvements +- **Order Lifecycle Tracking**: Fixed asyncio concurrency and field reference issues +- **Order Templates**: Fixed instrument lookup to use cached object +- **Cleanup Functionality**: Added comprehensive order/position cleanup +- **Documentation**: Updated all docs to reflect current version + ### v3.0.1 - Production Ready - **Performance Optimizations**: Enhanced connection pooling and caching - **Event Bus System**: Unified event handling across all components @@ -319,29 +335,31 @@ async with ProjectX.from_env() as client: - All core modules organized as packages with focused submodules - Improved code organization and maintainability -### Trading Suite Usage +### Trading Suite Usage (v3.0.0+) ```python # Complete trading suite with all managers -from project_x_py import create_trading_suite +from project_x_py import TradingSuite async def main(): - suite = await create_trading_suite( - instrument="MNQ", + suite = await TradingSuite.create( + "MNQ", timeframes=["1min", "5min"], - enable_orderbook=True, - enable_risk_management=True + features=["orderbook", "risk_manager"], + initial_days=5 ) # All managers are integrated and ready - await suite.start() + # No need to call start() - already connected # Access individual managers - order = await suite.order_manager.place_market_order( - "MNQ", 1, "BUY" + order = await suite.orders.place_market_order( + contract_id=suite.instrument_info.id, + side=0, # Buy + size=1 ) - position = suite.position_manager.get_position("MNQ") - bars = suite.data_manager.get_bars("MNQ", "1min") + position = await suite.positions.get_position("MNQ") + bars = await suite.data.get_data("1min") ``` ### Key Async Examples @@ -351,24 +369,23 @@ async with ProjectX.from_env() as client: await client.authenticate() bars = await client.get_bars("MNQ", days=5) -# Real-time data +# Real-time data with TradingSuite async def stream_data(): - async with ProjectX.from_env() as client: - await client.authenticate() - - realtime = await create_realtime_client( - client.jwt_token, - str(client.account_id) - ) - - data_manager = create_realtime_data_manager( - "MNQ", client, realtime - ) - - # Set up callbacks - data_manager.on_bar_received = handle_bar - - # Start streaming - await realtime.connect() - await data_manager.start_realtime_feed() + suite = await TradingSuite.create( + "MNQ", + timeframes=["1min", "5min"] + ) + + # Register event handlers + from project_x_py import EventType + + async def handle_bar(event): + print(f"New bar: {event.data}") + + await suite.on(EventType.NEW_BAR, handle_bar) + + # Data is already streaming + # Access current data + current_price = await suite.data.get_current_price() + bars = await suite.data.get_data("1min") ``` \ No newline at end of file diff --git a/RELEASE_NOTES_3.1.8.md b/RELEASE_NOTES_3.1.8.md new file mode 100644 index 0000000..d17d8fc --- /dev/null +++ b/RELEASE_NOTES_3.1.8.md @@ -0,0 +1,79 @@ +# Release v3.1.8 - Real-time Data Reliability Improvements + +## 🎯 Overview +This release addresses critical issues with real-time data processing for futures contracts, particularly improving support for E-mini futures (NQ, ES) and ensuring consistent bar generation during low-volume trading periods. + +## πŸ› Bug Fixes + +### Fixed Symbol Resolution for E-mini Futures +- **Issue**: E-mini futures like NQ (which resolves to ENQ) and ES (which resolves to EP) were not receiving real-time updates +- **Root Cause**: Symbol matching logic only checked the original instrument name, not the resolved symbol ID +- **Solution**: Enhanced symbol validation to check both the user-specified instrument and the resolved symbol ID +- **Impact**: All futures contracts now work correctly with real-time data feeds + +### Fixed Bar Generation During Low-Volume Periods +- **Issue**: OHLCV bars were only created when new ticks arrived, causing gaps during low-volume periods +- **Root Cause**: No periodic bar creation mechanism for quiet market periods +- **Solution**: Implemented a bar timer task that creates empty bars at proper intervals even without trading activity +- **Impact**: Consistent bar generation across all timeframes regardless of trading volume + +## πŸ”§ Technical Improvements + +### Enhanced Error Handling +- Added robust error handling for bar time calculations +- Improved error recovery in bar timer loop to prevent task termination +- Better logging for debugging symbol resolution issues + +### Code Quality Improvements +- Fixed asyncio task warnings by properly storing task references +- Improved unit conversion clarity with explicit mapping dictionary +- Optimized DataFrame operations for better performance + +## πŸ“Š Affected Components +- `RealtimeDataManager` - Core real-time data processing +- Symbol validation and matching logic +- Bar creation and timing mechanisms +- Event emission system + +## πŸš€ Migration Guide +No breaking changes. This version is fully backward compatible with v3.1.x. + +Simply update to v3.1.8 to benefit from improved real-time data reliability: +```bash +pip install --upgrade project-x-py==3.1.8 +``` + +## πŸ“ Example Usage +```python +from project_x_py import TradingSuite + +# Works with all futures contracts now +suite = await TradingSuite.create( + instrument="NQ", # E-mini Nasdaq now works correctly + timeframes=["1min", "5min"], + initial_days=1 +) + +# Bars will be created consistently even during low-volume periods +# No gaps in your data during quiet market times +``` + +## πŸŽ‰ Benefits +- **Improved Reliability**: All futures contracts now receive real-time updates correctly +- **Consistent Data**: No more gaps in bar data during low-volume periods +- **Better Trading**: More reliable data for trading strategies, especially for E-mini futures +- **Zero Breaking Changes**: Drop-in replacement for v3.1.7 + +## πŸ™ Acknowledgments +Special thanks to our users for reporting these issues and helping us improve the real-time data system. + +## πŸ“š Documentation +For more details, see: +- [CHANGELOG.md](./CHANGELOG.md) - Complete change history +- [Real-time Data Manager Documentation](./docs/realtime_data_manager.md) +- [Example Scripts](./examples/realtime_data_manager/) + +--- +*Released: January 2025* +*Version: 3.1.8* +*Status: Production Ready* \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index b5c7d7e..85f342a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -23,8 +23,8 @@ project = "project-x-py" copyright = "2025, Jeff West" author = "Jeff West" -release = "3.1.7" -version = "3.1.7" +release = "3.1.8" +version = "3.1.8" # -- General configuration --------------------------------------------------- diff --git a/examples/realtime_data_manager/01_events_with_on.py b/examples/realtime_data_manager/01_events_with_on.py index bb4d8b3..c4acf53 100644 --- a/examples/realtime_data_manager/01_events_with_on.py +++ b/examples/realtime_data_manager/01_events_with_on.py @@ -7,8 +7,10 @@ async def main(): print("Creating TradingSuite...") + # Note: Use "MNQ" for Micro E-mini Nasdaq-100 futures + # "NQ" resolves to E-mini Nasdaq (ENQ) which may have different data characteristics suite = await TradingSuite.create( - instrument="MNQ", + instrument="MNQ", # Works best with MNQ for consistent real-time updates timeframes=["15sec"], ) print("TradingSuite created!") @@ -19,7 +21,7 @@ async def main(): # Set up signal handler for clean exit shutdown_event = asyncio.Event() - def signal_handler(signum, frame): + def signal_handler(_signum, _frame): print("\n\nReceived interrupt signal. Shutting down gracefully...") shutdown_event.set() diff --git a/pyproject.toml b/pyproject.toml index 45fec00..9daac40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "project-x-py" -version = "3.1.7" +version = "3.1.8" description = "High-performance Python SDK for futures trading with real-time WebSocket data, technical indicators, order management, and market depth analysis" readme = "README.md" license = { text = "MIT" } diff --git a/src/project_x_py/__init__.py b/src/project_x_py/__init__.py index 03fdb55..d66a95d 100644 --- a/src/project_x_py/__init__.py +++ b/src/project_x_py/__init__.py @@ -78,7 +78,7 @@ It provides the infrastructure to help developers create their own trading applications that integrate with the ProjectX platform. -Version: 3.0.0 +Version: 3.1.7 Author: TexasCoding See Also: @@ -95,7 +95,7 @@ from project_x_py.client.base import ProjectXBase -__version__ = "3.1.7" +__version__ = "3.1.8" __author__ = "TexasCoding" # Core client classes - renamed from Async* to standard names diff --git a/src/project_x_py/client/__init__.py b/src/project_x_py/client/__init__.py index 5545916..452ed29 100644 --- a/src/project_x_py/client/__init__.py +++ b/src/project_x_py/client/__init__.py @@ -88,38 +88,35 @@ class ProjectX(ProjectXBase): >>> >>> asyncio.run(main()) - For advanced async trading applications, combine with specialized managers: - >>> # V3: Advanced trading with specialized managers - >>> from project_x_py import ( - ... ProjectX, - ... create_realtime_client, - ... create_order_manager, - ... create_position_manager, - ... create_realtime_data_manager, - ... ) + For advanced async trading applications, use the `TradingSuite`: + >>> # V3: Advanced trading with the TradingSuite + >>> import asyncio + >>> from project_x_py import TradingSuite >>> >>> async def trading_app(): - >>> async with ProjectX.from_env() as client: - >>> await client.authenticate() + >>> # The TradingSuite simplifies setup and integrates all managers + >>> suite = await TradingSuite.create( + ... "MNQ", + ... timeframes=["1min", "5min"], + ... features=["orderbook", "risk_manager"] + ... ) + >>> + >>> # Client is authenticated and real-time data is streaming. + >>> + >>> # Access integrated managers easily + >>> order = await suite.orders.place_market_order( + ... contract_id=suite.instrument_info.id, + ... side=0, # Buy + ... size=1 + ... ) >>> - >>> # V3: Create specialized async managers with dependency injection - >>> jwt_token = client.get_session_token() - >>> account_id = str(client.get_account_info().id) + >>> position = await suite.positions.get_position("MNQ") + >>> bars = await suite.data.get_data("1min") >>> - >>> # V3: Real-time WebSocket client for all managers - >>> realtime_client = await create_realtime_client(jwt_token, account_id) - >>> # V3: Create managers with shared realtime client - >>> order_manager = create_order_manager(client, realtime_client) - >>> position_manager = create_position_manager(client, realtime_client) - >>> data_manager = create_realtime_data_manager( - ... "MNQ", client, realtime_client, timeframes=["1min", "5min"] - ... ) + >>> print(f"Placed order {order.id}, current position: {position.netPos}") + >>> print(f"Latest 1-min bar: {bars.tail(1)}") >>> - >>> # V3: Connect and start real-time trading - >>> await realtime_client.connect() - >>> await data_manager.start_realtime_feed() - >>> # V3: Now ready for real-time trading with all managers - >>> # ... your trading logic here ... + >>> asyncio.run(trading_app()) """ diff --git a/src/project_x_py/client/cache.py b/src/project_x_py/client/cache.py index 870adf1..19e35ef 100644 --- a/src/project_x_py/client/cache.py +++ b/src/project_x_py/client/cache.py @@ -1,10 +1,19 @@ """ -Optimized caching with msgpack serialization and lz4 compression. - -This module provides high-performance caching using: -- msgpack for 2-5x faster serialization -- lz4 for fast compression (70% size reduction) -- cachetools for intelligent cache management +Optimized caching with msgpack serialization and lz4 compression for ProjectX. + +This module provides a high-performance caching layer (`CacheMixin`) designed to +significantly reduce latency and memory usage for the ProjectX async client. It +replaces standard pickle/JSON serialization with faster and more efficient alternatives. + +Key Features: +- msgpack: For serialization that is 2-5x faster than pickle. +- lz4: For high-speed data compression, achieving up to 70% size reduction on market data. +- cachetools: Implements intelligent LRU (Least Recently Used) and TTL (Time-to-Live) + cache eviction policies for instruments and market data respectively. +- Automatic Compression: Data payloads exceeding a configurable threshold (default 1KB) + are automatically compressed. +- Performance-Tuned: Optimized for handling Polars DataFrames and other data models + used within the SDK. """ import gc diff --git a/src/project_x_py/indicators/__init__.py b/src/project_x_py/indicators/__init__.py index 40afe46..b103d07 100644 --- a/src/project_x_py/indicators/__init__.py +++ b/src/project_x_py/indicators/__init__.py @@ -202,7 +202,7 @@ ) # Version info -__version__ = "3.1.7" +__version__ = "3.1.8" __author__ = "TexasCoding" diff --git a/src/project_x_py/position_manager/__init__.py b/src/project_x_py/position_manager/__init__.py index 94c54b1..ed7622e 100644 --- a/src/project_x_py/position_manager/__init__.py +++ b/src/project_x_py/position_manager/__init__.py @@ -28,6 +28,13 @@ - Direct position operations through ProjectX API - Comprehensive reporting and historical analysis +Note: + While this module provides direct access to the `PositionManager`, for most + trading applications, it is recommended to use the `TradingSuite`. The suite + automatically creates, configures, and manages the position manager, providing + simplified access to its functionality via `suite.positions`. + The example below shows the lower-level manual setup. + Example Usage: ```python # V3: Comprehensive position management with EventBus integration diff --git a/src/project_x_py/position_manager/core.py b/src/project_x_py/position_manager/core.py index bbd1b24..7df3961 100644 --- a/src/project_x_py/position_manager/core.py +++ b/src/project_x_py/position_manager/core.py @@ -20,13 +20,11 @@ - Comprehensive position operations (close, partial close) - Statistics, history, and report generation -Position Manager Components: - - PositionTrackingMixin: Real-time position tracking and callbacks - - PositionAnalyticsMixin: P&L calculations and portfolio analytics - - RiskManagementMixin: Risk metrics and position sizing - - PositionMonitoringMixin: Position monitoring and alerts - - PositionOperationsMixin: Direct position operations - - PositionReportingMixin: Statistics, history, and reports +Note: + This class is the core implementation of the position manager. For most + applications, it is recommended to interact with it through the `TradingSuite` + (`suite.positions`), which handles its lifecycle and integration automatically. + The example below demonstrates direct, low-level instantiation and usage. Example Usage: ```python diff --git a/src/project_x_py/position_manager/tracking.py b/src/project_x_py/position_manager/tracking.py index 6297acb..6081075 100644 --- a/src/project_x_py/position_manager/tracking.py +++ b/src/project_x_py/position_manager/tracking.py @@ -27,27 +27,30 @@ Example Usage: ```python - # Register position update callbacks - async def on_position_update(data): - pos = data.get("data", {}) - print(f"Position updated: {pos.get('contractId')} size: {pos.get('size')}") + # V3: Using EventBus for unified event handling is the recommended approach. + # The old add_callback method is deprecated. + from project_x_py import EventBus, EventType + + event_bus = EventBus() + # position_manager would be initialized with this event_bus instance. - await position_manager.add_callback("position_update", on_position_update) + # Register for position update events + @event_bus.on(EventType.POSITION_UPDATED) + async def on_position_update(data): + print(f"Position updated: {data.get('contractId')} size: {data.get('size')}") - # Register position closure callbacks + # Register for position closure events + @event_bus.on(EventType.POSITION_CLOSED) async def on_position_closed(data): - pos = data.get("data", {}) - print(f"Position closed: {pos.get('contractId')}") - + print(f"Position closed: {data.get('contractId')}") - await position_manager.add_callback("position_closed", on_position_closed) # Get position history - history = await position_manager.get_position_history("MGC", limit=10) - for entry in history: - print(f"{entry['timestamp']}: {entry['size_change']:+d} contracts") + # history = await position_manager.get_position_history("MGC", limit=10) + # for entry in history: + # print(f"{entry['timestamp']}: {entry['size_change']:+d} contracts") ``` See Also: diff --git a/src/project_x_py/realtime/__init__.py b/src/project_x_py/realtime/__init__.py index cb680d5..631f8ec 100644 --- a/src/project_x_py/realtime/__init__.py +++ b/src/project_x_py/realtime/__init__.py @@ -26,7 +26,15 @@ - Subscription management for specific contracts - Connection health monitoring and statistics +Note: + While this module provides direct access to the real-time client, for most + trading applications, it is recommended to use the `TradingSuite`. The suite + manages the real-time client, data processing, and event handling automatically, + offering a simpler and more robust development experience. + Example Usage: + The example below demonstrates the low-level usage of the `ProjectXRealtimeClient`. + ```python # V3: Real-time WebSocket client with async callbacks import asyncio diff --git a/src/project_x_py/realtime/connection_management.py b/src/project_x_py/realtime/connection_management.py index 0c10f21..713b7da 100644 --- a/src/project_x_py/realtime/connection_management.py +++ b/src/project_x_py/realtime/connection_management.py @@ -26,24 +26,32 @@ - Statistics tracking and health reporting Example Usage: + The functionality of this mixin is consumed through a `ProjectXRealtimeClient` instance. + For most use cases, this is handled automatically by the `TradingSuite`. + ```python - # Setup and connect - client = ProjectXRealtimeClient(jwt_token, account_id) - await client.setup_connections() + # The following demonstrates the low-level connection lifecycle managed by this mixin. + # Note: In a typical application, you would use TradingSuite, which handles this. + from project_x_py import create_realtime_client + + # 1. Initialization (handled by factory or TradingSuite) + # realtime_client = await create_realtime_client(jwt, account_id) - if await client.connect(): - print("Connected to ProjectX Gateway") + # 2. Connection (handled by TradingSuite.create() or client.connect()) + # if await realtime_client.connect(): + # print("Successfully connected to both User and Market hubs.") - # Check connection status - if client.is_connected(): - print("Both hubs connected") + # # 3. Health Monitoring + # if realtime_client.is_connected(): + # stats = realtime_client.get_stats() + # print(f"Events received so far: {stats['events_received']}") - # Get connection statistics - stats = client.get_stats() - print(f"Events received: {stats['events_received']}") + # # 4. Token Refresh (if managing tokens manually) + # # new_token = await get_new_token() + # # await realtime_client.update_jwt_token(new_token) - # Update JWT token on refresh - await client.update_jwt_token(new_jwt_token) + # # 5. Disconnection (handled by TradingSuite context manager or client.disconnect()) + # await realtime_client.disconnect() ``` See Also: diff --git a/src/project_x_py/realtime/core.py b/src/project_x_py/realtime/core.py index ee356e7..41fd3e4 100644 --- a/src/project_x_py/realtime/core.py +++ b/src/project_x_py/realtime/core.py @@ -31,7 +31,16 @@ - User Hub: Account, position, and order updates - Market Hub: Quote, trade, and market depth data +Note: + This class forms the low-level foundation for real-time data. For most applications, + the `TradingSuite` is the recommended entry point as it abstracts away the direct + management of this client, its connections, and its events. + Example Usage: + The example below shows how to use the client directly. In a real application, + the `position_manager`, `order_manager`, and `data_manager` would be instances + of the SDK's manager classes. + ```python # V3: Create async client with ProjectX Gateway URLs import asyncio @@ -43,9 +52,9 @@ async def main(): client = await create_realtime_client(jwt_token, account_id) # V3: Register async managers for event handling - await client.add_callback("position_update", position_manager.handle_update) - await client.add_callback("order_update", order_manager.handle_update) - await client.add_callback("quote_update", data_manager.handle_quote) + # await client.add_callback("position_update", position_manager.handle_update) + # await client.add_callback("order_update", order_manager.handle_update) + # await client.add_callback("quote_update", data_manager.handle_quote) # V3: Connect and check both hub connections if await client.connect(): @@ -63,7 +72,7 @@ async def main(): await client.disconnect() - asyncio.run(main()) + # asyncio.run(main()) ``` Event Types (per ProjectX Gateway docs): diff --git a/src/project_x_py/realtime_data_manager/__init__.py b/src/project_x_py/realtime_data_manager/__init__.py index b266242..3baab2f 100644 --- a/src/project_x_py/realtime_data_manager/__init__.py +++ b/src/project_x_py/realtime_data_manager/__init__.py @@ -27,6 +27,13 @@ - Memory management with automatic data cleanup - Performance monitoring and statistics +Note: + While this module provides direct access to the `RealtimeDataManager`, for most + trading applications, it is recommended to use the `TradingSuite`. The suite + automatically creates, configures, and manages the data manager, providing + simplified access to its data and events via `suite.data` and `suite.on()`. + The example below shows the lower-level manual setup. + Example Usage: ```python # V3: Uses factory functions and EventBus integration diff --git a/src/project_x_py/realtime_data_manager/core.py b/src/project_x_py/realtime_data_manager/core.py index 7a08684..1e80938 100644 --- a/src/project_x_py/realtime_data_manager/core.py +++ b/src/project_x_py/realtime_data_manager/core.py @@ -33,6 +33,12 @@ - Minimal latency for trading signals - Resilience to network issues +Note: + This class is the core implementation of the real-time data manager. For most + applications, it is recommended to interact with it through the `TradingSuite` + (`suite.data`), which handles its lifecycle and integration automatically. + The example below demonstrates direct, low-level instantiation and usage. + Example Usage: ```python # V3: Create shared async realtime client with factory @@ -111,6 +117,7 @@ async def on_new_bar(data): """ import asyncio +import contextlib import time from collections import defaultdict from datetime import datetime @@ -393,6 +400,8 @@ def __init__( # Contract ID for real-time subscriptions self.contract_id: str | None = None + # Actual symbol ID from the resolved instrument (e.g., "ENQ" when user specifies "NQ") + self.instrument_symbol_id: str | None = None # Memory management settings are set in _apply_config_defaults() self.last_cleanup: float = time.time() @@ -425,6 +434,9 @@ def __init__( # Background cleanup task self._cleanup_task: asyncio.Task[None] | None = None + # Background bar timer task for low-volume periods + self._bar_timer_task: asyncio.Task[None] | None = None + self.logger.info( "RealtimeDataManager initialized", extra={"instrument": instrument} ) @@ -523,6 +535,16 @@ async def initialize(self, initial_days: int = 1) -> bool: # Store the exact contract ID for real-time subscriptions self.contract_id = instrument_info.id + # Store the actual symbol ID for matching (e.g., "ENQ" when user specifies "NQ") + # Extract from symbolId like "F.US.ENQ" -> "ENQ" + if instrument_info.symbolId and "." in instrument_info.symbolId: + parts = instrument_info.symbolId.split(".") + self.instrument_symbol_id = ( + parts[-1] if parts else instrument_info.symbolId + ) + else: + self.instrument_symbol_id = instrument_info.symbolId or self.instrument + # Load initial data for all timeframes async with self.data_lock: for tf_key, tf_config in self.timeframes.items(): @@ -662,6 +684,9 @@ async def on_new_bar(data): # Start cleanup task self.start_cleanup_task() + # Start bar timer task for low-volume periods + self._start_bar_timer_task() + self.logger.debug( LogMessages.DATA_SUBSCRIBE, extra={"status": "feed_started", "instrument": self.instrument}, @@ -684,6 +709,9 @@ async def stop_realtime_feed(self) -> None: # Cancel cleanup task await self.stop_cleanup_task() + # Cancel bar timer task + await self._stop_bar_timer_task() + # Unsubscribe from market data # Note: unsubscribe_market_data will be implemented in ProjectXRealtimeClient if self.contract_id: @@ -724,3 +752,148 @@ async def cleanup(self) -> None: dom_attr[_k] = [] self.logger.info("βœ… RealtimeDataManager cleanup completed") + + def _start_bar_timer_task(self) -> None: + """Start the bar timer task for creating bars during low-volume periods.""" + if self._bar_timer_task is None or self._bar_timer_task.done(): + self._bar_timer_task = asyncio.create_task(self._bar_timer_loop()) + self.logger.debug("Bar timer task started") + + async def _stop_bar_timer_task(self) -> None: + """Stop the bar timer task.""" + if self._bar_timer_task and not self._bar_timer_task.done(): + self._bar_timer_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._bar_timer_task + self.logger.debug("Bar timer task stopped") + + async def _bar_timer_loop(self) -> None: + """ + Periodic task to create empty bars during low-volume periods. + + This ensures bars are created at regular intervals even when + there's no trading activity (important for low-volume instruments). + """ + try: + # Find the shortest timeframe interval to check + min_seconds = float("inf") + for tf_config in self.timeframes.values(): + interval = tf_config["interval"] + unit = tf_config["unit"] + + # Convert to seconds based on numeric unit value + # Unit mapping: {1: seconds, 2: minutes, 4: days, 5: weeks, 6: months} + unit_seconds_map = { + 1: 1, # seconds + 2: 60, # minutes + 4: 86400, # days + 5: 604800, # weeks + 6: 2629746, # months (approximate) + } + + if unit in unit_seconds_map: + seconds = interval * unit_seconds_map[unit] + else: + continue # Skip unsupported units + + min_seconds = min(min_seconds, seconds) + + # Check at least every 5 seconds, but no more than the shortest interval + check_interval = min(5.0, min_seconds / 3) + + self.logger.debug(f"Bar timer checking every {check_interval} seconds") + + while self.is_running: + await asyncio.sleep(check_interval) + + if not self.is_running: + break + + # Check each timeframe for stale bars + await self._check_and_create_empty_bars() + + except asyncio.CancelledError: + self.logger.debug("Bar timer task cancelled") + raise + except Exception as e: + self.logger.error(f"Error in bar timer loop: {e}") + + async def _check_and_create_empty_bars(self) -> None: + """ + Check each timeframe and create empty bars if needed. + + This handles low-volume periods where no ticks are coming in, + ensuring bars are still created at the proper intervals. + """ + try: + current_time = datetime.now(self.timezone) + events_to_trigger = [] + + async with self.data_lock: + for tf_key, tf_config in self.timeframes.items(): + if tf_key not in self.data: + continue + + current_data = self.data[tf_key] + if current_data.height == 0: + continue + + # Get the last bar time + last_bar_time = ( + current_data.select(pl.col("timestamp")).tail(1).item() + ) + + try: + # Calculate what the current bar time should be + expected_bar_time = self._calculate_bar_time( + current_time, tf_config["interval"], tf_config["unit"] + ) + except Exception as e: + self.logger.error( + f"Error calculating bar time for {tf_key}: {e}" + ) + continue # Skip this timeframe if calculation fails + + # If we're missing bars, create empty ones + if expected_bar_time > last_bar_time: + # Get the last close price to use for empty bars + last_close = current_data.select(pl.col("close")).tail(1).item() + + # Create empty bar with last close as OHLC, volume=0 + # Using DataFrame constructor is efficient for single rows + new_bar = pl.DataFrame( + { + "timestamp": [expected_bar_time], + "open": [last_close], + "high": [last_close], + "low": [last_close], + "close": [last_close], + "volume": [0], # Zero volume for empty bars + } + ) + + self.data[tf_key] = pl.concat([current_data, new_bar]) + self.last_bar_times[tf_key] = expected_bar_time + + self.logger.debug( + f"Created empty bar for {tf_key} at {expected_bar_time} " + f"(low volume period)" + ) + + # Prepare event to trigger + events_to_trigger.append( + { + "timeframe": tf_key, + "bar_time": expected_bar_time, + "data": new_bar.to_dicts()[0], + } + ) + + # Trigger events outside the lock (non-blocking) + for event in events_to_trigger: + # Store task reference to avoid warning (though we don't need to track it) + _ = asyncio.create_task(self._trigger_callbacks("new_bar", event)) + + except Exception as e: + self.logger.error(f"Error checking/creating empty bars: {e}") + # Don't re-raise - bar timer should continue even if one check fails diff --git a/src/project_x_py/realtime_data_manager/validation.py b/src/project_x_py/realtime_data_manager/validation.py index 3a048ba..044bfcc 100644 --- a/src/project_x_py/realtime_data_manager/validation.py +++ b/src/project_x_py/realtime_data_manager/validation.py @@ -248,8 +248,19 @@ def _symbol_matches_instrument( else: base_symbol = symbol - # Compare with our instrument (case-insensitive) - return base_symbol.upper() == self.instrument.upper() + # Compare with both our original instrument and the resolved symbol ID + # This handles cases like NQ -> ENQ resolution + base_upper = base_symbol.upper() + + # Check against original instrument (e.g., "NQ") + if base_upper == self.instrument.upper(): + return True + + # Check against resolved symbol ID (e.g., "ENQ" when user specified "NQ") + if hasattr(self, "instrument_symbol_id") and self.instrument_symbol_id: + return base_upper == self.instrument_symbol_id.upper() + + return False def get_realtime_validation_status( self: "RealtimeDataManagerProtocol", diff --git a/uv.lock b/uv.lock index 2ba3f6b..e523de7 100644 --- a/uv.lock +++ b/uv.lock @@ -943,7 +943,7 @@ wheels = [ [[package]] name = "project-x-py" -version = "3.1.6" +version = "3.1.8" source = { editable = "." } dependencies = [ { name = "cachetools" },