Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions BUG_REPORT_DEADLOCK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Bug Report: Deadlock in Event Handler Callbacks

## Summary
Calling `suite.data.get_current_price()` or `suite.data.get_data()` from within an EventBus event handler callback causes a deadlock, preventing the methods from completing.

## Severity
**HIGH** - This affects the usability of the event-driven API pattern

## Reproduction Steps

1. Register an event handler using `suite.on(EventType.NEW_BAR, handler)`
2. Within the handler, call any async method on `suite.data`:
- `await suite.data.get_current_price()`
- `await suite.data.get_data(timeframe, bars)`
3. The handler will receive the event but hang indefinitely on the async call

## Example Code That Reproduces The Issue

```python
async def on_new_bar(event):
"""This handler will deadlock"""
current_price = await suite.data.get_current_price() # HANGS HERE
print(f"Current price: {current_price}")

await suite.on(EventType.NEW_BAR, on_new_bar)
```

## Workaround

Use a queue to decouple event reception from processing:

```python
event_queue = asyncio.Queue()

async def on_new_bar(event):
"""Queue the event instead of processing it"""
await event_queue.put(event)

await suite.on(EventType.NEW_BAR, on_new_bar)

# Process events outside the handler context
while True:
event = await event_queue.get()
current_price = await suite.data.get_current_price() # Works fine here
print(f"Current price: {current_price}")
```

## Root Cause Analysis

The deadlock appears to occur because:

1. The event handler is executed within the EventBus's event processing context
2. The RealtimeDataManager's `get_current_price()` and `get_data()` methods may be trying to acquire locks or access resources that are held during event processing
3. This creates a circular wait condition where:
- The event handler is waiting for the data manager methods to complete
- The data manager methods are waiting for resources locked by the event processing

## Affected Components

- `TradingSuite`
- `RealtimeDataManager`
- `EventBus`

## Suggested Fixes

1. **Short-term**: Document this limitation and provide the queue-based workaround pattern
2. **Medium-term**: Make data access methods non-blocking when called from event handlers
3. **Long-term**: Refactor the event processing to avoid holding locks during callback execution

## Files Demonstrating the Issue

- `/examples/realtime_data_manager/01_events_with_on.py` - Shows the deadlock
- `/examples/realtime_data_manager/01_events_with_on_simple.py` - Shows the workaround

## Discovery Date
2025-01-12

## Discovered By
User testing of event handler examples

## ✅ FIXED

### The Solution
Modified `src/project_x_py/realtime_data_manager/data_processing.py` to:

1. **Changed `_update_timeframe_data()` to return event data instead of triggering it directly**
2. **Moved event triggering outside the lock in `_process_tick_data()`**
3. **Made event emission non-blocking using `asyncio.create_task()`**
4. **Added missing `asyncio` import**

### Key Changes
```python
# Before (inside lock):
async with self.data_lock:
for tf_key in self.timeframes:
await self._update_timeframe_data(tf_key, timestamp, price, volume)
# Event was triggered inside _update_timeframe_data while holding lock

# After (events triggered outside lock):
events_to_trigger = []
async with self.data_lock:
for tf_key in self.timeframes:
new_bar_event = await self._update_timeframe_data(tf_key, timestamp, price, volume)
if new_bar_event:
events_to_trigger.append(new_bar_event)

# Trigger events outside lock, non-blocking
for event in events_to_trigger:
asyncio.create_task(self._trigger_callbacks("new_bar", event))
```

### Test Results
- ✅ Event handlers are called successfully
- ✅ `suite.data.get_current_price()` works from within handlers
- ✅ `suite.data.get_data()` works from within handlers
- ✅ No deadlock occurs
- ✅ API remains unchanged
9 changes: 8 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Status: v3.1.5 - Stable Production Release
## Project Status: v3.1.6 - Stable Production Release

**IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading.

Expand Down Expand Up @@ -300,6 +300,13 @@ async with ProjectX.from_env() as client:

## Recent Changes

