Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 113 additions & 18 deletions backend_api_python/app/services/pending_order_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,29 @@ def _maybe_sync_positions(self) -> None:
except Exception as e:
logger.info(f"position sync skipped/failed: {e}")

def _sync_positions_best_effort(self) -> None:
def _sync_positions_best_effort(self, target_strategy_id: Optional[int] = None) -> None:
"""
Best-effort reconciliation:
- If exchange position is flat, delete local row from qd_strategy_positions.
- If exchange position size differs, update local size (optional best-effort).

This prevents "ghost positions" when positions are closed externally on the exchange.
"""
# 1) Load local positions
# 1) Load local positions (filtered if target_strategy_id provided)
with get_db_connection() as db:
cur = db.cursor()
cur.execute("SELECT id, strategy_id, symbol, side, size, entry_price FROM qd_strategy_positions ORDER BY updated_at DESC")
if target_strategy_id:
cur.execute(
"SELECT id, strategy_id, symbol, side, size, entry_price FROM qd_strategy_positions WHERE strategy_id = %s ORDER BY updated_at DESC",
(int(target_strategy_id),)
)
else:
cur.execute("SELECT id, strategy_id, symbol, side, size, entry_price FROM qd_strategy_positions ORDER BY updated_at DESC")
rows = cur.fetchall() or []
cur.close()

if not rows:
if not rows and not target_strategy_id:
# If global sync and no local positions, do nothing.
return

# Group by strategy_id for efficient exchange queries.
Expand All @@ -155,9 +162,20 @@ def _sync_positions_best_effort(self) -> None:
if sid <= 0:
continue
sid_to_rows.setdefault(sid, []).append(r)

# If targeted sync but no local rows found, we assume user might have opened position externally
# but DB is empty. However, without knowing *which* symbol to check, we can't easily auto-discover
# unless we fetch ALL positions from exchange for that strategy.
# But `load_strategy_configs(sid)` gives us the exchange keys.
# So if target_strategy_id is set but `sid_to_rows` is empty, we SHOULD explicitly add it to `sid_to_rows`
# so logic below enters and calls `client.get_positions()`.
if target_strategy_id and target_strategy_id not in sid_to_rows:
sid_to_rows[target_strategy_id] = []

# 2) Reconcile per strategy
for sid, plist in sid_to_rows.items():
if target_strategy_id and sid != target_strategy_id:
continue
try:
sc = load_strategy_configs(int(sid))
if (sc.get("execution_mode") or "").strip().lower() != "live":
Expand All @@ -169,20 +187,36 @@ def _sync_positions_best_effort(self) -> None:
if market_type in ("futures", "future", "perp", "perpetual"):
market_type = "swap"

client = create_client(exchange_config, market_type=market_type)
# Lazy import MT5 here to allow elif chain later
global MT5Client
if MT5Client is None:
try:
from app.services.mt5_trading import MT5Client as _MT5Client
MT5Client = _MT5Client
except ImportError:
pass

client = create_client(exchange_config, market_type=market_type)

# Build an "exchange snapshot" per symbol+side
exch_size: Dict[str, Dict[str, float]] = {} # {symbol: {long: size, short: size}}
exch_entry_price: Dict[str, Dict[str, float]] = {} # {symbol: {long: px, short: px}}

if isinstance(client, BinanceFuturesClient) and market_type == "swap":
all_pos = client.get_positions() or []
# Handle dict response if needed (wrapper)
if isinstance(all_pos, dict) and "raw" in all_pos:
all_pos = all_pos["raw"]

if isinstance(all_pos, list):
for p in all_pos:
sym = str(p.get("symbol") or "").strip().upper()
try:
amt = float(p.get("positionAmt") or 0.0)
ep = float(p.get("entryPrice") or 0.0)
except Exception:
amt = 0.0
ep = 0.0
if not sym or abs(amt) <= 0:
continue
# Map to our symbol format: BTCUSDT -> BTC/USDT (best-effort)
Expand All @@ -191,6 +225,7 @@ def _sync_positions_best_effort(self) -> None:
hb_sym = f"{hb_sym[:-4]}/USDT"
side = "long" if amt > 0 else "short"
exch_size.setdefault(hb_sym, {"long": 0.0, "short": 0.0})[side] = abs(float(amt))
exch_entry_price.setdefault(hb_sym, {"long": 0.0, "short": 0.0})[side] = abs(float(ep))

elif isinstance(client, OkxClient) and market_type == "swap":
resp = client.get_positions()
Expand Down Expand Up @@ -351,15 +386,7 @@ def _sync_positions_best_effort(self) -> None:
except Exception:
continue

# Check for MT5 client (forex)
global MT5Client
if MT5Client is None:
try:
from app.services.mt5_trading import MT5Client as _MT5Client
MT5Client = _MT5Client
except ImportError:
pass
if MT5Client is not None and isinstance(client, MT5Client):
elif MT5Client is not None and isinstance(client, MT5Client):
# MT5 forex positions
positions = client.get_positions()
if isinstance(positions, list):
Expand All @@ -383,6 +410,9 @@ def _sync_positions_best_effort(self) -> None:
logger.debug(f"position sync: skip unsupported market/client: sid={sid}, cfg={safe_cfg}, market_type={market_type}, client={type(client)}")
continue

