Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 162 additions & 27 deletions src/project_x_py/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def __init__(self, instrument: str, timezone: str = "America/Chicago"):
"type_9_count": 0, # Order modifications
"type_10_count": 0, # Order modifications/cancellations
"other_types": 0, # Unknown types
"skipped_updates": 0, # Added for skipped updates
"integrity_fixes": 0, # Added for orderbook integrity fixes
}

# Callbacks for orderbook events
Expand Down Expand Up @@ -313,48 +315,124 @@ def process_market_depth(self, data: dict) -> None:
# We need to determine which side based on price relative to current mid
best_prices = self.get_best_bid_ask()
mid_price = best_prices.get("mid")

if mid_price and price != 0:
if price <= mid_price: # Likely a bid modification
bid_updates.append(
{
current_best_bid = best_prices.get("bid")
current_best_ask = best_prices.get("ask")

side_determined = False

# Method 1: Use current best bid/ask for more accurate classification
if not side_determined and current_best_bid is not None and current_best_ask is not None:
try:
# Create a larger buffer zone around the current spread
spread = current_best_ask - current_best_bid
buffer = max(0.1, spread * 0.5) # At least 0.1 points or 50% of spread

bid_max_threshold = current_best_bid + buffer
ask_min_threshold = current_best_ask - buffer

if price <= bid_max_threshold:
bid_updates.append({
"price": float(price),
"volume": int(
volume
), # Could be 0 for cancellation
"volume": int(volume),
"timestamp": timestamp,
"type": f"bid_mod_{entry_type}",
}
)
else: # Likely an ask modification
ask_updates.append(
{
})
side_determined = True
elif price >= ask_min_threshold:
ask_updates.append({
"price": float(price),
"volume": int(
volume
), # Could be 0 for cancellation
"volume": int(volume),
"timestamp": timestamp,
"type": f"ask_mod_{entry_type}",
}
)
else:
# If we can't determine side, try both (safer approach)
bid_updates.append(
{
})
side_determined = True
except Exception:
pass

# Method 2: If we have a mid price but no current best prices
if not side_determined and mid_price is not None and price != 0:
if price <= mid_price:
bid_updates.append({
"price": float(price),
"volume": int(volume),
"timestamp": timestamp,
"type": f"bid_mod_{entry_type}",
}
)
ask_updates.append(
{
})
side_determined = True
else:
ask_updates.append({
"price": float(price),
"volume": int(volume),
"timestamp": timestamp,
"type": f"ask_mod_{entry_type}",
}
})
side_determined = True

# Method 3: Check if this price level already exists on either side
if not side_determined:
try:
bid_exists = len(self.orderbook_bids.filter(pl.col("price") == price)) > 0
ask_exists = len(self.orderbook_asks.filter(pl.col("price") == price)) > 0

if bid_exists and not ask_exists:
bid_updates.append({
"price": float(price),
"volume": int(volume),
"timestamp": timestamp,
"type": f"bid_mod_{entry_type}",
})
side_determined = True
elif ask_exists and not bid_exists:
ask_updates.append({
"price": float(price),
"volume": int(volume),
"timestamp": timestamp,
"type": f"ask_mod_{entry_type}",
})
side_determined = True
except Exception:
pass

# Method 4: Use historical price patterns if available
if not side_determined and len(self.orderbook_bids) > 0 and len(self.orderbook_asks) > 0:
try:
# Get the median of current bid and ask prices for better classification
bid_prices = self.orderbook_bids.select(pl.col("price")).to_series().to_list()
ask_prices = self.orderbook_asks.select(pl.col("price")).to_series().to_list()

if bid_prices and ask_prices:
max_bid = max(bid_prices)
min_ask = min(ask_prices)

# If price is clearly in bid territory
if price <= max_bid:
bid_updates.append({
"price": float(price),
"volume": int(volume),
"timestamp": timestamp,
"type": f"bid_mod_{entry_type}",
})
side_determined = True
# If price is clearly in ask territory
elif price >= min_ask:
ask_updates.append({
"price": float(price),
"volume": int(volume),
"timestamp": timestamp,
"type": f"ask_mod_{entry_type}",
})
side_determined = True
except Exception:
pass

# If still can't determine side, skip this update to avoid corruption
if not side_determined:
self.logger.warning(
f"Unable to classify order modification type {entry_type} "
f"at price {price} with volume {volume}. Skipping to avoid orderbook corruption."
)
# Update statistics for skipped updates
self.order_type_stats["skipped_updates"] = self.order_type_stats.get("skipped_updates", 0) + 1

# Update bid levels
if bid_updates:
Expand All @@ -366,6 +444,9 @@ def process_market_depth(self, data: dict) -> None:
updates_df = pl.from_dicts(ask_updates)
self._update_orderbook_side(updates_df, "ask")

# Validate orderbook integrity - check for negative spreads
self._validate_orderbook_integrity()

# Update trade flow data
if trade_updates:
updates_df = pl.from_dicts(trade_updates)
Expand Down Expand Up @@ -477,6 +558,60 @@ def _update_trade_flow(self, trade_updates: pl.DataFrame) -> None:
except Exception as e:
self.logger.error(f"❌ Error updating trade flow: {e}")

def _validate_orderbook_integrity(self) -> None:
"""
Validate orderbook integrity and fix any negative spreads by removing problematic entries.
This is a safety net to ensure market data integrity.
"""
try:
if len(self.orderbook_bids) == 0 or len(self.orderbook_asks) == 0:
return

# Get current best bid and ask
best_bid = float(self.orderbook_bids.select(pl.col("price")).head(1).item())
best_ask = float(self.orderbook_asks.select(pl.col("price")).head(1).item())

# If we have a negative spread, we need to fix it
if best_bid >= best_ask:
self.logger.warning(
f"Negative spread detected: best_bid={best_bid}, best_ask={best_ask}. "
f"Cleaning problematic entries."
)

# Remove any bid entries that are >= best ask
original_bid_count = len(self.orderbook_bids)
self.orderbook_bids = self.orderbook_bids.filter(pl.col("price") < best_ask)
removed_bids = original_bid_count - len(self.orderbook_bids)

# Remove any ask entries that are <= best bid
original_ask_count = len(self.orderbook_asks)
self.orderbook_asks = self.orderbook_asks.filter(pl.col("price") > best_bid)
removed_asks = original_ask_count - len(self.orderbook_asks)

# Update statistics
self.order_type_stats["integrity_fixes"] = self.order_type_stats.get("integrity_fixes", 0) + 1

# If we removed entries, log the action
if removed_bids > 0 or removed_asks > 0:
self.logger.info(
f"Orderbook integrity fix: removed {removed_bids} problematic bid entries "
f"and {removed_asks} problematic ask entries to maintain positive spread."
)

# Verify the fix worked
if len(self.orderbook_bids) > 0 and len(self.orderbook_asks) > 0:
new_best_bid = float(self.orderbook_bids.select(pl.col("price")).head(1).item())
new_best_ask = float(self.orderbook_asks.select(pl.col("price")).head(1).item())
new_spread = new_best_ask - new_best_bid

if new_spread >= 0:
self.logger.info(f"Orderbook integrity restored: new spread = {new_spread}")
else:
self.logger.error(f"Failed to fix negative spread: {new_spread}")

except Exception as e:
self.logger.error(f"Error in orderbook integrity validation: {e}")

def _process_level2_data(self, depth_data: list) -> dict:
"""
Process raw Level 2 data into structured bid/ask format.
Expand Down
Loading