@@ -419,6 +419,28 @@ where
419
419
}
420
420
} ;
421
421
422
+ let batch_min_received_at = updates
423
+ . price_feeds
424
+ . iter ( )
425
+ . filter_map ( |u| u. received_at )
426
+ . min ( ) ;
427
+ let batch_min_publish_time = updates
428
+ . price_feeds
429
+ . iter ( )
430
+ . map ( |u| u. price_feed . get_price_unchecked ( ) . publish_time )
431
+ . min ( ) ;
432
+
433
+ let batch_min_received_at = updates
434
+ . price_feeds
435
+ . iter ( )
436
+ . filter_map ( |u| u. received_at )
437
+ . min ( ) ;
438
+ let batch_min_publish_time = updates
439
+ . price_feeds
440
+ . iter ( )
441
+ . map ( |u| u. price_feed . get_price_unchecked ( ) . publish_time )
442
+ . min ( ) ;
443
+
422
444
for update in updates. price_feeds {
423
445
let config = self
424
446
. price_feeds_with_config
@@ -433,17 +455,6 @@ where
433
455
}
434
456
}
435
457
436
- let now_secs = std:: time:: SystemTime :: now ( )
437
- . duration_since ( std:: time:: UNIX_EPOCH )
438
- . map ( |d| d. as_secs_f64 ( ) )
439
- . unwrap_or ( 0.0 ) ;
440
- if let Some ( received_at) = update. received_at {
441
- let latency = now_secs - ( received_at as f64 ) ;
442
- self . ws_state
443
- . metrics
444
- . broadcast_latency
445
- . observe ( latency. max ( 0.0 ) ) ;
446
- }
447
458
448
459
let message = serde_json:: to_string ( & ServerMessage :: PriceUpdate {
449
460
price_feed : RpcPriceFeed :: from_price_feed_update (
@@ -510,6 +521,21 @@ where
510
521
}
511
522
512
523
self . sender . flush ( ) . await ?;
524
+ let now_secs = std:: time:: SystemTime :: now ( )
525
+ . duration_since ( std:: time:: UNIX_EPOCH )
526
+ . map ( |d| d. as_secs_f64 ( ) )
527
+ . unwrap_or ( 0.0 ) ;
528
+ if let Some ( min_recv) = batch_min_received_at {
529
+ self . ws_state
530
+ . metrics
531
+ . broadcast_latency
532
+ . observe ( ( now_secs - ( min_recv as f64 ) ) . max ( 0.0 ) ) ;
533
+ } else if let Some ( min_pub) = batch_min_publish_time {
534
+ self . ws_state
535
+ . metrics
536
+ . broadcast_latency
537
+ . observe ( ( now_secs - ( min_pub as f64 ) ) . max ( 0.0 ) ) ;
538
+ }
513
539
Ok ( ( ) )
514
540
}
515
541
@@ -518,8 +544,7 @@ where
518
544
let maybe_client_message = match message {
519
545
Message :: Close ( _) => {
520
546
// Closing the connection. We don't remove it from the subscribers
521
- // list, instead when the Subscriber struct is dropped the channel
522
- // to subscribers list will be closed and it will eventually get
547
+
523
548
// removed.
524
549
tracing:: trace!( id = self . id, "Subscriber Closed Connection." ) ;
525
550
self . ws_state
0 commit comments