### v3.1.6 - Critical Deadlock Fix
- **Fixed**: Deadlock when calling `suite.data` methods from event handler callbacks (Issue #39)
- **Improved**: Event emission now non-blocking to prevent handler deadlocks
- **Enhanced**: Event triggering moved outside lock scope for better concurrency
- **Added**: Missing asyncio import in data_processing module
- **Maintained**: Full API compatibility - no breaking changes

### v3.1.5 - Enhanced Bar Data Retrieval
- **Added**: Optional `start_time` and `end_time` parameters to `get_bars()` method
- **Improved**: Precise time range specification for historical data queries
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
project = "project-x-py"
copyright = "2025, Jeff West"
author = "Jeff West"
release = "3.1.5"
version = "3.1.5"
release = "3.1.6"
version = "3.1.6"

# -- General configuration ---------------------------------------------------

Expand Down
73 changes: 73 additions & 0 deletions examples/realtime_data_manager/00_events_with_wait_for.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import signal

from project_x_py import TradingSuite
from project_x_py.event_bus import EventType


async def on_new_bar(suite: TradingSuite):
current_price = await suite.data.get_current_price()
last_bars = await suite.data.get_data(timeframe="15sec", bars=5)
print(f"\nCurrent price: {current_price}")
print("=" * 80)

if last_bars is not None and not last_bars.is_empty():
print("Last 5 bars (oldest to newest):")
print("-" * 80)

# Get the last 5 bars and iterate through them
for row in last_bars.tail(5).iter_rows(named=True):
timestamp = row["timestamp"]
open_price = row["open"]
high = row["high"]
low = row["low"]
close = row["close"]
volume = row["volume"]

print(
f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}"
)
else:
print("No bar data available yet")


async def main():
suite = await TradingSuite.create(
instrument="MNQ",
timeframes=["15sec"],
)

await suite.connect()

# Set up signal handler for clean exit
shutdown_event = asyncio.Event()

def signal_handler(signum, frame):
print("\n\nReceived interrupt signal. Shutting down gracefully...")
shutdown_event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.\n")

try:
while not shutdown_event.is_set():
try:
# Wait for new bar with timeout to check shutdown event
new_bar = await asyncio.wait_for(
suite.wait_for(EventType.NEW_BAR), timeout=1.0
)
if new_bar:
await on_new_bar(suite)
except TimeoutError:
# Timeout is expected, just check shutdown_event again
pass
finally:
print("Disconnecting from real-time feeds...")
await suite.disconnect()
print("Clean shutdown complete.")


if __name__ == "__main__":
asyncio.run(main())
89 changes: 89 additions & 0 deletions examples/realtime_data_manager/01_events_with_on.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import signal

from project_x_py import TradingSuite
from project_x_py.event_bus import EventType


async def main():
print("Creating TradingSuite...")
suite = await TradingSuite.create(
instrument="MNQ",
timeframes=["15sec"],
)
print("TradingSuite created!")

# No need to call connect() - it's already connected via auto_connect=True
print("Suite is already connected!")

# Set up signal handler for clean exit
shutdown_event = asyncio.Event()

def signal_handler(signum, frame):
print("\n\nReceived interrupt signal. Shutting down gracefully...")
shutdown_event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Define the event handler as an async function
async def on_new_bar(event):
"""Handle new bar events"""
print(f"New bar event received: {event}")
print("About to call get_current_price...")
try:
current_price = await suite.data.get_current_price()
print(f"Got current price: {current_price}")
except Exception as e:
print(f"Error getting current price: {e}")
return

print("About to call get_data...")
try:
last_bars = await suite.data.get_data(timeframe="15sec", bars=5)
print("Got data")
except Exception as e:
print(f"Error getting data: {e}")
return
print(f"\nCurrent price: {current_price}")
print("=" * 80)

if last_bars is not None and not last_bars.is_empty():
print("Last 5 bars (oldest to newest):")
print("-" * 80)

# Get the last 5 bars and iterate through them
for row in last_bars.tail(5).iter_rows(named=True):
timestamp = row["timestamp"]
open_price = row["open"]
high = row["high"]
low = row["low"]
close = row["close"]
volume = row["volume"]

print(
f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}"
)
else:
print("No bar data available yet")

# Register the event handler
print("About to register event handler...")
await suite.on(EventType.NEW_BAR, on_new_bar)
print("Event handler registered!")

print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.")
print("Event handler registered and waiting for new bars...\n")

try:
# Keep the program running
while not shutdown_event.is_set():
await asyncio.sleep(1)
finally:
print("Disconnecting from real-time feeds...")
await suite.disconnect()
print("Clean shutdown complete.")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading