Skip to content

Commit aa05ca1

Browse files
TexasCodingclaude
andcommitted
feat(order-management): comprehensive order tracking and management improvements
## Key Improvements ### Real-time Order Tracking - Fix WebSocket payload parsing to handle ProjectX Gateway format {'action': X, 'data': {...}} - Add automatic connection and subscription to user updates in OrderManager.initialize() - Implement intelligent cache lookup with retry logic for real-time updates - Fix string/integer type conversion issue in cache lookups ### Custom Tag Management - Fix bracket order custom tag duplicate errors by generating unique timestamp-based tags - Only include customTag in API payload when provided (not null/empty) - Prevent "already in use" errors with automatic unique tag generation ### Enhanced Cleanup & Safety - Implement comprehensive cleanup that cancels ALL open orders (not just demo orders) - Add automatic position closing with market orders during cleanup - Improve error handling and logging throughout cleanup process - Provide detailed status reporting of cleanup operations ### Improved User Experience - Add intelligent fallback from real-time cache to API for order status - Enhance monitoring with detailed order and position status display - Improve status messages to show data source (cache vs API) - Add comprehensive order statistics and tracking information ### Performance Optimizations - Real-time WebSocket order tracking instead of polling APIs - Built-in retry logic for cache population timing - Efficient payload extraction and processing - Reduced API calls through intelligent caching ## Technical Details - All orders now properly tracked in real-time cache with status updates - WebSocket connection automatically established and maintained - Bracket orders work reliably without custom tag conflicts - Enhanced cleanup ensures no orphaned orders or positions - Comprehensive error handling with graceful fallbacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 9babaf2 commit aa05ca1

File tree

2 files changed

+162
-35
lines changed

2 files changed

+162
-35
lines changed

examples/02_order_management.py

Lines changed: 95 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,19 @@ def show_order_status(order_manager, order_id: int, description: str):
4949
"""Show detailed order status information."""
5050
print(f"\n📋 {description} Status:")
5151

52-
# Check if order is tracked
53-
order_data = order_manager.get_tracked_order_status(order_id)
52+
# Check if order is tracked in real-time cache (with built-in wait)
53+
order_data = order_manager.get_tracked_order_status(
54+
str(order_id), wait_for_cache=True
55+
)
56+
5457
if order_data:
5558
status_map = {1: "Open", 2: "Filled", 3: "Cancelled", 4: "Partially Filled"}
5659
status = status_map.get(
5760
order_data.get("status", 0), f"Unknown ({order_data.get('status')})"
5861
)
5962

6063
print(f" Order ID: {order_id}")
61-
print(f" Status: {status}")
64+
print(f" Status: {status} (from real-time cache)")
6265
print(f" Side: {'BUY' if order_data.get('side') == 0 else 'SELL'}")
6366
print(f" Size: {order_data.get('size', 0)}")
6467
print(f" Fill Volume: {order_data.get('fillVolume', 0)}")
@@ -70,7 +73,18 @@ def show_order_status(order_manager, order_id: int, description: str):
7073
if order_data.get("filledPrice"):
7174
print(f" Filled Price: ${order_data['filledPrice']:.2f}")
7275
else:
73-
print(f" Order {order_id} not found in tracking cache")
76+
# Fall back to API check for status
77+
print(f" Order {order_id} not in real-time cache, checking API...")
78+
api_order = order_manager.get_order_by_id(order_id)
79+
if api_order:
80+
status_map = {1: "Open", 2: "Filled", 3: "Cancelled", 4: "Partially Filled"}
81+
status = status_map.get(api_order.status, f"Unknown ({api_order.status})")
82+
print(f" Status: {status} (from API)")
83+
print(f" Side: {'BUY' if api_order.side == 0 else 'SELL'}")
84+
print(f" Size: {api_order.size}")
85+
print(f" Fill Volume: {api_order.fillVolume}")
86+
else:
87+
print(f" Order {order_id} not found in API either")
7488

