@@ -131,6 +131,8 @@ def __init__(self, instrument: str, timezone: str = "America/Chicago"):
131131 "type_9_count" : 0 , # Order modifications
132132 "type_10_count" : 0 , # Order modifications/cancellations
133133 "other_types" : 0 , # Unknown types
134+ "skipped_updates" : 0 , # Added for skipped updates
135+ "integrity_fixes" : 0 , # Added for orderbook integrity fixes
134136 }
135137
136138 # Callbacks for orderbook events
@@ -313,48 +315,124 @@ def process_market_depth(self, data: dict) -> None:
313315 # We need to determine which side based on price relative to current mid
314316 best_prices = self .get_best_bid_ask ()
315317 mid_price = best_prices .get ("mid" )
316-
317- if mid_price and price != 0 :
318- if price <= mid_price : # Likely a bid modification
319- bid_updates .append (
320- {
318+ current_best_bid = best_prices .get ("bid" )
319+ current_best_ask = best_prices .get ("ask" )
320+
321+ side_determined = False
322+
323+ # Method 1: Use current best bid/ask for more accurate classification
324+ if not side_determined and current_best_bid is not None and current_best_ask is not None :
325+ try :
326+ # Create a larger buffer zone around the current spread
327+ spread = current_best_ask - current_best_bid
328+ buffer = max (0.1 , spread * 0.5 ) # At least 0.1 points or 50% of spread
329+
330+ bid_max_threshold = current_best_bid + buffer
331+ ask_min_threshold = current_best_ask - buffer
332+
333+ if price <= bid_max_threshold :
334+ bid_updates .append ({
321335 "price" : float (price ),
322- "volume" : int (
323- volume
324- ), # Could be 0 for cancellation
336+ "volume" : int (volume ),
325337 "timestamp" : timestamp ,
326338 "type" : f"bid_mod_{ entry_type } " ,
327- }
328- )
329- else : # Likely an ask modification
330- ask_updates .append (
331- {
339+ })
340+ side_determined = True
341+ elif price >= ask_min_threshold :
342+ ask_updates .append ({
332343 "price" : float (price ),
333- "volume" : int (
334- volume
335- ), # Could be 0 for cancellation
344+ "volume" : int (volume ),
336345 "timestamp" : timestamp ,
337346 "type" : f"ask_mod_{ entry_type } " ,
338- }
339- )
340- else :
341- # If we can't determine side, try both (safer approach)
342- bid_updates .append (
343- {
347+ })
348+ side_determined = True
349+ except Exception :
350+ pass
351+
352+ # Method 2: If we have a mid price but no current best prices
353+ if not side_determined and mid_price is not None and price != 0 :
354+ if price <= mid_price :
355+ bid_updates .append ({
344356 "price" : float (price ),
345357 "volume" : int (volume ),
346358 "timestamp" : timestamp ,
347359 "type" : f"bid_mod_{ entry_type } " ,
348- }
349- )
350- ask_updates . append (
351- {
360+ })
361+ side_determined = True
362+ else :
363+ ask_updates . append ( {
352364 "price" : float (price ),
353365 "volume" : int (volume ),
354366 "timestamp" : timestamp ,
355367 "type" : f"ask_mod_{ entry_type } " ,
356- }
368+ })
369+ side_determined = True
370+
371+ # Method 3: Check if this price level already exists on either side
372+ if not side_determined :
373+ try :
374+ bid_exists = len (self .orderbook_bids .filter (pl .col ("price" ) == price )) > 0
375+ ask_exists = len (self .orderbook_asks .filter (pl .col ("price" ) == price )) > 0
376+
377+ if bid_exists and not ask_exists :
378+ bid_updates .append ({
379+ "price" : float (price ),
380+ "volume" : int (volume ),
381+ "timestamp" : timestamp ,
382+ "type" : f"bid_mod_{ entry_type } " ,
383+ })
384+ side_determined = True
385+ elif ask_exists and not bid_exists :
386+ ask_updates .append ({
387+ "price" : float (price ),
388+ "volume" : int (volume ),
389+ "timestamp" : timestamp ,
390+ "type" : f"ask_mod_{ entry_type } " ,
391+ })
392+ side_determined = True
393+ except Exception :
394+ pass
395+
396+ # Method 4: Use historical price patterns if available
397+ if not side_determined and len (self .orderbook_bids ) > 0 and len (self .orderbook_asks ) > 0 :
398+ try :
399+ # Get the median of current bid and ask prices for better classification
400+ bid_prices = self .orderbook_bids .select (pl .col ("price" )).to_series ().to_list ()
401+ ask_prices = self .orderbook_asks .select (pl .col ("price" )).to_series ().to_list ()
402+
403+ if bid_prices and ask_prices :
404+ max_bid = max (bid_prices )
405+ min_ask = min (ask_prices )
406+
407+ # If price is clearly in bid territory
408+ if price <= max_bid :
409+ bid_updates .append ({
410+ "price" : float (price ),
411+ "volume" : int (volume ),
412+ "timestamp" : timestamp ,
413+ "type" : f"bid_mod_{ entry_type } " ,
414+ })
415+ side_determined = True
416+ # If price is clearly in ask territory
417+ elif price >= min_ask :
418+ ask_updates .append ({
419+ "price" : float (price ),
420+ "volume" : int (volume ),
421+ "timestamp" : timestamp ,
422+ "type" : f"ask_mod_{ entry_type } " ,
423+ })
424+ side_determined = True
425+ except Exception :
426+ pass
427+
428+ # If still can't determine side, skip this update to avoid corruption
429+ if not side_determined :
430+ self .logger .warning (
431+ f"Unable to classify order modification type { entry_type } "
432+ f"at price { price } with volume { volume } . Skipping to avoid orderbook corruption."
357433 )
434+ # Update statistics for skipped updates
435+ self .order_type_stats ["skipped_updates" ] = self .order_type_stats .get ("skipped_updates" , 0 ) + 1
358436
359437 # Update bid levels
360438 if bid_updates :
@@ -366,6 +444,9 @@ def process_market_depth(self, data: dict) -> None:
366444 updates_df = pl .from_dicts (ask_updates )
367445 self ._update_orderbook_side (updates_df , "ask" )
368446
447+ # Validate orderbook integrity - check for negative spreads
448+ self ._validate_orderbook_integrity ()
449+
369450 # Update trade flow data
370451 if trade_updates :
371452 updates_df = pl .from_dicts (trade_updates )
@@ -477,6 +558,60 @@ def _update_trade_flow(self, trade_updates: pl.DataFrame) -> None:
477558 except Exception as e :
478559 self .logger .error (f"❌ Error updating trade flow: { e } " )
479560
561+ def _validate_orderbook_integrity (self ) -> None :
562+ """
563+ Validate orderbook integrity and fix any negative spreads by removing problematic entries.
564+ This is a safety net to ensure market data integrity.
565+ """
566+ try :
567+ if len (self .orderbook_bids ) == 0 or len (self .orderbook_asks ) == 0 :
568+ return
569+
570+ # Get current best bid and ask
571+ best_bid = float (self .orderbook_bids .select (pl .col ("price" )).head (1 ).item ())
572+ best_ask = float (self .orderbook_asks .select (pl .col ("price" )).head (1 ).item ())
573+
574+ # If we have a negative spread, we need to fix it
575+ if best_bid >= best_ask :
576+ self .logger .warning (
577+ f"Negative spread detected: best_bid={ best_bid } , best_ask={ best_ask } . "
578+ f"Cleaning problematic entries."
579+ )
580+
581+ # Remove any bid entries that are >= best ask
582+ original_bid_count = len (self .orderbook_bids )
583+ self .orderbook_bids = self .orderbook_bids .filter (pl .col ("price" ) < best_ask )
584+ removed_bids = original_bid_count - len (self .orderbook_bids )
585+
586+ # Remove any ask entries that are <= best bid
587+ original_ask_count = len (self .orderbook_asks )
588+ self .orderbook_asks = self .orderbook_asks .filter (pl .col ("price" ) > best_bid )
589+ removed_asks = original_ask_count - len (self .orderbook_asks )
590+
591+ # Update statistics
592+ self .order_type_stats ["integrity_fixes" ] = self .order_type_stats .get ("integrity_fixes" , 0 ) + 1
593+
594+ # If we removed entries, log the action
595+ if removed_bids > 0 or removed_asks > 0 :
596+ self .logger .info (
597+ f"Orderbook integrity fix: removed { removed_bids } problematic bid entries "
598+ f"and { removed_asks } problematic ask entries to maintain positive spread."
599+ )
600+
601+ # Verify the fix worked
602+ if len (self .orderbook_bids ) > 0 and len (self .orderbook_asks ) > 0 :
603+ new_best_bid = float (self .orderbook_bids .select (pl .col ("price" )).head (1 ).item ())
604+ new_best_ask = float (self .orderbook_asks .select (pl .col ("price" )).head (1 ).item ())
605+ new_spread = new_best_ask - new_best_bid
606+
607+ if new_spread >= 0 :
608+ self .logger .info (f"Orderbook integrity restored: new spread = { new_spread } " )
609+ else :
610+ self .logger .error (f"Failed to fix negative spread: { new_spread } " )
611+
612+ except Exception as e :
613+ self .logger .error (f"Error in orderbook integrity validation: { e } " )
614+
480615 def _process_level2_data (self , depth_data : list ) -> dict :
481616 """
482617 Process raw Level 2 data into structured bid/ask format.
0 commit comments