From 0c70ea00987469d74894e08830e8577b4dc9eaee Mon Sep 17 00:00:00 2001 From: Jeff West Date: Fri, 25 Jul 2025 09:39:46 -0500 Subject: [PATCH] added position and order sync --- examples/order_position_sync_demo.py | 295 +++++++++++++++++++++++++ src/project_x_py/order_manager.py | 311 ++++++++++++++++++++++++++- src/project_x_py/position_manager.py | 55 ++++- 3 files changed, 654 insertions(+), 7 deletions(-) create mode 100644 examples/order_position_sync_demo.py diff --git a/examples/order_position_sync_demo.py b/examples/order_position_sync_demo.py new file mode 100644 index 0000000..40f9d9c --- /dev/null +++ b/examples/order_position_sync_demo.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +""" +Order-Position Synchronization Demo + +This example demonstrates the new automatic synchronization between orders and positions: +1. When positions change size, related stop/target orders are automatically updated +2. When positions are closed, related orders are automatically cancelled +3. Track which orders belong to which positions + +Author: TexasCoding +Date: June 2025 +""" + +import asyncio +import logging +from typing import Any + +from project_x_py import ProjectX +from project_x_py.order_manager import OrderManager +from project_x_py.position_manager import PositionManager +from project_x_py.realtime import ProjectXRealtimeClient + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +async def order_position_sync_demo(): + """Demonstrate order-position synchronization features.""" + + # Initialize the clients (replace with your actual credentials) + client = ProjectX( + username="your_username", + api_key="your_api_key", + account_name="your_account_name", + ) + + account_info = client.get_account_info() + account_id = account_info.id if account_info else None + jwt_token = client.get_session_token() + + realtime_client: ProjectXRealtimeClient | None = ProjectXRealtimeClient( + jwt_token=jwt_token, + account_id=str(account_id), + ) + + order_manager = OrderManager(client) + position_manager = PositionManager(client) + + logger.info("๐Ÿš€ Starting order-position synchronization demo") + + instrument = client.get_instrument("MGC") + + try: + # Initialize all components with cross-references + # Note: Check actual method signatures in your implementation + order_manager.initialize(realtime_client=realtime_client) + position_manager.initialize( + realtime_client=realtime_client, + order_manager=order_manager, # Enable automatic synchronization + ) + + logger.info("โœ… All components initialized") + + contract_id = instrument.id if instrument else None + if not contract_id: + raise ValueError("Instrument not found") + + # 1. Place a bracket order (entry + stop + target) + logger.info(f"\n๐Ÿ“ˆ Step 1: Placing bracket order for {contract_id}") + bracket_response = order_manager.place_bracket_order( + contract_id=contract_id, + side=0, # Buy + size=2, # 2 contracts + entry_price=2045.0, + stop_loss_price=2040.0, + take_profit_price=2055.0, + ) + + if bracket_response.success: + logger.info("โœ… Bracket order placed successfully:") + logger.info(f" Entry Order: {bracket_response.entry_order_id}") + logger.info(f" Stop Order: {bracket_response.stop_order_id}") + logger.info(f" Target Order: {bracket_response.target_order_id}") + + # Show order tracking + position_orders = order_manager.get_position_orders(contract_id) + logger.info(f"๐Ÿ“Š Orders tracked for {contract_id}: {position_orders}") + else: + logger.error(f"โŒ Bracket order failed: {bracket_response.error_message}") + return + + # Wait for potential position creation (if entry order fills) + logger.info("\nโณ Waiting for potential order fills...") + await asyncio.sleep(10) + + # 2. Check current position + current_position = position_manager.get_position(contract_id) + if current_position: + logger.info(f"๐Ÿ“Š Current position: {current_position.size} contracts") + + # 3. Simulate adding to position (manual market order) + logger.info("\n๐Ÿ“ˆ Step 2: Adding to position (+1 contract)") + add_response = order_manager.place_market_order( + contract_id=contract_id, + side=0, # Buy (same direction) + size=1, + ) + + if add_response.success: + logger.info(f"โœ… Added to position with order {add_response.orderId}") + + # Wait for position update + await asyncio.sleep(5) + + # Check if orders were automatically updated + updated_position = position_manager.get_position(contract_id) + if updated_position: + logger.info( + f"๐Ÿ“Š Updated position: {updated_position.size} contracts" + ) + logger.info( + "๐Ÿ”„ Related stop/target orders should be automatically updated!" + ) + + # 4. Simulate partial position close + logger.info("\n๐Ÿ“‰ Step 3: Partially closing position (-1 contract)") + close_response = order_manager.place_market_order( + contract_id=contract_id, + side=1, # Sell (opposite direction) + size=1, + ) + + if close_response.success: + logger.info( + f"โœ… Partially closed position with order {close_response.orderId}" + ) + + # Wait for position update + await asyncio.sleep(5) + + # Check updated position and orders + updated_position = position_manager.get_position(contract_id) + if updated_position: + logger.info( + f"๐Ÿ“Š Position after partial close: {updated_position.size} contracts" + ) + logger.info( + "๐Ÿ”„ Related orders should be updated to match new position size!" + ) + + # 5. Close entire position + logger.info("\n๐Ÿ”š Step 4: Closing entire position") + final_close = position_manager.close_position_direct(contract_id) + + if final_close.get("success", False): + logger.info("โœ… Position closed completely") + + # Wait for position closure processing + await asyncio.sleep(5) + + # Check that related orders were cancelled + remaining_position = position_manager.get_position(contract_id) + if not remaining_position: + logger.info( + "โœ… Position closed - related orders should be automatically cancelled!" + ) + position_orders = order_manager.get_position_orders(contract_id) + logger.info(f"๐Ÿ“Š Remaining tracked orders: {position_orders}") + + else: + logger.info( + "๐Ÿ“Š No position currently open (entry order may not have filled)" + ) + + # Manual synchronization example + logger.info("\n๐Ÿ”„ Step 2: Manual synchronization example") + sync_result = order_manager.sync_orders_with_position(contract_id) + logger.info(f"๐Ÿ“Š Sync result: {sync_result}") + + # 6. Show final statistics + logger.info("\n๐Ÿ“Š Final Statistics:") + order_stats = order_manager.get_order_statistics() + position_stats = position_manager.get_position_statistics() + + logger.info("Order Manager - Position Order Tracking:") + logger.info( + f" Total tracked orders: {order_stats['position_order_relationships']['total_tracked_orders']}" + ) + logger.info( + f" Positions with orders: {order_stats['position_order_relationships']['positions_with_orders']}" + ) + + logger.info("Position Manager - Order Sync Status:") + logger.info(f" Order sync enabled: {position_stats['order_sync_enabled']}") + logger.info(f" Realtime enabled: {position_stats['realtime_enabled']}") + + # 7. Cleanup any remaining orders + logger.info(f"\n๐Ÿงน Cleaning up any remaining orders for {contract_id}") + cleanup_result = order_manager.cancel_position_orders(contract_id) + logger.info(f"๐Ÿ“Š Cleanup result: {cleanup_result}") + + except Exception as e: + logger.error(f"โŒ Demo error: {e}") + + finally: + # Cleanup + logger.info("\n๐Ÿงน Cleaning up connections...") + order_manager.cleanup() + position_manager.cleanup() + if realtime_client: + realtime_client.disconnect() + logger.info("โœ… Demo completed!") + + +def callback_demo(): + """Demonstrate callback-based order-position synchronization.""" + + logger.info("\n๐Ÿ”” Callback Demo - Setting up position change handlers") + + def on_position_closed(data: Any): + """Handle position closure events.""" + logger.info(f"๐Ÿ”” Position closed callback triggered: {data}") + + def on_position_alert(data: Any): + """Handle position alert events.""" + logger.info(f"๐Ÿšจ Position alert callback triggered: {data}") + + def on_order_filled(data: Any): + """Handle order fill events.""" + logger.info(f"โœ… Order filled callback triggered: {data}") + + # These callbacks would be registered in a real application: + # position_manager.add_callback("position_closed", on_position_closed) + # position_manager.add_callback("position_alert", on_position_alert) + # order_manager.add_callback("order_filled", on_order_filled) + + logger.info("๐Ÿ“ Callback setup complete (example only)") + + +def manual_sync_demo(): + """Demonstrate manual order-position synchronization methods.""" + + logger.info("\n๐Ÿ”ง Manual Synchronization Demo") + + # These are the key synchronization methods available: + + # 1. Track individual orders for positions + # order_manager.track_order_for_position(order_id=12345, contract_id="MGC", order_category="stop") + + # 2. Get all orders related to a position + # position_orders = order_manager.get_position_orders("MGC") + + # 3. Cancel all orders for a position + # result = order_manager.cancel_position_orders("MGC", categories=["stop", "target"]) + + # 4. Update order sizes to match position + # result = order_manager.update_position_order_sizes("MGC", new_position_size=3) + + # 5. Full synchronization + # result = order_manager.sync_orders_with_position("MGC") + + # 6. Position-triggered callbacks + # order_manager.on_position_changed("MGC", old_size=2, new_size=3) + # order_manager.on_position_closed("MGC") + + logger.info("๐Ÿ“ Manual synchronization methods available:") + logger.info(" - track_order_for_position()") + logger.info(" - get_position_orders()") + logger.info(" - cancel_position_orders()") + logger.info(" - update_position_order_sizes()") + logger.info(" - sync_orders_with_position()") + logger.info(" - on_position_changed() / on_position_closed()") + + +if __name__ == "__main__": + logger.info("๐Ÿ”„ Order-Position Synchronization Demo") + logger.info("=" * 50) + + # Show manual methods + manual_sync_demo() + + # Show callback setup + callback_demo() + + # Run the main demo + logger.info("\n๐Ÿš€ Starting live demo...") + try: + asyncio.run(order_position_sync_demo()) + except KeyboardInterrupt: + logger.info("\nโน๏ธ Demo interrupted by user") + except Exception as e: + logger.error(f"โŒ Demo failed: {e}") diff --git a/src/project_x_py/order_manager.py b/src/project_x_py/order_manager.py index 742f2ab..0c61d1a 100644 --- a/src/project_x_py/order_manager.py +++ b/src/project_x_py/order_manager.py @@ -120,6 +120,12 @@ def __init__(self, project_x_client: "ProjectX"): # Order callbacks (tracking is centralized in realtime client) self.order_callbacks: dict[str, list] = defaultdict(list) + # Order-Position relationship tracking for synchronization + self.position_orders: dict[str, dict[str, list[int]]] = defaultdict( + lambda: {"stop_orders": [], "target_orders": [], "entry_orders": []} + ) + self.order_to_position: dict[int, str] = {} # order_id -> contract_id + # Statistics self.stats = { "orders_placed": 0, @@ -690,6 +696,26 @@ def place_bracket_order( ) if bracket_success: + # Track order-position relationships for synchronization + with self.order_lock: + if entry_response.success: + self.position_orders[contract_id]["entry_orders"].append( + entry_response.orderId + ) + self.order_to_position[entry_response.orderId] = contract_id + + if stop_response.success: + self.position_orders[contract_id]["stop_orders"].append( + stop_response.orderId + ) + self.order_to_position[stop_response.orderId] = contract_id + + if target_response.success: + self.position_orders[contract_id]["target_orders"].append( + target_response.orderId + ) + self.order_to_position[target_response.orderId] = contract_id + self.logger.info( f"โœ… Bracket order placed successfully: Entry={entry_response.orderId}, Stop={stop_response.orderId}, Target={target_response.orderId}" ) @@ -1162,7 +1188,15 @@ def add_stop_loss( side = 1 if position.size > 0 else 0 # Sell long, Buy short size = abs(position.size) - return self.place_stop_order(contract_id, side, size, stop_price, account_id) + response = self.place_stop_order( + contract_id, side, size, stop_price, account_id + ) + + # Track the stop loss order for position synchronization + if response and response.success: + self.track_order_for_position(response.orderId, contract_id, "stop") + + return response def add_take_profit( self, contract_id: str, target_price: float, account_id: int | None = None @@ -1197,7 +1231,255 @@ def add_take_profit( side = 1 if position.size > 0 else 0 # Sell long, Buy short size = abs(position.size) - return self.place_limit_order(contract_id, side, size, target_price, account_id) + response = self.place_limit_order( + contract_id, side, size, target_price, account_id + ) + + # Track the take profit order for position synchronization + if response and response.success: + self.track_order_for_position(response.orderId, contract_id, "target") + + return response + + # ================================================================================ + # ORDER-POSITION SYNCHRONIZATION METHODS + # ================================================================================ + + def track_order_for_position( + self, order_id: int, contract_id: str, order_category: str + ): + """ + Track an order as being related to a position. + + Args: + order_id: Order ID to track + contract_id: Contract ID the order relates to + order_category: Category: 'entry', 'stop', or 'target' + """ + with self.order_lock: + if order_category in ["entry", "stop", "target"]: + category_key = f"{order_category}_orders" + self.position_orders[contract_id][category_key].append(order_id) + self.order_to_position[order_id] = contract_id + self.logger.debug( + f"๐Ÿ“Š Tracking {order_category} order {order_id} for position {contract_id}" + ) + + def untrack_order(self, order_id: int): + """ + Remove order from position tracking. + + Args: + order_id: Order ID to untrack + """ + with self.order_lock: + contract_id = self.order_to_position.pop(order_id, None) + if contract_id: + # Remove from all categories + for category in ["entry_orders", "stop_orders", "target_orders"]: + if order_id in self.position_orders[contract_id][category]: + self.position_orders[contract_id][category].remove(order_id) + self.logger.debug( + f"๐Ÿ“Š Untracked order {order_id} from position {contract_id}" + ) + + def get_position_orders(self, contract_id: str) -> dict[str, list[int]]: + """ + Get all orders related to a position. + + Args: + contract_id: Contract ID to get orders for + + Returns: + Dict with lists of order IDs by category + """ + with self.order_lock: + return { + "entry_orders": self.position_orders[contract_id][ + "entry_orders" + ].copy(), + "stop_orders": self.position_orders[contract_id]["stop_orders"].copy(), + "target_orders": self.position_orders[contract_id][ + "target_orders" + ].copy(), + } + + def cancel_position_orders( + self, + contract_id: str, + categories: list[str] | None = None, + account_id: int | None = None, + ) -> dict[str, Any]: + """ + Cancel all orders related to a position. + + Args: + contract_id: Contract ID to cancel orders for + categories: Order categories to cancel ('stop', 'target', 'entry'). All if None. + account_id: Account ID. Uses default account if None. + + Returns: + Dict with cancellation results + """ + if categories is None: + categories = ["stop", "target", "entry"] + + results = { + "total_cancelled": 0, + "failed_cancellations": 0, + "errors": [], + } + + with self.order_lock: + orders_to_cancel = [] + for category in categories: + category_key = f"{category}_orders" + if category_key in self.position_orders[contract_id]: + orders_to_cancel.extend( + self.position_orders[contract_id][category_key] + ) + + for order_id in orders_to_cancel: + try: + if self.cancel_order(order_id, account_id): + results["total_cancelled"] += 1 + self.untrack_order(order_id) + else: + results["failed_cancellations"] += 1 + except Exception as e: + results["failed_cancellations"] += 1 + results["errors"].append(f"Order {order_id}: {e!s}") + + self.logger.info( + f"โœ… Cancelled {results['total_cancelled']} position orders for {contract_id}" + ) + return results + + def update_position_order_sizes( + self, contract_id: str, new_position_size: int, account_id: int | None = None + ) -> dict[str, Any]: + """ + Update stop and target order sizes to match new position size. + + Args: + contract_id: Contract ID of the position + new_position_size: New position size (signed: positive=long, negative=short) + account_id: Account ID. Uses default account if None. + + Returns: + Dict with update results + """ + if new_position_size == 0: + # Position is closed, cancel all related orders + return self.cancel_position_orders(contract_id, account_id=account_id) + + results = { + "orders_updated": 0, + "orders_failed": 0, + "errors": [], + } + + order_size = abs(new_position_size) + position_orders = self.get_position_orders(contract_id) + + # Update stop orders + for order_id in position_orders["stop_orders"]: + try: + if self.modify_order(order_id, size=order_size, account_id=account_id): + results["orders_updated"] += 1 + else: + results["orders_failed"] += 1 + except Exception as e: + results["orders_failed"] += 1 + results["errors"].append(f"Stop order {order_id}: {e!s}") + + # Update target orders + for order_id in position_orders["target_orders"]: + try: + if self.modify_order(order_id, size=order_size, account_id=account_id): + results["orders_updated"] += 1 + else: + results["orders_failed"] += 1 + except Exception as e: + results["orders_failed"] += 1 + results["errors"].append(f"Target order {order_id}: {e!s}") + + self.logger.info( + f"๐Ÿ“Š Updated {results['orders_updated']} orders for position {contract_id} (size: {new_position_size})" + ) + return results + + def sync_orders_with_position( + self, contract_id: str, account_id: int | None = None + ) -> dict[str, Any]: + """ + Synchronize all related orders with current position state. + + Args: + contract_id: Contract ID to synchronize + account_id: Account ID. Uses default account if None. + + Returns: + Dict with synchronization results + """ + # Get current position + positions = self.project_x.search_open_positions(account_id=account_id) + current_position = None + for pos in positions: + if pos.contractId == contract_id: + current_position = pos + break + + if not current_position: + # Position is closed, cancel all related orders + self.logger.info( + f"๐Ÿ“Š Position {contract_id} closed, cancelling related orders" + ) + return self.cancel_position_orders(contract_id, account_id=account_id) + else: + # Position exists, update order sizes + self.logger.info( + f"๐Ÿ“Š Synchronizing orders for position {contract_id} (size: {current_position.size})" + ) + return self.update_position_order_sizes( + contract_id, current_position.size, account_id + ) + + def on_position_changed( + self, + contract_id: str, + old_size: int, + new_size: int, + account_id: int | None = None, + ): + """ + Callback for when a position size changes. + + Args: + contract_id: Contract ID of changed position + old_size: Previous position size + new_size: New position size + account_id: Account ID. Uses default account if None. + """ + self.logger.info(f"๐Ÿ“Š Position {contract_id} changed: {old_size} -> {new_size}") + + if new_size == 0: + # Position closed + self.cancel_position_orders(contract_id, account_id=account_id) + elif abs(new_size) != abs(old_size): + # Position size changed + self.update_position_order_sizes(contract_id, new_size, account_id) + + def on_position_closed(self, contract_id: str, account_id: int | None = None): + """ + Callback for when a position is fully closed. + + Args: + contract_id: Contract ID of closed position + account_id: Account ID. Uses default account if None. + """ + self.logger.info(f"๐Ÿ“Š Position {contract_id} closed, cancelling related orders") + self.cancel_position_orders(contract_id, account_id=account_id) # ================================================================================ # UTILITY METHODS @@ -1294,10 +1576,33 @@ def get_order_statistics(self) -> dict[str, Any]: if self.realtime_client: tracked_orders_count = len(self.realtime_client.tracked_orders) + # Count position-order relationships + total_position_orders = 0 + position_summary = {} + for contract_id, orders in self.position_orders.items(): + entry_count = len(orders["entry_orders"]) + stop_count = len(orders["stop_orders"]) + target_count = len(orders["target_orders"]) + total_count = entry_count + stop_count + target_count + total_position_orders += total_count + + if total_count > 0: + position_summary[contract_id] = { + "entry_orders": entry_count, + "stop_orders": stop_count, + "target_orders": target_count, + "total": total_count, + } + return { "statistics": self.stats.copy(), "realtime_enabled": self._realtime_enabled, "tracked_orders": tracked_orders_count, + "position_order_relationships": { + "total_tracked_orders": total_position_orders, + "positions_with_orders": len(position_summary), + "position_summary": position_summary, + }, "callbacks_registered": { event: len(callbacks) for event, callbacks in self.order_callbacks.items() @@ -1311,5 +1616,7 @@ def cleanup(self): """Clean up resources and connections.""" with self.order_lock: self.order_callbacks.clear() + self.position_orders.clear() + self.order_to_position.clear() self.logger.info("โœ… OrderManager cleanup completed") diff --git a/src/project_x_py/position_manager.py b/src/project_x_py/position_manager.py index cbc61c4..5a5605b 100644 --- a/src/project_x_py/position_manager.py +++ b/src/project_x_py/position_manager.py @@ -46,6 +46,7 @@ if TYPE_CHECKING: from .client import ProjectX + from .order_manager import OrderManager from .realtime import ProjectXRealtimeClient @@ -103,6 +104,10 @@ def __init__(self, project_x_client: "ProjectX"): self.realtime_client: ProjectXRealtimeClient | None = None self._realtime_enabled = False + # Order management integration (optional) + self.order_manager: OrderManager | None = None + self._order_sync_enabled = False + # Position tracking (maintains local state for business logic) self.tracked_positions: dict[str, Position] = {} self.position_history: dict[str, list[dict]] = defaultdict(list) @@ -136,13 +141,16 @@ def __init__(self, project_x_client: "ProjectX"): self.logger.info("PositionManager initialized") def initialize( - self, realtime_client: Optional["ProjectXRealtimeClient"] = None + self, + realtime_client: Optional["ProjectXRealtimeClient"] = None, + order_manager: Optional["OrderManager"] = None, ) -> bool: """ - Initialize the PositionManager with optional real-time capabilities. + Initialize the PositionManager with optional real-time capabilities and order synchronization. Args: realtime_client: Optional ProjectXRealtimeClient for live position tracking + order_manager: Optional OrderManager for automatic order synchronization Returns: bool: True if initialization successful @@ -159,6 +167,14 @@ def initialize( else: self.logger.info("โœ… PositionManager initialized (polling mode)") + # Set up order management integration if provided + if order_manager: + self.order_manager = order_manager + self._order_sync_enabled = True + self.logger.info( + "โœ… PositionManager initialized with order synchronization" + ) + # Load initial positions self.refresh_positions() @@ -215,6 +231,7 @@ def _process_position_data(self, position_data: dict): # Get the old position before updating old_position = self.tracked_positions.get(contract_id) + old_size = old_position.size if old_position else 0 if is_position_closed: # Position is closed - remove from tracking and trigger closure callbacks @@ -223,6 +240,10 @@ def _process_position_data(self, position_data: dict): self.logger.info(f"๐Ÿ“Š Position closed: {contract_id}") self.stats["positions_closed"] += 1 + # Synchronize orders - cancel related orders when position is closed + if self._order_sync_enabled and self.order_manager: + self.order_manager.on_position_closed(contract_id) + # Trigger position_closed callbacks with the closure data self._trigger_callbacks("position_closed", {"data": position_data}) else: @@ -230,14 +251,22 @@ def _process_position_data(self, position_data: dict): position = Position(**position_data) self.tracked_positions[contract_id] = position + # Synchronize orders - update order sizes if position size changed + if ( + self._order_sync_enabled + and self.order_manager + and old_size != position_size + ): + self.order_manager.on_position_changed( + contract_id, old_size, position_size + ) + # Track position history self.position_history[contract_id].append( { "timestamp": datetime.now(), "position": position_data.copy(), - "size_change": 0 - if not old_position - else position.size - old_position.size, + "size_change": position_size - old_size, } ) @@ -876,6 +905,10 @@ def close_position_direct( for contract_id in positions_to_remove: del self.tracked_positions[contract_id] + # Synchronize orders - cancel related orders when position is closed + if self._order_sync_enabled and self.order_manager: + self.order_manager.on_position_closed(contract_id) + self.stats["positions_closed"] += 1 else: error_msg = data.get("errorMessage", "Unknown error") @@ -948,6 +981,13 @@ def partially_close_position( ) # Trigger position refresh to get updated sizes self.refresh_positions(account_id=account_id) + + # Synchronize orders - update order sizes after partial close + if self._order_sync_enabled and self.order_manager: + self.order_manager.sync_orders_with_position( + contract_id, account_id + ) + self.stats["positions_partially_closed"] += 1 else: error_msg = data.get("errorMessage", "Unknown error") @@ -1074,6 +1114,7 @@ def get_position_statistics(self) -> dict[str, Any]: return { "statistics": self.stats.copy(), "realtime_enabled": self._realtime_enabled, + "order_sync_enabled": self._order_sync_enabled, "monitoring_active": self._monitoring_active, "tracked_positions": len(self.tracked_positions), "active_alerts": len( @@ -1147,4 +1188,8 @@ def cleanup(self): self.position_callbacks.clear() self.position_alerts.clear() + # Clear order manager integration + self.order_manager = None + self._order_sync_enabled = False + self.logger.info("โœ… PositionManager cleanup completed")