Skip to content

Commit cbb8dea

Browse files
TexasCodingclaude
andcommitted
fix: Real-time data processing for E-mini contracts and low-volume periods (v3.1.8)
## Fixed - Real-time data not being processed for E-mini contracts (NQ/ES) that resolve to different exchange symbols - Symbol matching now properly handles contract resolution (e.g., NQ→ENQ, ES→EP) - Stores both user-specified instrument and resolved symbol ID for accurate matching ## Added - Bar timer mechanism that creates empty bars during low-volume periods - Ensures consistent bar generation at regular intervals regardless of trading activity - Empty bars (volume=0) maintain price continuity using the last close price - Particularly important for low-volume contracts and after-hours trading ## Technical Details - RealtimeDataManager now tracks `instrument_symbol_id` alongside `instrument` - Symbol validation checks both original and resolved symbols - Bar timer runs periodically based on shortest configured timeframe - Empty bars are distinguishable by volume=0 This fixes issues where NQ and other E-mini contracts were not updating in real-time. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent f830644 commit cbb8dea

File tree

10 files changed

+204
-12
lines changed

10 files changed

+204
-12
lines changed

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
- Migration guides will be provided for all breaking changes
1515
- Semantic versioning (MAJOR.MINOR.PATCH) is strictly followed
1616

17+
## [3.1.8] - 2025-08-12
18+
19+
### Fixed
20+
- **🔧 Real-time Data Processing**: Fixed real-time data not being processed for E-mini contracts (NQ/ES)
21+
- Symbol matching now handles contract resolution (e.g., NQ resolves to ENQ)
22+
- Stores both original instrument and resolved symbol ID for proper matching
23+
- Affects all contracts where user symbol differs from exchange symbol
24+
25+
### Added
26+
- **⏱️ Bar Timer Mechanism**: Automatic bar creation during low-volume periods
27+
- Creates empty bars (volume=0) at regular intervals when no ticks arrive
28+
- Ensures consistent bar generation for all instruments regardless of trading activity
29+
- Particularly important for low-volume contracts and after-hours trading
30+
- Empty bars maintain price continuity using the last close price
31+
32+
### Improved
33+
- Enhanced symbol validation to support both user-specified and exchange-resolved symbols
34+
- Better handling of futures contract name resolution (NQ→ENQ, ES→EP, etc.)
35+
- More robust real-time data pipeline for all futures contracts
36+
1737
## [3.1.7] - 2025-08-12
1838

1939
### Changed

CLAUDE.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
44

5-
## Project Status: v3.1.7 - Stable Production Release
5+
## Project Status: v3.1.8 - Stable Production Release
66

77
**IMPORTANT**: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading.
88

@@ -288,7 +288,13 @@ async with ProjectX.from_env() as client:
288288

289289
## Recent Changes
290290

291-
### v3.1.7 - Latest Release
291+
### v3.1.8 - Latest Release
292+
- **Fixed**: Real-time data processing for E-mini contracts (NQ/ES) that resolve to different symbols
293+
- **Added**: Bar timer mechanism to create empty bars during low-volume periods
294+
- **Improved**: Symbol matching to handle contract resolution (e.g., NQ→ENQ)
295+
- **Enhanced**: Real-time data manager now properly processes all futures contracts
296+
297+
### v3.1.7 - Previous Release
292298
- Minor updates and improvements
293299
- Documentation enhancements
294300

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
project = "project-x-py"
2424
copyright = "2025, Jeff West"
2525
author = "Jeff West"
26-
release = "3.1.7"
27-
version = "3.1.7"
26+
release = "3.1.8"
27+
version = "3.1.8"
2828

2929
# -- General configuration ---------------------------------------------------
3030

examples/realtime_data_manager/01_events_with_on.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77

88
async def main():
99
print("Creating TradingSuite...")
10+
# Note: Use "MNQ" for Micro E-mini Nasdaq-100 futures
11+
# "NQ" resolves to E-mini Nasdaq (ENQ) which may have different data characteristics
1012
suite = await TradingSuite.create(
11-
instrument="MNQ",
13+
instrument="ES", # Works best with MNQ for consistent real-time updates
1214
timeframes=["15sec"],
1315
)
1416
print("TradingSuite created!")
@@ -19,7 +21,7 @@ async def main():
1921
# Set up signal handler for clean exit
2022
shutdown_event = asyncio.Event()
2123

