From 8a29892743cdd444a7ff41429f6553e602b1c4d9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 20 Aug 2025 23:53:16 +0000 Subject: [PATCH 1/3] feat(hermes): add latency histograms for SSE/WS send paths Co-Authored-By: Tejas Badadare --- apps/hermes/server/Cargo.toml | 2 +- apps/hermes/server/src/api.rs | 3 + apps/hermes/server/src/api/rest/v2/sse.rs | 68 +++++++++++++++++++++++ apps/hermes/server/src/api/ws.rs | 54 ++++++++++++++++-- 4 files changed, 120 insertions(+), 7 deletions(-) diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index a21455ec94..ae002fa959 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.10.4" +version = "0.10.5" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api.rs b/apps/hermes/server/src/api.rs index 7a9807de17..81e816a24f 100644 --- a/apps/hermes/server/src/api.rs +++ b/apps/hermes/server/src/api.rs @@ -23,6 +23,7 @@ pub struct ApiState { pub state: Arc, pub ws: Arc, pub metrics: Arc, + pub sse_metrics: Arc, } /// Manually implement `Clone` as the derive macro will try and slap `Clone` on @@ -33,6 +34,7 @@ impl Clone for ApiState { state: self.state.clone(), ws: self.ws.clone(), metrics: self.metrics.clone(), + sse_metrics: self.sse_metrics.clone(), } } } @@ -50,6 +52,7 @@ impl ApiState { requester_ip_header_name, state.clone(), )), + sse_metrics: Arc::new(crate::api::rest::SseMetrics::new(state.clone())), state, } } diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index e67f7b1825..844d38c964 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -22,9 +22,51 @@ use { std::{convert::Infallible, time::Duration}, tokio::{sync::broadcast, time::Instant}, tokio_stream::{wrappers::BroadcastStream, StreamExt as _}, + prometheus_client::metrics::histogram::Histogram, utoipa::IntoParams, + std::sync::Arc, + crate::state::metrics::Metrics, }; + +pub struct SseMetrics { + pub receive_to_sse_send_latency: Histogram, +} + +impl SseMetrics { + pub fn new(state: Arc) -> Self + where + S: Metrics, + S: Send + Sync + 'static, + { + let receive_to_sse_send_latency = Histogram::new( + [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(), + ); + + let new = Self { + receive_to_sse_send_latency: receive_to_sse_send_latency.clone(), + }; + + { + + tokio::spawn(async move { + Metrics::register( + &*state, + ( + "receive_to_sse_send_latency", + "Latency from receive_time to SSE send time (seconds)", + receive_to_sse_send_latency, + ), + ) + .await; + }); + } + + new + } +} + + // Constants const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours @@ -173,8 +215,34 @@ where ) .await?; + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()); + + for pf in &price_feeds_with_update_data.price_feeds { + if let Some(received_at) = pf.received_at { + let publish_time = pf.price_feed.get_price_unchecked().publish_time; + let pub_to_recv = (received_at - publish_time).max(0) as f64; + state + .ws + .metrics + .publish_to_receive_latency + .observe(pub_to_recv); + + if let Some(now) = now_secs { + let recv_to_send = (now - received_at).max(0) as f64; + state + .sse_metrics + .receive_to_sse_send_latency + .observe(recv_to_send); + } + } + } + let mut parsed_price_updates: Vec = price_feeds_with_update_data .price_feeds + .into_iter() .map(|price_feed| price_feed.into()) .collect(); diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 2d1098f6c0..d1c38aecd4 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -26,7 +26,7 @@ use { nonzero_ext::nonzero, prometheus_client::{ encoding::{EncodeLabelSet, EncodeLabelValue}, - metrics::{counter::Counter, family::Family}, + metrics::{counter::Counter, family::Family, histogram::Histogram}, }, pyth_sdk::PriceIdentifier, serde::{Deserialize, Serialize}, @@ -85,6 +85,8 @@ pub struct Labels { pub struct WsMetrics { pub interactions: Family, + pub publish_to_receive_latency: Histogram, + pub receive_to_ws_send_latency: Histogram, } impl WsMetrics { @@ -93,20 +95,58 @@ impl WsMetrics { S: Metrics, S: Send + Sync + 'static, { + let publish_to_receive_latency = Histogram::new( + [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(), + ); + let receive_to_ws_send_latency = Histogram::new( + [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(), + ); let new = Self { interactions: Family::default(), + publish_to_receive_latency: publish_to_receive_latency.clone(), + receive_to_ws_send_latency: receive_to_ws_send_latency.clone(), }; { let interactions = new.interactions.clone(); + tokio::spawn({ + let state = state.clone(); + async move { + Metrics::register( + &*state, + ( + "ws_interactions", + "Total number of websocket interactions", + interactions, + ), + ) + .await; + } + }); + + tokio::spawn({ + let state = state.clone(); + async move { + Metrics::register( + &*state, + ( + "publish_to_receive_latency", + "Latency from publish_time to receive_time (seconds)", + publish_to_receive_latency, + ), + ) + .await; + } + }); + tokio::spawn(async move { Metrics::register( &*state, ( - "ws_interactions", - "Total number of websocket interactions", - interactions, + "receive_to_ws_send_latency", + "Latency from receive_time to WebSocket send time (seconds)", + receive_to_ws_send_latency, ), ) .await; @@ -414,6 +454,8 @@ where continue; } } + let received_at_opt = update.received_at; + let publish_time = update.price_feed.get_price_unchecked().publish_time; let message = serde_json::to_string(&ServerMessage::PriceUpdate { price_feed: RpcPriceFeed::from_price_feed_update( @@ -425,6 +467,7 @@ where // Close the connection if rate limit is exceeded and the ip is not whitelisted. // If the ip address is None no rate limiting is applied. + if let Some(ip_addr) = self.ip_addr { if !self .ws_state @@ -465,8 +508,7 @@ where } } - // `sender.feed` buffers a message to the client but does not flush it, so we can send - // multiple messages and flush them all at once. + self.sender.feed(message.into()).await?; self.ws_state From caf71e4941ba90974498a997b63a549e95a4c7f5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 21 Aug 2025 00:02:15 +0000 Subject: [PATCH 2/3] fix(hermes): observe WS latency histograms and satisfy clippy/fmt Co-Authored-By: Tejas Badadare --- apps/hermes/server/src/api/rest/v2/sse.rs | 15 ++++++------ apps/hermes/server/src/api/ws.rs | 30 ++++++++++++++++++++--- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 844d38c964..b8a4e208b5 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -1,4 +1,5 @@ use { + crate::state::metrics::Metrics, crate::{ api::{ rest::{validate_price_ids, RestError}, @@ -16,19 +17,17 @@ use { response::sse::{Event, KeepAlive, Sse}, }, futures::Stream, + prometheus_client::metrics::histogram::Histogram, pyth_sdk::PriceIdentifier, serde::Deserialize, serde_qs::axum::QsQuery, + std::sync::Arc, std::{convert::Infallible, time::Duration}, tokio::{sync::broadcast, time::Instant}, tokio_stream::{wrappers::BroadcastStream, StreamExt as _}, - prometheus_client::metrics::histogram::Histogram, utoipa::IntoParams, - std::sync::Arc, - crate::state::metrics::Metrics, }; - pub struct SseMetrics { pub receive_to_sse_send_latency: Histogram, } @@ -40,7 +39,10 @@ impl SseMetrics { S: Send + Sync + 'static, { let receive_to_sse_send_latency = Histogram::new( - [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(), + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), ); let new = Self { @@ -48,7 +50,6 @@ impl SseMetrics { }; { - tokio::spawn(async move { Metrics::register( &*state, @@ -66,7 +67,6 @@ impl SseMetrics { } } - // Constants const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours @@ -242,7 +242,6 @@ where let mut parsed_price_updates: Vec = price_feeds_with_update_data .price_feeds - .into_iter() .map(|price_feed| price_feed.into()) .collect(); diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index d1c38aecd4..87832d9f1c 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -96,10 +96,16 @@ impl WsMetrics { S: Send + Sync + 'static, { let publish_to_receive_latency = Histogram::new( - [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(), + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), ); let receive_to_ws_send_latency = Histogram::new( - [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(), + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), ); let new = Self { interactions: Family::default(), @@ -503,12 +509,30 @@ where ) .await?; self.sender.close().await?; + if let Some(received_at) = received_at_opt { + let pub_to_recv = (received_at - publish_time).max(0) as f64; + self.ws_state + .metrics + .publish_to_receive_latency + .observe(pub_to_recv); + + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(received_at); + let recv_to_send = (now_secs - received_at).max(0) as f64; + self.ws_state + .metrics + .receive_to_ws_send_latency + .observe(recv_to_send); + } + self.closed = true; return Ok(()); } } - self.sender.feed(message.into()).await?; self.ws_state From e3617d42435be90d77ab9afac20e2a8bd5fa90cc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 21 Aug 2025 00:18:14 +0000 Subject: [PATCH 3/3] fix(ws): observe publish_to_receive and receive_to_ws_send latencies after flush Co-Authored-By: Tejas Badadare --- apps/hermes/server/Cargo.lock | 2 +- apps/hermes/server/src/api/ws.rs | 44 ++++++++++++++++++-------------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index 00414c68f8..d2b8507cd5 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1880,7 +1880,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.10.4" +version = "0.10.5" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 87832d9f1c..7bb7b1cc95 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -446,6 +446,7 @@ where .await? } }; + let mut pending_latency: Vec<(i64, i64)> = Vec::new(); for update in updates.price_feeds { let config = self @@ -463,6 +464,10 @@ where let received_at_opt = update.received_at; let publish_time = update.price_feed.get_price_unchecked().publish_time; + if let Some(received_at) = received_at_opt { + pending_latency.push((received_at, publish_time)); + } + let message = serde_json::to_string(&ServerMessage::PriceUpdate { price_feed: RpcPriceFeed::from_price_feed_update( update, @@ -509,25 +514,6 @@ where ) .await?; self.sender.close().await?; - if let Some(received_at) = received_at_opt { - let pub_to_recv = (received_at - publish_time).max(0) as f64; - self.ws_state - .metrics - .publish_to_receive_latency - .observe(pub_to_recv); - - let now_secs = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(received_at); - let recv_to_send = (now_secs - received_at).max(0) as f64; - self.ws_state - .metrics - .receive_to_ws_send_latency - .observe(recv_to_send); - } - self.closed = true; return Ok(()); } @@ -546,6 +532,26 @@ where } self.sender.flush().await?; + + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()); + if let Some(now) = now_secs { + for (received_at, publish_time) in pending_latency { + let pub_to_recv = (received_at - publish_time).max(0) as f64; + self.ws_state + .metrics + .publish_to_receive_latency + .observe(pub_to_recv); + let recv_to_send = (now - received_at).max(0) as f64; + self.ws_state + .metrics + .receive_to_ws_send_latency + .observe(recv_to_send); + } + } + Ok(()) }