Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 188 additions & 25 deletions backend_api_python/app/services/pending_order_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, poll_interval_sec: float = 1.0, batch_size: int = 50):
self._position_sync_enabled = os.getenv("POSITION_SYNC_ENABLED", "true").lower() == "true"
self._position_sync_interval_sec = float(os.getenv("POSITION_SYNC_INTERVAL_SEC", "10"))
self._last_position_sync_ts = 0.0
logger.info(f"PendingOrderWorker: sync_enabled={self._position_sync_enabled}, interval={self._position_sync_interval_sec}s")

def start(self) -> bool:
with self._lock:
Expand Down Expand Up @@ -95,7 +96,9 @@ def _run_loop(self) -> None:
time.sleep(self.poll_interval_sec)

def _tick(self) -> None:
# logger.info(f"[PendingOrderWorker] _tick start. last_sync={self._last_position_sync_ts}")
orders = self._fetch_pending_orders(limit=self.batch_size)
# logger.info(f"[PendingOrderWorker] orders fetched: {len(orders)}")
if not orders:
self._maybe_sync_positions()
return
Expand Down Expand Up @@ -124,29 +127,38 @@ def _maybe_sync_positions(self) -> None:
return
if now - float(self._last_position_sync_ts or 0.0) < float(self._position_sync_interval_sec):
return
logger.debug(f"[PendingOrderWorker] Triggering sync... (now={now}, last={self._last_position_sync_ts})")
self._last_position_sync_ts = now
try:
self._sync_positions_best_effort()
except Exception as e:
logger.info(f"position sync skipped/failed: {e}")
logger.debug(f"position sync skipped/failed: {e}")

def _sync_positions_best_effort(self) -> None:
def _sync_positions_best_effort(self, target_strategy_id: Optional[int] = None) -> None:
"""
Best-effort reconciliation:
- If exchange position is flat, delete local row from qd_strategy_positions.
- If exchange position size differs, update local size (optional best-effort).

This prevents "ghost positions" when positions are closed externally on the exchange.
"""
# 1) Load local positions
# 1) Load local positions (filtered if target_strategy_id provided)
logger.debug(f"[PositionSync] Entering _sync_positions_best_effort for target={target_strategy_id}")
with get_db_connection() as db:
cur = db.cursor()
cur.execute("SELECT id, strategy_id, symbol, side, size, entry_price FROM qd_strategy_positions ORDER BY updated_at DESC")
if target_strategy_id:
cur.execute(
"SELECT id, strategy_id, symbol, side, size, entry_price FROM qd_strategy_positions WHERE strategy_id = %s ORDER BY updated_at DESC",
(int(target_strategy_id),)
)
else:
cur.execute("SELECT id, strategy_id, symbol, side, size, entry_price FROM qd_strategy_positions ORDER BY updated_at DESC")
rows = cur.fetchall() or []
cur.close()

if not rows:
return
# [Defect Fix] Removed early return to allow syncing active strategies even if local DB is empty.
# if not rows and not target_strategy_id:
# return

# Group by strategy_id for efficient exchange queries.
sid_to_rows: Dict[int, List[Dict[str, Any]]] = {}
Expand All @@ -155,12 +167,45 @@ def _sync_positions_best_effort(self) -> None:
if sid <= 0:
continue
sid_to_rows.setdefault(sid, []).append(r)

# If targeted sync but no local rows found, we assume user might have opened position externally
# but DB is empty. However, without knowing *which* symbol to check, we can't easily auto-discover
# unless we fetch ALL positions from exchange for that strategy.
# But `load_strategy_configs(sid)` gives us the exchange keys.
# So if target_strategy_id is set but `sid_to_rows` is empty, we SHOULD explicitly add it to `sid_to_rows`
# so logic below enters and calls `client.get_positions()`.
if target_strategy_id and target_strategy_id not in sid_to_rows:
sid_to_rows[target_strategy_id] = []