7589
# Check if filled
7690
is_filled = order_manager.is_order_filled(order_id)
@@ -327,6 +341,7 @@ def main():
327341
for i in range(6): # 30 seconds, check every 5 seconds
328342
print(f"\n⏰ Check {i + 1}/6...")
329343

344+
# Check for filled orders and positions
330345
filled_orders = []
331346
for order_id in demo_orders:
332347
if order_manager.is_order_filled(order_id):
@@ -341,14 +356,41 @@ def main():
341356
else:
342357
print("📋 No orders filled yet")
343358

359+
# Check current positions (to detect fills that weren't caught)
360+
current_positions = client.search_open_positions()
361+
if current_positions:
362+
print(f"📊 Open positions: {len(current_positions)}")
363+
for pos in current_positions:
364+
side = "LONG" if pos.type == 1 else "SHORT"
365+
print(
366+
f" {pos.contractId}: {side} {pos.size} @ ${pos.averagePrice:.2f}"
367+
)
368+
344369
# Show current open orders
345370
open_orders = order_manager.search_open_orders(
346371
contract_id=contract_id
347372
)
348373
print(f"📊 Open orders: {len(open_orders)}")
374+
if open_orders:
375+
for order in open_orders:
376+
side = "BUY" if order.side == 0 else "SELL"
377+
order_type = {1: "LIMIT", 2: "MARKET", 4: "STOP"}.get(
378+
order.type, f"TYPE_{order.type}"
379+
)
380+
status = {1: "OPEN", 2: "FILLED", 3: "CANCELLED"}.get(
381+
order.status, f"STATUS_{order.status}"
382+
)
383+
price = ""
384+
if hasattr(order, "limitPrice") and order.limitPrice:
385+
price = f" @ ${order.limitPrice:.2f}"
386+
elif hasattr(order, "stopPrice") and order.stopPrice:
387+
price = f" @ ${order.stopPrice:.2f}"
388+
print(
389+
f" Order #{order.id}: {side} {order.size} {order_type}{price} - {status}"
390+
)
349391

350392
if i < 5: # Don't sleep on last iteration
351-
time.sleep(5)
393+
time.sleep(20)
352394

353395
# Show final order statistics
354396
print("\n" + "=" * 50)
@@ -365,31 +407,61 @@ def main():
365407
print(f" Real-time Enabled: {stats['realtime_enabled']}")
366408

367409
finally:
368-
# Cleanup: Cancel remaining demo orders
369-
if demo_orders:
370-
print("\n" + "=" * 50)
371-
print("🧹 CLEANUP - CANCELLING ORDERS")
372-
print("=" * 50)
410+
# Enhanced cleanup: Cancel ALL orders and close ALL positions
411+
print("\n" + "=" * 50)
412+
print("🧹 ENHANCED CLEANUP - ORDERS & POSITIONS")
413+
print("=" * 50)
373414

374-
print("Cancelling all demo orders for safety...")
415+
try:
416+
# First, get ALL open orders (not just demo orders)
417+
all_orders = order_manager.search_open_orders()
418+
print(f"Found {len(all_orders)} total open orders")
419+
420+
# Cancel all orders
375421
cancelled_count = 0
422+
for order in all_orders:
423+
try:
424+
if order_manager.cancel_order(order.id):
425+
print(f"✅ Cancelled order #{order.id}")
426+
cancelled_count += 1
427+
else:
428+
print(f"❌ Failed to cancel order #{order.id}")
429+
except Exception as e:
430+
print(f"❌ Error cancelling order #{order.id}: {e}")
431+
432+
# Check for positions and close them
433+
positions = client.search_open_positions()
434+
print(f"Found {len(positions)} open positions")
376435

