Skip to content

Commit c71d2ba

Browse files
committed
fix: v3.1.1 - Test suite compatibility and cache improvements
## Fixed - Updated test references from old cache variables to new optimized ones (_instrument_cache -> _opt_instrument_cache) - Fixed datetime serialization/deserialization in cached DataFrames to preserve timezone information - Resolved BatchedWebSocketHandler flush mechanism with event-based signaling - Fixed race condition in BatchedWebSocketHandler task creation - Corrected SignalR mock methods in connection management tests (AsyncMock -> MagicMock) ## Improved - Enhanced datetime handling in msgpack cache with proper timezone preservation - More robust cache deserialization with fallback handling - Better datetime string format compatibility ## Tests - All 438 tests now passing - Added comprehensive test coverage for new cache implementation - Fixed all test suite compatibility issues with optimized cache This release ensures full test suite compatibility with the v3.1.0 performance optimizations.
1 parent 6fe4fac commit c71d2ba

File tree

13 files changed

+443
-85
lines changed

13 files changed

+443
-85
lines changed

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
- Old implementations are removed when improved
1414
- Clean, modern code architecture is prioritized
1515

16+
## [3.1.1] - 2025-08-10
17+
18+
### Fixed
19+
- **🐛 Test Suite Compatibility**: Fixed all failing tests for optimized cache implementation
20+
- Updated test references from old cache variables (`_instrument_cache`) to new optimized ones (`_opt_instrument_cache`)
21+
- Fixed datetime serialization/deserialization in cached DataFrames to properly preserve timezone information
22+
- Resolved BatchedWebSocketHandler flush mechanism with event-based signaling for immediate message processing
23+
- Fixed race condition in BatchedWebSocketHandler task creation
24+
- Corrected SignalR mock methods in connection management tests (changed from AsyncMock to MagicMock for synchronous methods)
25+
26+
### Improved
27+
- **✨ Cache Serialization**: Enhanced datetime handling in msgpack cache
28+
- Proper timezone preservation for datetime columns in Polars DataFrames
29+
- More robust deserialization with fallback handling
30+
- Better datetime string format compatibility
31+
1632
## [3.1.0] - 2025-08-09
1733

