This file provides guidance to Google's Gemini models when working with code in this repository.
IMPORTANT: This project uses a fully asynchronous architecture. All APIs are async-only, optimized for high-performance futures trading.
IMPORTANT: This project has reached stable production status. When making changes:
- Maintain Backward Compatibility: Keep existing APIs functional with deprecation warnings
- Deprecation Policy: Mark deprecated features with warnings, remove after 2 minor versions
- Semantic Versioning: Follow semver strictly (MAJOR.MINOR.PATCH)
- Migration Paths: Provide clear migration guides for breaking changes
- Modern Patterns: Use the latest Python patterns while maintaining compatibility
- Gradual Refactoring: Improve code quality without breaking existing interfaces
- Async-First: All new code must use async/await patterns
Example approach:
- ✅ DO: Keep old method signatures with deprecation warnings
- ✅ DO: Provide new improved APIs alongside old ones
- ✅ DO: Add compatibility shims when necessary
- ✅ DO: Document migration paths clearly
- ❌ DON'T: Break existing APIs without major version bump
- ❌ DON'T: Remove deprecated features without proper notice period
- Mark as deprecated with
warnings.warn()and@deprecateddecorator - Document replacement in deprecation message
- Keep deprecated feature for at least 2 minor versions
- Remove only in major version releases (4.0.0, 5.0.0, etc.)
Example:
import warnings
from typing import deprecated
@deprecated("Use new_method() instead. Will be removed in v4.0.0")
def old_method(self):
warnings.warn(
"old_method() is deprecated, use new_method() instead. "
"Will be removed in v4.0.0",
DeprecationWarning,
stacklevel=2
)
return self.new_method()uv add [package] # Add a dependency
uv add --dev [package] # Add a development dependency
uv sync # Install/sync dependencies
uv run [command] # Run command in virtual environmentuv run pytest # Run all tests
uv run pytest tests/test_client.py # Run specific test file
uv run pytest -m "not slow" # Run tests excluding slow ones
uv run pytest --cov=project_x_py --cov-report=html # Generate coverage report
uv run pytest -k "async" # Run only async tests# Test async methods with pytest-asyncio
import pytest
@pytest.mark.asyncio
async def test_async_method():
async with ProjectX.from_env() as client:
await client.authenticate()
result = await client.get_bars("MNQ", days=1)
assert result is not Noneuv run ruff check . # Lint code
uv run ruff check . --fix # Auto-fix linting issues
uv run ruff format . # Format code
uv run mypy src/ # Type checkinguv build # Build wheel and source distribution
uv run python -m build # Alternative build commandProjectX Client (src/project_x_py/client/)
- Main async API client for TopStepX ProjectX Gateway
- Modular architecture with specialized modules:
auth.py: Authentication and JWT token managementhttp.py: Async HTTP client with retry logiccache.py: Intelligent caching for instrumentsmarket_data.py: Market data operationstrading.py: Trading operationsrate_limiter.py: Async rate limitingbase.py: Base class combining all mixins
Specialized Managers (All Async)
OrderManager(order_manager/): Comprehensive async order operationscore.py: Main order operationsbracket_orders.py: OCO and bracket order logicposition_orders.py: Position-based order managementtracking.py: Order state trackingtemplates.py: Order templates for common strategies
PositionManager(position_manager/): Async position tracking and risk managementcore.py: Position management corerisk.py: Risk calculations and limitsanalytics.py: Performance analyticsmonitoring.py: Real-time position monitoringtracking.py: Position lifecycle tracking
RiskManager(risk_manager/): Integrated risk managementcore.py: Risk limits and validationmonitoring.py: Real-time risk monitoringanalytics.py: Risk metrics and reporting
ProjectXRealtimeDataManager(realtime_data_manager/): Async WebSocket datacore.py: Main data managercallbacks.py: Event callback handlingdata_processing.py: OHLCV bar constructionmemory_management.py: Efficient data storage
OrderBook(orderbook/): Async Level 2 market depthbase.py: Core orderbook functionalityanalytics.py: Market microstructure analysisdetection.py: Iceberg and spoofing detectionprofile.py: Volume profile analysis
Technical Indicators (src/project_x_py/indicators/)
- TA-Lib compatible indicator library built on Polars
- 58+ indicators including pattern recognition:
- Momentum: RSI, MACD, Stochastic, etc.
- Overlap: SMA, EMA, Bollinger Bands, etc.
- Volatility: ATR, Keltner Channels, etc.
- Volume: OBV, VWAP, Money Flow, etc.
- Pattern Recognition (NEW):
- Fair Value Gap (FVG): Price imbalance detection
- Order Block: Institutional order zone identification
- Waddah Attar Explosion: Volatility-based trend strength
- All indicators work with Polars DataFrames for performance
Configuration System
- Environment variable based configuration
- JSON config file support (
~/.config/projectx/config.json) - ProjectXConfig dataclass for type safety
- ConfigManager for centralized configuration handling
Event System
- Unified EventBus for cross-component communication
- Type-safe event definitions
- Async event handlers with priority support
- Built-in event types for all trading events
Async Factory Functions: Use async create_* functions for component initialization:
# TradingSuite - Recommended approach (v3.0.0+)
async def setup_trading():
# Simple one-line setup with TradingSuite
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min", "5min"],
features=["orderbook"]
)
# Everything is ready - client authenticated, realtime connected
return suiteDependency Injection: Managers receive their dependencies (ProjectX client, realtime client) rather than creating them.
Real-time Integration: Single ProjectXRealtimeClient instance shared across managers for WebSocket connection efficiency.
Context Managers: Always use async context managers for proper resource cleanup:
async with ProjectX.from_env() as client:
# Client automatically handles auth, cleanup
pass- Authentication: ProjectX client authenticates and provides JWT tokens
- Real-time Setup: Create ProjectXRealtimeClient with JWT for WebSocket connections
- Manager Initialization: Pass clients to specialized managers via dependency injection
- Data Processing: Polars DataFrames used throughout for performance
- Event Handling: Real-time updates flow through WebSocket to respective managers
- All indicators follow TA-Lib naming conventions (uppercase function names allowed in
indicators/__init__.py) - Use Polars pipe() method for chaining:
data.pipe(SMA, period=20).pipe(RSI, period=14) - Indicators support both class instantiation and direct function calls
- All price handling uses Decimal for precision
- Automatic tick size alignment in OrderManager
- Price formatting utilities in utils.py
- Custom exception hierarchy in exceptions.py
- All API errors wrapped in ProjectX-specific exceptions
- Comprehensive error context and retry logic
- Pytest with async support and mocking
- Test markers: unit, integration, slow, realtime
- High test coverage required (configured in pyproject.toml)
- Mock external API calls in unit tests
Required environment variables:
PROJECT_X_API_KEY: TopStepX API keyPROJECT_X_USERNAME: TopStepX username
Optional configuration:
PROJECTX_API_URL: Custom API endpointPROJECTX_TIMEOUT_SECONDS: Request timeoutPROJECTX_RETRY_ATTEMPTS: Retry attempts
- HTTP connection pooling with retry strategies for 50-70% fewer connection overhead
- Instrument caching reduces repeated API calls by 80%
- Preemptive JWT token refresh at 80% lifetime prevents authentication delays
- Session-based requests with automatic retry on failures
- OrderBook: Sliding windows with configurable limits (max 10K trades, 1K depth entries)
- RealtimeDataManager: Automatic cleanup maintains 1K bars per timeframe
- Indicators: LRU cache for repeated calculations (100 entry limit)
- Periodic garbage collection after large data operations
- Chained operations reduce intermediate DataFrame creation by 30-40%
- Lazy evaluation with Polars for better memory efficiency
- Efficient datetime parsing with cached timezone objects
- Vectorized operations in orderbook analysis
Use async built-in methods to monitor performance:
# Client performance stats (async)
async with ProjectX.from_env() as client:
await client.authenticate()
# Check performance metrics
stats = await client.get_performance_stats()
print(f"API calls: {stats['api_calls']}")
print(f"Cache hits: {stats['cache_hits']}")
# Health check
health = await client.get_health_status()
# Memory usage monitoring
orderbook_stats = await orderbook.get_memory_stats()
data_manager_stats = await data_manager.get_memory_stats()- 50-70% reduction in API calls through intelligent caching
- 30-40% faster indicator calculations via chained operations
- 60% less memory usage through sliding windows and cleanup
- Sub-second response times for cached operations
- 95% reduction in polling with real-time WebSocket feeds
max_trades = 10000(OrderBook trade history)max_depth_entries = 1000(OrderBook depth per side)max_bars_per_timeframe = 1000(Real-time data per timeframe)tick_buffer_size = 1000(Tick data buffer)cache_max_size = 100(Indicator cache entries)
- Minor updates and improvements
- Documentation enhancements
- Fixed: Deadlock when calling
suite.datamethods from event handler callbacks (Issue #39) - Improved: Event emission now non-blocking to prevent handler deadlocks
- Enhanced: Event triggering moved outside lock scope for better concurrency
- Added: Missing asyncio import in data_processing module
- Maintained: Full API compatibility - no breaking changes
- Added: Optional
start_timeandend_timeparameters toget_bars()method - Improved: Precise time range specification for historical data queries
- Enhanced: Full timezone support with automatic UTC conversion
- Maintained: Complete backward compatibility with existing
daysparameter
- Fixed: Critical WebSocket error with missing
_use_batchingattribute - Improved: Proper mixin initialization in ProjectXRealtimeClient
- Enhanced: More robust real-time connection handling
- Order Lifecycle Tracking: Fixed asyncio concurrency and field reference issues
- Order Templates: Fixed instrument lookup to use cached object
- Cleanup Functionality: Added comprehensive order/position cleanup
- Documentation: Updated all docs to reflect current version
- Performance Optimizations: Enhanced connection pooling and caching
- Event Bus System: Unified event handling across all components
- Risk Management: Integrated risk manager with position limits and monitoring
- Order Tracking: Comprehensive order lifecycle tracking and management
- Memory Management: Optimized sliding windows and automatic cleanup
- Enhanced Models: Improved data models with better type safety
- Trading Suite: Unified trading suite with all managers integrated
- Advanced Order Types: OCO, bracket orders, and position-based orders
- Real-time Integration: Seamless WebSocket data flow across all components
- Protocol-based Design: Type-safe protocols for all major interfaces
- Converted monolithic modules to multi-file packages
- All core modules organized as packages with focused submodules
- Improved code organization and maintainability
# Complete trading suite with all managers
from project_x_py import TradingSuite
async def main():
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min", "5min"],
features=["orderbook", "risk_manager"],
initial_days=5
)
# All managers are integrated and ready
# No need to call start() - already connected
# Access individual managers
order = await suite.orders.place_market_order(
contract_id=suite.instrument_info.id,
side=0, # Buy
size=1
)
position = await suite.positions.get_position("MNQ")
bars = await suite.data.get_data("1min")# Basic usage
async with ProjectX.from_env() as client:
await client.authenticate()
bars = await client.get_bars("MNQ", days=5)
# Real-time data with TradingSuite
async def stream_data():
suite = await TradingSuite.create(
"MNQ",
timeframes=["1min", "5min"]
)
# Register event handlers
from project_x_py import EventType
async def handle_bar(event):
print(f"New bar: {event.data}")
await suite.on(EventType.NEW_BAR, handle_bar)
# Data is already streaming
# Access current data
current_price = await suite.data.get_current_price()
bars = await suite.data.get_data("1min")