Skip to content

Commit ba6ae6a

Browse files
TexasCodingclaude
andcommitted
Fix real-time data streaming example bar data access
- Changed from trying to format Polars Series to direct dict access - Access bar data fields directly from event.data['data'] dictionary - Fixes 'unsupported format string passed to Series.__format__' error - Properly displays OHLC, volume, and timestamp for new bars 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 2c48618 commit ba6ae6a

File tree

1 file changed

+47
-23
lines changed

1 file changed

+47
-23
lines changed

examples/realtime_data/01_basic_realtime_data_streaming.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
#!/usr/bin/env python
22
"""
3-
Basic real-time data streaming example
3+
Basic real-time data streaming example.
4+
5+
This example demonstrates:
6+
- Connecting to real-time data feeds
7+
- Handling tick (quote) updates
8+
- Processing new bar events
9+
- Monitoring connection health
10+
- Displaying streaming statistics
411
"""
512

613
import asyncio
714
from datetime import datetime
815

916
from project_x_py import EventType, TradingSuite
17+
from project_x_py.event_bus import Event
1018

1119

1220
async def main():
21+
"""Main function to run real-time data streaming."""
1322
# Create suite with real-time capabilities
1423
suite = await TradingSuite.create(
1524
["MNQ"],
@@ -26,7 +35,8 @@ async def main():
2635
bar_count = 0
2736
last_price = None
2837

29-
async def on_tick(event):
38+
async def on_tick(event: Event):
39+
"""Handle tick updates."""
3040
nonlocal tick_count, last_price
3141
tick_data = event.data
3242

@@ -38,7 +48,8 @@ async def on_tick(event):
3848
timestamp = datetime.now().strftime("%H:%M:%S")
3949
print(f"[{timestamp}] Tick #{tick_count}: ${last_price:.2f}")
4050

41-
async def on_new_bar(event):
51+
async def on_new_bar(event: Event):
52+
"""Handle new bar events."""
4253
nonlocal bar_count
4354
bar_count += 1
4455

@@ -48,34 +59,41 @@ async def on_new_bar(event):
4859
event_data = event.data
4960
timeframe = event_data.get("timeframe", "unknown")
5061

51-
# The actual bar data is nested in the 'data' field
52-
bar_data = event_data.get(
53-
"data", event_data
54-
) # Fallback to event_data if not nested
62+
# Get the bar data directly from the event
63+
bar_data = event_data.get("data", {})
5564

56-
print(f"[{timestamp}] New {timeframe} bar #{bar_count}:")
65+
if bar_data:
66+
print(f"[{timestamp}] New {timeframe} bar #{bar_count}:")
67+
68+
# Access the bar data fields directly
69+
open_price = bar_data.get("open", 0)
70+
high_price = bar_data.get("high", 0)
71+
low_price = bar_data.get("low", 0)
72+
close_price = bar_data.get("close", 0)
73+
volume = bar_data.get("volume", 0)
74+
bar_timestamp = bar_data.get("timestamp", "")
5775

58-
# Check if bar_data has the expected fields
59-
if (
60-
"open" in bar_data
61-
and "high" in bar_data
62-
and "low" in bar_data
63-
and "close" in bar_data
64-
):
6576
print(
66-
f" OHLC: ${bar_data['open']:.2f} / ${bar_data['high']:.2f} / ${bar_data['low']:.2f} / ${bar_data['close']:.2f}"
77+
f" OHLC: ${open_price:.2f} / ${high_price:.2f} / "
78+
f"${low_price:.2f} / ${close_price:.2f}"
6779
)
68-
print(f" Volume: {bar_data.get('volume', 0)}")
69-
print(f" Timestamp: {bar_data.get('timestamp')}")
70-
71-
async def on_connection_status(event):
72-
status = event.data.get("status", "unknown")
73-
print(f"Connection Status: {status}")
80+
print(f" Volume: {volume}")
81+
print(f" Timestamp: {bar_timestamp}")
82+
83+
async def on_connection_status(event: Event):
84+
"""Handle connection status changes."""
85+
status = event.data.get("connected", False)
86+
print(f"Connection Status Changed: {status}")
87+
if status:
88+
print("✅ Real-time feed connected")
89+
else:
90+
print("❌ Real-time feed disconnected")
7491

7592
# Register event handlers
7693
await mnq_context.on(EventType.QUOTE_UPDATE, on_tick)
7794
await mnq_context.on(EventType.NEW_BAR, on_new_bar)
7895
await mnq_context.on(EventType.CONNECTED, on_connection_status)
96+
await mnq_context.on(EventType.DISCONNECTED, on_connection_status)
7997

8098
print("Listening for real-time data... Press Ctrl+C to exit")
8199

@@ -88,11 +106,17 @@ async def on_connection_status(event):
88106
connection_health = await mnq_context.data.get_health_score()
89107

90108
print(
91-
f"Status - Price: ${current_price:.2f} | Ticks: {tick_count} | Bars: {bar_count} | Health: {connection_health}"
109+
f"Status - Price: ${current_price:.2f} | "
110+
f"Ticks: {tick_count} | Bars: {bar_count} | "
111+
f"Health: {connection_health}"
92112
)
93113

94114
except KeyboardInterrupt:
95115
print("\nShutting down real-time stream...")
116+
finally:
117+
# Ensure proper cleanup
118+
await suite.disconnect()
119+
print("Disconnected from real-time feeds")
96120

97121

98122
if __name__ == "__main__":

0 commit comments

Comments
 (0)