22-
def signal_handler(signum, frame):
24+
def signal_handler(_signum, _frame):
2325
print("\n\nReceived interrupt signal. Shutting down gracefully...")
2426
shutdown_event.set()
2527

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "project-x-py"
3-
version = "3.1.7"
3+
version = "3.1.8"
44
description = "High-performance Python SDK for futures trading with real-time WebSocket data, technical indicators, order management, and market depth analysis"
55
readme = "README.md"
66
license = { text = "MIT" }

src/project_x_py/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595

9696
from project_x_py.client.base import ProjectXBase
9797

98-
__version__ = "3.1.7"
98+
__version__ = "3.1.8"
9999
__author__ = "TexasCoding"
100100

101101
# Core client classes - renamed from Async* to standard names

src/project_x_py/indicators/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@
202202
)
203203

204204
# Version info
205-
__version__ = "3.1.7"
205+
__version__ = "3.1.8"
206206
__author__ = "TexasCoding"
207207

208208

src/project_x_py/realtime_data_manager/core.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ async def on_new_bar(data):
117117
"""
118118

119119
import asyncio
120+
import contextlib
120121
import time
121122
from collections import defaultdict
122123
from datetime import datetime
@@ -399,6 +400,8 @@ def __init__(
399400

400401
# Contract ID for real-time subscriptions
401402
self.contract_id: str | None = None
403+
# Actual symbol ID from the resolved instrument (e.g., "ENQ" when user specifies "NQ")
404+
self.instrument_symbol_id: str | None = None
402405

403406
# Memory management settings are set in _apply_config_defaults()
404407
self.last_cleanup: float = time.time()
@@ -431,6 +434,9 @@ def __init__(
431434
# Background cleanup task
432435
self._cleanup_task: asyncio.Task[None] | None = None
433436

437+
# Background bar timer task for low-volume periods
438+
self._bar_timer_task: asyncio.Task[None] | None = None
439+
434440
self.logger.info(
435441
"RealtimeDataManager initialized", extra={"instrument": instrument}
436442
)
@@ -529,6 +535,16 @@ async def initialize(self, initial_days: int = 1) -> bool:
529535
# Store the exact contract ID for real-time subscriptions
530536
self.contract_id = instrument_info.id
531537

538+
# Store the actual symbol ID for matching (e.g., "ENQ" when user specifies "NQ")
539+
# Extract from symbolId like "F.US.ENQ" -> "ENQ"
540+
if instrument_info.symbolId and "." in instrument_info.symbolId:
541+
parts = instrument_info.symbolId.split(".")
542+
self.instrument_symbol_id = (
543+
parts[-1] if parts else instrument_info.symbolId
544+
)
545+
else:
546+
self.instrument_symbol_id = instrument_info.symbolId or self.instrument
547+
532548
# Load initial data for all timeframes
533549
async with self.data_lock:
534550
for tf_key, tf_config in self.timeframes.items():
@@ -668,6 +684,9 @@ async def on_new_bar(data):
668684
# Start cleanup task
669685
self.start_cleanup_task()
670686

687+
# Start bar timer task for low-volume periods
688+
self._start_bar_timer_task()
689+
671690
self.logger.debug(
672691
LogMessages.DATA_SUBSCRIBE,
673692
extra={"status": "feed_started", "instrument": self.instrument},
@@ -690,6 +709,9 @@ async def stop_realtime_feed(self) -> None:
690709
# Cancel cleanup task
691710
await self.stop_cleanup_task()
692711

712+
# Cancel bar timer task
713+
await self._stop_bar_timer_task()
714+
693715
# Unsubscribe from market data
694716
# Note: unsubscribe_market_data will be implemented in ProjectXRealtimeClient
695717
if self.contract_id:
@@ -730,3 +752,134 @@ async def cleanup(self) -> None:
730752
dom_attr[_k] = []
731753

732754
self.logger.info("✅ RealtimeDataManager cleanup completed")
755+
756+
def _start_bar_timer_task(self) -> None:
757+
"""Start the bar timer task for creating bars during low-volume periods."""
758+
if self._bar_timer_task is None or self._bar_timer_task.done():
759+
self._bar_timer_task = asyncio.create_task(self._bar_timer_loop())
760+
self.logger.debug("Bar timer task started")
761+
762+
async def _stop_bar_timer_task(self) -> None:
763+
"""Stop the bar timer task."""
764+
if self._bar_timer_task and not self._bar_timer_task.done():
765+
self._bar_timer_task.cancel()
766+
with contextlib.suppress(asyncio.CancelledError):
767+
await self._bar_timer_task
768+
self.logger.debug("Bar timer task stopped")
769+
770+
async def _bar_timer_loop(self) -> None:
771+
"""
772+
Periodic task to create empty bars during low-volume periods.
773+
774+
This ensures bars are created at regular intervals even when
775+
there's no trading activity (important for low-volume instruments).
776+
"""
777+
try:
778+
# Find the shortest timeframe interval to check
779+
min_seconds = float("inf")
780+
for tf_config in self.timeframes.values():
781+
interval = tf_config["interval"]
782+
unit = tf_config["unit"]
783+
784+
# Convert to seconds
785+
if unit == "sec":
786+
seconds = interval
787+
elif unit == "min":
788+
seconds = interval * 60
789+
elif unit == "hr":
790+
seconds = interval * 3600
791+
else:
792+
continue
793+
794+
min_seconds = min(min_seconds, seconds)
795+
796+
# Check at least every 5 seconds, but no more than the shortest interval
797+
check_interval = min(5.0, min_seconds / 3)
798+
799+
self.logger.debug(f"Bar timer checking every {check_interval} seconds")
800+
801+
while self.is_running:
802+
await asyncio.sleep(check_interval)
803+
804+
if not self.is_running:
805+
break
806+
807+
# Check each timeframe for stale bars
808+
await self._check_and_create_empty_bars()
809+
810+
except asyncio.CancelledError:
811+
self.logger.debug("Bar timer task cancelled")
812+
raise
813+
except Exception as e:
814+
self.logger.error(f"Error in bar timer loop: {e}")
815+
816+
async def _check_and_create_empty_bars(self) -> None:
817+
"""
818+
Check each timeframe and create empty bars if needed.
819+
820+
This handles low-volume periods where no ticks are coming in,
821+
ensuring bars are still created at the proper intervals.
822+
"""
823+
try:
824+
current_time = datetime.now(self.timezone)
825+
events_to_trigger = []
826+
827+
async with self.data_lock:
828+
for tf_key, tf_config in self.timeframes.items():
829+
if tf_key not in self.data:
830+
continue
831+
832+
current_data = self.data[tf_key]
833+
if current_data.height == 0:
834+
continue
835+
836+
# Get the last bar time
837+
last_bar_time = (
838+
current_data.select(pl.col("timestamp")).tail(1).item()
839+
)
840+
841+
# Calculate what the current bar time should be
842+
expected_bar_time = self._calculate_bar_time(
843+
current_time, tf_config["interval"], tf_config["unit"]
844+
)
845+
846+
# If we're missing bars, create empty ones
847+
if expected_bar_time > last_bar_time:
848+
# Get the last close price to use for empty bars
849+
last_close = current_data.select(pl.col("close")).tail(1).item()
850+
851+
# Create empty bar with last close as OHLC, volume=0
852+
new_bar = pl.DataFrame(
853+
{
854+
"timestamp": [expected_bar_time],
855+
"open": [last_close],
856+
"high": [last_close],
857+
"low": [last_close],
858+
"close": [last_close],
859+
"volume": [0], # Zero volume for empty bars
860+
}
861+
)
862+
863+
self.data[tf_key] = pl.concat([current_data, new_bar])
864+
self.last_bar_times[tf_key] = expected_bar_time
865+
866+
self.logger.debug(
867+
f"Created empty bar for {tf_key} at {expected_bar_time} "
868+
f"(low volume period)"
869+
)
870+
871+
# Prepare event to trigger
872+
events_to_trigger.append(
873+
{
874+
"timeframe": tf_key,
875+
"bar_time": expected_bar_time,
876+
"data": new_bar.to_dicts()[0],
877+
}
878+
)
879+
880+
# Trigger events outside the lock (non-blocking)
881+
for event in events_to_trigger:
882+
asyncio.create_task(self._trigger_callbacks("new_bar", event))
883+
884+
except Exception as e:
885+
self.logger.error(f"Error checking/creating empty bars: {e}")

src/project_x_py/realtime_data_manager/validation.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,19 @@ def _symbol_matches_instrument(
248248
else:
249249
base_symbol = symbol
250250

251-
# Compare with our instrument (case-insensitive)
252-
return base_symbol.upper() == self.instrument.upper()
251+
# Compare with both our original instrument and the resolved symbol ID
252+
# This handles cases like NQ -> ENQ resolution
253+
base_upper = base_symbol.upper()
254+
255+
# Check against original instrument (e.g., "NQ")
256+
if base_upper == self.instrument.upper():
257+
return True
258+
259+
# Check against resolved symbol ID (e.g., "ENQ" when user specified "NQ")
260+
if hasattr(self, "instrument_symbol_id") and self.instrument_symbol_id:
261+
return base_upper == self.instrument_symbol_id.upper()
262+
263+
return False
253264

254265
def get_realtime_validation_status(
255266
self: "RealtimeDataManagerProtocol",

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)