From 4f4be8367463d9b6d8f63fa559cb0fea80314faf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 20 Aug 2025 22:13:23 +0000 Subject: [PATCH 1/4] =?UTF-8?q?feat(hermes):=20add=20staleness=20histogram?= =?UTF-8?q?s=20for=20publish=E2=86=92receive=20and=20receive=E2=86=92broad?= =?UTF-8?q?cast=20latencies?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tejas Badadare --- apps/hermes/server/Cargo.toml | 2 +- .../server/src/api/metrics_middleware.rs | 17 +++++++++++ apps/hermes/server/src/api/rest/v2/sse.rs | 14 +++++++++ apps/hermes/server/src/api/ws.rs | 29 +++++++++++++++++++ apps/hermes/server/src/state/aggregate.rs | 10 +++++++ .../server/src/state/aggregate/metrics.rs | 19 ++++++++++++ 6 files changed, 90 insertions(+), 1 deletion(-) diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index a21455ec94..ae002fa959 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.10.4" +version = "0.10.5" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api/metrics_middleware.rs b/apps/hermes/server/src/api/metrics_middleware.rs index aef971e48c..9076a2dfdd 100644 --- a/apps/hermes/server/src/api/metrics_middleware.rs +++ b/apps/hermes/server/src/api/metrics_middleware.rs @@ -18,6 +18,7 @@ use { pub struct ApiMetrics { pub requests: Family, pub latencies: Family, + pub sse_broadcast_latency: Histogram, } impl ApiMetrics { @@ -36,6 +37,12 @@ impl ApiMetrics { .into_iter(), ) }), + sse_broadcast_latency: Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), }; { @@ -58,6 +65,16 @@ impl ApiMetrics { ), ) .await; + + Metrics::register( + &*state, + ( + "sse_broadcast_latency_seconds", + "Latency from Hermes receive_time to SSE send in seconds", + new.sse_broadcast_latency.clone(), + ), + ) + .await; }); } diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index e67f7b1825..5be1e902d4 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -206,6 +206,20 @@ where return Ok(None); } + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + for pu in &parsed_price_updates { + if let Some(receive_time) = pu.metadata.proof_available_time { + let latency = now_secs - (receive_time as f64); + state + .metrics + .sse_broadcast_latency + .observe(latency.max(0.0)); + } + } + let price_update_data = price_feeds_with_update_data.update_data; let encoded_data: Vec = price_update_data .into_iter() diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 2d1098f6c0..ca16928dae 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -85,6 +85,7 @@ pub struct Labels { pub struct WsMetrics { pub interactions: Family, + pub broadcast_latency: prometheus_client::metrics::histogram::Histogram, } impl WsMetrics { @@ -95,6 +96,12 @@ impl WsMetrics { { let new = Self { interactions: Family::default(), + broadcast_latency: prometheus_client::metrics::histogram::Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), }; { @@ -110,6 +117,16 @@ impl WsMetrics { ), ) .await; + + Metrics::register( + &*state, + ( + "ws_broadcast_latency_seconds", + "Latency from Hermes receive_time to WS send in seconds", + new.broadcast_latency.clone(), + ), + ) + .await; }); } @@ -415,6 +432,18 @@ where } } + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + if let Some(received_at) = update.received_at { + let latency = now_secs - (received_at as f64); + self.ws_state + .metrics + .broadcast_latency + .observe(latency.max(0.0)); + } + let message = serde_json::to_string(&ServerMessage::PriceUpdate { price_feed: RpcPriceFeed::from_price_feed_update( update, diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 01e0f1e36d..5936e13bf5 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -367,6 +367,16 @@ where // we can build the message states let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?; + { + let mut data = self.into().data.write().await; + for ms in &message_states { + let publish = ms.message.publish_time() as f64; + let receive = ms.received_at as f64; + let latency = receive - publish; + data.metrics.observe_publish_to_receive(latency); + } + } + let message_state_keys = message_states .iter() .map(|message_state| message_state.key()) diff --git a/apps/hermes/server/src/state/aggregate/metrics.rs b/apps/hermes/server/src/state/aggregate/metrics.rs index 77009f7ecd..19122c93a0 100644 --- a/apps/hermes/server/src/state/aggregate/metrics.rs +++ b/apps/hermes/server/src/state/aggregate/metrics.rs @@ -34,6 +34,7 @@ struct ObservedSlotLabels { pub struct Metrics { observed_slot: Family, observed_slot_latency: Family, + publish_to_receive_latency: Histogram, first_observed_time_of_slot: BTreeMap, newest_observed_slot: HashMap, } @@ -50,6 +51,12 @@ impl Metrics { .into_iter(), ) }), + publish_to_receive_latency: Histogram::new( + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), first_observed_time_of_slot: BTreeMap::new(), newest_observed_slot: HashMap::new(), }; @@ -69,11 +76,23 @@ impl Metrics { "Latency of observed slots in seconds", observed_slot_latency, ); + + metrics_registry.register( + "publish_to_receive_latency_seconds", + "Latency from message publish_time to Hermes receive_time in seconds", + new.publish_to_receive_latency.clone(), + ); } new } + pub fn observe_publish_to_receive(&mut self, latency_secs: f64) { + if latency_secs.is_finite() && latency_secs >= 0.0 { + self.publish_to_receive_latency.observe(latency_secs); + } + } + /// Observe a slot and event. An event at a slot should be observed only once. pub fn observe(&mut self, slot: Slot, event: Event) { let order = if self From bb80ea5539dea1741f3c1a8cd9f27cc05a7e03c3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 20 Aug 2025 22:33:25 +0000 Subject: [PATCH 2/4] fix(hermes): clone histograms before registering to avoid moving into async tasks Co-Authored-By: Tejas Badadare --- apps/hermes/server/src/api/metrics_middleware.rs | 3 ++- apps/hermes/server/src/api/ws.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/hermes/server/src/api/metrics_middleware.rs b/apps/hermes/server/src/api/metrics_middleware.rs index 9076a2dfdd..9d63901607 100644 --- a/apps/hermes/server/src/api/metrics_middleware.rs +++ b/apps/hermes/server/src/api/metrics_middleware.rs @@ -48,6 +48,7 @@ impl ApiMetrics { { let requests = new.requests.clone(); let latencies = new.latencies.clone(); + let sse_broadcast_latency = new.sse_broadcast_latency.clone(); tokio::spawn(async move { Metrics::register( @@ -71,7 +72,7 @@ impl ApiMetrics { ( "sse_broadcast_latency_seconds", "Latency from Hermes receive_time to SSE send in seconds", - new.sse_broadcast_latency.clone(), + sse_broadcast_latency, ), ) .await; diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index ca16928dae..31b34005ef 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -106,6 +106,7 @@ impl WsMetrics { { let interactions = new.interactions.clone(); + let ws_broadcast_latency = new.broadcast_latency.clone(); tokio::spawn(async move { Metrics::register( @@ -123,7 +124,7 @@ impl WsMetrics { ( "ws_broadcast_latency_seconds", "Latency from Hermes receive_time to WS send in seconds", - new.broadcast_latency.clone(), + ws_broadcast_latency, ), ) .await; From 6f86bc9288fcb475cfe0472bea1e623444c60f60 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 20 Aug 2025 23:09:28 +0000 Subject: [PATCH 3/4] feat(hermes): observe WS latency after flush; observe SSE latency post-encoding Co-Authored-By: Tejas Badadare --- apps/hermes/server/src/api/rest/v2/sse.rs | 24 +++++------ apps/hermes/server/src/api/ws.rs | 51 +++++++++++++++++------ 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 5be1e902d4..0f0100d99c 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -206,30 +206,30 @@ where return Ok(None); } + + let price_update_data = price_feeds_with_update_data.update_data; + let encoded_data: Vec = price_update_data + .into_iter() + .map(|data| encoding.encode_str(&data)) + .collect(); + let binary_price_update = BinaryUpdate { + encoding, + data: encoded_data, + }; + let now_secs = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs_f64()) .unwrap_or(0.0); for pu in &parsed_price_updates { if let Some(receive_time) = pu.metadata.proof_available_time { - let latency = now_secs - (receive_time as f64); state .metrics .sse_broadcast_latency - .observe(latency.max(0.0)); + .observe((now_secs - (receive_time as f64)).max(0.0)); } } - let price_update_data = price_feeds_with_update_data.update_data; - let encoded_data: Vec = price_update_data - .into_iter() - .map(|data| encoding.encode_str(&data)) - .collect(); - let binary_price_update = BinaryUpdate { - encoding, - data: encoded_data, - }; - Ok(Some(PriceUpdate { binary: binary_price_update, parsed: if parsed { diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 31b34005ef..199ad28bbe 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -419,6 +419,28 @@ where } }; + let batch_min_received_at = updates + .price_feeds + .iter() + .filter_map(|u| u.received_at) + .min(); + let batch_min_publish_time = updates + .price_feeds + .iter() + .map(|u| u.price_feed.get_price_unchecked().publish_time) + .min(); + + let batch_min_received_at = updates + .price_feeds + .iter() + .filter_map(|u| u.received_at) + .min(); + let batch_min_publish_time = updates + .price_feeds + .iter() + .map(|u| u.price_feed.get_price_unchecked().publish_time) + .min(); + for update in updates.price_feeds { let config = self .price_feeds_with_config @@ -433,17 +455,6 @@ where } } - let now_secs = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs_f64()) - .unwrap_or(0.0); - if let Some(received_at) = update.received_at { - let latency = now_secs - (received_at as f64); - self.ws_state - .metrics - .broadcast_latency - .observe(latency.max(0.0)); - } let message = serde_json::to_string(&ServerMessage::PriceUpdate { price_feed: RpcPriceFeed::from_price_feed_update( @@ -510,6 +521,21 @@ where } self.sender.flush().await?; + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + if let Some(min_recv) = batch_min_received_at { + self.ws_state + .metrics + .broadcast_latency + .observe((now_secs - (min_recv as f64)).max(0.0)); + } else if let Some(min_pub) = batch_min_publish_time { + self.ws_state + .metrics + .broadcast_latency + .observe((now_secs - (min_pub as f64)).max(0.0)); + } Ok(()) } @@ -518,8 +544,7 @@ where let maybe_client_message = match message { Message::Close(_) => { // Closing the connection. We don't remove it from the subscribers - // list, instead when the Subscriber struct is dropped the channel - // to subscribers list will be closed and it will eventually get + // removed. tracing::trace!(id = self.id, "Subscriber Closed Connection."); self.ws_state From 3acab8bdaa6732f8dd9e9871853c21a472652952 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 20 Aug 2025 23:21:30 +0000 Subject: [PATCH 4/4] fix(hermes): remove duplicate WS batch minima; format; clippy clean Co-Authored-By: Tejas Badadare --- apps/hermes/server/src/api/rest/v2/sse.rs | 1 - apps/hermes/server/src/api/ws.rs | 12 ------------ 2 files changed, 13 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 0f0100d99c..02e10426bd 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -206,7 +206,6 @@ where return Ok(None); } - let price_update_data = price_feeds_with_update_data.update_data; let encoded_data: Vec = price_update_data .into_iter() diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 199ad28bbe..2b34f5cac2 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -419,17 +419,6 @@ where } }; - let batch_min_received_at = updates - .price_feeds - .iter() - .filter_map(|u| u.received_at) - .min(); - let batch_min_publish_time = updates - .price_feeds - .iter() - .map(|u| u.price_feed.get_price_unchecked().publish_time) - .min(); - let batch_min_received_at = updates .price_feeds .iter() @@ -455,7 +444,6 @@ where } } - let message = serde_json::to_string(&ServerMessage::PriceUpdate { price_feed: RpcPriceFeed::from_price_feed_update( update,