# [Log Fix] Load all ACTIVE LIVE strategies to ensure we sync/log them even if local DB is empty.
# Otherwise, if we have no local positions, we would silently skip the exchange check.
try:
with get_db_connection() as db:
cur = db.cursor()
# Fetch all strategies configured for LIVE execution
cur.execute("SELECT id FROM qd_strategies_trading WHERE status = 'running' AND execution_mode = 'live'")
active_rows = cur.fetchall() or []
cur.close()

logger.debug(f"[PositionSync] Found {len(active_rows)} active live strategies in DB.")
for _ar in active_rows:
_sid = int(_ar.get("id") or 0)
if _sid > 0 and _sid not in sid_to_rows:
if target_strategy_id and target_strategy_id != _sid:
continue
sid_to_rows[_sid] = []
except Exception as e:
logger.error(f"Failed to load active strategies for sync: {e}", exc_info=True)

# 2) Reconcile per strategy
for sid, plist in sid_to_rows.items():
if target_strategy_id and sid != target_strategy_id:
continue
try:
sc = load_strategy_configs(int(sid))
if (sc.get("execution_mode") or "").strip().lower() != "live":
exec_mode = (sc.get("execution_mode") or "").strip().lower()
if exec_mode != "live":
logger.debug(f"[PositionSync] Strategy {sid} skipped: execution_mode='{exec_mode}' (needs 'live')")
continue
exchange_config = resolve_exchange_config(sc.get("exchange_config") or {})
safe_cfg = safe_exchange_config_for_log(exchange_config)
Expand All @@ -169,20 +214,36 @@ def _sync_positions_best_effort(self) -> None:
if market_type in ("futures", "future", "perp", "perpetual"):
market_type = "swap"

client = create_client(exchange_config, market_type=market_type)
# Lazy import MT5 here to allow elif chain later
global MT5Client
if MT5Client is None:
try:
from app.services.mt5_trading import MT5Client as _MT5Client
MT5Client = _MT5Client
except ImportError:
pass

client = create_client(exchange_config, market_type=market_type)

# Build an "exchange snapshot" per symbol+side
exch_size: Dict[str, Dict[str, float]] = {} # {symbol: {long: size, short: size}}
exch_entry_price: Dict[str, Dict[str, float]] = {} # {symbol: {long: px, short: px}}

if isinstance(client, BinanceFuturesClient) and market_type == "swap":
all_pos = client.get_positions() or []
# Handle dict response if needed (wrapper)
if isinstance(all_pos, dict) and "raw" in all_pos:
all_pos = all_pos["raw"]

if isinstance(all_pos, list):
for p in all_pos:
sym = str(p.get("symbol") or "").strip().upper()
try:
amt = float(p.get("positionAmt") or 0.0)
ep = float(p.get("entryPrice") or 0.0)
except Exception:
amt = 0.0
ep = 0.0
if not sym or abs(amt) <= 0:
continue
# Map to our symbol format: BTCUSDT -> BTC/USDT (best-effort)
Expand All @@ -191,6 +252,7 @@ def _sync_positions_best_effort(self) -> None:
hb_sym = f"{hb_sym[:-4]}/USDT"
side = "long" if amt > 0 else "short"
exch_size.setdefault(hb_sym, {"long": 0.0, "short": 0.0})[side] = abs(float(amt))
exch_entry_price.setdefault(hb_sym, {"long": 0.0, "short": 0.0})[side] = abs(float(ep))

elif isinstance(client, OkxClient) and market_type == "swap":
resp = client.get_positions()
Expand Down Expand Up @@ -351,15 +413,7 @@ def _sync_positions_best_effort(self) -> None:
except Exception:
continue

