diff --git a/src/project_x_py/orderbook.py b/src/project_x_py/orderbook.py index 4f0850c..64f99c8 100644 --- a/src/project_x_py/orderbook.py +++ b/src/project_x_py/orderbook.py @@ -131,6 +131,8 @@ def __init__(self, instrument: str, timezone: str = "America/Chicago"): "type_9_count": 0, # Order modifications "type_10_count": 0, # Order modifications/cancellations "other_types": 0, # Unknown types + "skipped_updates": 0, # Added for skipped updates + "integrity_fixes": 0, # Added for orderbook integrity fixes } # Callbacks for orderbook events @@ -313,48 +315,124 @@ def process_market_depth(self, data: dict) -> None: # We need to determine which side based on price relative to current mid best_prices = self.get_best_bid_ask() mid_price = best_prices.get("mid") - - if mid_price and price != 0: - if price <= mid_price: # Likely a bid modification - bid_updates.append( - { + current_best_bid = best_prices.get("bid") + current_best_ask = best_prices.get("ask") + + side_determined = False + + # Method 1: Use current best bid/ask for more accurate classification + if not side_determined and current_best_bid is not None and current_best_ask is not None: + try: + # Create a larger buffer zone around the current spread + spread = current_best_ask - current_best_bid + buffer = max(0.1, spread * 0.5) # At least 0.1 points or 50% of spread + + bid_max_threshold = current_best_bid + buffer + ask_min_threshold = current_best_ask - buffer + + if price <= bid_max_threshold: + bid_updates.append({ "price": float(price), - "volume": int( - volume - ), # Could be 0 for cancellation + "volume": int(volume), "timestamp": timestamp, "type": f"bid_mod_{entry_type}", - } - ) - else: # Likely an ask modification - ask_updates.append( - { + }) + side_determined = True + elif price >= ask_min_threshold: + ask_updates.append({ "price": float(price), - "volume": int( - volume - ), # Could be 0 for cancellation + "volume": int(volume), "timestamp": timestamp, "type": f"ask_mod_{entry_type}", - } - ) - else: - # If we can't determine side, try both (safer approach) - bid_updates.append( - { + }) + side_determined = True + except Exception: + pass + + # Method 2: If we have a mid price but no current best prices + if not side_determined and mid_price is not None and price != 0: + if price <= mid_price: + bid_updates.append({ "price": float(price), "volume": int(volume), "timestamp": timestamp, "type": f"bid_mod_{entry_type}", - } - ) - ask_updates.append( - { + }) + side_determined = True + else: + ask_updates.append({ "price": float(price), "volume": int(volume), "timestamp": timestamp, "type": f"ask_mod_{entry_type}", - } + }) + side_determined = True + + # Method 3: Check if this price level already exists on either side + if not side_determined: + try: + bid_exists = len(self.orderbook_bids.filter(pl.col("price") == price)) > 0 + ask_exists = len(self.orderbook_asks.filter(pl.col("price") == price)) > 0 + + if bid_exists and not ask_exists: + bid_updates.append({ + "price": float(price), + "volume": int(volume), + "timestamp": timestamp, + "type": f"bid_mod_{entry_type}", + }) + side_determined = True + elif ask_exists and not bid_exists: + ask_updates.append({ + "price": float(price), + "volume": int(volume), + "timestamp": timestamp, + "type": f"ask_mod_{entry_type}", + }) + side_determined = True + except Exception: + pass + + # Method 4: Use historical price patterns if available + if not side_determined and len(self.orderbook_bids) > 0 and len(self.orderbook_asks) > 0: + try: + # Get the median of current bid and ask prices for better classification + bid_prices = self.orderbook_bids.select(pl.col("price")).to_series().to_list() + ask_prices = self.orderbook_asks.select(pl.col("price")).to_series().to_list() + + if bid_prices and ask_prices: + max_bid = max(bid_prices) + min_ask = min(ask_prices) + + # If price is clearly in bid territory + if price <= max_bid: + bid_updates.append({ + "price": float(price), + "volume": int(volume), + "timestamp": timestamp, + "type": f"bid_mod_{entry_type}", + }) + side_determined = True + # If price is clearly in ask territory + elif price >= min_ask: + ask_updates.append({ + "price": float(price), + "volume": int(volume), + "timestamp": timestamp, + "type": f"ask_mod_{entry_type}", + }) + side_determined = True + except Exception: + pass + + # If still can't determine side, skip this update to avoid corruption + if not side_determined: + self.logger.warning( + f"Unable to classify order modification type {entry_type} " + f"at price {price} with volume {volume}. Skipping to avoid orderbook corruption." ) + # Update statistics for skipped updates + self.order_type_stats["skipped_updates"] = self.order_type_stats.get("skipped_updates", 0) + 1 # Update bid levels if bid_updates: @@ -366,6 +444,9 @@ def process_market_depth(self, data: dict) -> None: updates_df = pl.from_dicts(ask_updates) self._update_orderbook_side(updates_df, "ask") + # Validate orderbook integrity - check for negative spreads + self._validate_orderbook_integrity() + # Update trade flow data if trade_updates: updates_df = pl.from_dicts(trade_updates) @@ -477,6 +558,60 @@ def _update_trade_flow(self, trade_updates: pl.DataFrame) -> None: except Exception as e: self.logger.error(f"❌ Error updating trade flow: {e}") + def _validate_orderbook_integrity(self) -> None: + """ + Validate orderbook integrity and fix any negative spreads by removing problematic entries. + This is a safety net to ensure market data integrity. + """ + try: + if len(self.orderbook_bids) == 0 or len(self.orderbook_asks) == 0: + return + + # Get current best bid and ask + best_bid = float(self.orderbook_bids.select(pl.col("price")).head(1).item()) + best_ask = float(self.orderbook_asks.select(pl.col("price")).head(1).item()) + + # If we have a negative spread, we need to fix it + if best_bid >= best_ask: + self.logger.warning( + f"Negative spread detected: best_bid={best_bid}, best_ask={best_ask}. " + f"Cleaning problematic entries." + ) + + # Remove any bid entries that are >= best ask + original_bid_count = len(self.orderbook_bids) + self.orderbook_bids = self.orderbook_bids.filter(pl.col("price") < best_ask) + removed_bids = original_bid_count - len(self.orderbook_bids) + + # Remove any ask entries that are <= best bid + original_ask_count = len(self.orderbook_asks) + self.orderbook_asks = self.orderbook_asks.filter(pl.col("price") > best_bid) + removed_asks = original_ask_count - len(self.orderbook_asks) + + # Update statistics + self.order_type_stats["integrity_fixes"] = self.order_type_stats.get("integrity_fixes", 0) + 1 + + # If we removed entries, log the action + if removed_bids > 0 or removed_asks > 0: + self.logger.info( + f"Orderbook integrity fix: removed {removed_bids} problematic bid entries " + f"and {removed_asks} problematic ask entries to maintain positive spread." + ) + + # Verify the fix worked + if len(self.orderbook_bids) > 0 and len(self.orderbook_asks) > 0: + new_best_bid = float(self.orderbook_bids.select(pl.col("price")).head(1).item()) + new_best_ask = float(self.orderbook_asks.select(pl.col("price")).head(1).item()) + new_spread = new_best_ask - new_best_bid + + if new_spread >= 0: + self.logger.info(f"Orderbook integrity restored: new spread = {new_spread}") + else: + self.logger.error(f"Failed to fix negative spread: {new_spread}") + + except Exception as e: + self.logger.error(f"Error in orderbook integrity validation: {e}") + def _process_level2_data(self, depth_data: list) -> dict: """ Process raw Level 2 data into structured bid/ask format.