diff --git a/examples/15_order_lifecycle_tracking.py b/examples/15_order_lifecycle_tracking.py index 11ab4fa..47263d1 100644 --- a/examples/15_order_lifecycle_tracking.py +++ b/examples/15_order_lifecycle_tracking.py @@ -1,384 +1,638 @@ #!/usr/bin/env python3 """ -Example: Order Lifecycle Tracking with OrderTracker v3.0.0 +Example: Order Lifecycle Tracking with EventBus v4.0.0 -This example demonstrates the new OrderTracker functionality that provides -comprehensive order lifecycle management with automatic state tracking and -async waiting capabilities. +This example demonstrates modern order lifecycle management using the EventBus +system and OrderManager bracket orders, replacing the deprecated OrderTracker +and OrderChainBuilder from v3.x. Key features shown: -- OrderTracker context manager for automatic cleanup -- Async waiting for order fills and status changes -- Order modification and cancellation helpers -- Order chain builder for complex orders -- Common order templates - -Author: SDK v3.0.2 Examples +- EventBus-based order event tracking with suite.on(EventType.ORDER_FILLED, callback) +- Async order state monitoring with timeouts +- OrderManager.place_bracket_order() for complex orders (replaces OrderChainBuilder) +- Manual order modification and cancellation +- Event-driven order lifecycle management +- Multiple order tracking patterns + +Migration from v3.x: +- OrderTracker → Custom OrderTracker class using EventBus +- OrderChainBuilder → OrderManager.place_bracket_order() +- suite.track_order() → Custom event tracking with suite.on() +- suite.order_chain() → Direct OrderManager methods + +Note: This example may show "Outside of trading hours" errors when markets are closed. +This is expected behavior and demonstrates proper error handling. + +Author: SDK v4.0.0 Examples """ import asyncio -from typing import Any +from decimal import Decimal +from typing import Any, Dict, List, Optional + +from project_x_py import EventType, OrderSide, ProjectXOrderError, TradingSuite + + +class OrderTracker: + """Modern order tracker using EventBus for lifecycle monitoring.""" + + def __init__(self, suite: TradingSuite): + self.suite = suite + self.tracked_orders: Dict[int, Dict[str, Any]] = {} + self.order_events: Dict[int, List[Any]] = {} + self.event_handlers: List[tuple] = [] + + async def __aenter__(self) -> "OrderTracker": + """Enter context and register event handlers.""" + # Register for all order-related events + await self.suite.on(EventType.ORDER_PLACED, self._handle_order_event) + await self.suite.on(EventType.ORDER_FILLED, self._handle_order_event) + await self.suite.on(EventType.ORDER_PARTIAL_FILL, self._handle_order_event) + await self.suite.on(EventType.ORDER_CANCELLED, self._handle_order_event) + await self.suite.on(EventType.ORDER_MODIFIED, self._handle_order_event) + await self.suite.on(EventType.ORDER_REJECTED, self._handle_order_event) + + self.event_handlers = [ + (EventType.ORDER_PLACED, self._handle_order_event), + (EventType.ORDER_FILLED, self._handle_order_event), + (EventType.ORDER_PARTIAL_FILL, self._handle_order_event), + (EventType.ORDER_CANCELLED, self._handle_order_event), + (EventType.ORDER_MODIFIED, self._handle_order_event), + (EventType.ORDER_REJECTED, self._handle_order_event), + ] -from project_x_py import EventType, OrderLifecycleError, TradingSuite, get_template + return self + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Exit context and unregister event handlers.""" + for event_type, handler in self.event_handlers: + await self.suite.off(event_type, handler) -async def demonstrate_order_tracker() -> None: - """Show basic OrderTracker functionality.""" + async def _handle_order_event(self, event: Any) -> None: + """Handle incoming order events.""" + order_data = event.data + order_id = order_data.get("order_id") or order_data.get("id") - async with await TradingSuite.create("MNQ") as suite: - print("=== OrderTracker Demo ===\n") + if order_id and order_id in self.tracked_orders: + # Update order status + self.tracked_orders[order_id]["last_event"] = event + self.tracked_orders[order_id]["status"] = order_data.get( + "status", "UNKNOWN" + ) - # Get current price - price = await suite.data.get_latest_price() - if price is None: - print("No price data available") - return + # Store event history + if order_id not in self.order_events: + self.order_events[order_id] = [] + self.order_events[order_id].append(event) + + def track(self, order: Any) -> None: + """Start tracking an order.""" + order_id = order.orderId if hasattr(order, "orderId") else order.id + self.tracked_orders[order_id] = { + "order": order, + "status": "PENDING", + "last_event": None, + "created_at": asyncio.get_event_loop().time(), + } + + async def wait_for_fill( + self, order_id: Optional[int] = None, timeout: float = 30 + ) -> Any: + """Wait for an order to fill.""" + target_order_id = ( + order_id or list(self.tracked_orders.keys())[0] + if self.tracked_orders + else None + ) - print(f"Current price: ${price:,.2f}") - print(f"Using contract: {suite.instrument_id}\n") + if not target_order_id: + raise ProjectXOrderError("No order being tracked") - # 1. Basic order tracking with automatic fill detection - print("1. Basic Order Tracking:") - async with suite.track_order() as tracker: - # Place a limit order below market - assert suite.instrument_id is not None - order = await suite.orders.place_limit_order( - contract_id=suite.instrument_id, - side=0, # BUY - size=1, - limit_price=price - 50, # 50 points below market - ) + start_time = asyncio.get_event_loop().time() - if not order.success: - print(f"Order failed: {order.errorMessage}") - return + while asyncio.get_event_loop().time() - start_time < timeout: + if target_order_id in self.tracked_orders: + last_event = self.tracked_orders[target_order_id]["last_event"] + if last_event and last_event.event_type == EventType.ORDER_FILLED: + return last_event.data - # Track the order - tracker.track(order) - print(f"Placed BUY limit order at ${price - 50:,.2f}") - print(f"Order ID: {order.orderId}") + await asyncio.sleep(0.1) - # Wait for fill or timeout - try: - print("Waiting for fill (10s timeout)...") - filled_order = await tracker.wait_for_fill(timeout=10) - print(f"✅ Order filled at ${filled_order.filledPrice:,.2f}!") + raise TimeoutError(f"Order {target_order_id} not filled within {timeout}s") - except TimeoutError: - print("⏱️ Order not filled in 10 seconds") + async def wait_for_status( + self, target_status: int, order_id: Optional[int] = None, timeout: float = 30 + ) -> Any: + """Wait for an order to reach a specific status.""" + target_order_id = ( + order_id or list(self.tracked_orders.keys())[0] + if self.tracked_orders + else None + ) - # Try to improve the price - print("Modifying order price...") - success = await tracker.modify_or_cancel(new_price=price - 25) + if not target_order_id: + raise ProjectXOrderError("No order being tracked") - if success: - print(f"✅ Order modified to ${price - 25:,.2f}") - else: - print("❌ Order cancelled") + start_time = asyncio.get_event_loop().time() - except OrderLifecycleError as e: - print(f"❌ Order error: {e}") + while asyncio.get_event_loop().time() - start_time < timeout: + if target_order_id in self.tracked_orders: + status = self.tracked_orders[target_order_id].get("status") + if status == target_status: + return self.tracked_orders[target_order_id]["last_event"] - print("\n" + "-" * 50 + "\n") + await asyncio.sleep(0.1) - # 2. Wait for specific status - print("2. Waiting for Specific Status:") - async with suite.track_order() as tracker: - # Place a marketable limit order - assert suite.instrument_id is not None - order = await suite.orders.place_limit_order( - contract_id=suite.instrument_id, - side=1, # SELL - size=1, - limit_price=price + 10, # Slightly above market for quick fill - ) + raise TimeoutError( + f"Order {target_order_id} did not reach status {target_status} within {timeout}s" + ) - if order.success: - tracker.track(order) - print(f"Placed SELL limit order at ${price + 10:,.2f}") + async def modify_or_cancel( + self, order_id: Optional[int] = None, new_price: Optional[Decimal] = None + ) -> bool: + """Modify order price or cancel if modification fails.""" + target_order_id = ( + order_id or list(self.tracked_orders.keys())[0] + if self.tracked_orders + else None + ) - try: - # Wait for any terminal status - print("Waiting for order completion...") - _completed = await tracker.wait_for_status(2, timeout=5) # FILLED - print("✅ Order reached FILLED status") + if not target_order_id: + return False - except TimeoutError: - print("⏱️ Order still pending") + if new_price: + try: + # Attempt to modify the order + success = await self.suite.orders.modify_order( + target_order_id, limit_price=new_price + ) + return success + except Exception: + # If modification fails, try to cancel + pass + + # Cancel the order + try: + return await self.suite.orders.cancel_order(target_order_id) + except Exception: + return False + + async def get_current_status( + self, order_id: Optional[int] = None + ) -> Optional[Dict[str, Any]]: + """Get current order status.""" + target_order_id = ( + order_id or list(self.tracked_orders.keys())[0] + if self.tracked_orders + else None + ) + + if target_order_id and target_order_id in self.tracked_orders: + return self.tracked_orders[target_order_id] - # Check current status - current = await tracker.get_current_status() - if current: - print(f"Current status: {current.status_str}") + return None -async def demonstrate_order_chain() -> None: - """Show OrderChainBuilder functionality.""" +async def demonstrate_basic_order_tracking() -> None: + """Show basic order tracking with EventBus.""" async with await TradingSuite.create("MNQ") as suite: - print("\n=== Order Chain Builder Demo ===\n") + print("=== Basic Order Tracking with EventBus ===\n") - # 1. Market order with bracket - print("1. Market Order with Stops and Targets:") + # Get current price + price = await suite.data.get_latest_price() + if price is None: + print("No price data available") + return - order_chain = ( - suite.order_chain() - .market_order(size=1, side=0) # BUY - .with_stop_loss(offset=20) # 20 points stop - .with_take_profit(offset=40) # 40 points target - ) + print(f"Current price: ${price:,.2f}") + print(f"Using contract: {suite.instrument_id}\n") - print("Executing bracket order...") - result = await order_chain.execute() + # Create order tracker + async with OrderTracker(suite) as tracker: + try: + # Place a limit order below market + assert suite.instrument_id is not None + order = await suite.orders.place_limit_order( + contract_id=suite.instrument_id, + side=0, # BUY + size=1, + limit_price=price - 50, # 50 points below market + ) + + if not order.success: + print(f"Order failed: {order.errorMessage}") + print( + "Note: This is likely due to market being closed or invalid parameters" + ) + return + + # Track the order + tracker.track(order) + print(f"Placed BUY limit order at ${price - 50:,.2f}") + print(f"Order ID: {order.orderId}") - if result.success: - print("✅ Bracket order placed successfully:") - print(f" Entry: Market order (ID: {result.entry_order_id})") - print( - f" Stop: ${result.stop_loss_price:,.2f} (ID: {result.stop_order_id})" - ) - print( - f" Target: ${result.take_profit_price:,.2f} (ID: {result.target_order_id})" - ) - else: - print(f"❌ Bracket order failed: {result.error_message}") + # Wait for fill or timeout + try: + print("Waiting for fill (10s timeout)...") + filled_order = await tracker.wait_for_fill(timeout=10) + print( + f"✅ Order filled at ${filled_order.get('filledPrice', 'N/A'):,.2f}!" + ) - print("\n" + "-" * 50 + "\n") + except TimeoutError: + print("⏱️ Order not filled in 10 seconds") - # 2. Limit order with dynamic stops - print("2. Limit Order with Price-Based Stops:") + # Try to improve the price + print("Modifying order price...") + success = await tracker.modify_or_cancel( + new_price=Decimal(str(price - 25)) + ) - current_price = await suite.data.get_latest_price() - if current_price is not None: - order_chain = ( - suite.order_chain() - .limit_order(size=1, price=current_price - 10, side=0) - .with_stop_loss(price=current_price - 30) - .with_take_profit(price=current_price + 20) - ) + if success: + print(f"✅ Order modified to ${price - 25:,.2f}") + else: + print("❌ Order cancelled") - print("Building order:") - print(f" Entry: Limit BUY at ${current_price - 10:,.2f}") - print(f" Stop: ${current_price - 30:,.2f}") - print(f" Target: ${current_price + 20:,.2f}") + except ProjectXOrderError as e: + print(f"❌ Order error: {e}") - result = await order_chain.execute() - print(f"Result: {'✅ Success' if result.success else '❌ Failed'}") + except ProjectXOrderError as e: + print(f"❌ Order placement failed: {e}") + print( + "This is expected when markets are closed. The example shows proper error handling." + ) -async def demonstrate_order_templates() -> None: - """Show pre-configured order templates.""" +async def demonstrate_bracket_orders() -> None: + """Show modern bracket order functionality.""" async with await TradingSuite.create("MNQ") as suite: - print("\n=== Order Templates Demo ===\n") + print("\n=== Modern Bracket Orders ===\n") - # 1. Risk/Reward Template - print("1. Risk/Reward Template (2:1):") + # Get current price + price = await suite.data.get_latest_price() + if price is None: + print("No price data available") + return - template = get_template("standard_rr") + print(f"Current price: ${price:,.2f}") try: - # Risk $100 with 2:1 risk/reward - result = await template.create_order( - suite, + # 1. Market bracket order + print("\n1. Market Order with Bracket:") + assert suite.instrument_id is not None + + # For market bracket order, use current price as entry_price + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, side=0, # BUY - risk_amount=100, + size=1, + entry_price=price, # Market entry + stop_loss_price=price - 20, # 20 points stop + take_profit_price=price + 40, # 40 points target + entry_type="market", # Market entry order ) if result.success: - print("✅ 2:1 R/R order placed:") - print(f" Entry: ${result.entry_price:,.2f}") - print(f" Stop: ${result.stop_loss_price:,.2f}") - print(f" Target: ${result.take_profit_price:,.2f}") - - # Calculate actual R/R - risk = abs(result.entry_price - result.stop_loss_price) - reward = abs(result.take_profit_price - result.entry_price) - print(f" Actual R/R: {reward / risk:.2f}:1") + print("✅ Bracket order placed successfully:") + print(f" Entry: Market order (ID: {result.entry_order_id})") + print(" Stop and target orders will be placed after fill") else: - print(f"❌ Order failed: {result.error_message}") - - except Exception as e: - print(f"❌ Template error: {e}") + print(f"❌ Bracket order failed: {result.error_message}") + print("Note: This is expected when markets are closed") - print("\n" + "-" * 50 + "\n") + await asyncio.sleep(2) # Allow processing time - # 2. ATR-based Template - print("2. ATR-Based Stop Template:") + print("\n" + "-" * 50) - # Make sure we have enough data for ATR - await asyncio.sleep(2) + # 2. Limit bracket order with specific prices + print("\n2. Limit Order with Specific Stop/Target Prices:") - atr_template = get_template("standard_atr") + entry_price = price - 10 + stop_price = price - 30 + target_price = price + 20 - try: - result = await atr_template.create_order( - suite, - side=1, # SELL + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=0, # BUY size=1, + entry_price=entry_price, # Limit entry + stop_loss_price=stop_price, + take_profit_price=target_price, + entry_type="limit", # Limit entry order ) - if result.success: - print("✅ ATR-based order placed:") - print(" Stop distance based on 2x ATR") - print(" Target distance based on 3x ATR") - print(f" Entry: ${result.entry_price:,.2f}") - print(f" Stop: ${result.stop_loss_price:,.2f}") - print(f" Target: ${result.take_profit_price:,.2f}") - else: - print(f"❌ Order failed: {result.error_message}") - - except Exception as e: - print(f"❌ Template error: {e}") + print(f"Entry: Limit BUY at ${entry_price:,.2f}") + print(f"Stop: ${stop_price:,.2f}") + print(f"Target: ${target_price:,.2f}") + print(f"Result: {'✅ Success' if result.success else '❌ Failed'}") - print("\n" + "-" * 50 + "\n") + if not result.success: + print(f"Error: {result.error_message}") + print("Note: This is expected when markets are closed") - # 3. Scalping Template - print("3. Scalping Template:") + except ProjectXOrderError as e: + print(f"❌ Bracket order placement failed: {e}") + print( + "This demonstrates the EventBus pattern even when orders fail due to market hours." + ) - scalp_template = get_template("normal_scalp") - try: - result = await scalp_template.create_order( - suite, - side=0, # BUY - size=2, - check_spread=False, # Skip spread check for demo - ) +async def demonstrate_event_monitoring() -> None: + """Show comprehensive event monitoring.""" - if result.success: - print("✅ Scalp order placed:") - print(" 4 tick stop, 8 tick target") - print(" Entry: Market order") - print(f" Stop: ${result.stop_loss_price:,.2f}") - print(f" Target: ${result.take_profit_price:,.2f}") - else: - print(f"❌ Order failed: {result.error_message}") + async with await TradingSuite.create("MNQ") as suite: + print("\n=== Event-Driven Order Monitoring ===\n") - except Exception as e: - print(f"❌ Template error: {e}") + # Track all order events + events_received: List[Any] = [] + order_states: Dict[int, str] = {} + async def on_order_placed(event: Any) -> None: + events_received.append(event) + order_id = event.data.get("order_id", event.data.get("id")) + order_states[order_id] = "PLACED" + print(f"📨 ORDER_PLACED - Order {order_id}") -async def demonstrate_advanced_tracking() -> None: - """Show advanced order tracking scenarios.""" + async def on_order_filled(event: Any) -> None: + events_received.append(event) + order_id = event.data.get("order_id", event.data.get("id")) + order_states[order_id] = "FILLED" + fill_price = event.data.get("filledPrice", "N/A") + print(f"✅ ORDER_FILLED - Order {order_id} at ${fill_price}") - async with await TradingSuite.create("MNQ") as suite: - print("\n=== Advanced Order Tracking ===\n") + async def on_order_cancelled(event: Any) -> None: + events_received.append(event) + order_id = event.data.get("order_id", event.data.get("id")) + order_states[order_id] = "CANCELLED" + print(f"❌ ORDER_CANCELLED - Order {order_id}") - # Track multiple orders - print("1. Tracking Multiple Orders:") + async def on_order_modified(event: Any) -> None: + events_received.append(event) + order_id = event.data.get("order_id", event.data.get("id")) + print(f"📝 ORDER_MODIFIED - Order {order_id}") - trackers: list[Any] = [] - order_ids: list[int] = [] + # Register event handlers + await suite.on(EventType.ORDER_PLACED, on_order_placed) + await suite.on(EventType.ORDER_FILLED, on_order_filled) + await suite.on(EventType.ORDER_CANCELLED, on_order_cancelled) + await suite.on(EventType.ORDER_MODIFIED, on_order_modified) - current_price = await suite.data.get_latest_price() - if current_price is None: - return + try: + # Get current price + price = await suite.data.get_latest_price() + if price is None: + return - # Place multiple orders - for i in range(3): - tracker = suite.track_order() + print(f"Current price: ${price:,.2f}") + # Place order far from market assert suite.instrument_id is not None - order = await suite.orders.place_limit_order( - contract_id=suite.instrument_id, - side=0, # BUY - size=1, - limit_price=current_price - (10 * (i + 1)), # Staggered prices - ) + try: + order = await suite.orders.place_limit_order( + contract_id=suite.instrument_id, + side=1, # SELL + size=1, + limit_price=price + 100, # Far from market + ) + except ProjectXOrderError as e: + print(f"❌ Order placement failed: {e}") + print( + "This is expected when markets are closed. Event monitoring still demonstrated." + ) + return if order.success: - await tracker.__aenter__() # Enter context manually - tracker.track(order) - trackers.append(tracker) - order_ids.append(order.orderId) - print(f"Order {i + 1}: BUY at ${current_price - (10 * (i + 1)):,.2f}") + print(f"Placed SELL limit order at ${price + 100:,.2f}") + + # Wait for placed event + await asyncio.sleep(1) - print(f"\nTracking {len(trackers)} orders...") + # Modify the order + print("Modifying order price...") + await suite.orders.modify_order(order.orderId, limit_price=price + 50) - # Wait for any to fill - print("Waiting for first fill...") + await asyncio.sleep(1) - # Create tasks instead of coroutines - fill_tasks = [ - asyncio.create_task(tracker.wait_for_fill(timeout=5)) - for tracker in trackers - ] + # Cancel the order + print("Cancelling order...") + await suite.orders.cancel_order(order.orderId) + + await asyncio.sleep(1) + + print("\nEvent Summary:") + print(f"Total events received: {len(events_received)}") + for order_id, state in order_states.items(): + print(f"Order {order_id}: {state}") + + finally: + # Unregister event handlers + await suite.off(EventType.ORDER_PLACED, on_order_placed) + await suite.off(EventType.ORDER_FILLED, on_order_filled) + await suite.off(EventType.ORDER_CANCELLED, on_order_cancelled) + await suite.off(EventType.ORDER_MODIFIED, on_order_modified) + + +async def demonstrate_multiple_order_tracking() -> None: + """Show tracking multiple orders simultaneously.""" + + async with await TradingSuite.create("MNQ") as suite: + print("\n=== Multiple Order Tracking ===\n") + + # Get current price + price = await suite.data.get_latest_price() + if price is None: + return + + print(f"Current price: ${price:,.2f}") + + # Create multiple orders with event tracking + order_trackers: Dict[int, Dict[str, Any]] = {} + + async def track_order_event(event: Any) -> None: + """Track events for all our orders.""" + order_data = event.data + order_id = order_data.get("order_id", order_data.get("id")) + + if order_id in order_trackers: + order_trackers[order_id]["events"].append(event) + order_trackers[order_id]["last_status"] = event.event_type.name + print(f"📨 {event.event_type.name} - Order {order_id}") + + # Register event handler + await suite.on(EventType.ORDER_PLACED, track_order_event) + await suite.on(EventType.ORDER_FILLED, track_order_event) + await suite.on(EventType.ORDER_CANCELLED, track_order_event) try: - # Wait for first fill - done, pending = await asyncio.wait( - fill_tasks, return_when=asyncio.FIRST_COMPLETED - ) + # Place multiple staggered orders + orders_placed = [] - if done: - filled = done.pop() + for i in range(3): + assert suite.instrument_id is not None try: - result = await filled - print(f"✅ First order filled: {result.id}") - except Exception as e: - print(f"No fills: {e}") + order = await suite.orders.place_limit_order( + contract_id=suite.instrument_id, + side=0, # BUY + size=1, + limit_price=price - (10 * (i + 1)), # Staggered prices + ) + except ProjectXOrderError: + print(f"Order {i + 1}: Failed (market closed)") + continue + + if order.success: + orders_placed.append(order) + order_trackers[order.orderId] = { + "order": order, + "events": [], + "last_status": "PENDING", + "price": price - (10 * (i + 1)), + } + print( + f"Order {i + 1}: BUY at ${price - (10 * (i + 1)):,.2f} (ID: {order.orderId})" + ) + + print(f"\nTracking {len(orders_placed)} orders...") + await asyncio.sleep(2) # Allow events to process + + # Check if any filled (unlikely with prices far from market) + filled_orders = [ + order_id + for order_id, data in order_trackers.items() + if data["last_status"] == "ORDER_FILLED" + ] + + if filled_orders: + print(f"✅ {len(filled_orders)} orders filled!") + else: + print("⏱️ No orders filled (as expected with off-market prices)") + + # Cancel remaining orders + print("Cancelling remaining orders...") + for order in orders_placed: + if order.orderId in order_trackers: + await suite.orders.cancel_order(order.orderId) - # Cancel remaining - for task in pending: - task.cancel() + await asyncio.sleep(1) # Allow cancel events - except TimeoutError: - print("⏱️ No orders filled") + # Summary + print("\nTracking Summary:") + for order_id, data in order_trackers.items(): + print( + f"Order {order_id}: {len(data['events'])} events, status: {data['last_status']}" + ) finally: - # Cleanup trackers - for tracker in trackers: - await tracker.__aexit__(None, None, None) + # Cleanup event handlers + await suite.off(EventType.ORDER_PLACED, track_order_event) + await suite.off(EventType.ORDER_FILLED, track_order_event) + await suite.off(EventType.ORDER_CANCELLED, track_order_event) - print("\n" + "-" * 50 + "\n") - # 2. Complex order with event monitoring - print("2. Order with Event Monitoring:") +async def demonstrate_advanced_bracket_patterns() -> None: + """Show advanced bracket order patterns.""" - # Register for order events - events_received = [] + async with await TradingSuite.create("MNQ") as suite: + print("\n=== Advanced Bracket Patterns ===\n") - async def on_order_event(event: Any) -> None: - events_received.append(event) - print( - f"📨 Event: {event.event_type.name} - Order {event.data.get('order_id')}" - ) + # Get current price + price = await suite.data.get_latest_price() + if price is None: + return - await suite.on(EventType.ORDER_PLACED, on_order_event) - await suite.on(EventType.ORDER_FILLED, on_order_event) - await suite.on(EventType.ORDER_CANCELLED, on_order_event) + print(f"Current price: ${price:,.2f}") - # Place and track order - async with suite.track_order() as tracker: - assert suite.instrument_id is not None - order = await suite.orders.place_limit_order( + # 1. Risk-based bracket (risking $100) + print("\n1. Risk-Based Bracket Order:") + + # Calculate position size for $100 risk with 20-point stop + tick_value = 5.0 # MNQ tick value + risk_points = 20 + risk_amount = 100.0 + position_size = int(risk_amount / (risk_points * tick_value)) + + assert suite.instrument_id is not None + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=0, # BUY + size=position_size, + entry_price=price - 5, # 5 points below market + stop_loss_price=price - 5 - risk_points, # 20 points stop + take_profit_price=price + - 5 + + (risk_points * 2), # 40 points target (2:1 R/R) + entry_type="limit", # Limit entry order + ) + + print(f"Risk Amount: ${risk_amount}") + print(f"Position Size: {position_size} contracts") + print(f"Entry: ${price - 5:,.2f}") + print(f"Stop: ${price - 5 - risk_points:,.2f}") + print(f"Target: ${price - 5 + (risk_points * 2):,.2f}") + print("Risk/Reward: 1:2") + print(f"Result: {'✅ Success' if result.success else '❌ Failed'}") + + if not result.success: + print(f"Error: {result.error_message}") + + await asyncio.sleep(1) + + print("\n" + "-" * 50) + + # 2. Trailing stop simulation (manual approach) + print("\n2. Manual Trailing Stop Pattern:") + + # Place initial position + try: + entry_order = await suite.orders.place_market_order( contract_id=suite.instrument_id, - side=1, # SELL + side=0, # BUY size=1, - limit_price=current_price + 100, # Far from market ) + except ProjectXOrderError as e: + print(f"❌ Entry order failed: {e}") + print( + "This is expected when markets are closed. Example shows trailing stop concept." + ) + return - if order.success: - tracker.track(order) - print(f"Placed order at ${current_price + 100:,.2f}") + if entry_order.success: + print(f"✅ Market order placed (ID: {entry_order.orderId})") - # Give events time to arrive - await asyncio.sleep(1) + # Simulate trailing stop logic + initial_stop = price - 20 + trail_amount = 10 - # Cancel the order - print("Cancelling order...") - await suite.orders.cancel_order(order.orderId) + print(f"Initial stop: ${initial_stop:,.2f}") + print(f"Trail amount: {trail_amount} points") + print("(In real trading, this would monitor price and adjust stop)") - # Wait a bit for cancel event - await asyncio.sleep(1) + # Place initial stop + try: + stop_order = await suite.orders.place_stop_order( + contract_id=suite.instrument_id, + side=1, # SELL + size=1, + stop_price=initial_stop, + ) + + if stop_order.success: + print(f"✅ Initial stop placed at ${initial_stop:,.2f}") + else: + print(f"❌ Stop order failed: {stop_order.errorMessage}") + except ProjectXOrderError as e: + print(f"❌ Stop order failed: {e}") + print("This is expected when markets are closed") - print(f"\nReceived {len(events_received)} events") + # In a real implementation, you would: + # 1. Monitor price updates via WebSocket + # 2. Calculate new stop level when price moves favorably + # 3. Modify stop order when trail threshold is hit + # 4. Handle fill events and cleanup - # Cleanup event handlers - await suite.off(EventType.ORDER_PLACED, on_order_event) - await suite.off(EventType.ORDER_FILLED, on_order_event) - await suite.off(EventType.ORDER_CANCELLED, on_order_event) + print("(Trailing logic would run here in production)") async def cleanup_demo_orders_and_positions() -> None: @@ -392,101 +646,113 @@ async def cleanup_demo_orders_and_positions() -> None: # 1. Cancel all open orders print("1. Checking for open orders...") - open_orders = await suite.orders.search_open_orders() + try: + open_orders = await suite.orders.search_open_orders() - if open_orders: - print(f" Found {len(open_orders)} open orders to cancel:") - for order in open_orders: - try: - success = await suite.orders.cancel_order(order.id) - if success: - # Get order type and side names safely - order_type = ( - "LIMIT" - if order.type == 1 - else "MARKET" - if order.type == 2 - else "STOP" - if order.type == 4 - else str(order.type) - ) - side = ( - "BUY" - if order.side == 0 - else "SELL" - if order.side == 1 - else str(order.side) - ) - print(f" ✅ Cancelled order {order.id} ({order_type} {side})") - else: - print(f" ⚠️ Failed to cancel order {order.id}") - except Exception as e: - print(f" ⚠️ Error cancelling order {order.id}: {e}") - else: - print(" No open orders found") + if open_orders: + print(f" Found {len(open_orders)} open orders to cancel:") + for order in open_orders: + try: + success = await suite.orders.cancel_order(order.id) + if success: + # Get order type and side names safely + order_type = ( + "LIMIT" + if order.type == 1 + else "MARKET" + if order.type == 2 + else "STOP" + if order.type == 4 + else str(order.type) + ) + side = ( + "BUY" + if order.side == 0 + else "SELL" + if order.side == 1 + else str(order.side) + ) + print( + f" ✅ Cancelled order {order.id} ({order_type} {side})" + ) + else: + print(f" ⚠️ Failed to cancel order {order.id}") + except Exception as e: + print(f" ⚠️ Error cancelling order {order.id}: {e}") + else: + print(" No open orders found") + except Exception as e: + print(f" ⚠️ Error retrieving open orders: {e}") print() # 2. Close all open positions print("2. Checking for open positions...") - positions = await suite.positions.get_all_positions() - - if positions: - print(f" Found {len(positions)} open positions to close:") - for position in positions: - if position.size != 0: - try: - # Place a market order to close the position - # Position type: 1=LONG, 2=SHORT - side = ( - 1 if position.type == 1 else 0 - ) # SELL if long, BUY if short - size = position.size # size is always positive - - result = await suite.orders.place_market_order( - contract_id=position.contractId, side=side, size=size - ) - - if result.success: - position_type = ( - "LONG" - if position.type == 1 - else "SHORT" - if position.type == 2 - else "UNKNOWN" - ) - print( - f" ✅ Closed {position_type} position in {position.contractId} (Size: {position.size})" + try: + positions = await suite.positions.get_all_positions() + + if positions: + print(f" Found {len(positions)} positions to check:") + for position in positions: + if position.size != 0: + try: + # Place a market order to close the position + # Position type: 1=LONG, 2=SHORT + order_side = int( + OrderSide.SELL if position.type == 1 else OrderSide.BUY + ) # SELL if long, BUY if short + size = position.size # size is always positive + + result = await suite.orders.place_market_order( + contract_id=position.contractId, + side=order_side, + size=size, ) - else: + + if result.success: + position_type = ( + "LONG" + if position.type == 1 + else "SHORT" + if position.type == 2 + else "UNKNOWN" + ) + print( + f" ✅ Closed {position_type} position in {position.contractId} (Size: {position.size})" + ) + else: + print( + f" ⚠️ Failed to close position in {position.contractId}: {result.errorMessage}" + ) + except Exception as e: print( - f" ⚠️ Failed to close position in {position.contractId}: {result.errorMessage}" + f" ⚠️ Error closing position in {position.contractId}: {e}" ) - except Exception as e: - print( - f" ⚠️ Error closing position in {position.contractId}: {e}" - ) - else: - print(" No open positions found") + else: + print(" No open positions found") + except Exception as e: + print(f" ⚠️ Error retrieving positions: {e}") print("\n✅ Demo cleanup complete!") async def main() -> None: """Run all demonstrations.""" - _suite = None try: - # Basic order tracking - await demonstrate_order_tracker() + # Basic order tracking with EventBus + await demonstrate_basic_order_tracking() + + # Modern bracket orders + await demonstrate_bracket_orders() - # Order chain builder - await demonstrate_order_chain() + # Event monitoring + await demonstrate_event_monitoring() - # Order templates - await demonstrate_order_templates() + # Multiple order tracking + await demonstrate_multiple_order_tracking() - # Advanced scenarios - await demonstrate_advanced_tracking() + # Advanced bracket patterns + await demonstrate_advanced_bracket_patterns() except KeyboardInterrupt: print("\n\nDemo interrupted by user") @@ -504,6 +770,12 @@ async def main() -> None: if __name__ == "__main__": - print("ProjectX SDK v3.0.2 - Order Lifecycle Tracking") - print("=" * 50) + print("ProjectX SDK v4.0.0 - Order Lifecycle Tracking with EventBus") + print("=" * 60) + print("This example demonstrates migration from v3.x deprecated features:") + print("• OrderTracker → Custom EventBus-based tracking") + print("• OrderChainBuilder → OrderManager.place_bracket_order()") + print("• suite.track_order() → suite.on(EventType.ORDER_FILLED, callback)") + print("• suite.order_chain() → Direct OrderManager methods") + print("=" * 60) asyncio.run(main()) diff --git a/scripts/check_async.py b/scripts/check_async.py index 6914157..687eaab 100644 --- a/scripts/check_async.py +++ b/scripts/check_async.py @@ -28,9 +28,9 @@ def visit_ClassDef(self, node: ast.ClassDef) -> None: # Explicitly exclude utility classes that don't need to be async excluded_classes = [ "ConfigManager", # Configuration management - sync utility - "ErrorContext", # Error context manager - has async context but methods can be sync - "Deprecation", # Deprecation utilities - "Logger", # Logging utilities + "ErrorContext", # Error context manager - has async context but methods can be sync + "Deprecation", # Deprecation utilities + "Logger", # Logging utilities ] if node.name in excluded_classes: @@ -38,7 +38,9 @@ def visit_ClassDef(self, node: ast.ClassDef) -> None: else: # Check if it's a class that should be async based on patterns is_async_class = ( - ("Manager" in node.name and node.name != "ConfigManager") # Managers except ConfigManager + ( + "Manager" in node.name and node.name != "ConfigManager" + ) # Managers except ConfigManager or "Client" in node.name or "Suite" in node.name or "Realtime" in node.name @@ -101,7 +103,14 @@ def _has_io_operations(self, node: ast.FunctionDef) -> bool: # Check for known I/O objects and their methods if obj_name in ["httpx", "aiohttp", "requests", "urllib"]: - if attr_name in ["get", "post", "put", "delete", "patch", "request"]: + if attr_name in [ + "get", + "post", + "put", + "delete", + "patch", + "request", + ]: return True elif obj_name in ["socket", "websocket", "ws"]: if attr_name in ["connect", "send", "recv", "close"]: @@ -109,7 +118,13 @@ def _has_io_operations(self, node: ast.FunctionDef) -> bool: elif obj_name in ["file", "f", "fp"]: if attr_name in ["read", "write", "seek", "tell"]: return True - elif obj_name in ["db", "database", "conn", "connection", "cursor"]: + elif obj_name in [ + "db", + "database", + "conn", + "connection", + "cursor", + ]: if attr_name in ["execute", "fetch", "commit", "rollback"]: return True @@ -117,13 +132,35 @@ def _has_io_operations(self, node: ast.FunctionDef) -> bool: if isinstance(item.func.value, ast.Attribute): if hasattr(item.func.value, "attr"): obj_attr = item.func.value.attr - if obj_attr in ["client", "http", "session", "api", "_client", "_http"]: - if attr_name in ["get", "post", "put", "delete", "patch", "request", "fetch"]: + if obj_attr in [ + "client", + "http", + "session", + "api", + "_client", + "_http", + ]: + if attr_name in [ + "get", + "post", + "put", + "delete", + "patch", + "request", + "fetch", + ]: return True # Check for common async I/O patterns that should be async - if attr_name in ["request", "fetch_data", "api_call", "send_request", - "make_request", "http_get", "http_post"]: + if attr_name in [ + "request", + "fetch_data", + "api_call", + "send_request", + "make_request", + "http_get", + "http_post", + ]: return True elif isinstance(item.func, ast.Name): @@ -148,7 +185,13 @@ def _is_simple_getter(self, node: ast.FunctionDef) -> bool: if isinstance(item, ast.Call): # Check if any calls might be I/O if isinstance(item.func, ast.Attribute): - if item.func.attr in ["request", "fetch", "api_call", "http_get", "http_post"]: + if item.func.attr in [ + "request", + "fetch", + "api_call", + "http_get", + "http_post", + ]: has_io_call = True break elif isinstance(item.func, ast.Name): diff --git a/src/project_x_py/__init__.py b/src/project_x_py/__init__.py index adf1221..eabfb9f 100644 --- a/src/project_x_py/__init__.py +++ b/src/project_x_py/__init__.py @@ -180,18 +180,7 @@ ScalpingTemplate, get_template, ) - -# Deprecated: These are re-exported for backward compatibility only -# Use TradingSuite.track_order() and TradingSuite.order_chain() instead -from project_x_py.order_tracker import ( - OrderChainBuilder, # Deprecated: Use TradingSuite.order_chain() - OrderLifecycleError, - OrderTracker, # Deprecated: Use TradingSuite.track_order() -) -from project_x_py.orderbook import ( - OrderBook, - create_orderbook, -) +from project_x_py.orderbook import OrderBook from project_x_py.position_manager import PositionManager from project_x_py.realtime import ProjectXRealtimeClient as ProjectXRealtimeClient from project_x_py.realtime_data_manager import RealtimeDataManager @@ -249,13 +238,10 @@ "Order", # Core classes (now async-only but with original names) "OrderBook", - "OrderChainBuilder", - "OrderLifecycleError", "OrderManager", "OrderManagerConfig", "OrderPlaceResponse", "OrderTemplate", - "OrderTracker", "OrderSide", "OrderStatus", "OrderType", @@ -314,7 +300,6 @@ "calculate_vwap", "calculate_williams_r", "create_custom_config", - "create_orderbook", "get_env_var", "load_default_config", "load_topstepx_config", diff --git a/src/project_x_py/client/trading.py b/src/project_x_py/client/trading.py index 9761784..035b545 100644 --- a/src/project_x_py/client/trading.py +++ b/src/project_x_py/client/trading.py @@ -73,7 +73,6 @@ async def main(): from project_x_py.exceptions import ProjectXError from project_x_py.models import Position, Trade -from project_x_py.utils.deprecation import deprecated logger = logging.getLogger(__name__) @@ -99,29 +98,6 @@ async def _make_request( """Provided by HttpMixin.""" _ = (method, endpoint, data, params, headers, retry_count) - @deprecated( - reason="Method renamed for API consistency", - version="3.0.0", - removal_version="4.0.0", - replacement="search_open_positions()", - ) - async def get_positions(self) -> list[Position]: - """ - DEPRECATED: Get all open positions for the authenticated account. - - This method is deprecated and will be removed in a future version. - Please use `search_open_positions()` instead, which provides the same - functionality with a more consistent API endpoint. - - Args: - self: The client instance. - - Returns: - A list of Position objects representing current holdings. - """ - # Deprecation warning handled by decorator - return await self.search_open_positions() - async def search_open_positions( self, account_id: int | None = None ) -> list[Position]: diff --git a/src/project_x_py/order_manager/tracking.py b/src/project_x_py/order_manager/tracking.py index 25da630..d1b7fec 100644 --- a/src/project_x_py/order_manager/tracking.py +++ b/src/project_x_py/order_manager/tracking.py @@ -52,13 +52,11 @@ def on_order_fill(order_data): import logging import time from collections import defaultdict, deque -from collections.abc import Callable, Coroutine +from collections.abc import Coroutine from typing import TYPE_CHECKING, Any, cast from cachetools import TTLCache -from project_x_py.utils.deprecation import deprecated - if TYPE_CHECKING: from project_x_py.event_bus import EventBus from project_x_py.types import OrderManagerProtocol @@ -1024,24 +1022,6 @@ async def get_tracked_order_status( order_data = self.tracked_orders.get(order_id) return order_data if order_data is not None else None - @deprecated( - reason="Use TradingSuite.on() with EventType enum for event handling", - version="3.1.0", - removal_version="4.0.0", - replacement="TradingSuite.on(EventType.ORDER_FILLED, callback)", - ) - def add_callback( - self, - _event_type: str, - _callback: Callable[[dict[str, Any]], None], - ) -> None: - """ - DEPRECATED: Use TradingSuite.on() with EventType enum instead. - - This method is provided for backward compatibility only and will be removed in v4.0. - """ - # Deprecation warning handled by decorator - async def _trigger_callbacks(self, event_type: str, data: Any) -> None: """ Trigger all callbacks registered for a specific event type. diff --git a/src/project_x_py/order_templates.py b/src/project_x_py/order_templates.py index d3b548e..e7d8fe6 100644 --- a/src/project_x_py/order_templates.py +++ b/src/project_x_py/order_templates.py @@ -39,7 +39,8 @@ from project_x_py.indicators import ATR from project_x_py.models import BracketOrderResponse -from project_x_py.order_tracker import OrderChainBuilder + +# OrderChainBuilder was deprecated and removed - using OrderManager directly if TYPE_CHECKING: from project_x_py.trading_suite import TradingSuite @@ -153,33 +154,51 @@ async def create_order( else: raise ValueError("Must provide size, risk_amount, or risk_percent") - # Build order chain - builder = OrderChainBuilder(suite) - - if size is None: + # Calculate stop and target prices + # Defensive check - size should be set by now, but verify for safety + if size is None: # pragma: no cover raise ValueError("Size is required") - if self.use_limit_entry: - builder.limit_order(size=size, price=entry_price, side=side) - else: - builder.market_order(size=size, side=side) + if not suite.instrument_id: + raise ValueError("No instrument ID available") - # Add stop loss and take profit target_dist = stop_dist * self.risk_reward_ratio - builder.with_stop_loss(offset=stop_dist) - builder.with_take_profit(offset=target_dist) + if side == 0: # Buy + stop_loss_price = entry_price - stop_dist + take_profit_price = entry_price + target_dist + else: # Sell + stop_loss_price = entry_price + stop_dist + take_profit_price = entry_price - target_dist - # Execute order - result = await builder.execute() - - if result.success: - logger.info( - f"Created {self.risk_reward_ratio}:1 R/R order - " - f"Entry: ${entry_price:.2f}, Stop: ${result.stop_loss_price:.2f}, " - f"Target: ${result.take_profit_price:.2f}" + # Use OrderManager's bracket order functionality + if self.use_limit_entry: + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=entry_price, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + entry_type="limit", + ) + else: + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=entry_price, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + entry_type="market", ) + logger.info( + f"Created {self.risk_reward_ratio}:1 R/R bracket order - " + f"Entry: ${entry_price:.2f}, Stop: ${stop_loss_price:.2f}, " + f"Target: ${take_profit_price:.2f}" + ) + return result @@ -258,25 +277,52 @@ async def create_order( f"Target distance={target_distance:.2f}" ) - # Build order - builder = OrderChainBuilder(suite) - + # Calculate entry and stop prices if size is None: raise ValueError("Size is required") + if not suite.instrument_id: + raise ValueError("No instrument ID available") + if use_limit_entry: if side == 0: # BUY entry_price = current_price - entry_offset + stop_loss_price = entry_price - stop_distance + take_profit_price = entry_price + target_distance else: # SELL entry_price = current_price + entry_offset - builder.limit_order(size=size, price=entry_price, side=side) + stop_loss_price = entry_price + stop_distance + take_profit_price = entry_price - target_distance + + # Place bracket order with limit entry + return await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=entry_price, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + entry_type="limit", + ) else: - builder.market_order(size=size, side=side) - - builder.with_stop_loss(offset=stop_distance) - builder.with_take_profit(offset=target_distance) - - return await builder.execute() + # Market entry - use current price as reference + if side == 0: # BUY + stop_loss_price = current_price - stop_distance + take_profit_price = current_price + target_distance + else: # SELL + stop_loss_price = current_price + stop_distance + take_profit_price = current_price - target_distance + + # Place bracket order with market entry + return await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=current_price, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + entry_type="market", + ) class BreakoutTemplate(OrderTemplate): @@ -368,19 +414,24 @@ async def create_order( f"Target={target_price:.2f}" ) - # Build order + # Build breakout order using bracket order if size is None: raise ValueError("Size is required") - builder = ( - OrderChainBuilder(suite) - .stop_order(size=size, price=entry_price, side=side) - .with_stop_loss(price=stop_price) - .with_take_profit(price=target_price) + if not suite.instrument_id: + raise ValueError("No instrument ID available") + + # Use stop order as entry type for breakouts + return await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=entry_price, + stop_loss_price=stop_price, + take_profit_price=target_price, + entry_type="stop", # Stop order for breakout entry ) - return await builder.execute() - class ScalpingTemplate(OrderTemplate): """ @@ -456,32 +507,65 @@ async def create_order( stop_distance = self.stop_ticks * tick_size target_distance = self.target_ticks * tick_size - # Build order - builder = OrderChainBuilder(suite) - + # Build scalping order using bracket order if size is None: raise ValueError("Size is required") + if not suite.instrument_id: + raise ValueError("No instrument ID available") + if self.use_market_entry: - builder.market_order(size=size, side=side) - else: - # Use limit at best bid/ask + # Get current price for market entry calculations current_price = await suite.data.get_current_price() if not current_price: raise ValueError("Cannot get current price") - builder.limit_order(size=size, price=current_price, side=side) - builder.with_stop_loss(offset=stop_distance) - builder.with_take_profit(offset=target_distance) - - result = await builder.execute() + # Calculate stop and target prices based on side + if side == 0: # BUY + stop_loss_price = current_price - stop_distance + take_profit_price = current_price + target_distance + else: # SELL + stop_loss_price = current_price + stop_distance + take_profit_price = current_price - target_distance + + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=current_price, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + entry_type="market", + ) + else: + # Use limit at current price + current_price = await suite.data.get_current_price() + if not current_price: + raise ValueError("Cannot get current price") - if result.success: - logger.info( - f"Scalp order placed: {self.stop_ticks} tick stop, " - f"{self.target_ticks} tick target" + # Calculate stop and target prices + if side == 0: # BUY + stop_loss_price = current_price - stop_distance + take_profit_price = current_price + target_distance + else: # SELL + stop_loss_price = current_price + stop_distance + take_profit_price = current_price - target_distance + + result = await suite.orders.place_bracket_order( + contract_id=suite.instrument_id, + side=side, + size=size, + entry_price=current_price, + stop_loss_price=stop_loss_price, + take_profit_price=take_profit_price, + entry_type="limit", ) + logger.info( + f"Scalping bracket order placed: {self.stop_ticks} tick stop, " + f"{self.target_ticks} tick target" + ) + return result diff --git a/src/project_x_py/order_tracker.py b/src/project_x_py/order_tracker.py deleted file mode 100644 index 808a9db..0000000 --- a/src/project_x_py/order_tracker.py +++ /dev/null @@ -1,660 +0,0 @@ -""" -Order lifecycle tracking and management for ProjectX SDK v3.0.0. - -DEPRECATED: This module is deprecated as of v3.1.14 and will be removed in v4.0.0. - Use TradingSuite.track_order() and TradingSuite.order_chain() instead. - -Author: SDK v3.0.0 -Date: 2025-08-04 - -Overview: - Provides a context manager for comprehensive order lifecycle tracking with - automatic state management, async waiting mechanisms, and simplified error - handling. Eliminates the need for manual order state tracking in strategies. - -Key Features: - - Context manager for automatic cleanup - - Async waiting for order fills/status changes - - Automatic timeout handling - - Order modification and cancellation helpers - - Order chain builder for complex orders - - Common order templates - - Integration with EventBus for real-time updates - -Example Usage: - ```python - # Simple order tracking - async with suite.track_order() as tracker: - order = await suite.orders.place_limit_order( - contract_id=instrument.id, - side=OrderSide.BUY, - size=1, - price=current_price - 10, - ) - - try: - filled_order = await tracker.wait_for_fill(timeout=60) - print(f"Order filled at {filled_order.filledPrice}") - except TimeoutError: - await tracker.modify_or_cancel(new_price=current_price - 5) - - # Order chain builder - order_chain = ( - suite.orders.market_order(size=1) - .with_stop_loss(offset=50) - .with_take_profit(offset=100) - .with_trail_stop(offset=25, trigger_offset=50) - ) - - result = await order_chain.execute() - ``` - -See Also: - - `order_manager.core.OrderManager` - - `event_bus.EventBus` - - `models.Order` -""" - -import asyncio -import logging -from types import TracebackType -from typing import TYPE_CHECKING, Any, Union - -from project_x_py.event_bus import EventType -from project_x_py.models import BracketOrderResponse, Order, OrderPlaceResponse -from project_x_py.utils.deprecation import deprecated, deprecated_class - -if TYPE_CHECKING: - from project_x_py.trading_suite import TradingSuite - -logger = logging.getLogger(__name__) - - -@deprecated_class( - reason="Use TradingSuite.track_order() for integrated order tracking", - version="3.1.14", - removal_version="4.0.0", - replacement="TradingSuite.track_order()", -) -class OrderTracker: - """ - Context manager for comprehensive order lifecycle tracking. - - DEPRECATED: Use TradingSuite.track_order() instead. Will be removed in v4.0.0. - - Provides automatic order state management with async waiting capabilities, - eliminating the need for manual order status polling and complex state - tracking in trading strategies. - - Features: - - Automatic order status tracking via EventBus - - Async waiting for specific order states - - Timeout handling with automatic cleanup - - Order modification and cancellation helpers - - Fill detection and reporting - - Thread-safe operation - """ - - def __init__(self, trading_suite: "TradingSuite", order: Order | None = None): - """ - Initialize OrderTracker. - - Args: - trading_suite: TradingSuite instance for access to components - order: Optional order to track immediately - """ - self.suite = trading_suite - self.order_manager = trading_suite.orders - self.event_bus = trading_suite.events - self.order = order - self.order_id: int | None = order.id if order else None - - # State tracking - self._fill_event = asyncio.Event() - self._status_events: dict[int, asyncio.Event] = {} - self._current_status: int | None = order.status if order else None - self._filled_order: Order | None = None - self._error: Exception | None = None - - # Event handlers - self._event_handlers: list[tuple[EventType, Any]] = [] - - async def __aenter__(self) -> "OrderTracker": - """Enter the context manager and set up tracking.""" - # Register event handlers - await self._setup_event_handlers() - return self - - async def __aexit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> None: - """Exit the context manager and clean up.""" - await self.cleanup() - - async def _setup_event_handlers(self) -> None: - """Set up EventBus handlers for order tracking.""" - - # Handler for order fills - async def on_fill(data: dict[str, Any]) -> None: - order = data.get("order") - if order and order.id == self.order_id: - self._filled_order = order - self._current_status = 2 # FILLED - self._fill_event.set() - - # Handler for status changes - async def on_status_change(data: dict[str, Any]) -> None: - order = data.get("order") - if order and order.id == self.order_id: - new_status = order.status - self._current_status = new_status - - # Set status-specific events - if new_status in self._status_events: - self._status_events[new_status].set() - - # Handle terminal states - if new_status in (3, 4, 5): # CANCELLED, EXPIRED, REJECTED - self._error = OrderLifecycleError( - f"Order {self.order_id} reached terminal state: {new_status}" - ) - - # Register handlers - self._event_handlers = [ - (EventType.ORDER_FILLED, on_fill), - (EventType.ORDER_CANCELLED, on_status_change), - (EventType.ORDER_REJECTED, on_status_change), - (EventType.ORDER_EXPIRED, on_status_change), - ] - - for event_type, handler in self._event_handlers: - await self.event_bus.on(event_type, handler) - - async def cleanup(self) -> None: - """Clean up event handlers and resources.""" - # Unregister event handlers - for event_type, handler in self._event_handlers: - await self.event_bus.off(event_type, handler) - - self._event_handlers.clear() - - def track(self, order: Union[Order, OrderPlaceResponse, int]) -> "OrderTracker": - """ - Start tracking a specific order. - - Args: - order: Order object, OrderPlaceResponse, or order ID to track - - Returns: - Self for method chaining - """ - if isinstance(order, Order): - self.order = order - self.order_id = order.id - self._current_status = order.status - elif isinstance(order, OrderPlaceResponse): - self.order_id = order.orderId - self._current_status = 1 # OPEN (assumed for new orders) - else: # int - self.order_id = order - self._current_status = None - - return self - - async def wait_for_fill(self, timeout: float = 30.0) -> Order: - """ - Wait for the order to be filled. - - Args: - timeout: Maximum time to wait in seconds - - Returns: - Filled Order object - - Raises: - TimeoutError: If order is not filled within timeout - OrderLifecycleError: If order reaches terminal non-filled state - """ - if not self.order_id: - raise ValueError("No order is being tracked") - - try: - await asyncio.wait_for(self._fill_event.wait(), timeout=timeout) - - if self._error: - raise self._error - - if self._filled_order: - return self._filled_order - else: - # Fetch latest order data - order = await self.order_manager.get_order_by_id(self.order_id) - if order and order.status == 2: # FILLED - return order - else: - raise OrderLifecycleError( - "Order fill event received but order not filled" - ) - - except TimeoutError: - raise TimeoutError( - f"Order {self.order_id} not filled within {timeout} seconds" - ) from None - - async def wait_for_status(self, status: int, timeout: float = 30.0) -> Order: - """ - Wait for the order to reach a specific status. - - Args: - status: Target order status to wait for - timeout: Maximum time to wait in seconds - - Returns: - Order object with the target status - - Raises: - TimeoutError: If status is not reached within timeout - OrderLifecycleError: If order reaches incompatible terminal state - """ - if not self.order_id: - raise ValueError("No order is being tracked") - - # Create event for this status if not exists - if status not in self._status_events: - self._status_events[status] = asyncio.Event() - - # Check current status before waiting - if self._current_status == status: - order = await self.order_manager.get_order_by_id(self.order_id) - if order and order.status == status: - return order - - # Wait for the event - try: - await asyncio.wait_for(self._status_events[status].wait(), timeout=timeout) - except TimeoutError: - # After timeout, check the status one last time via API - order = await self.order_manager.get_order_by_id(self.order_id) - if order and order.status == status: - return order - raise TimeoutError( - f"Order {self.order_id} did not reach status {status} within {timeout} seconds" - ) from None - - # After event is received - if self._error and status != self._current_status: - raise self._error - - order = await self.order_manager.get_order_by_id(self.order_id) - if order and order.status == status: - return order - else: - # This can happen if event fires but API state is not yet consistent, - # or if another status update arrived quickly. - raise OrderLifecycleError( - f"Status event received but order not in expected state {status}. Current state: {order.status if order else 'not found'}" - ) - - async def modify_or_cancel( - self, new_price: float | None = None, new_size: int | None = None - ) -> bool: - """ - Attempt to modify the order, or cancel if modification fails. - - Args: - new_price: New limit price for the order - new_size: New size for the order - - Returns: - True if modification succeeded, False if order was cancelled - """ - if not self.order_id: - raise ValueError("No order is being tracked") - - try: - if new_price is not None or new_size is not None: - # Attempt modification - success = await self.order_manager.modify_order( - self.order_id, limit_price=new_price, size=new_size - ) - - if success: - logger.info(f"Order {self.order_id} modified successfully") - return True - - except Exception as e: - logger.warning(f"Failed to modify order {self.order_id}: {e}") - - # Modification failed, cancel the order - try: - await self.order_manager.cancel_order(self.order_id) - logger.info(f"Order {self.order_id} cancelled") - return False - except Exception as e: - logger.error(f"Failed to cancel order {self.order_id}: {e}") - raise - - async def get_current_status(self) -> Order | None: - """ - Get the current order status. - - Returns: - Current Order object or None if not found - """ - if not self.order_id: - return None - - return await self.order_manager.get_order_by_id(self.order_id) - - @property - def is_filled(self) -> bool: - """Check if the order has been filled.""" - return self._current_status == 2 - - @property - def is_working(self) -> bool: - """Check if the order is still working (open or pending).""" - return self._current_status in (1, 6) # OPEN or PENDING - - @property - def is_terminal(self) -> bool: - """Check if the order is in a terminal state.""" - return self._current_status in ( - 2, - 3, - 4, - 5, - ) # FILLED, CANCELLED, EXPIRED, REJECTED - - -@deprecated_class( - reason="Use TradingSuite.order_chain() for integrated order chain building", - version="3.1.14", - removal_version="4.0.0", - replacement="TradingSuite.order_chain()", -) -class OrderChainBuilder: - """ - Fluent API for building complex order chains. - - DEPRECATED: Use TradingSuite.order_chain() instead. Will be removed in v4.0.0. - - Allows creating multi-part orders (entry + stops + targets) with a - clean, chainable syntax that's easy to read and maintain. - - Example: - ```python - order_chain = ( - OrderChainBuilder(suite) - .market_order(size=2) - .with_stop_loss(offset=50) - .with_take_profit(offset=100) - .with_trail_stop(offset=25, trigger_offset=50) - ) - - result = await order_chain.execute() - ``` - """ - - def __init__(self, trading_suite: "TradingSuite"): - """Initialize the order chain builder.""" - self.suite = trading_suite - self.order_manager = trading_suite.orders - - # Order configuration - self.entry_type = "market" - self.side: int | None = None - self.size: int | None = None - self.entry_price: float | None = None - self.contract_id: str | None = None - - # Risk orders - self.stop_loss: dict[str, Any] | None = None - self.take_profit: dict[str, Any] | None = None - self.trail_stop: dict[str, Any] | None = None - - def market_order(self, size: int, side: int = 0) -> "OrderChainBuilder": - """Configure a market order as entry.""" - self.entry_type = "market" - self.size = size - self.side = side - return self - - def limit_order( - self, size: int, price: float, side: int = 0 - ) -> "OrderChainBuilder": - """Configure a limit order as entry.""" - self.entry_type = "limit" - self.size = size - self.entry_price = price - self.side = side - return self - - def stop_order(self, size: int, price: float, side: int = 0) -> "OrderChainBuilder": - """Configure a stop order as entry.""" - self.entry_type = "stop" - self.size = size - self.entry_price = price - self.side = side - return self - - def for_instrument(self, contract_id: str) -> "OrderChainBuilder": - """Set the instrument for the order chain.""" - self.contract_id = contract_id - return self - - def with_stop_loss( - self, offset: float | None = None, price: float | None = None - ) -> "OrderChainBuilder": - """Add a stop loss to the order chain.""" - self.stop_loss = {"offset": offset, "price": price} - return self - - def with_take_profit( - self, - offset: float | None = None, - price: float | None = None, - ) -> "OrderChainBuilder": - """Add a take profit to the order chain.""" - self.take_profit = {"offset": offset, "price": price} - return self - - def with_trail_stop( - self, offset: float, trigger_offset: float | None = None - ) -> "OrderChainBuilder": - """Add a trailing stop to the order chain.""" - self.trail_stop = {"offset": offset, "trigger_offset": trigger_offset} - return self - - async def execute(self) -> BracketOrderResponse: - """ - Execute the order chain. - - Returns: - BracketOrderResponse with all order IDs - - Raises: - ValueError: If required parameters are missing - OrderLifecycleError: If order placement fails - """ - # Validate configuration - if self.size is None: - raise ValueError("Order size is required") - if self.side is None: - raise ValueError("Order side is required") - if not self.contract_id and not self.suite.instrument_id: - raise ValueError("Contract ID is required") - - contract_id = self.contract_id or self.suite.instrument_id - if not contract_id: - raise ValueError("Contract ID is required") - - # Calculate risk order prices if needed - current_price = await self.suite.data.get_current_price() - if not current_price: - raise ValueError("Cannot get current price for risk calculations") - - # Build bracket order parameters - if self.entry_type == "market": - # For market orders, use current price for risk calculations - entry_price = current_price - else: - entry_price = self.entry_price or current_price - - # Calculate stop loss price - stop_loss_price = None - if self.stop_loss: - if self.stop_loss["price"]: - stop_loss_price = self.stop_loss["price"] - elif self.stop_loss["offset"]: - if self.side == 0: # BUY - stop_loss_price = entry_price - self.stop_loss["offset"] - else: # SELL - stop_loss_price = entry_price + self.stop_loss["offset"] - - # Calculate take profit price - take_profit_price = None - if self.take_profit: - if self.take_profit["price"]: - take_profit_price = self.take_profit["price"] - elif self.take_profit["offset"]: - if self.side == 0: # BUY - take_profit_price = entry_price + self.take_profit["offset"] - else: # SELL - take_profit_price = entry_price - self.take_profit["offset"] - - # Execute the appropriate order type - if stop_loss_price or take_profit_price: - # Use bracket order - # For market orders, pass the current price as entry_price for validation - bracket_entry_price = ( - entry_price if self.entry_type != "market" else current_price - ) - assert self.side is not None # Already checked above - assert self.size is not None # Already checked above - result = await self.order_manager.place_bracket_order( - contract_id=contract_id, - side=self.side, - size=self.size, - entry_price=bracket_entry_price, - stop_loss_price=stop_loss_price or 0.0, - take_profit_price=take_profit_price or 0.0, - entry_type=self.entry_type, - ) - - # Add trailing stop if configured - if self.trail_stop and result.success and result.stop_order_id: - logger.info( - f"Replacing stop order {result.stop_order_id} with trailing stop." - ) - try: - await self.order_manager.cancel_order(result.stop_order_id) - trail_offset = self.trail_stop["offset"] - stop_side = 1 if self.side == 0 else 0 # Opposite of entry - - trail_response = await self.order_manager.place_trailing_stop_order( - contract_id=contract_id, - side=stop_side, - size=self.size, - trail_price=trail_offset, - ) - if trail_response.success: - logger.info( - f"Trailing stop order placed: {trail_response.orderId}" - ) - # Note: The BracketOrderResponse does not have a field for the trailing stop ID. - # The original stop_order_id will remain in the response. - else: - logger.error( - f"Failed to place trailing stop: {trail_response.errorMessage}" - ) - except Exception as e: - logger.error(f"Error replacing stop with trailing stop: {e}") - - return result - - else: - # Simple order without brackets - if self.entry_type == "market": - response = await self.order_manager.place_market_order( - contract_id=contract_id, side=self.side, size=self.size - ) - elif self.entry_type == "limit": - if self.entry_price is None: - raise ValueError("Entry price is required for limit orders") - response = await self.order_manager.place_limit_order( - contract_id=contract_id, - side=self.side, - size=self.size, - limit_price=self.entry_price, - ) - else: # stop - if self.entry_price is None: - raise ValueError("Entry price is required for stop orders") - response = await self.order_manager.place_stop_order( - contract_id=contract_id, - side=self.side, - size=self.size, - stop_price=self.entry_price, - ) - - # Convert to BracketOrderResponse format - return BracketOrderResponse( - success=response.success, - entry_order_id=response.orderId if response.success else None, - stop_order_id=None, - target_order_id=None, - entry_price=entry_price, - stop_loss_price=stop_loss_price or 0.0, - take_profit_price=take_profit_price or 0.0, - entry_response=response, - stop_response=None, - target_response=None, - error_message=response.errorMessage, - ) - - -class OrderLifecycleError(Exception): - """Exception raised when order lifecycle encounters an error.""" - - -# Convenience function for creating order trackers -@deprecated( - reason="Use TradingSuite.track_order() for integrated tracking", - version="3.1.14", - removal_version="4.0.0", - replacement="TradingSuite.track_order()", -) -def track_order( - trading_suite: "TradingSuite", - order: Union[Order, OrderPlaceResponse, int] | None = None, -) -> OrderTracker: - """ - Create an OrderTracker instance. - - Args: - trading_suite: TradingSuite instance - order: Optional order to track immediately - - Returns: - OrderTracker instance - - Example: - ```python - async with track_order(suite) as tracker: - order = await suite.orders.place_limit_order(...) - tracker.track(order) - filled = await tracker.wait_for_fill() - ``` - """ - # Deprecation warning handled by decorator - tracker = OrderTracker(trading_suite) - if order: - if isinstance(order, Order | OrderPlaceResponse): - tracker.track(order) - else: # int - tracker.order_id = order - return tracker diff --git a/src/project_x_py/orderbook/__init__.py b/src/project_x_py/orderbook/__init__.py index 9e9977e..f6ed989 100644 --- a/src/project_x_py/orderbook/__init__.py +++ b/src/project_x_py/orderbook/__init__.py @@ -114,7 +114,6 @@ async def on_depth_update(event): OrderbookAnalysisResponse, SpoofingDetectionResponse, ) -from project_x_py.utils.deprecation import deprecated __all__ = [ # Types @@ -134,7 +133,8 @@ async def on_depth_update(event): "TradeDict", # Profile components "VolumeProfile", - "create_orderbook", + # Memory management + "MemoryManager", ] @@ -509,66 +509,3 @@ async def cleanup(self) -> None: # Call parent cleanup await super().cleanup() - - -@deprecated( - reason="Use TradingSuite.create() with orderbook feature for integrated orderbook", - version="3.1.0", - removal_version="4.0.0", - replacement='TradingSuite.create(instrument, features=["orderbook"])', -) -def create_orderbook( - instrument: str, - event_bus: Any, - project_x: "ProjectXBase | None" = None, - realtime_client: "ProjectXRealtimeClient | None" = None, - timezone_str: str = DEFAULT_TIMEZONE, -) -> OrderBook: - """ - Factory function to create an orderbook. - - This factory function creates and returns an OrderBook instance for the specified - instrument. It simplifies the process of creating an orderbook by handling the initial - configuration. Note that the returned orderbook is not yet initialized - you must call - the initialize() method separately to start the orderbook's functionality. - - The factory approach provides several benefits: - 1. Ensures consistent orderbook creation across the application - 2. Allows for future extension with pre-configured orderbook variants - 3. Simplifies the API for common use cases - - Args: - instrument: Trading instrument symbol (e.g., "ES", "NQ", "MES", "MNQ"). - This should be the base symbol without contract-specific extensions. - project_x: Optional AsyncProjectX client for tick size lookup and API access. - If provided, the orderbook will be able to look up tick sizes and other - contract details automatically. - realtime_client: Optional real-time client for WebSocket data. This is kept - for compatibility but should be passed to initialize() instead. - timezone_str: Timezone for timestamps (default: "America/Chicago"). - All timestamps in the orderbook will be converted to this timezone. - - Returns: - OrderBook: Orderbook instance that must be initialized with a call - to initialize() before use. - - Example: - >>> # V3.1: Use TradingSuite instead of factory function - >>> # This function is deprecated - use TradingSuite.create() - >>> suite = await TradingSuite.create( - ... instrument="MNQ", - ... features=["orderbook"], - ... timezone_str="America/Chicago", # CME timezone - ... ) - >>> - >>> # Orderbook is automatically initialized - >>> # Access via suite.orderbook - >>> snapshot = await suite.orderbook.get_orderbook_snapshot() - >>> - >>> # Note: create_orderbook is maintained for backward compatibility - >>> # but TradingSuite is the recommended approach - """ - # Note: realtime_client is passed to initialize() separately to allow - # for async initialization - _ = realtime_client # Mark as intentionally unused - return OrderBook(instrument, event_bus, project_x, timezone_str) diff --git a/src/project_x_py/orderbook/base.py b/src/project_x_py/orderbook/base.py index 9b78703..0b50772 100644 --- a/src/project_x_py/orderbook/base.py +++ b/src/project_x_py/orderbook/base.py @@ -79,7 +79,6 @@ async def on_depth(data): from project_x_py.statistics.base import BaseStatisticsTracker from project_x_py.types import ( DEFAULT_TIMEZONE, - CallbackType, DomType, MemoryConfig, ) @@ -93,7 +92,6 @@ async def on_depth(data): ProjectXLogger, handle_errors, ) -from project_x_py.utils.deprecation import deprecated logger = ProjectXLogger.get_logger(__name__) @@ -693,78 +691,6 @@ async def get_order_type_statistics(self) -> dict[str, int]: async with self.orderbook_lock: return self.order_type_stats.copy() - @deprecated( - reason="Use TradingSuite.on() with EventType enum for event handling", - version="3.1.0", - removal_version="4.0.0", - replacement="TradingSuite.on(EventType.MARKET_DEPTH_UPDATE, callback)", - ) - @handle_errors("add callback", reraise=False) - async def add_callback(self, event_type: str, callback: CallbackType) -> None: - """ - Register a callback for orderbook events. - - This method allows client code to register callbacks that will be triggered when - specific orderbook events occur. Callbacks can be either synchronous functions or - asynchronous coroutines. When an event occurs, all registered callbacks for that - event type will be executed with the event data. - - Supported event types: - - "depth_update": Triggered when a price level is updated - - "trade": Triggered when a new trade is processed - - "best_bid_change": Triggered when the best bid price changes - - "best_ask_change": Triggered when the best ask price changes - - "spread_change": Triggered when the bid-ask spread changes - - "reset": Triggered when the orderbook is reset - - Args: - event_type: The type of event to listen for (from the list above) - callback: A callable function or coroutine that will receive the event data. - The callback should accept a single parameter: a dictionary containing - the event data specific to that event type. - - Example: - >>> # Use TradingSuite with EventBus for callbacks - >>> from project_x_py import TradingSuite, EventType - >>> - >>> suite = await TradingSuite.create("MNQ", features=["orderbook"]) - >>> - >>> @suite.events.on(EventType.TRADE_TICK) - >>> async def on_trade(event): - ... data = event.data - ... print(f"Trade: {data['size']} @ {data['price']} ({data['side']})") - >>> - >>> @suite.events.on(EventType.MARKET_DEPTH_UPDATE) - >>> async def on_depth_change(event): - ... data = event.data - ... print( - ... f"New best bid: {data['bids'][0]['price'] if data['bids'] else 'None'}" - ... ) - >>> # Events automatically flow through EventBus - """ - async with self._callback_lock: - # Deprecation warning handled by decorator - logger.debug( - LogMessages.CALLBACK_REGISTERED, - extra={"event_type": event_type, "component": "orderbook"}, - ) - - @deprecated( - reason="Use TradingSuite.off() with EventType enum for event handling", - version="3.1.0", - removal_version="4.0.0", - replacement="TradingSuite.off(EventType.MARKET_DEPTH_UPDATE, callback)", - ) - @handle_errors("remove callback", reraise=False) - async def remove_callback(self, event_type: str, callback: CallbackType) -> None: - """Remove a registered callback.""" - async with self._callback_lock: - # Deprecation warning handled by decorator - logger.debug( - LogMessages.CALLBACK_REMOVED, - extra={"event_type": event_type, "component": "orderbook"}, - ) - async def _trigger_callbacks(self, event_type: str, data: dict[str, Any]) -> None: """ Trigger all callbacks for a specific event type. @@ -814,7 +740,7 @@ async def track_ask_update(self, levels: int = 1) -> None: self._ask_updates += levels await self._track_update_frequency() - async def track_trade_processed(self, volume: int, price: float) -> None: + async def track_trade_processed(self, volume: int, _price: float) -> None: """Track trade execution processing.""" await self.increment("trades_processed", 1) await self.increment("total_volume", volume) diff --git a/src/project_x_py/position_manager/tracking.py b/src/project_x_py/position_manager/tracking.py index 68df75b..83d01bb 100644 --- a/src/project_x_py/position_manager/tracking.py +++ b/src/project_x_py/position_manager/tracking.py @@ -63,14 +63,12 @@ async def on_position_closed(data): import contextlib import logging from collections import defaultdict, deque -from collections.abc import Callable, Coroutine from datetime import datetime from decimal import Decimal from typing import TYPE_CHECKING, Any from project_x_py.models import Position from project_x_py.types.trading import PositionType -from project_x_py.utils.deprecation import deprecated if TYPE_CHECKING: from asyncio import Lock @@ -577,52 +575,6 @@ async def cleanup_tracking(self) -> None: self.logger.info("✅ Position tracking cleanup completed") - @deprecated( - reason="Use TradingSuite.on() with EventType enum for event handling", - version="3.1.0", - removal_version="4.0.0", - replacement="TradingSuite.on(EventType.POSITION_UPDATED, callback)", - ) - async def add_callback( - self, - event_type: str, - callback: Callable[[dict[str, Any]], Coroutine[Any, Any, None] | None], - ) -> None: - """ - Register a callback function for specific position events. - - Allows you to listen for position updates, closures, account changes, and alerts - to build custom monitoring and notification systems. - - Args: - event_type: Type of event to listen for - - "position_update": Position size or price changes - - "position_closed": Position fully closed (size = 0) - - "account_update": Account-level changes - - "position_alert": Position alert triggered - callback: Async function to call when event occurs - Should accept one argument: the event data dict - - Example: - >>> async def on_position_update(data): - ... pos = data.get("data", {}) - ... print( - ... f"Position updated: {pos.get('contractId')} size: {pos.get('size')}" - ... ) - >>> await position_manager.add_callback( - ... "position_update", on_position_update - ... ) - >>> async def on_position_closed(data): - ... pos = data.get("data", {}) - ... print(f"Position closed: {pos.get('contractId')}") - >>> await position_manager.add_callback( - ... "position_closed", on_position_closed - ... ) - """ - self.logger.warning( - "add_callback is deprecated. Use TradingSuite.on() with EventType enum instead." - ) - async def get_position_history_size(self, contract_id: str) -> int: """Get the current size of position history for a contract.""" return len(self.position_history.get(contract_id, deque())) diff --git a/src/project_x_py/trading_suite.py b/src/project_x_py/trading_suite.py index ccc1528..3f2dbb0 100644 --- a/src/project_x_py/trading_suite.py +++ b/src/project_x_py/trading_suite.py @@ -49,7 +49,6 @@ from project_x_py.event_bus import EventBus, EventType from project_x_py.models import Instrument from project_x_py.order_manager import OrderManager -from project_x_py.order_tracker import OrderChainBuilder, OrderTracker from project_x_py.orderbook import OrderBook from project_x_py.position_manager import PositionManager from project_x_py.realtime import ProjectXRealtimeClient @@ -65,7 +64,6 @@ from project_x_py.types.protocols import ProjectXClientProtocol from project_x_py.types.stats_types import TradingSuiteStats from project_x_py.utils import ProjectXLogger -from project_x_py.utils.deprecation import deprecated logger = ProjectXLogger.get_logger(__name__) @@ -695,78 +693,6 @@ async def off( """ await self.events.off(event, handler) - def track_order(self, order: Any = None) -> OrderTracker: - """ - Create an OrderTracker for comprehensive order lifecycle management. - - This provides automatic order state tracking with async waiting capabilities, - eliminating the need for manual order status polling. - - Args: - order: Optional order to track immediately (Order, OrderPlaceResponse, or order ID) - - Returns: - OrderTracker instance (use as context manager) - - Example: - ```python - from project_x_py.types.trading import OrderSide - - # Track a new order - async with suite.track_order() as tracker: - order = await suite.orders.place_limit_order( - contract_id=suite.instrument_id, - side=OrderSide.BUY, - size=1, - price=current_price - 10, - ) - tracker.track(order) - - try: - filled = await tracker.wait_for_fill(timeout=60) - print(f"Order filled at {filled.filledPrice}") - except TimeoutError: - await tracker.modify_or_cancel(new_price=current_price - 5) - ``` - """ - tracker = OrderTracker(self, order) - return tracker - - def order_chain(self) -> OrderChainBuilder: - """ - Create an order chain builder for complex order structures. - - Provides a fluent API for building multi-part orders (entry + stops + targets) - with clean, readable syntax. - - Returns: - OrderChainBuilder instance - - Example: - ```python - # Build a bracket order with stops and targets - # Note: side=0 for BUY, side=1 for SELL - order_chain = ( - suite.order_chain() - .market_order(size=2, side=0) # BUY 2 contracts - .with_stop_loss(offset=50) - .with_take_profit(offset=100) - .with_trail_stop(offset=25, trigger_offset=50) - ) - - result = await order_chain.execute() - - # Or use a limit entry - order_chain = ( - suite.order_chain() - .limit_order(size=1, price=16000, side=0) # BUY limit - .with_stop_loss(price=15950) - .with_take_profit(price=16100) - ) - ``` - """ - return OrderChainBuilder(self) - def managed_trade( self, max_risk_percent: float | None = None, @@ -853,28 +779,3 @@ async def get_stats(self) -> TradingSuiteStats: Structured statistics from all active components with accurate metrics """ return await self._stats_aggregator.aggregate_stats() - - @deprecated( - reason="Synchronous methods are being phased out in favor of async-only API", - version="3.3.0", - removal_version="4.0.0", - replacement="await get_stats()", - ) - def get_stats_sync(self) -> TradingSuiteStats: - """ - Synchronous wrapper for get_stats for backward compatibility. - - Returns: - Structured statistics from all active components - """ - import asyncio - - # Try to get or create event loop - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Run the async method - return loop.run_until_complete(self.get_stats()) diff --git a/tests/benchmarks/test_performance.py b/tests/benchmarks/test_performance.py index d3ccbec..88231d4 100644 --- a/tests/benchmarks/test_performance.py +++ b/tests/benchmarks/test_performance.py @@ -35,7 +35,9 @@ async def place_order() -> Any: suite = TradingSuite.__new__(TradingSuite) suite.client = mock_client suite.orders = MagicMock() - suite.orders.place_market_order = AsyncMock(return_value={"order_id": "123"}) + suite.orders.place_market_order = AsyncMock( + return_value={"order_id": "123"} + ) return await suite.orders.place_market_order( contract_id="MNQ", side=0, size=1 @@ -61,11 +63,13 @@ async def place_bracket() -> Any: suite = TradingSuite.__new__(TradingSuite) suite.client = mock_client suite.orders = MagicMock() - suite.orders.place_bracket_order = AsyncMock(return_value={ - "main_order": {"order_id": "123"}, - "stop_order": {"order_id": "124"}, - "target_order": {"order_id": "125"} - }) + suite.orders.place_bracket_order = AsyncMock( + return_value={ + "main_order": {"order_id": "123"}, + "stop_order": {"order_id": "124"}, + "target_order": {"order_id": "125"}, + } + ) return await suite.orders.place_bracket_order( contract_id="MNQ", diff --git a/tests/client/test_client_integration.py b/tests/client/test_client_integration.py index 80a054c..62234d9 100644 --- a/tests/client/test_client_integration.py +++ b/tests/client/test_client_integration.py @@ -112,7 +112,7 @@ async def test_trading_workflow( assert client._authenticated is True # Step 2: Get positions - positions = await client.get_positions() + positions = await client.search_open_positions() assert len(positions) == 2 assert positions[0].contractId == "MGC" assert positions[1].contractId == "MNQ" diff --git a/tests/client/test_trading.py b/tests/client/test_trading.py index 6a48467..e00c130 100644 --- a/tests/client/test_trading.py +++ b/tests/client/test_trading.py @@ -15,7 +15,7 @@ class TestTrading: """Tests for the trading functionality of the ProjectX client.""" @pytest.mark.asyncio - async def test_get_positions( + async def test_search_open_positions( self, mock_httpx_client, mock_auth_response, mock_positions_response ): """Test getting positions.""" @@ -33,7 +33,7 @@ async def test_get_positions( client.rate_limiter = RateLimiter(max_requests=100, window_seconds=60) await client.authenticate() - positions = await client.get_positions() + positions = await client.search_open_positions() assert len(positions) == 2 assert positions[0].contractId == "MGC" @@ -42,7 +42,7 @@ async def test_get_positions( assert positions[1].size == 2 # Short position has positive size @pytest.mark.asyncio - async def test_get_positions_empty( + async def test_search_open_positions_empty( self, mock_httpx_client, mock_auth_response, mock_response ): """Test getting positions with empty response.""" @@ -62,12 +62,12 @@ async def test_get_positions_empty( client.rate_limiter = RateLimiter(max_requests=100, window_seconds=60) await client.authenticate() - positions = await client.get_positions() + positions = await client.search_open_positions() assert len(positions) == 0 @pytest.mark.asyncio - async def test_get_positions_no_account(self, mock_httpx_client): + async def test_search_open_positions_no_account(self, mock_httpx_client): """Test error when getting positions without account.""" with patch("httpx.AsyncClient", return_value=mock_httpx_client): async with ProjectX("testuser", "test-api-key") as client: @@ -76,13 +76,13 @@ async def test_get_positions_no_account(self, mock_httpx_client): client.rate_limiter = RateLimiter(max_requests=100, window_seconds=60) # No authentication, no account info with pytest.raises(ProjectXError): - await client.get_positions() + await client.search_open_positions() @pytest.mark.asyncio - async def test_search_open_positions( + async def test_search_open_positions_basic( self, mock_httpx_client, mock_auth_response, mock_response ): - """Test searching open positions.""" + """Test searching open positions with basic functionality.""" auth_response, accounts_response = mock_auth_response positions_response = mock_response( json_data={ @@ -173,7 +173,7 @@ async def test_search_open_positions_with_account_id( assert last_call[1]["json"]["accountId"] == 67890 @pytest.mark.asyncio - async def test_search_open_positions_no_account(self, mock_httpx_client): + async def test_search_open_positions_no_account_error(self, mock_httpx_client): """Test error when searching positions without account.""" with patch("httpx.AsyncClient", return_value=mock_httpx_client): async with ProjectX("testuser", "test-api-key") as client: @@ -185,7 +185,7 @@ async def test_search_open_positions_no_account(self, mock_httpx_client): await client.search_open_positions() @pytest.mark.asyncio - async def test_search_open_positions_empty( + async def test_search_open_positions_empty_response( self, mock_httpx_client, mock_auth_response, mock_response ): """Test searching open positions with empty response.""" diff --git a/tests/realtime/test_circuit_breaker.py b/tests/realtime/test_circuit_breaker.py index 11e8d25..e7c5f24 100644 --- a/tests/realtime/test_circuit_breaker.py +++ b/tests/realtime/test_circuit_breaker.py @@ -668,8 +668,7 @@ async def test_cleanup(self): handler = MockEventHandler() # Configure with per-event circuits only await handler.configure_circuit_breaker( - enable_global_circuit=False, - enable_per_event_circuits=True + enable_global_circuit=False, enable_per_event_circuits=True ) # Create some circuit breakers @@ -946,8 +945,11 @@ async def test_overhead_measurement(self): # In CI environments, overhead can be higher due to resource constraints # Allow up to 500% overhead in CI, 100% locally import os + max_overhead = 5.0 if os.environ.get("CI") else 1.0 - assert overhead < max_overhead, f"Circuit breaker overhead too high: {overhead:.2%} (max: {max_overhead*100:.0f}%)" + assert overhead < max_overhead, ( + f"Circuit breaker overhead too high: {overhead:.2%} (max: {max_overhead * 100:.0f}%)" + ) print(f"Circuit breaker overhead: {overhead:.2%}") diff --git a/tests/test_client_trading.py b/tests/test_client_trading.py index a9815c6..b3716e4 100644 --- a/tests/test_client_trading.py +++ b/tests/test_client_trading.py @@ -32,40 +32,6 @@ def trading_client(self): """Create a mock client with TradingMixin for testing.""" return MockTradingClient() - @pytest.mark.asyncio - async def test_get_positions_deprecated(self, trading_client): - """Test that get_positions shows deprecation warning.""" - trading_client.account_info = Account( - id=12345, - name="Test Account", - balance=10000.0, - canTrade=True, - isVisible=True, - simulated=False, - ) - - mock_response = { - "success": True, - "positions": [ - { - "id": "pos1", - "accountId": 12345, - "contractId": "MNQ", - "creationTimestamp": datetime.datetime.now(pytz.UTC).isoformat(), - "size": 2, - "averagePrice": 15000.0, - "type": 1, - } - ], - } - trading_client._make_request.return_value = mock_response - - with pytest.warns(DeprecationWarning, match="(get_positions|Method renamed)"): - positions = await trading_client.get_positions() - - assert len(positions) == 1 - assert positions[0].contractId == "MNQ" - @pytest.mark.asyncio async def test_search_open_positions_success(self, trading_client): """Test successful position search.""" @@ -588,7 +554,6 @@ async def test_authentication_called_for_all_methods(self, trading_client): await trading_client.search_trades() assert trading_client._ensure_authenticated.call_count == 2 - # Test get_positions (deprecated) - with pytest.warns(DeprecationWarning, match="(get_positions|Method renamed)"): - await trading_client.get_positions() + # Test search_open_positions + await trading_client.search_open_positions() assert trading_client._ensure_authenticated.call_count == 3 diff --git a/tests/test_enhanced_statistics.py b/tests/test_enhanced_statistics.py index 9e23830..aa7a4f5 100644 --- a/tests/test_enhanced_statistics.py +++ b/tests/test_enhanced_statistics.py @@ -103,6 +103,7 @@ async def test_pii_sanitization(self): # Parse the JSON export import json + exported_data = json.loads(exported) # Check that PII is sanitized in error details @@ -188,9 +189,7 @@ async def test_data_quality_tracking(self): event_bus = EventBus() orderbook = OrderBook( - instrument="TEST", - event_bus=event_bus, - project_x=mock_client + instrument="TEST", event_bus=event_bus, project_x=mock_client ) # Track data quality issues using the actual keys that OrderBook initializes @@ -248,36 +247,31 @@ async def test_prometheus_export_format(self): comprehensive_stats = { "health": { "overall_score": 95.0, - "component_scores": { - "test_component": 95.0 - } + "component_scores": {"test_component": 95.0}, }, "performance": { "api_calls_total": 10, "cache_hit_rate": 0.8, - "avg_response_time": 0.1 + "avg_response_time": 0.1, }, "memory": { "total_memory_mb": stats.get("memory_usage_mb", 0.1), "component_memory": { "test_component": stats.get("memory_usage_mb", 0.1) - } + }, }, "errors": { "total_errors": stats.get("error_count", 0), "error_rate": 0.01, - "errors_by_component": { - "test_component": stats.get("error_count", 0) - } - } + "errors_by_component": {"test_component": stats.get("error_count", 0)}, + }, } exporter = StatsExporter() # Export in Prometheus format prom_export = await exporter.export( - comprehensive_stats, - export_format="prometheus" + comprehensive_stats, export_format="prometheus" ) # Check format @@ -434,7 +428,12 @@ async def test_empty_component_stats_structure(self): stats = await component.get_stats() assert stats["name"] == "test_component" - assert stats["status"] in ["initializing", "connecting", "disconnected", "connected"] + assert stats["status"] in [ + "initializing", + "connecting", + "disconnected", + "connected", + ] assert "uptime_seconds" in stats assert stats["error_count"] == 0 assert stats["memory_usage_mb"] >= 0.0 @@ -459,16 +458,14 @@ async def test_stats_under_load(): # Use record_timing for timing operations tasks.append( component.record_timing( - operation=f"trade_{i % 10}", - duration_ms=float(i % 100) + operation=f"trade_{i % 10}", duration_ms=float(i % 100) ) ) # Track errors for some operations (10% failure rate) if i % 10 == 0: tasks.append( component.track_error( - Exception(f"Trade error {i}"), - context=f"trade_{i % 10}" + Exception(f"Trade error {i}"), context=f"trade_{i % 10}" ) ) @@ -573,10 +570,7 @@ async def test_realtime_data_manager_stats_integration(): await data_manager.record_timing("process_tick", 0.3) # Track an error - await data_manager.track_error( - Exception("Test error"), - context="process_tick" - ) + await data_manager.track_error(Exception("Test error"), context="process_tick") # Get stats stats = await data_manager.get_stats() diff --git a/tests/test_mmap_integration.py b/tests/test_mmap_integration.py index f453b6f..ed48d32 100644 --- a/tests/test_mmap_integration.py +++ b/tests/test_mmap_integration.py @@ -222,7 +222,9 @@ async def test_memory_cleanup_with_overflow( # Verify data was sampled, not completely lost assert len(manager.data["1hr"]) > 0 - assert len(manager.data["1hr"]) == int(manager.max_bars_per_timeframe * 0.7) # Should be 70% of max + assert len(manager.data["1hr"]) == int( + manager.max_bars_per_timeframe * 0.7 + ) # Should be 70% of max @pytest.mark.asyncio async def test_restore_from_overflow(self, mock_project_x, mock_realtime_client): diff --git a/tests/test_order_tracker_deprecation.py b/tests/test_order_tracker_deprecation.py deleted file mode 100644 index f35b0b3..0000000 --- a/tests/test_order_tracker_deprecation.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Test deprecation warnings for order_tracker module.""" - -import warnings -from unittest.mock import AsyncMock, MagicMock - -from project_x_py.order_tracker import OrderChainBuilder, OrderTracker - - -def test_order_tracker_deprecation_warning(): - """Test that OrderTracker raises deprecation warning.""" - suite = MagicMock() - suite.orders = MagicMock() - suite.events = MagicMock() - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - tracker = OrderTracker(suite) - - # Check that a deprecation warning was raised - assert len(w) == 1 - assert issubclass(w[0].category, DeprecationWarning) - assert "OrderTracker is deprecated" in str(w[0].message) - assert "TradingSuite.track_order()" in str(w[0].message) - - -def test_order_chain_builder_deprecation_warning(): - """Test that OrderChainBuilder raises deprecation warning.""" - suite = MagicMock() - suite.orders = MagicMock() - suite.data = AsyncMock() - suite.instrument_id = "TEST" - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - chain = OrderChainBuilder(suite) - - # Check that a deprecation warning was raised - assert len(w) == 1 - assert issubclass(w[0].category, DeprecationWarning) - assert "OrderChainBuilder is deprecated" in str(w[0].message) - assert "TradingSuite.order_chain()" in str(w[0].message) - - -def test_track_order_function_deprecation(): - """Test that track_order function raises deprecation warning.""" - from project_x_py.order_tracker import track_order - - suite = MagicMock() - suite.orders = MagicMock() - suite.events = MagicMock() - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - tracker = track_order(suite) - - # Check that at least two deprecation warnings were raised - # One from the function itself, one from OrderTracker class - # There may be additional warnings from the @deprecated decorator - assert len(w) >= 2 - # Check the function deprecation - # The warning format may vary - check for any of these - assert any( - "track_order" in str(warning.message) or - "Integrated into TradingSuite" in str(warning.message) - for warning in w - ) - assert any( - "OrderTracker is deprecated" in str(warning.message) for warning in w - ) - - -def test_trading_suite_methods_no_deprecation(): - """Test that TradingSuite methods don't raise deprecation warnings.""" - from project_x_py.trading_suite import TradingSuite - - # Create a mock suite with minimal required attributes - suite = MagicMock(spec=TradingSuite) - suite.orders = MagicMock() - suite.events = MagicMock() - suite.data = AsyncMock() - suite.instrument_id = "TEST" - - # Mock the methods to avoid actual implementation - suite.track_order = MagicMock(return_value=MagicMock()) - suite.order_chain = MagicMock(return_value=MagicMock()) - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - - # These should not raise deprecation warnings - tracker = suite.track_order() - chain = suite.order_chain() - - # No deprecation warnings should be raised - deprecation_warnings = [ - w for w in w if issubclass(w.category, DeprecationWarning) - ] - assert len(deprecation_warnings) == 0 diff --git a/tests/utils/test_async_rate_limiter.py b/tests/utils/test_async_rate_limiter.py index edcddba..53f5c1a 100644 --- a/tests/utils/test_async_rate_limiter.py +++ b/tests/utils/test_async_rate_limiter.py @@ -205,6 +205,7 @@ async def make_request(): # Check rate limiting - for any 0.5s window, we should have at most 10 requests # In CI environments, allow up to 30% more due to timing variations import os + max_allowed = 13 if os.environ.get("CI") else 10 for i in range(len(times)):