1834
### Added

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,15 @@ A **high-performance async Python SDK** for the [ProjectX Trading Platform](http
2121

2222
This Python SDK acts as a bridge between your trading strategies and the ProjectX platform, handling all the complex API interactions, data processing, and real-time connectivity.
2323

24-
## 🚀 v3.1.0 - High-Performance Production Suite
24+
## 🚀 v3.1.1 - High-Performance Production Suite
2525

26-
**Latest Update (v3.1.0)**: Major performance optimizations delivering 2-5x improvements across the board with automatic memory management and enterprise-grade caching.
26+
**Latest Update (v3.1.1)**: Bug fixes and improvements for test suite compatibility with optimized cache implementation, enhanced datetime serialization, and WebSocket handler improvements.
27+
28+
### What's New in v3.1.1
29+
- **Fixed**: Test suite compatibility with optimized cache implementation
30+
- **Fixed**: Datetime serialization/deserialization in cached DataFrames
31+
- **Fixed**: BatchedWebSocketHandler flush and race condition issues
32+
- **Fixed**: SignalR mock methods in connection management tests
2733

2834
### What's New in v3.1.0
2935

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
project = "project-x-py"
2424
copyright = "2025, Jeff West"
2525
author = "Jeff West"
26-
release = "3.1.0"
27-
version = "3.1.0"
26+
release = "3.1.1"
27+
version = "3.1.1"
2828

2929
# -- General configuration ---------------------------------------------------
3030

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "project-x-py"
3-
version = "3.1.0"
3+
version = "3.1.1"
44
description = "High-performance Python SDK for futures trading with real-time WebSocket data, technical indicators, order management, and market depth analysis"
55
readme = "README.md"
66
license = { text = "MIT" }

src/project_x_py/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595

9696
from project_x_py.client.base import ProjectXBase
9797

98-
__version__ = "3.1.0"
98+
__version__ = "3.1.1"
9999
__author__ = "TexasCoding"
100100

101101
# Core client classes - renamed from Async* to standard names

src/project_x_py/client/cache.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import gc
1111
import logging
12+
import re
1213
import time
1314
from typing import TYPE_CHECKING, Any
1415

@@ -75,9 +76,20 @@ def _serialize_dataframe(self, df: pl.DataFrame) -> bytes:
7576
return b""
7677

7778
# Convert to dictionary format for msgpack
79+
columns_data = {}
80+
for col in df.columns:
81+
col_data = df[col]
82+
# Convert datetime columns to ISO strings for msgpack serialization
83+
if col_data.dtype in [pl.Datetime, pl.Date]:
84+
columns_data[col] = col_data.dt.to_string(
85+
"%Y-%m-%d %H:%M:%S%.f"
86+
).to_list()
87+
else:
88+
columns_data[col] = col_data.to_list()
89+
7890
data = {
7991
"schema": {name: str(dtype) for name, dtype in df.schema.items()},
80-
"columns": {col: df[col].to_list() for col in df.columns},
92+
"columns": columns_data,
8193
"shape": df.shape,
8294
}
8395

@@ -132,8 +144,34 @@ def _deserialize_dataframe(self, data: bytes) -> pl.DataFrame | None:
132144
if not unpacked or "columns" not in unpacked:
133145
return None
134146

135-
# Reconstruct DataFrame
136-
return pl.DataFrame(unpacked["columns"])
147+
# Reconstruct DataFrame with proper schema
148+
df = pl.DataFrame(unpacked["columns"])
149+
150+
# Restore datetime columns based on stored schema
151+
if "schema" in unpacked:
152+
for col_name, dtype_str in unpacked["schema"].items():
153+
if "datetime" in dtype_str.lower() and col_name in df.columns:
154+
# Parse timezone from dtype string (e.g., "Datetime(time_unit='us', time_zone='UTC')")
155+
time_zone = None
156+
if "time_zone=" in dtype_str:
157+
# Extract timezone
158+
tz_match = re.search(r"time_zone='([^']+)'", dtype_str)
159+
if tz_match:
160+
time_zone = tz_match.group(1)
161+
162+
# Convert string column to datetime
163+
if df[col_name].dtype == pl.Utf8:
164+
df = df.with_columns(
165+
pl.col(col_name)
166+
.str.strptime(
167+
pl.Datetime("us", time_zone),
168+
"%Y-%m-%d %H:%M:%S%.f",
169+
strict=False,
170+
)
171+
.alias(col_name)
172+
)
173+
174+
return df
137175
except Exception as e:
138176
logger.debug(f"Failed to deserialize DataFrame: {e}")
139177
return None

src/project_x_py/indicators/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@
202202
)
203203

204204
# Version info
205-
__version__ = "3.1.0"
205+
__version__ = "3.1.1"
206206
__author__ = "TexasCoding"
207207

208208

