diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index a21455ec94..ae002fa959 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.10.4" +version = "0.10.5" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api/metrics_middleware.rs b/apps/hermes/server/src/api/metrics_middleware.rs index aef971e48c..9d63901607 100644 --- a/apps/hermes/server/src/api/metrics_middleware.rs +++ b/apps/hermes/server/src/api/metrics_middleware.rs @@ -18,6 +18,7 @@ use { pub struct ApiMetrics { pub requests: Family, pub latencies: Family, + pub sse_broadcast_latency: Histogram, } impl ApiMetrics { @@ -36,11 +37,18 @@ impl ApiMetrics { .into_iter(), ) }), + sse_broadcast_latency: Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), }; { let requests = new.requests.clone(); let latencies = new.latencies.clone(); + let sse_broadcast_latency = new.sse_broadcast_latency.clone(); tokio::spawn(async move { Metrics::register( @@ -58,6 +66,16 @@ impl ApiMetrics { ), ) .await; + + Metrics::register( + &*state, + ( + "sse_broadcast_latency_seconds", + "Latency from Hermes receive_time to SSE send in seconds", + sse_broadcast_latency, + ), + ) + .await; }); } diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index e67f7b1825..02e10426bd 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -216,6 +216,19 @@ where data: encoded_data, }; + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + for pu in &parsed_price_updates { + if let Some(receive_time) = pu.metadata.proof_available_time { + state + .metrics + .sse_broadcast_latency + .observe((now_secs - (receive_time as f64)).max(0.0)); + } + } + Ok(Some(PriceUpdate { binary: binary_price_update, parsed: if parsed { diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 2d1098f6c0..2b34f5cac2 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -85,6 +85,7 @@ pub struct Labels { pub struct WsMetrics { pub interactions: Family, + pub broadcast_latency: prometheus_client::metrics::histogram::Histogram, } impl WsMetrics { @@ -95,10 +96,17 @@ impl WsMetrics { { let new = Self { interactions: Family::default(), + broadcast_latency: prometheus_client::metrics::histogram::Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), }; { let interactions = new.interactions.clone(); + let ws_broadcast_latency = new.broadcast_latency.clone(); tokio::spawn(async move { Metrics::register( @@ -110,6 +118,16 @@ impl WsMetrics { ), ) .await; + + Metrics::register( + &*state, + ( + "ws_broadcast_latency_seconds", + "Latency from Hermes receive_time to WS send in seconds", + ws_broadcast_latency, + ), + ) + .await; }); } @@ -401,6 +419,17 @@ where } }; + let batch_min_received_at = updates + .price_feeds + .iter() + .filter_map(|u| u.received_at) + .min(); + let batch_min_publish_time = updates + .price_feeds + .iter() + .map(|u| u.price_feed.get_price_unchecked().publish_time) + .min(); + for update in updates.price_feeds { let config = self .price_feeds_with_config @@ -480,6 +509,21 @@ where } self.sender.flush().await?; + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + if let Some(min_recv) = batch_min_received_at { + self.ws_state + .metrics + .broadcast_latency + .observe((now_secs - (min_recv as f64)).max(0.0)); + } else if let Some(min_pub) = batch_min_publish_time { + self.ws_state + .metrics + .broadcast_latency + .observe((now_secs - (min_pub as f64)).max(0.0)); + } Ok(()) } @@ -488,8 +532,7 @@ where let maybe_client_message = match message { Message::Close(_) => { // Closing the connection. We don't remove it from the subscribers - // list, instead when the Subscriber struct is dropped the channel - // to subscribers list will be closed and it will eventually get + // removed. tracing::trace!(id = self.id, "Subscriber Closed Connection."); self.ws_state diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 01e0f1e36d..5936e13bf5 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -367,6 +367,16 @@ where // we can build the message states let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?; + { + let mut data = self.into().data.write().await; + for ms in &message_states { + let publish = ms.message.publish_time() as f64; + let receive = ms.received_at as f64; + let latency = receive - publish; + data.metrics.observe_publish_to_receive(latency); + } + } + let message_state_keys = message_states .iter() .map(|message_state| message_state.key()) diff --git a/apps/hermes/server/src/state/aggregate/metrics.rs b/apps/hermes/server/src/state/aggregate/metrics.rs index 77009f7ecd..19122c93a0 100644 --- a/apps/hermes/server/src/state/aggregate/metrics.rs +++ b/apps/hermes/server/src/state/aggregate/metrics.rs @@ -34,6 +34,7 @@ struct ObservedSlotLabels { pub struct Metrics { observed_slot: Family, observed_slot_latency: Family, + publish_to_receive_latency: Histogram, first_observed_time_of_slot: BTreeMap, newest_observed_slot: HashMap, } @@ -50,6 +51,12 @@ impl Metrics { .into_iter(), ) }), + publish_to_receive_latency: Histogram::new( + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), first_observed_time_of_slot: BTreeMap::new(), newest_observed_slot: HashMap::new(), }; @@ -69,11 +76,23 @@ impl Metrics { "Latency of observed slots in seconds", observed_slot_latency, ); + + metrics_registry.register( + "publish_to_receive_latency_seconds", + "Latency from message publish_time to Hermes receive_time in seconds", + new.publish_to_receive_latency.clone(), + ); } new } + pub fn observe_publish_to_receive(&mut self, latency_secs: f64) { + if latency_secs.is_finite() && latency_secs >= 0.0 { + self.publish_to_receive_latency.observe(latency_secs); + } + } + /// Observe a slot and event. An event at a slot should be observed only once. pub fn observe(&mut self, slot: Slot, event: Event) { let order = if self