377-
for order_id in demo_orders:
436+
closed_count = 0
437+
for position in positions:
378438
try:
379-
# Check if order is still open before trying to cancel
380-
order_data = order_manager.get_tracked_order_status(order_id)
381-
if order_data and order_data.get("status") == 1: # Open
382-
if order_manager.cancel_order(order_id):
383-
print(f"✅ Cancelled order #{order_id}")
384-
cancelled_count += 1
385-
else:
386-
print(f"❌ Failed to cancel order #{order_id}")
439+
side_text = "LONG" if position.type == 1 else "SHORT"
440+
print(
441+
f"Closing {side_text} position: {position.contractId} ({position.size} contracts)"
442+
)
443+
444+
response = order_manager.close_position(
445+
position.contractId, method="market"
446+
)
447+
448+
if response and response.success:
449+
print(
450+
f"✅ Closed position {position.contractId} (Order #{response.orderId})"
451+
)
452+
closed_count += 1
387453
else:
388-
print(f"i Order #{order_id} already closed/filled")
454+
print(f"❌ Failed to close position {position.contractId}")
389455
except Exception as e:
390-
print(f"❌ Error cancelling order #{order_id}: {e}")
456+
print(f"❌ Error closing position {position.contractId}: {e}")
457+
458+
print("\n📊 Cleanup completed:")
459+
print(f" Orders cancelled: {cancelled_count}")
460+
print(f" Positions closed: {closed_count}")
391461

392-
print(f"\n📊 Cleanup completed: {cancelled_count} orders cancelled")
462+
except Exception as e:
463+
print(f"❌ Cleanup error: {e}")
464+
print("⚠️ Manual cleanup may be required")
393465

394466
# Final status check
395467
print("\n" + "=" * 50)