src/project_x_py/realtime/batched_handler.py

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ def __init__(
6565
# Lock for thread safety
6666
self._lock = asyncio.Lock()
6767

68+
# Event to signal immediate flush
69+
self._flush_event = asyncio.Event()
70+
6871
async def handle_message(self, message: dict[str, Any]) -> None:
6972
"""
7073
Add a message to the batch queue for processing.
@@ -76,7 +79,9 @@ async def handle_message(self, message: dict[str, Any]) -> None:
7679
self.message_queue.append(message)
7780

7881
# Start batch processing if not already running
79-
if not self.processing:
82+
if not self.processing and (
83+
not self._processing_task or self._processing_task.done()
84+
):
8085
self._processing_task = asyncio.create_task(self._process_batch())
8186

8287
async def _process_batch(self) -> None:
@@ -90,17 +95,39 @@ async def _process_batch(self) -> None:
9095
batch: list[dict[str, Any]] = []
9196
deadline = time.time() + self.batch_timeout
9297

93-
# Collect messages until batch is full or timeout
94-
while time.time() < deadline and len(batch) < self.batch_size:
98+
# Collect messages until batch is full or timeout or flush is requested
99+
while (
100+
time.time() < deadline
101+
and len(batch) < self.batch_size
102+
and not self._flush_event.is_set()
103+
):
95104
if self.message_queue:
96105
# Get all available messages up to batch size
97106
while self.message_queue and len(batch) < self.batch_size:
98107
batch.append(self.message_queue.popleft())
99108
else:
100-
# Wait a bit for more messages
109+
# Wait a bit for more messages or flush event
101110
remaining = deadline - time.time()
102111
if remaining > 0:
103-
await asyncio.sleep(min(0.001, remaining))
112+
try:
113+
# Wait for either timeout or flush event
114+
await asyncio.wait_for(
115+
self._flush_event.wait(),
116+
timeout=min(
117+
0.01, remaining
118+
), # Increased from 0.001 to 0.01
119+
)
120+
# Flush was triggered, break the loop
121+
break
122+
except TimeoutError:
123+
# Normal timeout, continue
124+
pass
125+
126+
# If flush was triggered, get any remaining messages
127+
if self._flush_event.is_set():
128+
while self.message_queue and len(batch) < 10000: # Safety limit
129+
batch.append(self.message_queue.popleft())
130+
self._flush_event.clear()
104131

105132
# Process the batch if we have messages
106133
if batch:
@@ -148,17 +175,16 @@ async def _process_batch(self) -> None:
148175

149176
async def flush(self) -> None:
150177
"""Force processing of all queued messages immediately."""
151-
# Wait for any current processing to complete first
178+
# Signal the processing task to flush immediately
179+
self._flush_event.set()
180+
181+
# Wait for the current processing task to complete if it exists
152182
if self._processing_task and not self._processing_task.done():
153-
with contextlib.suppress(TimeoutError, asyncio.CancelledError):
183+
with contextlib.suppress(asyncio.TimeoutError, asyncio.CancelledError):
154184
await asyncio.wait_for(self._processing_task, timeout=1.0)
155185

156-
# Give the processing task a chance to actually process
157-
await asyncio.sleep(0) # Yield to let other tasks run
158-
159-
# Now process any remaining messages
160-
while self.message_queue:
161-
# Process all remaining messages
186+
# Process any remaining messages that weren't picked up
187+
if self.message_queue:
162188
batch = list(self.message_queue)
163189
self.message_queue.clear()
164190

@@ -170,6 +196,10 @@ async def flush(self) -> None:
170196
except Exception as e:
171197
logger.error(f"Error flushing batch of {len(batch)} messages: {e}")
172198

199+
# Clear the flush event for next time
200+
self._flush_event.clear()
201+
self.processing = False
202+
173203
def get_stats(self) -> dict[str, Any]:
174204
"""
175205
Get performance statistics for the batch handler.
@@ -202,6 +232,9 @@ def get_stats(self) -> dict[str, Any]:
202232

203233
async def stop(self) -> None:
204234
"""Stop the batch handler and process remaining messages."""
235+
# Signal flush to trigger immediate processing
236+
self._flush_event.set()
237+
205238
# Wait for current processing to complete
206239
if self._processing_task and not self._processing_task.done():
207240
try:
@@ -211,7 +244,7 @@ async def stop(self) -> None:
211244
with contextlib.suppress(asyncio.CancelledError):
212245
await self._processing_task
213246

214-
# Flush remaining messages
247+
# Process any remaining messages that weren't handled
215248
await self.flush()
216249

217250
logger.info(

tests/client/test_client_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async def test_auth_market_data_workflow(
7575
# Step 2: Get instrument data
7676
instrument = await client.get_instrument("MGC")
7777
assert instrument is not None
78-
assert "MGC" in client._instrument_cache
78+
assert "MGC" in client._opt_instrument_cache
7979

8080
# Step 3: Get market data
8181
bars = await client.get_bars("MGC", days=5, interval=5)
@@ -85,7 +85,7 @@ async def test_auth_market_data_workflow(
8585

8686
# Step 4: Verify cache is populated
8787
cache_key = "MGC_5_5_2_True"
88-
assert cache_key in client._market_data_cache
88+
assert cache_key in client._opt_market_data_cache
8989

9090
@pytest.mark.asyncio
9191
async def test_trading_workflow(

0 commit comments

Comments
 (0)