diff --git a/CHANGELOG.md b/CHANGELOG.md index 7268fd7..f898029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Migration guides will be provided for all breaking changes - Semantic versioning (MAJOR.MINOR.PATCH) is strictly followed +## [3.1.12] - 2025-08-15 + +### Added +- **šŸ“Š Enhanced Example**: Significantly improved `01_events_with_on.py` real-time data example + - Added CSV export functionality for bar data + - Interactive candlestick chart generation using Plotly + - Automatic prompt after 10 bars to export data and generate charts + - Non-blocking user input handling for CSV export confirmation + - Proper bar counting and display formatting + - Chart opens automatically in browser when generated + +### Improved +- Example now shows last 6 bars instead of 5 for better context +- Better formatting of price displays with proper currency formatting +- Clear visual indicators for new bar events +- More user-friendly prompts and progress indicators + +### Dependencies +- Added optional Plotly dependency for chart generation in examples +- Example gracefully handles missing Plotly installation + ## [3.1.11] - 2025-08-13 ### Fixed diff --git a/CLAUDE.md b/CLAUDE.md index 1894e26..0afcf98 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -288,7 +288,15 @@ async with ProjectX.from_env() as client: ## Recent Changes -### v3.1.11 - Latest Release +### v3.1.12 - Latest Release +- **Enhanced**: Significantly improved `01_events_with_on.py` real-time data example + - Added CSV export functionality with interactive prompts + - Plotly-based candlestick chart generation + - Non-blocking user input handling + - Better bar display formatting and visual indicators + - Automatic browser opening for generated charts + +### v3.1.11 - **Fixed**: ManagedTrade `_get_market_price()` implementation - ManagedTrade can now fetch current market prices from data manager - Automatic fallback through multiple timeframes (1sec, 15sec, 1min, 5min) diff --git a/docs/conf.py b/docs/conf.py index 2feb420..e46611b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -23,8 +23,8 @@ project = "project-x-py" copyright = "2025, Jeff West" author = "Jeff West" -release = "3.1.11" -version = "3.1.11" +release = "3.1.12" +version = "3.1.12" # -- General configuration --------------------------------------------------- diff --git a/examples/16_risk_manager_live_demo.py b/examples/16_risk_manager_live_demo.py new file mode 100644 index 0000000..bb7228b --- /dev/null +++ b/examples/16_risk_manager_live_demo.py @@ -0,0 +1,706 @@ +#!/usr/bin/env python3 +""" +Live Risk Manager Demo - Complete Feature Demonstration + +This example demonstrates ALL risk manager features with real positions: +- Position sizing based on risk parameters +- Risk validation and trade blocking +- Stop-loss and take-profit management +- Portfolio risk monitoring +- Managed trades with automatic cleanup +- Risk analytics and metrics + +WARNING: This script will open REAL positions. Use with care! +""" + +import asyncio +import logging +from typing import Any, cast + +from project_x_py import EventType, TradingSuite +from project_x_py.models import Order +from project_x_py.risk_manager import ManagedTrade, RiskConfig +from project_x_py.types import OrderSide + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +class RiskManagerDemo: + """Comprehensive demonstration of risk management features.""" + + def __init__(self) -> None: + """Initialize demo.""" + self.suite: TradingSuite | None = None + self.positions_opened: list[Any] = [] + self.orders_placed: list[Any] = [] + self.demo_trades_enabled = True # Set to False to skip actual trades + + async def setup(self) -> None: + """Set up trading suite with risk management.""" + print("\n" + "=" * 60) + print("RISK MANAGER LIVE DEMO") + print("=" * 60) + print("\nšŸ”§ Setting up trading suite with risk management...") + + # Create suite with risk manager enabled + self.suite = await TradingSuite.create( + "MNQ", + timeframes=["1min", "5min", "15min"], + features=["risk_manager", "orderbook"], + initial_days=5, + ) + + if not self.suite: + raise RuntimeError("Failed to create trading suite") + + # Configure risk parameters + if self.suite.risk_manager: + self.suite.risk_manager.config = RiskConfig( + max_position_size=5, # Max 5 contracts per position + max_positions=3, # Max 3 concurrent positions + max_risk_per_trade=0.02, # 2% per trade + max_daily_loss=0.05, # 5% daily loss limit + max_correlated_positions=3, # Max 3 correlated positions + use_kelly_criterion=True, # Use Kelly for sizing + use_trailing_stops=True, # Auto-adjust stops + trailing_stop_trigger=50.0, # Activate after $50 profit + trailing_stop_distance=25.0, # Trail by $25 + ) + print("āœ… Risk management configured") + + if self.suite.client.account_info: + print( + f"āœ… Suite created for account: {self.suite.client.account_info.name}" + ) + else: + print("āœ… Suite created") + + instrument = await self.suite.client.get_instrument("MNQ") + print(f"āœ… Instrument: {instrument.name}") + + # Register event handlers + await self._register_event_handlers() + + async def _register_event_handlers(self) -> None: + """Register handlers for risk events.""" + if not self.suite: + return + + async def on_position_opened(event: Any) -> None: + """Handle position opened.""" + pos = event.data.get("position") if hasattr(event, "data") else event + if pos: + logger.info(f"šŸ“Š Position Opened: {pos}") + + async def on_position_closed(event: Any) -> None: + """Handle position closed.""" + pos = event.data.get("position") if hasattr(event, "data") else event + if pos: + logger.info(f"šŸ“Š Position Closed: {pos}") + + await self.suite.on(EventType.POSITION_OPENED, on_position_opened) + await self.suite.on(EventType.POSITION_CLOSED, on_position_closed) + + async def demo_position_sizing(self) -> None: + """Demonstrate position sizing calculations.""" + print("\n" + "-" * 60) + print("1. POSITION SIZING DEMONSTRATION") + print("-" * 60) + + if not self.suite: + print("āŒ Trading suite not initialized") + return + + # Get current price + current_price = await self.suite.data.get_current_price() + if not current_price: + print("āŒ Could not get current price") + return + + print(f"\nšŸ“ˆ Current MNQ Price: ${current_price:,.2f}") + + # Calculate different position sizes + scenarios = [ + {"stop_loss": current_price - 50, "risk_percent": 0.01}, # 1% risk + {"stop_loss": current_price - 100, "risk_percent": 0.02}, # 2% risk + { + "stop_loss": current_price - 25, + "risk_amount": 100, + }, # $100 fixed risk (4 contracts) + ] + + for i, scenario in enumerate(scenarios, 1): + if not self.suite.risk_manager: + print("āŒ Risk manager not enabled") + return + + result = await self.suite.risk_manager.calculate_position_size( + entry_price=current_price, + stop_loss=scenario["stop_loss"], + risk_percent=scenario.get("risk_percent"), + risk_amount=scenario.get("risk_amount"), + instrument=await self.suite.client.get_instrument("MNQ"), + ) + + print(f"\nšŸ“Š Scenario {i}:") + print(f" Stop Distance: ${current_price - scenario['stop_loss']:.2f}") + if "risk_percent" in scenario: + print(f" Risk: {scenario['risk_percent'] * 100:.1f}% of account") + else: + print(f" Risk: ${scenario['risk_amount']:.2f} fixed") + print(f" Calculated Size: {result.get('position_size', 0)} contracts") + print(f" Total Risk: ${result.get('risk_amount', 0):.2f}") + print(f" Risk/Reward @ 2:1: ${result.get('risk_amount', 0) * 2:.2f}") + if ( + result.get("position_size", 0) + == self.suite.risk_manager.config.max_position_size + ): + ideal_size = ( + scenario.get("risk_amount", 0) + / (current_price - scenario["stop_loss"]) + if "risk_amount" in scenario + else None + ) + if ( + ideal_size + and ideal_size > self.suite.risk_manager.config.max_position_size + ): + print( + f" (Limited by max size {self.suite.risk_manager.config.max_position_size}, ideal would be {int(ideal_size)})" + ) + + async def demo_risk_validation(self) -> None: + """Demonstrate trade validation against risk rules.""" + print("\n" + "-" * 60) + print("2. RISK VALIDATION DEMONSTRATION") + print("-" * 60) + + if not self.suite: + print("āŒ Trading suite not initialized") + return + + current_price = await self.suite.data.get_current_price() + if not current_price: + return + + # Test various trade scenarios + test_trades = [ + {"size": 1, "side": OrderSide.BUY, "desc": "Small long (should pass)"}, + {"size": 10, "side": OrderSide.BUY, "desc": "Large long (should fail)"}, + {"size": 3, "side": OrderSide.SELL, "desc": "Medium short"}, + ] + + for trade in test_trades: + print(f"\nšŸ” Testing: {trade['desc']}") + print(f" Size: {trade['size']} contracts") + + if not self.suite.risk_manager: + print("āŒ Risk manager not enabled") + return + + # Create real Order object for validation + instrument = await self.suite.client.get_instrument("MNQ") + + # Create a proper Order instance + from datetime import datetime + + side_value = cast( + int, + trade["side"].value + if hasattr(trade["side"], "value") + else trade["side"], + ) + size_value = cast(int, trade["size"]) + + mock_order = Order( + id=0, + accountId=0, + contractId=instrument.id, + creationTimestamp=datetime.now().isoformat(), + updateTimestamp=None, + status=1, # Open + type=2, # Market + side=side_value, + size=size_value, + limitPrice=current_price if trade["side"] == OrderSide.BUY else None, + ) + + validation = await self.suite.risk_manager.validate_trade(order=mock_order) + + if validation.get("is_valid"): + # Calculate a simple risk score based on portfolio risk (0-10 scale) + portfolio_risk = validation.get("portfolio_risk", 0) + risk_score = min(10.0, portfolio_risk * 100) # Convert to 0-10 scale + print(f" āœ… APPROVED - Risk Score: {risk_score:.2f}/10") + else: + reasons = validation.get("reasons", []) + if reasons: + print(" āŒ REJECTED - Reasons:") + for reason in reasons: + print(f" - {reason}") + else: + print(" āŒ REJECTED") + for warning in validation.get("warnings", []): + print(f" āš ļø Warning: {warning}") + + async def demo_managed_trade(self) -> None: + """Demonstrate managed trade with automatic risk management.""" + print("\n" + "-" * 60) + print("3. MANAGED TRADE DEMONSTRATION") + print("-" * 60) + + if not self.demo_trades_enabled: + print("āš ļø Demo trades disabled - skipping actual position opening") + return + + if not self.suite: + print("āŒ Trading suite not initialized") + return + + # Check if we already have positions (from the real position demo) + existing_positions = await self.suite.positions.get_all_positions() + if existing_positions: + print( + f"āš ļø Skipping managed trade demo - already have {len(existing_positions)} positions" + ) + return + + current_price = await self.suite.data.get_current_price() + if not current_price: + return + + print("\nšŸŽÆ Executing managed trade with automatic risk controls...") + print(" Entry: Market Order") + print(f" Stop Loss: ${current_price - 50:.2f}") + print(f" Take Profit: ${current_price + 100:.2f}") + print(" Max Risk: 1% of account") + + if not self.suite.risk_manager: + print("āŒ Risk manager not enabled") + return + + instrument = await self.suite.client.get_instrument("MNQ") + + # Execute managed trade + async with ManagedTrade( + risk_manager=self.suite.risk_manager, + order_manager=self.suite.orders, + position_manager=self.suite.positions, + instrument_id=instrument.id, + data_manager=self.suite.data, + max_risk_percent=0.01, # 1% max risk + ) as trade: + # Enter long position with automatic sizing + try: + position = await trade.enter_long( + stop_loss=current_price - 50, + take_profit=current_price + 100, + ) + + if position: + print("\nāœ… Position opened:") + print(f" Size: {position.get('size', 'N/A')} contracts") + if trade._entry_order: + print(f" Entry Order: {trade._entry_order.id}") + if trade._stop_order: + print(f" Stop Order: {trade._stop_order.id}") + if trade._target_order: + print(f" Target Order: {trade._target_order.id}") + + self.positions_opened.append(position) + + # Monitor for a few seconds + print("\nā±ļø Monitoring position for 5 seconds...") + await asyncio.sleep(5) + + # Check if trailing stop activated + if self.suite.risk_manager.config.use_trailing_stops: + print("šŸ“ˆ Trailing stop monitoring active") + else: + print("āŒ Failed to open position") + except Exception as e: + print(f"āŒ Error in managed trade: {e}") + + async def demo_real_position(self) -> None: + """Open a real position to test risk features.""" + print("\n" + "-" * 60) + print("4. REAL POSITION DEMONSTRATION") + print("-" * 60) + + if not self.demo_trades_enabled: + print("āš ļø Demo trades disabled - skipping") + return + + if not self.suite: + print("āŒ Trading suite not initialized") + return + + current_price = await self.suite.data.get_current_price() + if not current_price: + return + + print("\nšŸ”„ Opening a real position for testing...") + + # Place a small market order + instrument = await self.suite.client.get_instrument("MNQ") + + try: + order = await self.suite.orders.place_market_order( + contract_id=instrument.id, + side=OrderSide.BUY, + size=1, + ) + + if order: + self.orders_placed.append(order) + print(f" āœ… Market order placed: {order.orderId}") + + # Wait for fill + await asyncio.sleep(3) + + # Get the position + positions = await self.suite.positions.get_all_positions() + if positions: + position = positions[0] + print(f" āœ… Position opened: {position.contractId}") + print(f" Size: {position.size} contracts") + + # Now attach risk orders + await self.demo_risk_orders_for_position(position) + else: + print(" āš ļø No position found after order") + except Exception as e: + print(f" āŒ Error placing order: {e}") + + async def demo_risk_orders_for_position(self, position: Any) -> None: + """Attach and manage risk orders for a position.""" + print("\nšŸ“Ž Attaching risk orders to position...") + + if not self.suite: + return + + current_price = await self.suite.data.get_current_price() + if not current_price or not self.suite.risk_manager: + return + + # Attach stop and target orders + try: + orders = await self.suite.risk_manager.attach_risk_orders( + position=position, + stop_loss=current_price - 30, # $30 stop + take_profit=current_price + 60, # $60 target + ) + + if orders: + print("āœ… Risk orders attached:") + # orders is the dict returned from attach_risk_orders + if "bracket_order" in orders: + bracket = orders["bracket_order"] + if bracket.stop_order_id: + print(f" - Stop Order ID: {bracket.stop_order_id}") + if bracket.target_order_id: + print(f" - Target Order ID: {bracket.target_order_id}") + # Store the bracket response + self.orders_placed.append(bracket) + + # Wait and then adjust stops + await asyncio.sleep(3) + + print("\nšŸ”„ Adjusting stops to breakeven...") + entry_price = position.averagePrice + # For long position, stop should be below entry (breakeven - $5 for safety) + new_stop_price = entry_price - 5 if position.is_long else entry_price + 5 + + # Add retry logic for stop adjustment + max_retries = 3 + for attempt in range(max_retries): + try: + adjusted = await self.suite.risk_manager.adjust_stops( + position=position, + new_stop=new_stop_price, + ) + + if adjusted: + print( + f" āœ… Stops adjusted successfully to ${new_stop_price:.2f}" + ) + break + else: + if attempt < max_retries - 1: + print( + f" āš ļø Stop adjustment failed, retrying... (attempt {attempt + 1}/{max_retries})" + ) + await asyncio.sleep(1) # Wait before retry + else: + print( + f" āŒ Failed to adjust stops after {max_retries} attempts" + ) + print( + f" ā„¹ļø Attempted to set stop to ${new_stop_price:.2f}" + ) + except Exception as stop_error: + if attempt < max_retries - 1: + print(f" āš ļø Stop adjustment error: {stop_error}, retrying...") + await asyncio.sleep(1) + else: + print(f" āŒ Stop adjustment failed with error: {stop_error}") + print( + f" ā„¹ļø Check that stop price ${new_stop_price:.2f} is valid for {position.direction} position" + ) + + except Exception as e: + print(f"āŒ Error managing risk orders: {e}") + + async def demo_portfolio_risk(self) -> None: + """Demonstrate portfolio-wide risk analysis.""" + print("\n" + "-" * 60) + print("5. PORTFOLIO RISK ANALYSIS") + print("-" * 60) + + if not self.suite: + print("āŒ Trading suite not initialized") + return + + # Get all positions + positions = await self.suite.positions.get_all_positions() + + print(f"\nšŸ“Š Current Positions: {len(positions)}") + for pos in positions: + size = pos.size + print(f" - {pos.contractId}: {size} contracts") + + if not self.suite.risk_manager: + print("āŒ Risk manager not enabled") + return + + # Calculate portfolio risk metrics + metrics = await self.suite.risk_manager.get_risk_metrics() + + print("\nšŸ“Š Portfolio Risk Metrics:") + print(f" Total Positions: {metrics.get('position_count', 0)}") + print(f" Current Risk: ${metrics.get('current_risk', 0):,.2f}") + print(f" Daily P&L: ${metrics.get('daily_loss', 0):.2f}") + print(f" Max Drawdown: ${metrics.get('max_drawdown', 0):.2f}") + print(f" Account Balance: ${metrics.get('account_balance', 0):,.2f}") + + # Calculate and display additional metrics + total_exposure = sum( + pos["risk_amount"] for pos in metrics.get("position_risks", []) + ) + risk_percentage = metrics.get("current_risk", 0) / max( + metrics.get("account_balance", 1), 1 + ) + print(f" Total Exposure: ${total_exposure:,.2f}") + print(f" Risk Percentage: {risk_percentage:.2%}") + + # Sharpe ratio is a standard field + if metrics.get("sharpe_ratio"): + print(f" Sharpe Ratio: {metrics['sharpe_ratio']:.2f}") + + # Check risk limits + print("\n🚦 Risk Limit Status:") + print( + f" Position Limit: {metrics.get('position_count', 0)}/{self.suite.risk_manager.config.max_positions}" + ) + daily_loss = cast(float, metrics.get("daily_loss", 0)) + account_balance = cast(float, metrics.get("account_balance", 1)) + daily_loss_limit = self.suite.risk_manager.config.max_daily_loss + if self.suite.risk_manager.config.max_daily_loss_amount: + daily_loss_limit_amount = ( + self.suite.risk_manager.config.max_daily_loss_amount + ) + else: + daily_loss_limit_amount = account_balance * daily_loss_limit + print( + f" Daily Loss Limit: ${abs(daily_loss):.2f}/${daily_loss_limit_amount:.2f}" + ) + + # Check for warnings (may not be in RiskAnalysisResponse) + warnings = cast(Any, metrics).get("warnings", []) + if warnings: + print("\nāš ļø Risk Warnings:") + for warning in warnings: + print(f" - {warning}") + + async def demo_trade_recording(self) -> None: + """Demonstrate trade result recording for Kelly criterion.""" + print("\n" + "-" * 60) + print("6. TRADE HISTORY & KELLY CRITERION") + print("-" * 60) + + if not self.suite or not self.suite.risk_manager: + print("āŒ Risk manager not enabled") + return + + # Record some sample trades for demonstration + sample_trades = [ + {"pnl": 150, "entry": 20000, "exit": 20150}, # Win + {"pnl": -75, "entry": 20100, "exit": 20025}, # Loss + {"pnl": 200, "entry": 20050, "exit": 20250}, # Win + {"pnl": -50, "entry": 20200, "exit": 20150}, # Loss + {"pnl": 100, "entry": 20150, "exit": 20250}, # Win + ] + + print("\nšŸ“ Recording sample trade history...") + for i, trade in enumerate(sample_trades, 1): + await self.suite.risk_manager.record_trade_result( + position_id=f"demo_trade_{i}", + pnl=trade["pnl"], + duration_seconds=300, # 5 minutes demo + ) + result = "WIN" if trade["pnl"] > 0 else "LOSS" + print(f" Trade {i}: {result} ${abs(trade['pnl']):.2f}") + + # Display Kelly statistics + print("\nšŸ“Š Kelly Criterion Statistics:") + print(f" Win Rate: {self.suite.risk_manager._win_rate:.1%}") + print(f" Avg Win: ${self.suite.risk_manager._avg_win:.2f}") + print(f" Avg Loss: ${abs(float(self.suite.risk_manager._avg_loss)):.2f}") + + if ( + self.suite.risk_manager._win_rate > 0 + and self.suite.risk_manager._avg_win > 0 + and self.suite.risk_manager._avg_loss != 0 + ): + # Calculate Kelly percentage + win_loss_ratio = float( + self.suite.risk_manager._avg_win + / abs(self.suite.risk_manager._avg_loss) + ) + kelly_pct = ( + self.suite.risk_manager._win_rate * win_loss_ratio + - (1 - self.suite.risk_manager._win_rate) + ) / win_loss_ratio + print(f" Kelly %: {kelly_pct:.1%}") + print( + f" Recommended Position Size: {max(0, min(kelly_pct, 0.25)):.1%} of capital" + ) + + async def cleanup(self) -> None: + """Clean up all positions and orders.""" + print("\n" + "-" * 60) + print("CLEANUP") + print("-" * 60) + + if not self.suite: + print("āŒ Trading suite not initialized") + return + + try: + # Cancel all open orders + if self.orders_placed: + print("\n🚫 Cancelling open orders...") + cancelled_count = 0 + for order_response in self.orders_placed: + try: + # Check if it's an OrderPlaceResponse + if hasattr(order_response, "orderId"): + # Get current order status + orders = await self.suite.orders.search_open_orders() + order = next( + (o for o in orders if o.id == order_response.orderId), + None, + ) + if order and order.is_working: + await self.suite.orders.cancel_order(order.id) + print(f" āœ… Cancelled order {order.id}") + cancelled_count += 1 + except Exception as e: + logger.debug(f"Could not cancel order: {e}") + if cancelled_count > 0: + print(f" Cancelled {cancelled_count} orders") + + # Close all positions + positions = await self.suite.positions.get_all_positions() + if positions: + print(f"\nšŸ“‰ Closing {len(positions)} positions...") + for position in positions: + try: + # Get position size + size = position.size + if size == 0: + continue + + # Place market order to flatten + close_side = OrderSide.SELL if size > 0 else OrderSide.BUY + close_order = await self.suite.orders.place_market_order( + contract_id=position.contractId, + side=close_side, + size=abs(size), + ) + if close_order: + print(f" āœ… Closed position: {position.contractId}") + await asyncio.sleep(1) + except Exception as e: + logger.error(f"Error closing position: {e}") + + # Final risk report + if self.suite.risk_manager: + final_metrics = await self.suite.risk_manager.get_risk_metrics() + print("\nšŸ“Š Final Risk Report:") + print(f" Daily P&L: ${final_metrics.get('daily_pnl', 0):.2f}") + print(f" Max Drawdown: ${final_metrics.get('max_drawdown', 0):.2f}") + print(f" Total Trades: {self.suite.risk_manager._daily_trades}") + + except Exception as e: + logger.error(f"Error during cleanup: {e}") + + finally: + if self.suite: + await self.suite.disconnect() + print("\nāœ… Disconnected from trading suite") + + +async def main() -> None: + """Run the complete risk manager demonstration.""" + demo = RiskManagerDemo() + + # Set this to False to skip actual trading + demo.demo_trades_enabled = True # Enable for full testing + + try: + # Setup + await demo.setup() + + # Run all demonstrations + await demo.demo_position_sizing() + await asyncio.sleep(2) + + await demo.demo_risk_validation() + await asyncio.sleep(2) + + # Open a real position to test features + await demo.demo_real_position() + await asyncio.sleep(2) + + # Try managed trade + await demo.demo_managed_trade() + await asyncio.sleep(2) + + # Portfolio analysis + await demo.demo_portfolio_risk() + await asyncio.sleep(2) + + # Record trades for Kelly + await demo.demo_trade_recording() + + print("\n" + "=" * 60) + print("DEMO COMPLETE") + print("=" * 60) + print("\nāš ļø Remember to check your account for any open positions!") + + except KeyboardInterrupt: + print("\nāš ļø Demo interrupted by user") + except Exception as e: + logger.error(f"Demo error: {e}", exc_info=True) + finally: + # Always cleanup + await demo.cleanup() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/realtime_data_manager/01_events_with_on.py b/examples/realtime_data_manager/01_events_with_on.py index 1f30a03..594573b 100644 --- a/examples/realtime_data_manager/01_events_with_on.py +++ b/examples/realtime_data_manager/01_events_with_on.py @@ -1,9 +1,202 @@ import asyncio import signal +from datetime import datetime +from pathlib import Path + +try: + import plotly.graph_objects as go + from plotly.subplots import make_subplots + + PLOTLY_AVAILABLE = True +except ImportError: + PLOTLY_AVAILABLE = False + print("āš ļø Plotly not installed. Charts will not be generated.") from project_x_py import TradingSuite from project_x_py.event_bus import EventType +TIMEFRAME = "15sec" + + +def create_candlestick_chart(bars_data, instrument: str, timeframe: str, filename: str): + """Create a candlestick chart from bar data using Plotly""" + if not PLOTLY_AVAILABLE: + return False + + try: + # Convert Polars DataFrame to dict for easier access + data_dict = bars_data.to_dict() + + # Create figure with secondary y-axis for volume + fig = make_subplots( + rows=2, + cols=1, + shared_xaxes=True, + vertical_spacing=0.03, + subplot_titles=(f"{instrument} - {timeframe} Candlestick Chart", "Volume"), + row_heights=[0.7, 0.3], + ) + + # Add candlestick chart with proper price formatting + fig.add_trace( + go.Candlestick( + x=data_dict["timestamp"], + open=data_dict["open"], + high=data_dict["high"], + low=data_dict["low"], + close=data_dict["close"], + name="OHLC", + increasing_line_color="green", + decreasing_line_color="red", + ), + row=1, + col=1, + ) + + # Add volume bars + colors = [ + "green" if close >= open_ else "red" + for close, open_ in zip(data_dict["close"], data_dict["open"], strict=False) + ] + + fig.add_trace( + go.Bar( + x=data_dict["timestamp"], + y=data_dict["volume"], + name="Volume", + marker_color=colors, + showlegend=False, + hovertemplate="%{x}
" + + "Volume: %{y:,}
" + + "", + ), + row=2, + col=1, + ) + + # Update layout + fig.update_layout( + title=f"{instrument} - Last {bars_data.height} {timeframe} Bars", + xaxis_title="Time", + yaxis_title="Price ($)", + template="plotly_dark", + xaxis_rangeslider_visible=False, + height=800, + showlegend=False, + ) + + # Update y-axes with proper formatting + fig.update_yaxes( + title_text="Price ($)", + row=1, + col=1, + tickformat="$,.2f", # Format y-axis ticks as currency with 2 decimals + ) + fig.update_yaxes(title_text="Volume", row=2, col=1, tickformat=",") + + # Generate HTML filename + html_filename = filename.replace(".csv", ".html") + + # Save the chart + fig.write_html(html_filename) + + print(f"šŸ“ˆ Candlestick chart saved to {html_filename}") + + # Also try to open in browser (optional) + try: + import webbrowser + + webbrowser.open(f"file://{Path(html_filename).absolute()}") + print("šŸ“Š Chart opened in browser") + except Exception: + pass # Silently fail if browser can't be opened + + return True + + except Exception as e: + print(f"āš ļø Could not create chart: {e}") + return False + + +async def export_bars_to_csv( + suite: TradingSuite, timeframe: str, bars_count: int = 100 +): + """Export the last N bars to a CSV file""" + try: + # Get the last 100 bars + bars_data = await suite.data.get_data(timeframe=timeframe, bars=bars_count) + + if bars_data is None or bars_data.is_empty(): + print("No data available to export.") + return False + + if suite.instrument is None: + print("Suite.instrument is None, skipping chart creation") + return True + + # Generate filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"bars_export_{suite.instrument.name}_{timeframe}_{timestamp}.csv" + filepath = Path(filename) + + # Write to CSV + bars_data.write_csv(filepath) + + print(f"\nāœ… Successfully exported {bars_data.height} bars to {filename}") + + if suite.instrument is None: + print("Suite.instrument is None, skipping chart creation") + return True + + # Create candlestick chart + create_candlestick_chart(bars_data, suite.instrument.name, timeframe, filename) + + return True + + except Exception as e: + print(f"āŒ Error exporting data: {e}") + return False + + +async def prompt_for_csv_export(suite, timeframe: str): + """Prompt user to export CSV in a non-blocking way""" + print("\n" + "=" * 80) + print("šŸ“Š 10 new bars have been received!") + print( + "Would you like to export the last 100 bars to CSV and generate a candlestick chart?" + ) + print("Type 'y' or 'yes' to export, or press Enter to continue monitoring...") + print("=" * 80) + + # Create a task to wait for user input without blocking + loop = asyncio.get_event_loop() + future = loop.create_future() + + def handle_input(): + try: + # Non-blocking input using asyncio + response = input().strip().lower() + future.set_result(response) + except Exception as e: + future.set_exception(e) + + # Run input in executor to avoid blocking + loop.run_in_executor(None, handle_input) + + try: + # Wait for input with a timeout + response = await asyncio.wait_for(future, timeout=10.0) + + if response in ["y", "yes"]: + await export_bars_to_csv(suite, timeframe) + return True + except TimeoutError: + print("\nNo response received. Continuing to monitor...") + except Exception as e: + print(f"\nError handling input: {e}") + + return False + async def main(): print("Creating TradingSuite...") @@ -11,7 +204,7 @@ async def main(): # "NQ" resolves to E-mini Nasdaq (ENQ) which may have different data characteristics suite = await TradingSuite.create( instrument="NQ", # Works best with MNQ for consistent real-time updates - timeframes=["15sec"], + timeframes=[TIMEFRAME], ) print("TradingSuite created!") @@ -21,6 +214,9 @@ async def main(): # Set up signal handler for clean exit shutdown_event = asyncio.Event() + # Bar counter + bar_counter = {"count": 0, "export_prompted": False} + def signal_handler(_signum, _frame): print("\n\nReceived interrupt signal. Shutting down gracefully...") shutdown_event.set() @@ -31,31 +227,33 @@ def signal_handler(_signum, _frame): # Define the event handler as an async function async def on_new_bar(event): """Handle new bar events""" - print(f"New bar event received: {event}") - print("About to call get_current_price...") + # Increment bar counter + bar_counter["count"] += 1 + + print(f"\nšŸ“Š New bar #{bar_counter['count']} received") + try: current_price = await suite.data.get_current_price() - print(f"Got current price: {current_price}") except Exception as e: print(f"Error getting current price: {e}") return - print("About to call get_data...") try: - last_bars = await suite.data.get_data(timeframe="15sec", bars=5) - print("Got data") + last_bars = await suite.data.get_data(timeframe=TIMEFRAME, bars=5) except Exception as e: print(f"Error getting data: {e}") return - print(f"\nCurrent price: {current_price}") + + print(f"Current price: ${current_price:,.2f}") print("=" * 80) if last_bars is not None and not last_bars.is_empty(): - print("Last 5 bars (oldest to newest):") + print("Last 6 bars (oldest to newest):") + print("Oldest bar is first, current bar is last") print("-" * 80) # Get the last 5 bars and iterate through them - for row in last_bars.tail(5).iter_rows(named=True): + for row in last_bars.tail(6).iter_rows(named=True): timestamp = row["timestamp"] open_price = row["open"] high = row["high"] @@ -69,12 +267,24 @@ async def on_new_bar(event): else: print("No bar data available yet") + # Check if we should prompt for CSV export + if bar_counter["count"] == 10 and not bar_counter["export_prompted"]: + bar_counter["export_prompted"] = True + # Run the prompt in a separate task to avoid blocking + asyncio.create_task(prompt_for_csv_export(suite, TIMEFRAME)) # noqa: RUF006 + + # Reset the prompt flag after 20 bars so it can prompt again + if bar_counter["count"] >= 20: + bar_counter["count"] = 0 + bar_counter["export_prompted"] = False + # Register the event handler print("About to register event handler...") await suite.on(EventType.NEW_BAR, on_new_bar) print("Event handler registered!") - print("Monitoring MNQ 15-second bars. Press CTRL+C to exit.") + print(f"\nMonitoring {suite.instrument} {TIMEFRAME} bars. Press CTRL+C to exit.") + print("šŸ“Š CSV export and chart generation will be prompted after 10 new bars.") print("Event handler registered and waiting for new bars...\n") try: diff --git a/pyproject.toml b/pyproject.toml index 90fbb48..79dfb39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "project-x-py" -version = "3.1.11" +version = "3.1.12" description = "High-performance Python SDK for futures trading with real-time WebSocket data, technical indicators, order management, and market depth analysis" readme = "README.md" license = { text = "MIT" } @@ -46,6 +46,7 @@ dependencies = [ "msgpack-python>=0.5.6", "lz4>=4.4.4", "cachetools>=6.1.0", + "plotly>=6.3.0", ] [project.optional-dependencies] diff --git a/src/project_x_py/__init__.py b/src/project_x_py/__init__.py index be2cc23..7dad095 100644 --- a/src/project_x_py/__init__.py +++ b/src/project_x_py/__init__.py @@ -95,7 +95,7 @@ from project_x_py.client.base import ProjectXBase -__version__ = "3.1.11" +__version__ = "3.1.12" __author__ = "TexasCoding" # Core client classes - renamed from Async* to standard names diff --git a/src/project_x_py/client/market_data.py b/src/project_x_py/client/market_data.py index e570a39..5f167a2 100644 --- a/src/project_x_py/client/market_data.py +++ b/src/project_x_py/client/market_data.py @@ -395,35 +395,38 @@ async def get_bars( # Calculate date range from datetime import timedelta + # Use the configured timezone (America/Chicago by default) + market_tz = pytz.timezone(self.config.timezone) + if start_time is not None or end_time is not None: # Use provided time range if start_time is not None: # Ensure timezone awareness if start_time.tzinfo is None: - start_date = pytz.UTC.localize(start_time) + start_date = market_tz.localize(start_time) else: - start_date = start_time.astimezone(pytz.UTC) + start_date = start_time.astimezone(market_tz) else: # Default to days parameter ago if only end_time provided - start_date = datetime.datetime.now(pytz.UTC) - timedelta(days=days) + start_date = datetime.datetime.now(market_tz) - timedelta(days=days) if end_time is not None: # Ensure timezone awareness if end_time.tzinfo is None: - end_date = pytz.UTC.localize(end_time) + end_date = market_tz.localize(end_time) else: - end_date = end_time.astimezone(pytz.UTC) + end_date = end_time.astimezone(market_tz) else: # Default to now if only start_time provided - end_date = datetime.datetime.now(pytz.UTC) + end_date = datetime.datetime.now(market_tz) # Calculate days for cache key (approximate) days_calc = int((end_date - start_date).total_seconds() / 86400) cache_key = f"{symbol}_{start_date.isoformat()}_{end_date.isoformat()}_{interval}_{unit}_{partial}" else: # Use days parameter - start_date = datetime.datetime.now(pytz.UTC) - timedelta(days=days) - end_date = datetime.datetime.now(pytz.UTC) + start_date = datetime.datetime.now(market_tz) - timedelta(days=days) + end_date = datetime.datetime.now(market_tz) days_calc = days cache_key = f"{symbol}_{days}_{interval}_{unit}_{partial}" @@ -456,12 +459,12 @@ async def get_bars( total_minutes = int((end_date - start_date).total_seconds() / 60) limit = int(total_minutes / interval) - # Prepare payload + # Prepare payload - convert to UTC for API payload = { "contractId": instrument.id, "live": False, - "startTime": start_date.isoformat(), - "endTime": end_date.isoformat(), + "startTime": start_date.astimezone(pytz.UTC).isoformat(), + "endTime": end_date.astimezone(pytz.UTC).isoformat(), "unit": unit, "unitNumber": interval, "limit": limit, diff --git a/src/project_x_py/indicators/__init__.py b/src/project_x_py/indicators/__init__.py index 66d16e9..03744c7 100644 --- a/src/project_x_py/indicators/__init__.py +++ b/src/project_x_py/indicators/__init__.py @@ -202,7 +202,7 @@ ) # Version info -__version__ = "3.1.11" +__version__ = "3.1.12" __author__ = "TexasCoding" diff --git a/src/project_x_py/realtime_data_manager/core.py b/src/project_x_py/realtime_data_manager/core.py index ec46b68..7391d74 100644 --- a/src/project_x_py/realtime_data_manager/core.py +++ b/src/project_x_py/realtime_data_manager/core.py @@ -557,6 +557,28 @@ async def initialize(self, initial_days: int = 1) -> bool: if bars is not None and not bars.is_empty(): self.data[tf_key] = bars + # Store the last bar time for proper sync with real-time data + last_bar_time = bars.select(pl.col("timestamp")).tail(1).item() + self.last_bar_times[tf_key] = last_bar_time + + # Check for potential gap between historical data and current time + from datetime import datetime + + current_time = datetime.now(self.timezone) + time_gap = current_time - last_bar_time + + # Warn if historical data is more than 5 minutes old + if time_gap.total_seconds() > 300: + self.logger.warning( + f"Historical data for {tf_key} ends at {last_bar_time}, " + f"{time_gap.total_seconds() / 60:.1f} minutes ago. " + "Gap will be filled when real-time data arrives.", + extra={ + "timeframe": tf_key, + "gap_minutes": time_gap.total_seconds() / 60, + }, + ) + self.logger.debug( LogMessages.DATA_RECEIVED, extra={"timeframe": tf_key, "bar_count": len(bars)}, diff --git a/src/project_x_py/risk_manager/managed_trade.py b/src/project_x_py/risk_manager/managed_trade.py index 94e9103..eca794c 100644 --- a/src/project_x_py/risk_manager/managed_trade.py +++ b/src/project_x_py/risk_manager/managed_trade.py @@ -68,21 +68,34 @@ async def __aenter__(self) -> "ManagedTrade": async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: """Exit managed trade context with cleanup.""" try: - # Cancel any unfilled orders + # Only cancel unfilled entry orders, NOT stop/target orders + # Stop and target orders should remain active to protect the position for order in self._orders: - if order.is_working: + # Only cancel working entry orders, not stop/target protective orders + if ( + order.is_working + and order != self._stop_order + and order != self._target_order + ): try: await self.orders.cancel_order(order.id) - logger.info(f"Cancelled unfilled order {order.id}") + logger.info(f"Cancelled unfilled entry order {order.id}") except Exception as e: logger.error(f"Error cancelling order {order.id}: {e}") # Log trade summary if self._entry_order: + active_stops = ( + 1 if self._stop_order and self._stop_order.is_working else 0 + ) + active_targets = ( + 1 if self._target_order and self._target_order.is_working else 0 + ) logger.info( f"Managed trade completed for {self.instrument_id}: " f"Entry: {self._entry_order.status_str}, " - f"Positions: {len(self._positions)}" + f"Positions: {len(self._positions)}, " + f"Active stops: {active_stops}, Active targets: {active_targets}" ) except Exception as e: @@ -172,10 +185,9 @@ async def enter_long( self._orders.append(self._entry_order) # Wait for fill if market order - if order_type == OrderType.MARKET: - # Market orders should fill immediately - # TODO: Add proper fill waiting logic - pass + if order_type == OrderType.MARKET and self._entry_order: + # Wait for market order to fill before proceeding + await self._wait_for_order_fill(self._entry_order, timeout_seconds=10) # Get position and attach risk orders positions = await self.positions.get_all_positions() @@ -195,12 +207,21 @@ async def enter_long( if "bracket_order" in risk_orders: bracket = risk_orders["bracket_order"] - if "stop_order" in bracket: - self._stop_order = bracket["stop_order"] + # BracketOrderResponse has stop_order_id and target_order_id + if bracket.stop_order_id: + # Get the actual order object + orders = await self.orders.search_open_orders() + self._stop_order = next( + (o for o in orders if o.id == bracket.stop_order_id), None + ) if self._stop_order: self._orders.append(self._stop_order) - if "limit_order" in bracket: - self._target_order = bracket["limit_order"] + if bracket.target_order_id: + # Get the actual order object + orders = await self.orders.search_open_orders() + self._target_order = next( + (o for o in orders if o.id == bracket.target_order_id), None + ) if self._target_order: self._orders.append(self._target_order) @@ -293,6 +314,11 @@ async def enter_short( if self._entry_order: self._orders.append(self._entry_order) + # Wait for fill if market order + if order_type == OrderType.MARKET and self._entry_order: + # Wait for market order to fill before proceeding + await self._wait_for_order_fill(self._entry_order, timeout_seconds=10) + # Get position and attach risk orders positions = await self.positions.get_all_positions() position = next( @@ -311,12 +337,21 @@ async def enter_short( if "bracket_order" in risk_orders: bracket = risk_orders["bracket_order"] - if "stop_order" in bracket: - self._stop_order = bracket["stop_order"] + # BracketOrderResponse has stop_order_id and target_order_id + if bracket.stop_order_id: + # Get the actual order object + orders = await self.orders.search_open_orders() + self._stop_order = next( + (o for o in orders if o.id == bracket.stop_order_id), None + ) if self._stop_order: self._orders.append(self._stop_order) - if "limit_order" in bracket: - self._target_order = bracket["limit_order"] + if bracket.target_order_id: + # Get the actual order object + orders = await self.orders.search_open_orders() + self._target_order = next( + (o for o in orders if o.id == bracket.target_order_id), None + ) if self._target_order: self._orders.append(self._target_order) @@ -579,3 +614,58 @@ async def _get_market_price(self) -> float: f"Unable to fetch current market price for {self.instrument_id} - no data available. " "Please ensure data manager is connected and receiving data." ) + + async def _wait_for_order_fill( + self, order: "Order", timeout_seconds: int = 10 + ) -> bool: + """Wait for an order to fill. + + Args: + order: Order to wait for + timeout_seconds: Maximum time to wait + + Returns: + True if order filled, False if timeout + """ + import asyncio + + start_time = asyncio.get_event_loop().time() + check_interval = 0.5 # Check every 500ms + + while (asyncio.get_event_loop().time() - start_time) < timeout_seconds: + try: + # Get updated order status + orders = await self.orders.search_open_orders() + updated_order = next((o for o in orders if o.id == order.id), None) + + if updated_order: + # Update our reference + if updated_order.is_filled: + logger.info(f"Order {order.id} filled successfully") + return True + elif updated_order.is_terminal and not updated_order.is_filled: + logger.warning( + f"Order {order.id} terminated without fill: {updated_order.status_str}" + ) + return False + else: + # Order not found in open orders, might be filled + # Check if position exists + positions = await self.positions.get_all_positions() + position = next( + (p for p in positions if p.contractId == self.instrument_id), + None, + ) + if position: + logger.info( + f"Order {order.id} appears to be filled (position found)" + ) + return True + + await asyncio.sleep(check_interval) + except Exception as e: + logger.error(f"Error checking order fill status: {e}") + await asyncio.sleep(check_interval) + + logger.warning(f"Timeout waiting for order {order.id} to fill") + return False diff --git a/uv.lock b/uv.lock index 8c41a88..abcdb34 100644 --- a/uv.lock +++ b/uv.lock @@ -754,6 +754,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5f/df/76d0321c3797b54b60fef9ec3bd6f4cfd124b9e422182156a1dd418722cf/myst_parser-4.0.1-py3-none-any.whl", hash = "sha256:9134e88959ec3b5780aedf8a99680ea242869d012e8821db3126d427edc9c95d", size = 84579 }, ] +[[package]] +name = "narwhals" +version = "2.1.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/88/2b/d1206ec57d44efd07a08c75ecbeeef05e6b0457dc6777b6bd2d1e79713cd/narwhals-2.1.1.tar.gz", hash = "sha256:308ec9d0e40616b66b61cd76ede4083a4232ae04942a3acef7e514d49641cb77", size = 529925 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5b/9d/4e9b4b3de8e5b809a41eaa22e1516ea65636ca78eb342607434155ffe0ed/narwhals-2.1.1-py3-none-any.whl", hash = "sha256:dee7d7582d456ef325cb831a65b80783041ef841bbf183180ec445d132a154c6", size = 389471 }, +] + [[package]] name = "nodeenv" version = "1.9.1" @@ -902,6 +911,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fe/39/979e8e21520d4e47a0bbe349e2713c0aac6f3d853d0e5b34d76206c439aa/platformdirs-4.3.8-py3-none-any.whl", hash = "sha256:ff7059bb7eb1179e2685604f4aaf157cfd9535242bd23742eadc3c13542139b4", size = 18567 }, ] +[[package]] +name = "plotly" +version = "6.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "narwhals" }, + { name = "packaging" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a0/64/850de5076f4436410e1ce4f6a69f4313ef6215dfea155f3f6559335cad29/plotly-6.3.0.tar.gz", hash = "sha256:8840a184d18ccae0f9189c2b9a2943923fd5cae7717b723f36eef78f444e5a73", size = 6923926 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/95/a9/12e2dc726ba1ba775a2c6922d5d5b4488ad60bdab0888c337c194c8e6de8/plotly-6.3.0-py3-none-any.whl", hash = "sha256:7ad806edce9d3cdd882eaebaf97c0c9e252043ed1ed3d382c3e3520ec07806d4", size = 9791257 }, +] + [[package]] name = "pluggy" version = "1.6.0" @@ -952,6 +974,7 @@ dependencies = [ { name = "msgpack-python" }, { name = "numpy" }, { name = "orjson" }, + { name = "plotly" }, { name = "polars" }, { name = "pydantic" }, { name = "pytz" }, @@ -1049,6 +1072,7 @@ requires-dist = [ { name = "myst-parser", marker = "extra == 'docs'", specifier = ">=1.0.0" }, { name = "numpy", specifier = ">=2.3.2" }, { name = "orjson", specifier = ">=3.11.1" }, + { name = "plotly", specifier = ">=6.3.0" }, { name = "polars", specifier = ">=1.31.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.0.0" }, { name = "project-x-py", extras = ["realtime", "dev", "test", "docs"], marker = "extra == 'all'" },