src/project_x_py/order_manager.py

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import json
3232
import logging
33+
import time
3334
from collections import defaultdict
3435
from datetime import datetime
3536
from decimal import ROUND_HALF_UP, Decimal
@@ -158,6 +159,21 @@ def initialize(
158159
if realtime_client:
159160
self.realtime_client = realtime_client
160161
self._setup_realtime_callbacks()
162+
163+
# Connect and subscribe to user updates for order tracking
164+
if not realtime_client.user_connected:
165+
if realtime_client.connect():
166+
self.logger.info("🔌 Real-time client connected")
167+
else:
168+
self.logger.warning("⚠️ Real-time client connection failed")
169+
return False
170+
171+
# Subscribe to user updates to receive order events
172+
if realtime_client.subscribe_user_updates():
173+
self.logger.info("📡 Subscribed to user order updates")
174+
else:
175+
self.logger.warning("⚠️ Failed to subscribe to user updates")
176+
161177
self._realtime_enabled = True
162178
self.logger.info(
163179
"✅ OrderManager initialized with real-time capabilities"
@@ -187,19 +203,32 @@ def _on_order_update(self, data: dict):
187203
"""Handle real-time order updates and detect fills/cancellations."""
188204
try:
189205
with self.order_lock:
190-
# According to ProjectX docs, the payload is the order data directly
191-
# Handle both single order and list of orders
206+
# Handle ProjectX Gateway payload format: {'action': X, 'data': order_data}
192207
if isinstance(data, list):
193-
for order_data in data:
194-
self._process_order_data(order_data)
208+
for item in data:
209+
self._extract_and_process_order_data(item)
195210
elif isinstance(data, dict):
196-
self._process_order_data(data)
211+
self._extract_and_process_order_data(data)
197212

198213
# Note: No duplicate callback triggering - realtime client handles this
199214

200215
except Exception as e:
201216
self.logger.error(f"Error processing order update: {e}")
202217

218+
def _extract_and_process_order_data(self, payload):
219+
"""Extract order data from ProjectX Gateway payload format."""
220+
try:
221+
# Handle ProjectX Gateway format: {'action': X, 'data': {...}}
222+
if isinstance(payload, dict) and "data" in payload:
223+
order_data = payload["data"]
224+
self._process_order_data(order_data)
225+
else:
226+
# Direct order data (fallback)
227+
self._process_order_data(payload)
228+
except Exception as e:
229+
self.logger.error(f"Error extracting order data from payload: {e}")
230+
self.logger.debug(f"Payload that caused error: {payload}")
231+
203232
def _validate_order_payload(self, order_data: dict) -> bool:
204233
"""
205234
Validate that order payload matches ProjectX GatewayUserOrder format.
@@ -372,7 +401,9 @@ def add_callback(self, event_type: str, callback):
372401
# REALTIME ORDER TRACKING METHODS (for optimization)
373402
# ================================================================================
374403

375-
def get_tracked_order_status(self, order_id: str) -> dict[str, Any] | None:
404+
def get_tracked_order_status(
405+
self, order_id: str, wait_for_cache: bool = False
406+
) -> dict[str, Any] | None:
376407
"""
377408
Get cached order status from real-time tracking for faster access.
378409
@@ -381,6 +412,7 @@ def get_tracked_order_status(self, order_id: str) -> dict[str, Any] | None:
381412
382413
Args:
383414
order_id: Order ID to get status for (as string)
415+
wait_for_cache: If True, briefly wait for real-time cache to populate
384416
385417
Returns:
386418
dict: Complete order data if tracked in cache, None if not found
@@ -398,6 +430,17 @@ def get_tracked_order_status(self, order_id: str) -> dict[str, Any] | None:
398430
>>> else:
399431
... print("Order not found in cache")
400432
"""
433+
if wait_for_cache and self._realtime_enabled:
434+
# Brief wait for real-time cache to populate
435+
for attempt in range(3):
436+
with self.order_lock:
437+
order_data = self.tracked_orders.get(order_id)
438+
if order_data:
439+
return order_data
440+
441+
if attempt < 2: # Don't sleep on last attempt
442+
time.sleep(0.3) # Brief wait for real-time update
443+
401444
with self.order_lock:
402445
return self.tracked_orders.get(order_id)
403446

@@ -424,12 +467,16 @@ def is_order_filled(self, order_id: str | int) -> bool:
424467
"""
425468
order_id_str = str(order_id)
426469

427-
# Try cached data first (realtime optimization)
470+
# Try cached data first with brief retry for real-time updates
428471
if self._realtime_enabled:
429-
with self.order_lock:
430-
status = self.order_status_cache.get(order_id_str)
431-
if status is not None:
432-
return status == 2 # 2 = Filled
472+
for attempt in range(3): # Try 3 times with small delays
473+
with self.order_lock:
474+
status = self.order_status_cache.get(order_id_str)
475+
if status is not None:
476+
return status == 2 # 2 = Filled
477+
478+
if attempt < 2: # Don't sleep on last attempt
479+
time.sleep(0.2) # Brief wait for real-time update
433480

434481
# Fallback to API check
435482
order = self.get_order_by_id(int(order_id))
@@ -641,10 +688,13 @@ def place_order(
641688
"limitPrice": aligned_limit_price,
642689
"stopPrice": aligned_stop_price,
643690
"trailPrice": aligned_trail_price,
644-
"customTag": custom_tag,
645691
"linkedOrderId": linked_order_id,
646692
}
647693

694+
# Only include customTag if it's provided and not None/empty
695+
if custom_tag:
696+
payload["customTag"] = custom_tag
697+
648698
# 🔍 DEBUG: Log order parameters to diagnose placement issues
649699
self.logger.debug(f"🔍 Order Placement Request: {payload}")
650700

@@ -822,6 +872,11 @@ def place_bracket_order(
822872
entry_price, stop_loss_price, take_profit_price, contract_id, side
823873
)
824874

875+
# Generate unique custom tag if none provided to avoid "already in use" errors
876+
if custom_tag is None:
877+
timestamp = int(time.time() * 1000) # milliseconds for uniqueness
878+
custom_tag = f"bracket_{timestamp}"
879+
825880
entry_response = self._place_entry_order(
826881
contract_id,
827882
side,

0 commit comments

Comments
 (0)