diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index 00414c68f8..d2b8507cd5 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1880,7 +1880,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.10.4" +version = "0.10.5" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index a21455ec94..ae002fa959 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.10.4" +version = "0.10.5" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api.rs b/apps/hermes/server/src/api.rs index 7a9807de17..81e816a24f 100644 --- a/apps/hermes/server/src/api.rs +++ b/apps/hermes/server/src/api.rs @@ -23,6 +23,7 @@ pub struct ApiState { pub state: Arc, pub ws: Arc, pub metrics: Arc, + pub sse_metrics: Arc, } /// Manually implement `Clone` as the derive macro will try and slap `Clone` on @@ -33,6 +34,7 @@ impl Clone for ApiState { state: self.state.clone(), ws: self.ws.clone(), metrics: self.metrics.clone(), + sse_metrics: self.sse_metrics.clone(), } } } @@ -50,6 +52,7 @@ impl ApiState { requester_ip_header_name, state.clone(), )), + sse_metrics: Arc::new(crate::api::rest::SseMetrics::new(state.clone())), state, } } diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index e67f7b1825..b8a4e208b5 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -1,4 +1,5 @@ use { + crate::state::metrics::Metrics, crate::{ api::{ rest::{validate_price_ids, RestError}, @@ -16,15 +17,56 @@ use { response::sse::{Event, KeepAlive, Sse}, }, futures::Stream, + prometheus_client::metrics::histogram::Histogram, pyth_sdk::PriceIdentifier, serde::Deserialize, serde_qs::axum::QsQuery, + std::sync::Arc, std::{convert::Infallible, time::Duration}, tokio::{sync::broadcast, time::Instant}, tokio_stream::{wrappers::BroadcastStream, StreamExt as _}, utoipa::IntoParams, }; +pub struct SseMetrics { + pub receive_to_sse_send_latency: Histogram, +} + +impl SseMetrics { + pub fn new(state: Arc) -> Self + where + S: Metrics, + S: Send + Sync + 'static, + { + let receive_to_sse_send_latency = Histogram::new( + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ); + + let new = Self { + receive_to_sse_send_latency: receive_to_sse_send_latency.clone(), + }; + + { + tokio::spawn(async move { + Metrics::register( + &*state, + ( + "receive_to_sse_send_latency", + "Latency from receive_time to SSE send time (seconds)", + receive_to_sse_send_latency, + ), + ) + .await; + }); + } + + new + } +} + // Constants const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours @@ -173,6 +215,31 @@ where ) .await?; + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()); + + for pf in &price_feeds_with_update_data.price_feeds { + if let Some(received_at) = pf.received_at { + let publish_time = pf.price_feed.get_price_unchecked().publish_time; + let pub_to_recv = (received_at - publish_time).max(0) as f64; + state + .ws + .metrics + .publish_to_receive_latency + .observe(pub_to_recv); + + if let Some(now) = now_secs { + let recv_to_send = (now - received_at).max(0) as f64; + state + .sse_metrics + .receive_to_sse_send_latency + .observe(recv_to_send); + } + } + } + let mut parsed_price_updates: Vec = price_feeds_with_update_data .price_feeds .into_iter() diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 2d1098f6c0..7bb7b1cc95 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -26,7 +26,7 @@ use { nonzero_ext::nonzero, prometheus_client::{ encoding::{EncodeLabelSet, EncodeLabelValue}, - metrics::{counter::Counter, family::Family}, + metrics::{counter::Counter, family::Family, histogram::Histogram}, }, pyth_sdk::PriceIdentifier, serde::{Deserialize, Serialize}, @@ -85,6 +85,8 @@ pub struct Labels { pub struct WsMetrics { pub interactions: Family, + pub publish_to_receive_latency: Histogram, + pub receive_to_ws_send_latency: Histogram, } impl WsMetrics { @@ -93,20 +95,64 @@ impl WsMetrics { S: Metrics, S: Send + Sync + 'static, { + let publish_to_receive_latency = Histogram::new( + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ); + let receive_to_ws_send_latency = Histogram::new( + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ); let new = Self { interactions: Family::default(), + publish_to_receive_latency: publish_to_receive_latency.clone(), + receive_to_ws_send_latency: receive_to_ws_send_latency.clone(), }; { let interactions = new.interactions.clone(); + tokio::spawn({ + let state = state.clone(); + async move { + Metrics::register( + &*state, + ( + "ws_interactions", + "Total number of websocket interactions", + interactions, + ), + ) + .await; + } + }); + + tokio::spawn({ + let state = state.clone(); + async move { + Metrics::register( + &*state, + ( + "publish_to_receive_latency", + "Latency from publish_time to receive_time (seconds)", + publish_to_receive_latency, + ), + ) + .await; + } + }); + tokio::spawn(async move { Metrics::register( &*state, ( - "ws_interactions", - "Total number of websocket interactions", - interactions, + "receive_to_ws_send_latency", + "Latency from receive_time to WebSocket send time (seconds)", + receive_to_ws_send_latency, ), ) .await; @@ -400,6 +446,7 @@ where .await? } }; + let mut pending_latency: Vec<(i64, i64)> = Vec::new(); for update in updates.price_feeds { let config = self @@ -414,6 +461,12 @@ where continue; } } + let received_at_opt = update.received_at; + let publish_time = update.price_feed.get_price_unchecked().publish_time; + + if let Some(received_at) = received_at_opt { + pending_latency.push((received_at, publish_time)); + } let message = serde_json::to_string(&ServerMessage::PriceUpdate { price_feed: RpcPriceFeed::from_price_feed_update( @@ -425,6 +478,7 @@ where // Close the connection if rate limit is exceeded and the ip is not whitelisted. // If the ip address is None no rate limiting is applied. + if let Some(ip_addr) = self.ip_addr { if !self .ws_state @@ -465,8 +519,6 @@ where } } - // `sender.feed` buffers a message to the client but does not flush it, so we can send - // multiple messages and flush them all at once. self.sender.feed(message.into()).await?; self.ws_state @@ -480,6 +532,26 @@ where } self.sender.flush().await?; + + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()); + if let Some(now) = now_secs { + for (received_at, publish_time) in pending_latency { + let pub_to_recv = (received_at - publish_time).max(0) as f64; + self.ws_state + .metrics + .publish_to_receive_latency + .observe(pub_to_recv); + let recv_to_send = (now - received_at).max(0) as f64; + self.ws_state + .metrics + .receive_to_ws_send_latency + .observe(recv_to_send); + } + } + Ok(()) }