# [DEBUG] Log all normalized exchange keys for inspection
logger.debug(f"[PositionSync] Strategy {sid} Exchange Keys: {list(exch_size.keys())}")

# 3) Apply reconciliation to local rows.
to_delete_ids: List[int] = []
to_update: List[Dict[str, Any]] = []
Expand All @@ -402,13 +432,31 @@ def _sync_positions_best_effort(self) -> None:
exch = exch_size.get(sym) or {}
exch_qty = float(exch.get(side) or 0.0)

# Lookup entry price
exch_ep_map = exch_entry_price.get(sym) or {}
exch_price = float(exch_ep_map.get(side) or 0.0)

try:
local_price = float(r.get("entry_price") or 0.0)
except Exception:
local_price = 0.0
logger.debug(f"[PositionSync] Check ID={rid} {sym} {side}: local_sz={local_size} px={local_price}, exch_sz={exch_qty} px={exch_price}")

if exch_qty <= eps:
# Exchange is flat -> delete local position (self-heal).
to_delete_ids.append(rid)
else:
# Update local size if it diverged materially (best-effort).
if local_size <= 0 or abs(exch_qty - local_size) / max(1.0, local_size) > 0.01:
to_update.append({"id": rid, "size": exch_qty})
# Update local size if it diverged materially (best-effort), OR if entry_price changed significantly (>0.5% diff)
# or if local_price is 0 (first sync)
price_diff_ratio = 0.0
if local_price > 0:
price_diff_ratio = abs(exch_price - local_price) / local_price
else:
price_diff_ratio = 1.0 if exch_price > 0 else 0.0

if (local_size <= 0 or abs(exch_qty - local_size) / max(1.0, local_size) > 0.01) or (price_diff_ratio > 0.005):
logger.info(f"[PositionSync] -> Flagged for UPDATE: {sym} (local_sz={local_size}->{exch_qty}, px={local_price}->{exch_price})")
to_update.append({"id": rid, "size": exch_qty, "entry_price": exch_price})

if not to_delete_ids and not to_update:
continue
Expand All @@ -418,7 +466,10 @@ def _sync_positions_best_effort(self) -> None:
for rid in to_delete_ids:
cur.execute("DELETE FROM qd_strategy_positions WHERE id = %s", (int(rid),))
for u in to_update:
cur.execute("UPDATE qd_strategy_positions SET size = %s, updated_at = NOW() WHERE id = %s", (float(u["size"]), int(u["id"])))
cur.execute(
"UPDATE qd_strategy_positions SET size = %s, entry_price = %s, updated_at = NOW() WHERE id = %s",
(float(u["size"]), float(u["entry_price"]), int(u["id"]))
)
db.commit()
cur.close()

Expand Down Expand Up @@ -876,6 +927,50 @@ def _signal_to_side_pos_reduce(sig_type: str):
if leverage <= 0:
leverage = 1.0

# [FEATURE] Sync positions before execution to ensure size is checking against reality
# The user requested to sync before EVERY live order to prevent mismatch.
try:
logger.info(f"[Sync] Triggering pre-execution sync for strategy {strategy_id} before order {order_id}")
self._sync_positions_best_effort(target_strategy_id=strategy_id)
except Exception as e:
logger.warning(f"Pre-execution sync failed: {e}")

# [FEATURE] Auto-correct amount for Close/Reduce signals if we hold less than requested
if reduce_only:
try:
with get_db_connection() as db:
cur = db.cursor()
# We need to find the specific position.
# Symbol stored in DB is normalized (e.g. BTC/USDT).
# The payload 'symbol' might be 'BTCUSDT' or 'BTC/USDT'.
# We try to match what stored in DB.
# Best effort: try exact match, then normalized.
qry_sym = str(symbol or "").strip().upper()
# Mapping logic similar to _sync:
if qry_sym.endswith("USDT") and "/" not in qry_sym:
qry_sym = f"{qry_sym[:-4]}/USDT"

cur.execute(
"SELECT size FROM qd_strategy_positions WHERE strategy_id = %s AND symbol = %s AND side = %s",
(strategy_id, qry_sym, pos_side)
)
row = cur.fetchone()
cur.close()

if row:
held_size = float(row["size"] or 0.0)
if amount > held_size:
logger.warning(f"[RiskControl] Adjusting Close amount from {amount} to {held_size} (Held) for {symbol}")
amount = held_size
else:
# No position found in DB?
# If reduce_only, and no position, maybe it's 0.
logger.warning(f"[RiskControl] Close signal for {symbol} but NO position found in DB. Setting amount=0.")
amount = 0.0
except Exception as e:
logger.error(f"[RiskControl] Failed to check DB position logic: {e}")


# Collect raw exchange interactions / intermediate states for debugging & persistence.
phases: Dict[str, Any] = {}

Expand Down
Loading
Loading