@@ -419,6 +419,11 @@ where
419
419
}
420
420
} ;
421
421
422
+ // Capture the minimum receive_time from the updates batch
423
+ let min_received_at = updates. price_feeds . iter ( )
424
+ . filter_map ( |update| update. received_at )
425
+ . min ( ) ;
426
+
422
427
for update in updates. price_feeds {
423
428
let config = self
424
429
. price_feeds_with_config
@@ -433,18 +438,6 @@ where
433
438
}
434
439
}
435
440
436
- let now_secs = std:: time:: SystemTime :: now ( )
437
- . duration_since ( std:: time:: UNIX_EPOCH )
438
- . map ( |d| d. as_secs_f64 ( ) )
439
- . unwrap_or ( 0.0 ) ;
440
- if let Some ( received_at) = update. received_at {
441
- let latency = now_secs - ( received_at as f64 ) ;
442
- self . ws_state
443
- . metrics
444
- . broadcast_latency
445
- . observe ( latency. max ( 0.0 ) ) ;
446
- }
447
-
448
441
let message = serde_json:: to_string ( & ServerMessage :: PriceUpdate {
449
442
price_feed : RpcPriceFeed :: from_price_feed_update (
450
443
update,
@@ -510,6 +503,20 @@ where
510
503
}
511
504
512
505
self . sender . flush ( ) . await ?;
506
+
507
+ // Record latency from receive to ws send after flushing
508
+ if let Some ( min_received_at) = min_received_at {
509
+ let now_secs = std:: time:: SystemTime :: now ( )
510
+ . duration_since ( std:: time:: UNIX_EPOCH )
511
+ . map ( |d| d. as_secs_f64 ( ) )
512
+ . unwrap_or ( 0.0 ) ;
513
+ let latency = now_secs - ( min_received_at as f64 ) ;
514
+ self . ws_state
515
+ . metrics
516
+ . broadcast_latency
517
+ . observe ( latency. max ( 0.0 ) ) ;
518
+ }
519
+
513
520
Ok ( ( ) )
514
521
}
515
522
0 commit comments