diff --git a/BUG_REPORT_DEADLOCK.md b/BUG_REPORT_DEADLOCK.md new file mode 100644 index 0000000..1c1bd45 --- /dev/null +++ b/BUG_REPORT_DEADLOCK.md @@ -0,0 +1,117 @@ +# Bug Report: Deadlock in Event Handler Callbacks + +## Summary +Calling `suite.data.get_current_price()` or `suite.data.get_data()` from within an EventBus event handler callback causes a deadlock, preventing the methods from completing. + +## Severity +**HIGH** - This affects the usability of the event-driven API pattern + +## Reproduction Steps + +1. Register an event handler using `suite.on(EventType.NEW_BAR, handler)` +2. Within the handler, call any async method on `suite.data`: + - `await suite.data.get_current_price()` + - `await suite.data.get_data(timeframe, bars)` +3. The handler will receive the event but hang indefinitely on the async call + +## Example Code That Reproduces The Issue + +```python +async def on_new_bar(event): + """This handler will deadlock""" + current_price = await suite.data.get_current_price() # HANGS HERE + print(f"Current price: {current_price}") + +await suite.on(EventType.NEW_BAR, on_new_bar) +``` + +## Workaround + +Use a queue to decouple event reception from processing: + +```python +event_queue = asyncio.Queue() + +async def on_new_bar(event): + """Queue the event instead of processing it""" + await event_queue.put(event) + +await suite.on(EventType.NEW_BAR, on_new_bar) + +# Process events outside the handler context +while True: + event = await event_queue.get() + current_price = await suite.data.get_current_price() # Works fine here + print(f"Current price: {current_price}") +``` + +## Root Cause Analysis + +The deadlock appears to occur because: + +1. The event handler is executed within the EventBus's event processing context +2. The RealtimeDataManager's `get_current_price()` and `get_data()` methods may be trying to acquire locks or access resources that are held during event processing +3. This creates a circular wait condition where: + - The event handler is waiting for the data manager methods to complete + - The data manager methods are waiting for resources locked by the event processing + +## Affected Components + +- `TradingSuite` +- `RealtimeDataManager` +- `EventBus` + +## Suggested Fixes + +1. **Short-term**: Document this limitation and provide the queue-based workaround pattern +2. **Medium-term**: Make data access methods non-blocking when called from event handlers +3. **Long-term**: Refactor the event processing to avoid holding locks during callback execution + +## Files Demonstrating the Issue + +- `/examples/realtime_data_manager/01_events_with_on.py` - Shows the deadlock +- `/examples/realtime_data_manager/01_events_with_on_simple.py` - Shows the workaround + +## Discovery Date +2025-01-12 + +## Discovered By +User testing of event handler examples + +## ✅ FIXED + +### The Solution +Modified `src/project_x_py/realtime_data_manager/data_processing.py` to: + +1. **Changed `_update_timeframe_data()` to return event data instead of triggering it directly** +2. **Moved event triggering outside the lock in `_process_tick_data()`** +3. **Made event emission non-blocking using `asyncio.create_task()`** +4. **Added missing `asyncio` import** + +### Key Changes +```python +# Before (inside lock): +async with self.data_lock: + for tf_key in self.timeframes: + await self._update_timeframe_data(tf_key, timestamp, price, volume) + # Event was triggered inside _update_timeframe_data while holding lock + +# After (events triggered outside lock): +events_to_trigger = [] +async with self.data_lock: + for tf_key in self.timeframes: + new_bar_event = await self._update_timeframe_data(tf_key, timestamp, price, volume) + if new_bar_event: + events_to_trigger.append(new_bar_event) + +# Trigger events outside lock, non-blocking +for event in events_to_trigger: + asyncio.create_task(self._trigger_callbacks("new_bar", event)) +``` + +### Test Results +- ✅ Event handlers are called successfully +- ✅ `suite.data.get_current_price()` works from within handlers +- ✅ `suite.data.get_data()` works from within handlers +- ✅ No deadlock occurs +- ✅ API remains unchanged \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index 455ac9f..7eaabfd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,7 +2,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. -## Project Status: v3.1.5 - Stable Production Release +## Project Status: v3.1.6 - Stable Production Release **IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading. @@ -300,6 +300,13 @@ async with ProjectX.from_env() as client: ## Recent Changes +### v3.1.6 - Critical Deadlock Fix +- **Fixed**: Deadlock when calling `suite.data` methods from event handler callbacks (Issue #39) +- **Improved**: Event emission now non-blocking to prevent handler deadlocks +- **Enhanced**: Event triggering moved outside lock scope for better concurrency +- **Added**: Missing asyncio import in data_processing module +- **Maintained**: Full API compatibility - no breaking changes + ### v3.1.5 - Enhanced Bar Data Retrieval - **Added**: Optional `start_time` and `end_time` parameters to `get_bars()` method - **Improved**: Precise time range specification for historical data queries diff --git a/docs/conf.py b/docs/conf.py index 74035b2..94f3fd8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -23,8 +23,8 @@ project = "project-x-py" copyright = "2025, Jeff West" author = "Jeff West" -release = "3.1.5" -version = "3.1.5" +release = "3.1.6" +version = "3.1.6" # -- General configuration --------------------------------------------------- diff --git a/examples/realtime_data_manager/00_events_with_wait_for.py b/examples/realtime_data_manager/00_events_with_wait_for.py new file mode 100644 index 0000000..beac207 --- /dev/null +++ b/examples/realtime_data_manager/00_events_with_wait_for.py @@ -0,0 +1,73 @@ +import asyncio +import signal + +from project_x_py import TradingSuite +from project_x_py.event_bus import EventType + + +async def on_new_bar(suite: TradingSuite): + current_price = await suite.data.get_current_price() + last_bars = await suite.data.get_data(timeframe="15sec", bars=5) + print(f"\nCurrent price: {current_price}") + print("=" * 80) + + if last_bars is not None and not last_bars.is_empty(): + print("Last 5 bars (oldest to newest):") + print("-" * 80) + + # Get the last 5 bars and iterate through them + for row in last_bars.tail(5).iter_rows(named=True): + timestamp = row["timestamp"] + open_price = row["open"] + high = row["high"] + low = row["low"] + close = row["close"] + volume = row["volume"] + + print( + f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}" + ) + else: + print("No bar data available yet") + + +async def main(): + suite = await TradingSuite.create( + instrument="MNQ", + timeframes=["15sec"], + ) + + await suite.connect() + + # Set up signal handler for clean exit + shutdown_event = asyncio.Event() + + def signal_handler(signum, frame): + print("\n\nReceived interrupt signal. Shutting down gracefully...") + shutdown_event.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.\n") + + try: + while not shutdown_event.is_set(): + try: + # Wait for new bar with timeout to check shutdown event + new_bar = await asyncio.wait_for( + suite.wait_for(EventType.NEW_BAR), timeout=1.0 + ) + if new_bar: + await on_new_bar(suite) + except TimeoutError: + # Timeout is expected, just check shutdown_event again + pass + finally: + print("Disconnecting from real-time feeds...") + await suite.disconnect() + print("Clean shutdown complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/realtime_data_manager/01_events_with_on.py b/examples/realtime_data_manager/01_events_with_on.py new file mode 100644 index 0000000..bb4d8b3 --- /dev/null +++ b/examples/realtime_data_manager/01_events_with_on.py @@ -0,0 +1,89 @@ +import asyncio +import signal + +from project_x_py import TradingSuite +from project_x_py.event_bus import EventType + + +async def main(): + print("Creating TradingSuite...") + suite = await TradingSuite.create( + instrument="MNQ", + timeframes=["15sec"], + ) + print("TradingSuite created!") + + # No need to call connect() - it's already connected via auto_connect=True + print("Suite is already connected!") + + # Set up signal handler for clean exit + shutdown_event = asyncio.Event() + + def signal_handler(signum, frame): + print("\n\nReceived interrupt signal. Shutting down gracefully...") + shutdown_event.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Define the event handler as an async function + async def on_new_bar(event): + """Handle new bar events""" + print(f"New bar event received: {event}") + print("About to call get_current_price...") + try: + current_price = await suite.data.get_current_price() + print(f"Got current price: {current_price}") + except Exception as e: + print(f"Error getting current price: {e}") + return + + print("About to call get_data...") + try: + last_bars = await suite.data.get_data(timeframe="15sec", bars=5) + print("Got data") + except Exception as e: + print(f"Error getting data: {e}") + return + print(f"\nCurrent price: {current_price}") + print("=" * 80) + + if last_bars is not None and not last_bars.is_empty(): + print("Last 5 bars (oldest to newest):") + print("-" * 80) + + # Get the last 5 bars and iterate through them + for row in last_bars.tail(5).iter_rows(named=True): + timestamp = row["timestamp"] + open_price = row["open"] + high = row["high"] + low = row["low"] + close = row["close"] + volume = row["volume"] + + print( + f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}" + ) + else: + print("No bar data available yet") + + # Register the event handler + print("About to register event handler...") + await suite.on(EventType.NEW_BAR, on_new_bar) + print("Event handler registered!") + + print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.") + print("Event handler registered and waiting for new bars...\n") + + try: + # Keep the program running + while not shutdown_event.is_set(): + await asyncio.sleep(1) + finally: + print("Disconnecting from real-time feeds...") + await suite.disconnect() + print("Clean shutdown complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/realtime_data_manager/01_events_with_on_simple.py b/examples/realtime_data_manager/01_events_with_on_simple.py new file mode 100644 index 0000000..9e1ee58 --- /dev/null +++ b/examples/realtime_data_manager/01_events_with_on_simple.py @@ -0,0 +1,83 @@ +import asyncio +import signal + +from project_x_py import TradingSuite +from project_x_py.event_bus import EventType + + +async def main(): + suite = await TradingSuite.create( + instrument="MNQ", + timeframes=["15sec"], + ) + + await suite.connect() + + # Set up signal handler for clean exit + shutdown_event = asyncio.Event() + + def signal_handler(signum, frame): + print("\n\nReceived interrupt signal. Shutting down gracefully...") + shutdown_event.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Use a queue to handle events outside the callback + event_queue = asyncio.Queue() + + # Define the event handler as an async function + async def on_new_bar(event): + """Handle new bar events - just queue them""" + await event_queue.put(event) + + # Register the event handler + await suite.on(EventType.NEW_BAR, on_new_bar) + + print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.") + print("Event handler registered and waiting for new bars...\n") + + try: + # Process events from the queue + while not shutdown_event.is_set(): + try: + # Wait for event with timeout + event = await asyncio.wait_for(event_queue.get(), timeout=1.0) + + # Process the event outside the handler + current_price = await suite.data.get_current_price() + last_bars = await suite.data.get_data(timeframe="15sec", bars=5) + + print(f"\nCurrent price: {current_price}") + print("=" * 80) + + if last_bars is not None and not last_bars.is_empty(): + print("Last 5 bars (oldest to newest):") + print("-" * 80) + + # Get the last 5 bars and iterate through them + for row in last_bars.tail(5).iter_rows(named=True): + timestamp = row["timestamp"] + open_price = row["open"] + high = row["high"] + low = row["low"] + close = row["close"] + volume = row["volume"] + + print( + f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}" + ) + else: + print("No bar data available yet") + + except asyncio.TimeoutError: + # Timeout is expected, just check shutdown_event again + pass + finally: + print("Disconnecting from real-time feeds...") + await suite.disconnect() + print("Clean shutdown complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 4b2069c..b8ddf35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "project-x-py" -version = "3.1.5" +version = "3.1.6" description = "High-performance Python SDK for futures trading with real-time WebSocket data, technical indicators, order management, and market depth analysis" readme = "README.md" license = { text = "MIT" } diff --git a/src/project_x_py/__init__.py b/src/project_x_py/__init__.py index c7696cb..97de763 100644 --- a/src/project_x_py/__init__.py +++ b/src/project_x_py/__init__.py @@ -95,7 +95,7 @@ from project_x_py.client.base import ProjectXBase -__version__ = "3.1.5" +__version__ = "3.1.6" __author__ = "TexasCoding" # Core client classes - renamed from Async* to standard names diff --git a/src/project_x_py/indicators/__init__.py b/src/project_x_py/indicators/__init__.py index f141e7b..8868ee6 100644 --- a/src/project_x_py/indicators/__init__.py +++ b/src/project_x_py/indicators/__init__.py @@ -202,7 +202,7 @@ ) # Version info -__version__ = "3.1.5" +__version__ = "3.1.6" __author__ = "TexasCoding" diff --git a/src/project_x_py/realtime_data_manager/data_processing.py b/src/project_x_py/realtime_data_manager/data_processing.py index 251ea98..b11eca4 100644 --- a/src/project_x_py/realtime_data_manager/data_processing.py +++ b/src/project_x_py/realtime_data_manager/data_processing.py @@ -88,6 +88,7 @@ async def on_new_bar(data): - `realtime_data_manager.validation.ValidationMixin` """ +import asyncio import logging from collections import deque from datetime import datetime @@ -295,20 +296,33 @@ async def _process_tick_data(self, tick: dict[str, Any]) -> None: price = tick["price"] volume = tick.get("volume", 0) + # Collect events to trigger after releasing the lock + events_to_trigger = [] + # Update each timeframe async with self.data_lock: # Add to current tick data for get_current_price() self.current_tick_data.append(tick) for tf_key in self.timeframes: - await self._update_timeframe_data(tf_key, timestamp, price, volume) - - # Trigger callbacks for data updates - await self._trigger_callbacks( - "data_update", - {"timestamp": timestamp, "price": price, "volume": volume}, + new_bar_event = await self._update_timeframe_data( + tf_key, timestamp, price, volume + ) + if new_bar_event: + events_to_trigger.append(new_bar_event) + + # Trigger callbacks for data updates (outside the lock, non-blocking) + asyncio.create_task( + self._trigger_callbacks( + "data_update", + {"timestamp": timestamp, "price": price, "volume": volume}, + ) ) + # Trigger any new bar events (outside the lock, non-blocking) + for event in events_to_trigger: + asyncio.create_task(self._trigger_callbacks("new_bar", event)) + # Update memory stats and periodic cleanup self.memory_stats["ticks_processed"] += 1 await self._cleanup_old_data() @@ -322,7 +336,7 @@ async def _update_timeframe_data( timestamp: datetime, price: float, volume: int, - ) -> None: + ) -> dict[str, Any] | None: """ Update a specific timeframe with new tick data. @@ -331,6 +345,9 @@ async def _update_timeframe_data( timestamp: Timestamp of the tick price: Price of the tick volume: Volume of the tick + + Returns: + dict: New bar event data if a new bar was created, None otherwise """ try: interval = self.timeframes[tf_key]["interval"] @@ -383,15 +400,12 @@ async def _update_timeframe_data( self.data[tf_key] = pl.concat([current_data, new_bar]) self.last_bar_times[tf_key] = bar_time - # Trigger new bar callback - await self._trigger_callbacks( - "new_bar", - { - "timeframe": tf_key, - "bar_time": bar_time, - "data": new_bar.to_dicts()[0], - }, - ) + # Return new bar event data to be triggered outside the lock + return { + "timeframe": tf_key, + "bar_time": bar_time, + "data": new_bar.to_dicts()[0], + } elif bar_time == last_bar_time: # Update existing bar @@ -446,8 +460,12 @@ async def _update_timeframe_data( if self.data[tf_key].height > 1000: self.data[tf_key] = self.data[tf_key].tail(1000) + # Return None if no new bar was created + return None + except Exception as e: self.logger.error(f"Error updating {tf_key} timeframe: {e}") + return None def _calculate_bar_time( self, diff --git a/test_deadlock_fix.py b/test_deadlock_fix.py new file mode 100644 index 0000000..afb39e6 --- /dev/null +++ b/test_deadlock_fix.py @@ -0,0 +1,63 @@ +import asyncio + +from project_x_py import TradingSuite +from project_x_py.event_bus import EventType + + +async def main(): + print("Test of deadlock fix") + print("=" * 50) + + print("Creating TradingSuite...") + suite = await TradingSuite.create( + instrument="MNQ", + timeframes=["15sec"], + ) + print("✓ TradingSuite created") + + # Test flag to verify handler was called + handler_called = asyncio.Event() + data_accessed = asyncio.Event() + + async def on_new_bar(event): + """Event handler that tries to access data (would deadlock before fix)""" + print("✓ Event handler called") + handler_called.set() + + try: + # This would deadlock before the fix + current_price = await suite.data.get_current_price() + print(f"✓ Got current price: {current_price}") + + bars = await suite.data.get_data(timeframe="15sec", bars=5) + print(f"✓ Got bar data: {bars is not None}") + + data_accessed.set() + except Exception as e: + print(f"✗ Error accessing data: {e}") + + # Register handler + print("Registering event handler...") + await suite.on(EventType.NEW_BAR, on_new_bar) + print("✓ Handler registered") + + # Wait for handler to be called + print("Waiting for new bar event (max 30 seconds)...") + try: + await asyncio.wait_for(handler_called.wait(), timeout=30.0) + print("✓ Handler was called successfully") + + # Wait for data access to complete + await asyncio.wait_for(data_accessed.wait(), timeout=5.0) + print("✓ Data was accessed successfully without deadlock!") + print("\n✅ TEST PASSED: No deadlock detected!") + + except asyncio.TimeoutError: + print("\n⚠️ TEST INCONCLUSIVE: No new bar event received (market may be closed)") + + await suite.disconnect() + print("✓ Disconnected") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/uv.lock b/uv.lock index fc072c6..6150bb5 100644 --- a/uv.lock +++ b/uv.lock @@ -943,7 +943,7 @@ wheels = [ [[package]] name = "project-x-py" -version = "3.1.4" +version = "3.1.5" source = { editable = "." } dependencies = [ { name = "cachetools" },