@@ -446,6 +446,7 @@ where
446
446
. await ?
447
447
}
448
448
} ;
449
+ let mut pending_latency: Vec < ( i64 , i64 ) > = Vec :: new ( ) ;
449
450
450
451
for update in updates. price_feeds {
451
452
let config = self
@@ -463,6 +464,10 @@ where
463
464
let received_at_opt = update. received_at ;
464
465
let publish_time = update. price_feed . get_price_unchecked ( ) . publish_time ;
465
466
467
+ if let Some ( received_at) = received_at_opt {
468
+ pending_latency. push ( ( received_at, publish_time) ) ;
469
+ }
470
+
466
471
let message = serde_json:: to_string ( & ServerMessage :: PriceUpdate {
467
472
price_feed : RpcPriceFeed :: from_price_feed_update (
468
473
update,
@@ -509,25 +514,6 @@ where
509
514
)
510
515
. await ?;
511
516
self . sender . close ( ) . await ?;
512
- if let Some ( received_at) = received_at_opt {
513
- let pub_to_recv = ( received_at - publish_time) . max ( 0 ) as f64 ;
514
- self . ws_state
515
- . metrics
516
- . publish_to_receive_latency
517
- . observe ( pub_to_recv) ;
518
-
519
- let now_secs = std:: time:: SystemTime :: now ( )
520
- . duration_since ( std:: time:: UNIX_EPOCH )
521
- . ok ( )
522
- . and_then ( |d| i64:: try_from ( d. as_secs ( ) ) . ok ( ) )
523
- . unwrap_or ( received_at) ;
524
- let recv_to_send = ( now_secs - received_at) . max ( 0 ) as f64 ;
525
- self . ws_state
526
- . metrics
527
- . receive_to_ws_send_latency
528
- . observe ( recv_to_send) ;
529
- }
530
-
531
517
self . closed = true ;
532
518
return Ok ( ( ) ) ;
533
519
}
@@ -546,6 +532,26 @@ where
546
532
}
547
533
548
534
self . sender . flush ( ) . await ?;
535
+
536
+ let now_secs = std:: time:: SystemTime :: now ( )
537
+ . duration_since ( std:: time:: UNIX_EPOCH )
538
+ . ok ( )
539
+ . and_then ( |d| i64:: try_from ( d. as_secs ( ) ) . ok ( ) ) ;
540
+ if let Some ( now) = now_secs {
541
+ for ( received_at, publish_time) in pending_latency {
542
+ let pub_to_recv = ( received_at - publish_time) . max ( 0 ) as f64 ;
543
+ self . ws_state
544
+ . metrics
545
+ . publish_to_receive_latency
546
+ . observe ( pub_to_recv) ;
547
+ let recv_to_send = ( now - received_at) . max ( 0 ) as f64 ;
548
+ self . ws_state
549
+ . metrics
550
+ . receive_to_ws_send_latency
551
+ . observe ( recv_to_send) ;
552
+ }
553
+ }
554
+
549
555
Ok ( ( ) )
550
556
}
551
557
0 commit comments