Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Repository Guidelines

## Project Structure & Module Organization
- Source: `src/project_x_py/` (core SDK: client, realtime, orderbook, indicators, utils, etc.).
- Tests: `tests/` (pytest; async tests supported).
- Examples: `examples/` (runnable end-to-end samples).
- Docs: `docs/` (Sphinx); helper script `scripts/build-docs.py`.
- Scripts: `scripts/` (build, docs, versioning). Build artifacts in `dist/` and coverage in `htmlcov/`.

## Build, Test, and Development Commands
- Install dev env: `uv sync` (or `pip install -e ".[dev]"`).
- Run tests + coverage: `uv run pytest` (HTML at `htmlcov/index.html`).
- Lint: `uv run ruff check .` Format: `uv run ruff format .` Types: `uv run mypy src`.
- Docs: `uv run python scripts/build-docs.py --clean --open`.
- CLI helpers: `uv run projectx-check` and `uv run projectx-config`.
- Run an example: `uv run python examples/01_basic_client_connection.py`.

## Coding Style & Naming Conventions
- Python 3.12+, 4-space indents, max line length 88.
- Format with Ruff formatter (Black-compatible); import order via Ruff/isort.
- Naming follows PEP 8; uppercase class names allowed in `indicators/` (see Ruff per-file ignores).
- Keep functions small, typed, and documented where behavior is non-obvious.

## Testing Guidelines
- Framework: pytest (+ pytest-asyncio). Place tests under `tests/` as `test_*.py`.
- Marks: `unit`, `integration`, `slow`, `realtime` (see `pyproject.toml`).
- Aim for meaningful coverage of public APIs; coverage reports are produced automatically.
- Prefer async-safe patterns; use fixtures and markers to isolate realtime or networked tests.

## Commit & Pull Request Guidelines
- Use Conventional Commits: `feat:`, `fix:`, `perf:`, `docs:`, `chore:`, etc. Add scope when helpful: `fix(orderbook): ...`.
- Keep subject ≀ 72 chars; body explains what/why and migration notes if breaking.
- Before PR: run `uv run ruff format . && uv run ruff check . && uv run mypy src && uv run pytest`.
- PRs include: clear description, linked issues, test updates, docs updates (if user-facing), and screenshots/logs when relevant.

## Security & Configuration Tips
- Auth via env vars `PROJECT_X_API_KEY`, `PROJECT_X_USERNAME`, or config at `~/.config/projectx/config.json`.
- Avoid committing secrets; prefer `.env` locally and CI secrets in GitHub.
- When adding realtime features, guard network calls in tests with markers.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Old implementations are removed when improved
- Clean, modern code architecture is prioritized

## [3.1.1] - 2025-08-10

### Fixed
- **πŸ› Test Suite Compatibility**: Fixed all failing tests for optimized cache implementation
- Updated test references from old cache variables (`_instrument_cache`) to new optimized ones (`_opt_instrument_cache`)
- Fixed datetime serialization/deserialization in cached DataFrames to properly preserve timezone information
- Resolved BatchedWebSocketHandler flush mechanism with event-based signaling for immediate message processing
- Fixed race condition in BatchedWebSocketHandler task creation
- Corrected SignalR mock methods in connection management tests (changed from AsyncMock to MagicMock for synchronous methods)

### Improved
- **✨ Cache Serialization**: Enhanced datetime handling in msgpack cache
- Proper timezone preservation for datetime columns in Polars DataFrames
- More robust deserialization with fallback handling
- Better datetime string format compatibility

## [3.1.0] - 2025-08-09

### Added
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ A **high-performance async Python SDK** for the [ProjectX Trading Platform](http

This Python SDK acts as a bridge between your trading strategies and the ProjectX platform, handling all the complex API interactions, data processing, and real-time connectivity.

## πŸš€ v3.1.0 - High-Performance Production Suite
## πŸš€ v3.1.1 - High-Performance Production Suite

**Latest Update (v3.1.0)**: Major performance optimizations delivering 2-5x improvements across the board with automatic memory management and enterprise-grade caching.
**Latest Update (v3.1.1)**: Bug fixes and improvements for test suite compatibility with optimized cache implementation, enhanced datetime serialization, and WebSocket handler improvements.

### What's New in v3.1.1
- **Fixed**: Test suite compatibility with optimized cache implementation
- **Fixed**: Datetime serialization/deserialization in cached DataFrames
- **Fixed**: BatchedWebSocketHandler flush and race condition issues
- **Fixed**: SignalR mock methods in connection management tests

### What's New in v3.1.0

Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
project = "project-x-py"
copyright = "2025, Jeff West"
author = "Jeff West"
release = "3.1.0"
version = "3.1.0"
release = "3.1.1"
version = "3.1.1"

# -- General configuration ---------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "project-x-py"
version = "3.1.0"
version = "3.1.1"
description = "High-performance Python SDK for futures trading with real-time WebSocket data, technical indicators, order management, and market depth analysis"
readme = "README.md"
license = { text = "MIT" }
Expand Down
2 changes: 1 addition & 1 deletion src/project_x_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@

from project_x_py.client.base import ProjectXBase

__version__ = "3.1.0"
__version__ = "3.1.1"
__author__ = "TexasCoding"

# Core client classes - renamed from Async* to standard names
Expand Down
44 changes: 41 additions & 3 deletions src/project_x_py/client/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import gc
import logging
import re
import time
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -75,9 +76,20 @@ def _serialize_dataframe(self, df: pl.DataFrame) -> bytes:
return b""

# Convert to dictionary format for msgpack
columns_data = {}
for col in df.columns:
col_data = df[col]
# Convert datetime columns to ISO strings for msgpack serialization
if col_data.dtype in [pl.Datetime, pl.Date]:
columns_data[col] = col_data.dt.to_string(
"%Y-%m-%d %H:%M:%S%.f"
).to_list()
else:
columns_data[col] = col_data.to_list()

data = {
"schema": {name: str(dtype) for name, dtype in df.schema.items()},
"columns": {col: df[col].to_list() for col in df.columns},
"columns": columns_data,
"shape": df.shape,
}

Expand Down Expand Up @@ -132,8 +144,34 @@ def _deserialize_dataframe(self, data: bytes) -> pl.DataFrame | None:
if not unpacked or "columns" not in unpacked:
return None

# Reconstruct DataFrame
return pl.DataFrame(unpacked["columns"])
# Reconstruct DataFrame with proper schema
df = pl.DataFrame(unpacked["columns"])

# Restore datetime columns based on stored schema
if "schema" in unpacked:
for col_name, dtype_str in unpacked["schema"].items():
if "datetime" in dtype_str.lower() and col_name in df.columns:
# Parse timezone from dtype string (e.g., "Datetime(time_unit='us', time_zone='UTC')")
time_zone = None
if "time_zone=" in dtype_str:
# Extract timezone
tz_match = re.search(r"time_zone='([^']+)'", dtype_str)
if tz_match:
time_zone = tz_match.group(1)

# Convert string column to datetime
if df[col_name].dtype == pl.Utf8:
df = df.with_columns(
pl.col(col_name)
.str.strptime(
pl.Datetime("us", time_zone),
"%Y-%m-%d %H:%M:%S%.f",
strict=False,
)
.alias(col_name)
)

return df
except Exception as e:
logger.debug(f"Failed to deserialize DataFrame: {e}")
return None
Expand Down
2 changes: 1 addition & 1 deletion src/project_x_py/indicators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@
)

# Version info
__version__ = "3.1.0"
__version__ = "3.1.1"
__author__ = "TexasCoding"