# Check for MT5 client (forex)
global MT5Client
if MT5Client is None:
try:
from app.services.mt5_trading import MT5Client as _MT5Client
MT5Client = _MT5Client
except ImportError:
pass
if MT5Client is not None and isinstance(client, MT5Client):
elif MT5Client is not None and isinstance(client, MT5Client):
# MT5 forex positions
positions = client.get_positions()
if isinstance(positions, list):
Expand All @@ -383,6 +437,22 @@ def _sync_positions_best_effort(self) -> None:
logger.debug(f"position sync: skip unsupported market/client: sid={sid}, cfg={safe_cfg}, market_type={market_type}, client={type(client)}")
continue

# [DEBUG] Log all normalized exchange keys for inspection
logger.debug(f"[PositionSync] Strategy {sid} Exchange Keys: {list(exch_size.keys())}")

# [Log Optimization] Always log current positions every sync cycle (10s)
pos_summary_parts = []
for _sym, _sides in exch_size.items():
for _side_key, _qty in _sides.items():
if _qty > 0:
_ep = exch_entry_price.get(_sym, {}).get(_side_key, 0.0)
pos_summary_parts.append(f"{_sym} {_side_key} size={_qty} entry={_ep}")

if pos_summary_parts:
logger.info(f"[PositionSync] Strategy {sid} ({safe_cfg.get('exchange_id', 'unknown')}) positions: {'; '.join(pos_summary_parts)}")
else:
logger.info(f"[PositionSync] Strategy {sid} ({safe_cfg.get('exchange_id', 'unknown')}) has NO positions on exchange.")

# 3) Apply reconciliation to local rows.
to_delete_ids: List[int] = []
to_update: List[Dict[str, Any]] = []
Expand All @@ -402,30 +472,79 @@ def _sync_positions_best_effort(self) -> None:
exch = exch_size.get(sym) or {}
exch_qty = float(exch.get(side) or 0.0)

# Lookup entry price
exch_ep_map = exch_entry_price.get(sym) or {}
exch_price = float(exch_ep_map.get(side) or 0.0)

try:
local_price = float(r.get("entry_price") or 0.0)
except Exception:
local_price = 0.0
logger.debug(f"[PositionSync] Check ID={rid} {sym} {side}: local_sz={local_size} px={local_price}, exch_sz={exch_qty} px={exch_price}")

if exch_qty <= eps:
# Exchange is flat -> delete local position (self-heal).
to_delete_ids.append(rid)
else:
# Update local size if it diverged materially (best-effort).
if local_size <= 0 or abs(exch_qty - local_size) / max(1.0, local_size) > 0.01:
to_update.append({"id": rid, "size": exch_qty})

if not to_delete_ids and not to_update:
# Update local size if it diverged materially (best-effort), OR if entry_price changed significantly (>0.5% diff)
# or if local_price is 0 (first sync)
price_diff_ratio = 0.0
if local_price > 0:
price_diff_ratio = abs(exch_price - local_price) / local_price
else:
price_diff_ratio = 1.0 if exch_price > 0 else 0.0

if (local_size <= 0 or abs(exch_qty - local_size) / max(1.0, local_size) > 0.01) or (price_diff_ratio > 0.005):
logger.info(f"[PositionSync] -> Flagged for UPDATE: {sym} (local_sz={local_size}->{exch_qty}, px={local_price}->{exch_price})")
to_update.append({"id": rid, "size": exch_qty, "entry_price": exch_price})

# [New Feature] Detect positions that exist on exchange but not in local DB, and insert them.
to_insert: List[Dict[str, Any]] = []
local_symbols_sides = {(str(r.get("symbol") or "").strip(), str(r.get("side") or "").strip().lower()) for r in plist}

for _sym, _sides_map in exch_size.items():
for _side, _qty in _sides_map.items():
if _qty > 1e-12 and (_sym, _side) not in local_symbols_sides:
# Exchange has this position but local DB does not
_ep = exch_entry_price.get(_sym, {}).get(_side, 0.0)
to_insert.append({
"strategy_id": sid,
"symbol": _sym,
"side": _side,
"size": _qty,
"entry_price": _ep
})
logger.info(f"[PositionSync] -> Flagged for INSERT: {_sym} {_side} size={_qty} entry={_ep}")

