Skip to content

Commit 844539b

Browse files
authored
Merge pull request #40 from TexasCoding/patches_1
fix: resolve deadlock in event handlers (v3.1.6)
2 parents 09870ea + 561f2fe commit 844539b

File tree

12 files changed

+473
-23
lines changed

12 files changed

+473
-23
lines changed

BUG_REPORT_DEADLOCK.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Bug Report: Deadlock in Event Handler Callbacks
2+
3+
## Summary
4+
Calling `suite.data.get_current_price()` or `suite.data.get_data()` from within an EventBus event handler callback causes a deadlock, preventing the methods from completing.
5+
6+
## Severity
7+
**HIGH** - This affects the usability of the event-driven API pattern
8+
9+
## Reproduction Steps
10+
11+
1. Register an event handler using `suite.on(EventType.NEW_BAR, handler)`
12+
2. Within the handler, call any async method on `suite.data`:
13+
- `await suite.data.get_current_price()`
14+
- `await suite.data.get_data(timeframe, bars)`
15+
3. The handler will receive the event but hang indefinitely on the async call
16+
17+
## Example Code That Reproduces The Issue
18+
19+
```python
20+
async def on_new_bar(event):
21+
"""This handler will deadlock"""
22+
current_price = await suite.data.get_current_price() # HANGS HERE
23+
print(f"Current price: {current_price}")
24+
25+
await suite.on(EventType.NEW_BAR, on_new_bar)
26+
```
27+
28+
## Workaround
29+
30+
Use a queue to decouple event reception from processing:
31+
32+
```python
33+
event_queue = asyncio.Queue()
34+
35+
async def on_new_bar(event):
36+
"""Queue the event instead of processing it"""
37+
await event_queue.put(event)
38+
39+
await suite.on(EventType.NEW_BAR, on_new_bar)
40+
41+
# Process events outside the handler context
42+
while True:
43+
event = await event_queue.get()
44+
current_price = await suite.data.get_current_price() # Works fine here
45+
print(f"Current price: {current_price}")
46+
```
47+
48+
## Root Cause Analysis
49+
50+
The deadlock appears to occur because:
51+
52+
1. The event handler is executed within the EventBus's event processing context
53+
2. The RealtimeDataManager's `get_current_price()` and `get_data()` methods may be trying to acquire locks or access resources that are held during event processing
54+
3. This creates a circular wait condition where:
55+
- The event handler is waiting for the data manager methods to complete
56+
- The data manager methods are waiting for resources locked by the event processing
57+
58+
## Affected Components
59+
60+
- `TradingSuite`
61+
- `RealtimeDataManager`
62+
- `EventBus`
63+
64+
## Suggested Fixes
65+
66+
1. **Short-term**: Document this limitation and provide the queue-based workaround pattern
67+
2. **Medium-term**: Make data access methods non-blocking when called from event handlers
68+
3. **Long-term**: Refactor the event processing to avoid holding locks during callback execution
69+
70+
## Files Demonstrating the Issue
71+
72+
- `/examples/realtime_data_manager/01_events_with_on.py` - Shows the deadlock
73+
- `/examples/realtime_data_manager/01_events_with_on_simple.py` - Shows the workaround
74+
75+
## Discovery Date
76+
2025-01-12
77+
78+
## Discovered By
79+
User testing of event handler examples
80+
81+
## ✅ FIXED
82+
83+
### The Solution
84+
Modified `src/project_x_py/realtime_data_manager/data_processing.py` to:
85+
86+
1. **Changed `_update_timeframe_data()` to return event data instead of triggering it directly**
87+
2. **Moved event triggering outside the lock in `_process_tick_data()`**
88+
3. **Made event emission non-blocking using `asyncio.create_task()`**
89+
4. **Added missing `asyncio` import**
90+
91+
### Key Changes
92+
```python
93+
# Before (inside lock):
94+
async with self.data_lock:
95+
for tf_key in self.timeframes:
96+
await self._update_timeframe_data(tf_key, timestamp, price, volume)
97+
# Event was triggered inside _update_timeframe_data while holding lock
98+
99+
# After (events triggered outside lock):
100+
events_to_trigger = []
101+
async with self.data_lock:
102+
for tf_key in self.timeframes:
103+
new_bar_event = await self._update_timeframe_data(tf_key, timestamp, price, volume)
104+
if new_bar_event:
105+
events_to_trigger.append(new_bar_event)
106+
107+
# Trigger events outside lock, non-blocking
108+
for event in events_to_trigger:
109+
asyncio.create_task(self._trigger_callbacks("new_bar", event))
110+
```
111+
112+
### Test Results
113+
- ✅ Event handlers are called successfully
114+
-`suite.data.get_current_price()` works from within handlers
115+
-`suite.data.get_data()` works from within handlers
116+
- ✅ No deadlock occurs
117+
- ✅ API remains unchanged

CLAUDE.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
44

5-
## Project Status: v3.1.5 - Stable Production Release
5+
## Project Status: v3.1.6 - Stable Production Release
66

77
**IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading.
88

@@ -300,6 +300,13 @@ async with ProjectX.from_env() as client:
300300

301301
## Recent Changes
302302