Expand Down
61 changes: 47 additions & 14 deletions src/project_x_py/realtime/batched_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def __init__(
# Lock for thread safety
self._lock = asyncio.Lock()

# Event to signal immediate flush
self._flush_event = asyncio.Event()

async def handle_message(self, message: dict[str, Any]) -> None:
"""
Add a message to the batch queue for processing.
Expand All @@ -76,7 +79,9 @@ async def handle_message(self, message: dict[str, Any]) -> None:
self.message_queue.append(message)

# Start batch processing if not already running
if not self.processing:
if not self.processing and (
not self._processing_task or self._processing_task.done()
):
self._processing_task = asyncio.create_task(self._process_batch())

async def _process_batch(self) -> None:
Expand All @@ -90,17 +95,39 @@ async def _process_batch(self) -> None:
batch: list[dict[str, Any]] = []
deadline = time.time() + self.batch_timeout

# Collect messages until batch is full or timeout
while time.time() < deadline and len(batch) < self.batch_size:
# Collect messages until batch is full or timeout or flush is requested
while (
time.time() < deadline
and len(batch) < self.batch_size
and not self._flush_event.is_set()
):
if self.message_queue:
# Get all available messages up to batch size
while self.message_queue and len(batch) < self.batch_size:
batch.append(self.message_queue.popleft())
else:
# Wait a bit for more messages
# Wait a bit for more messages or flush event
remaining = deadline - time.time()
if remaining > 0:
await asyncio.sleep(min(0.001, remaining))
try:
# Wait for either timeout or flush event
await asyncio.wait_for(
self._flush_event.wait(),
timeout=min(
0.01, remaining
), # Increased from 0.001 to 0.01
)
# Flush was triggered, break the loop
break
except TimeoutError:
# Normal timeout, continue
pass

# If flush was triggered, get any remaining messages
if self._flush_event.is_set():
while self.message_queue and len(batch) < 10000: # Safety limit
batch.append(self.message_queue.popleft())
self._flush_event.clear()

# Process the batch if we have messages
if batch:
Expand Down Expand Up @@ -148,17 +175,16 @@ async def _process_batch(self) -> None:

async def flush(self) -> None:
"""Force processing of all queued messages immediately."""
# Wait for any current processing to complete first
# Signal the processing task to flush immediately
self._flush_event.set()

# Wait for the current processing task to complete if it exists
if self._processing_task and not self._processing_task.done():
with contextlib.suppress(TimeoutError, asyncio.CancelledError):
with contextlib.suppress(asyncio.TimeoutError, asyncio.CancelledError):
await asyncio.wait_for(self._processing_task, timeout=1.0)

# Give the processing task a chance to actually process
await asyncio.sleep(0) # Yield to let other tasks run

# Now process any remaining messages
while self.message_queue:
# Process all remaining messages
# Process any remaining messages that weren't picked up
if self.message_queue:
batch = list(self.message_queue)
self.message_queue.clear()

Expand All @@ -170,6 +196,10 @@ async def flush(self) -> None:
except Exception as e:
logger.error(f"Error flushing batch of {len(batch)} messages: {e}")

# Clear the flush event for next time
self._flush_event.clear()
self.processing = False

def get_stats(self) -> dict[str, Any]:
"""
Get performance statistics for the batch handler.
Expand Down Expand Up @@ -202,6 +232,9 @@ def get_stats(self) -> dict[str, Any]:

async def stop(self) -> None:
"""Stop the batch handler and process remaining messages."""
# Signal flush to trigger immediate processing
self._flush_event.set()

# Wait for current processing to complete
if self._processing_task and not self._processing_task.done():
try:
Expand All @@ -211,7 +244,7 @@ async def stop(self) -> None:
with contextlib.suppress(asyncio.CancelledError):
await self._processing_task

# Flush remaining messages
# Process any remaining messages that weren't handled
await self.flush()

logger.info(
Expand Down
4 changes: 2 additions & 2 deletions tests/client/test_client_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def test_auth_market_data_workflow(
# Step 2: Get instrument data
instrument = await client.get_instrument("MGC")
assert instrument is not None
assert "MGC" in client._instrument_cache
assert "MGC" in client._opt_instrument_cache

# Step 3: Get market data
bars = await client.get_bars("MGC", days=5, interval=5)
Expand All @@ -85,7 +85,7 @@ async def test_auth_market_data_workflow(

# Step 4: Verify cache is populated
cache_key = "MGC_5_5_2_True"
assert cache_key in client._market_data_cache
assert cache_key in client._opt_market_data_cache

@pytest.mark.asyncio
async def test_trading_workflow(
Expand Down
Loading
Loading