if not to_delete_ids and not to_update and not to_insert:
continue

with get_db_connection() as db:
cur = db.cursor()
for rid in to_delete_ids:
cur.execute("DELETE FROM qd_strategy_positions WHERE id = %s", (int(rid),))
for u in to_update:
cur.execute("UPDATE qd_strategy_positions SET size = %s, updated_at = NOW() WHERE id = %s", (float(u["size"]), int(u["id"])))
cur.execute(
"UPDATE qd_strategy_positions SET size = %s, entry_price = %s, updated_at = NOW() WHERE id = %s",
(float(u["size"]), float(u["entry_price"]), int(u["id"]))
)
for ins in to_insert:
cur.execute(
"""INSERT INTO qd_strategy_positions (user_id, strategy_id, symbol, side, size, entry_price, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW())""",
(1, int(ins["strategy_id"]), str(ins["symbol"]), str(ins["side"]), float(ins["size"]), float(ins["entry_price"]))
)
db.commit()
cur.close()

if to_delete_ids:
logger.info(f"position sync: removed {len(to_delete_ids)} ghost positions for strategy_id={sid}")
logger.debug(f"position sync: removed {len(to_delete_ids)} ghost positions for strategy_id={sid}")
if to_update:
logger.debug(f"position sync: updated {len(to_update)} positions for strategy_id={sid}")
if to_insert:
logger.debug(f"position sync: inserted {len(to_insert)} new positions for strategy_id={sid}")
except Exception as e:
logger.info(f"position sync: strategy_id={sid} failed: {e}")
logger.error(f"position sync: strategy_id={sid} failed: {e}", exc_info=True)

def _fetch_pending_orders(self, limit: int = 50) -> List[Dict[str, Any]]:
try:
Expand Down Expand Up @@ -876,6 +995,50 @@ def _signal_to_side_pos_reduce(sig_type: str):
if leverage <= 0:
leverage = 1.0

# [FEATURE] Sync positions before execution to ensure size is checking against reality
# The user requested to sync before EVERY live order to prevent mismatch.
try:
logger.info(f"[Sync] Triggering pre-execution sync for strategy {strategy_id} before order {order_id}")
self._sync_positions_best_effort(target_strategy_id=strategy_id)
except Exception as e:
logger.warning(f"Pre-execution sync failed: {e}")

# [FEATURE] Auto-correct amount for Close/Reduce signals if we hold less than requested
if reduce_only:
try:
with get_db_connection() as db:
cur = db.cursor()
# We need to find the specific position.
# Symbol stored in DB is normalized (e.g. BTC/USDT).
# The payload 'symbol' might be 'BTCUSDT' or 'BTC/USDT'.
# We try to match what stored in DB.
# Best effort: try exact match, then normalized.
qry_sym = str(symbol or "").strip().upper()
# Mapping logic similar to _sync:
if qry_sym.endswith("USDT") and "/" not in qry_sym:
qry_sym = f"{qry_sym[:-4]}/USDT"

cur.execute(
"SELECT size FROM qd_strategy_positions WHERE strategy_id = %s AND symbol = %s AND side = %s",
(strategy_id, qry_sym, pos_side)
)
row = cur.fetchone()
cur.close()

if row:
held_size = float(row["size"] or 0.0)
if amount > held_size:
logger.warning(f"[RiskControl] Adjusting Close amount from {amount} to {held_size} (Held) for {symbol}")
amount = held_size
else:
# No position found in DB?
# If reduce_only, and no position, maybe it's 0.
logger.warning(f"[RiskControl] Close signal for {symbol} but NO position found in DB. Setting amount=0.")
amount = 0.0
except Exception as e:
logger.error(f"[RiskControl] Failed to check DB position logic: {e}")


# Collect raw exchange interactions / intermediate states for debugging & persistence.
phases: Dict[str, Any] = {}

Expand Down
Loading