303+
### v3.1.6 - Critical Deadlock Fix
304+
- **Fixed**: Deadlock when calling `suite.data` methods from event handler callbacks (Issue #39)
305+
- **Improved**: Event emission now non-blocking to prevent handler deadlocks
306+
- **Enhanced**: Event triggering moved outside lock scope for better concurrency
307+
- **Added**: Missing asyncio import in data_processing module
308+
- **Maintained**: Full API compatibility - no breaking changes
309+
303310
### v3.1.5 - Enhanced Bar Data Retrieval
304311
- **Added**: Optional `start_time` and `end_time` parameters to `get_bars()` method
305312
- **Improved**: Precise time range specification for historical data queries

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
project = "project-x-py"
2424
copyright = "2025, Jeff West"
2525
author = "Jeff West"
26-
release = "3.1.5"
27-
version = "3.1.5"
26+
release = "3.1.6"
27+
version = "3.1.6"
2828

2929
# -- General configuration ---------------------------------------------------
3030

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
import signal
3+
4+
from project_x_py import TradingSuite
5+
from project_x_py.event_bus import EventType
6+
7+
8+
async def on_new_bar(suite: TradingSuite):
9+
current_price = await suite.data.get_current_price()
10+
last_bars = await suite.data.get_data(timeframe="15sec", bars=5)
11+
print(f"\nCurrent price: {current_price}")
12+
print("=" * 80)
13+
14+
if last_bars is not None and not last_bars.is_empty():
15+
print("Last 5 bars (oldest to newest):")
16+
print("-" * 80)
17+
18+
# Get the last 5 bars and iterate through them
19+
for row in last_bars.tail(5).iter_rows(named=True):
20+
timestamp = row["timestamp"]
21+
open_price = row["open"]
22+
high = row["high"]
23+
low = row["low"]
24+
close = row["close"]
25+
volume = row["volume"]
26+
27+
print(
28+
f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}"
29+
)
30+
else:
31+
print("No bar data available yet")
32+
33+
34+
async def main():
35+
suite = await TradingSuite.create(
36+
instrument="MNQ",
37+
timeframes=["15sec"],
38+
)
39+
40+
await suite.connect()
41+
42+
# Set up signal handler for clean exit
43+
shutdown_event = asyncio.Event()
44+
45+
def signal_handler(signum, frame):
46+
print("\n\nReceived interrupt signal. Shutting down gracefully...")
47+
shutdown_event.set()
48+
49+
signal.signal(signal.SIGINT, signal_handler)
50+
signal.signal(signal.SIGTERM, signal_handler)
51+
52+
print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.\n")
53+
54+
try:
55+
while not shutdown_event.is_set():
56+
try:
57+
# Wait for new bar with timeout to check shutdown event
58+
new_bar = await asyncio.wait_for(
59+
suite.wait_for(EventType.NEW_BAR), timeout=1.0
60+
)
61+
if new_bar:
62+
await on_new_bar(suite)
63+
except TimeoutError:
64+
# Timeout is expected, just check shutdown_event again
65+
pass
66+
finally:
67+
print("Disconnecting from real-time feeds...")
68+
await suite.disconnect()
69+
print("Clean shutdown complete.")
70+
71+
72+
if __name__ == "__main__":
73+
asyncio.run(main())
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import asyncio
2+
import signal
3+
4+
from project_x_py import TradingSuite
5+
from project_x_py.event_bus import EventType
6+
7+
8+
async def main():
9+
print("Creating TradingSuite...")
10+
suite = await TradingSuite.create(
11+
instrument="MNQ",
12+
timeframes=["15sec"],
13+
)
14+
print("TradingSuite created!")
15+
16+
# No need to call connect() - it's already connected via auto_connect=True
17+
print("Suite is already connected!")
18+
19+
# Set up signal handler for clean exit
20+
shutdown_event = asyncio.Event()
21+
22+
def signal_handler(signum, frame):
23+
print("\n\nReceived interrupt signal. Shutting down gracefully...")
24+
shutdown_event.set()
25+
26+
signal.signal(signal.SIGINT, signal_handler)
27+
signal.signal(signal.SIGTERM, signal_handler)
28+
29+
# Define the event handler as an async function
30+
async def on_new_bar(event):
31+
"""Handle new bar events"""
32+
print(f"New bar event received: {event}")
33+
print("About to call get_current_price...")
34+
try:
35+
current_price = await suite.data.get_current_price()
36+
print(f"Got current price: {current_price}")
37+
except Exception as e:
38+
print(f"Error getting current price: {e}")
39+
return
40+
41+
print("About to call get_data...")
42+
try:
43+
last_bars = await suite.data.get_data(timeframe="15sec", bars=5)
44+
print("Got data")
45+
except Exception as e:
46+
print(f"Error getting data: {e}")
47+
return
48+
print(f"\nCurrent price: {current_price}")
49+
print("=" * 80)
50+
51+
if last_bars is not None and not last_bars.is_empty():
52+
print("Last 5 bars (oldest to newest):")
53+
print("-" * 80)
54+
55+
# Get the last 5 bars and iterate through them
56+
for row in last_bars.tail(5).iter_rows(named=True):
57+
timestamp = row["timestamp"]
58+
open_price = row["open"]
59+
high = row["high"]
60+
low = row["low"]
61+
close = row["close"]
62+
volume = row["volume"]
63+
64+
print(
65+
f"Time: {timestamp} | O: ${open_price:,.2f} | H: ${high:,.2f} | L: ${low:,.2f} | C: ${close:,.2f} | Vol: {volume:,}"
66+
)
67+
else:
68+
print("No bar data available yet")
69+
70+
# Register the event handler
71+
print("About to register event handler...")
72+
await suite.on(EventType.NEW_BAR, on_new_bar)
73+
print("Event handler registered!")
74+
75+
print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.")
76+
print("Event handler registered and waiting for new bars...\n")
77+
78+
try:
79+
# Keep the program running
80+
while not shutdown_event.is_set():
81+
await asyncio.sleep(1)
82+
finally:
83+
print("Disconnecting from real-time feeds...")
84+
await suite.disconnect()
85+
print("Clean shutdown complete.")
86+
87+
88+
if __name__ == "__main__":
89+
asyncio.run(main())

0 commit comments

Comments
 (0)