Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.10.4"
version = "0.10.5"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
3 changes: 3 additions & 0 deletions apps/hermes/server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ApiState<S> {
pub state: Arc<S>,
pub ws: Arc<ws::WsState>,
pub metrics: Arc<metrics_middleware::ApiMetrics>,
pub sse_metrics: Arc<crate::api::rest::SseMetrics>,
}

/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
Expand All @@ -33,6 +34,7 @@ impl<S> Clone for ApiState<S> {
state: self.state.clone(),
ws: self.ws.clone(),
metrics: self.metrics.clone(),
sse_metrics: self.sse_metrics.clone(),
}
}
}
Expand All @@ -50,6 +52,7 @@ impl<S> ApiState<S> {
requester_ip_header_name,
state.clone(),
)),
sse_metrics: Arc::new(crate::api::rest::SseMetrics::new(state.clone())),
state,
}
}
Expand Down
67 changes: 67 additions & 0 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
crate::state::metrics::Metrics,
crate::{
api::{
rest::{validate_price_ids, RestError},
Expand All @@ -16,15 +17,56 @@ use {
response::sse::{Event, KeepAlive, Sse},
},
futures::Stream,
prometheus_client::metrics::histogram::Histogram,
pyth_sdk::PriceIdentifier,
serde::Deserialize,
serde_qs::axum::QsQuery,
std::sync::Arc,
std::{convert::Infallible, time::Duration},
tokio::{sync::broadcast, time::Instant},
tokio_stream::{wrappers::BroadcastStream, StreamExt as _},
utoipa::IntoParams,
};

pub struct SseMetrics {
pub receive_to_sse_send_latency: Histogram,
}

impl SseMetrics {
pub fn new<S>(state: Arc<S>) -> Self
where
S: Metrics,
S: Send + Sync + 'static,
{
let receive_to_sse_send_latency = Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
);

let new = Self {
receive_to_sse_send_latency: receive_to_sse_send_latency.clone(),
};

{
tokio::spawn(async move {
Metrics::register(
&*state,
(
"receive_to_sse_send_latency",
"Latency from receive_time to SSE send time (seconds)",
receive_to_sse_send_latency,
),
)
.await;
});
}

new
}
}

// Constants
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours

Expand Down Expand Up @@ -173,6 +215,31 @@ where
)
.await?;

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok());

for pf in &price_feeds_with_update_data.price_feeds {
if let Some(received_at) = pf.received_at {
let publish_time = pf.price_feed.get_price_unchecked().publish_time;
let pub_to_recv = (received_at - publish_time).max(0) as f64;
state
.ws
.metrics
.publish_to_receive_latency
.observe(pub_to_recv);

if let Some(now) = now_secs {
let recv_to_send = (now - received_at).max(0) as f64;
state
.sse_metrics
.receive_to_sse_send_latency
.observe(recv_to_send);
}
}
}

let mut parsed_price_updates: Vec<ParsedPriceUpdate> = price_feeds_with_update_data
.price_feeds
.into_iter()
Expand Down
78 changes: 72 additions & 6 deletions apps/hermes/server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
nonzero_ext::nonzero,
prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
metrics::{counter::Counter, family::Family},
metrics::{counter::Counter, family::Family, histogram::Histogram},
},
pyth_sdk::PriceIdentifier,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -85,6 +85,8 @@ pub struct Labels {

pub struct WsMetrics {
pub interactions: Family<Labels, Counter>,
pub publish_to_receive_latency: Histogram,
pub receive_to_ws_send_latency: Histogram,
}

impl WsMetrics {
Expand All @@ -93,20 +95,64 @@ impl WsMetrics {
S: Metrics,
S: Send + Sync + 'static,
{
let publish_to_receive_latency = Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
);
let receive_to_ws_send_latency = Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
);
let new = Self {
interactions: Family::default(),
publish_to_receive_latency: publish_to_receive_latency.clone(),
receive_to_ws_send_latency: receive_to_ws_send_latency.clone(),
};

{
let interactions = new.interactions.clone();

tokio::spawn({
let state = state.clone();
async move {
Metrics::register(
&*state,
(
"ws_interactions",
"Total number of websocket interactions",
interactions,
),
)
.await;
}
});

tokio::spawn({
let state = state.clone();
async move {
Metrics::register(
&*state,
(
"publish_to_receive_latency",
"Latency from publish_time to receive_time (seconds)",
publish_to_receive_latency,
),
)
.await;
}
});

tokio::spawn(async move {
Metrics::register(
&*state,
(
"ws_interactions",
"Total number of websocket interactions",
interactions,
"receive_to_ws_send_latency",
"Latency from receive_time to WebSocket send time (seconds)",
receive_to_ws_send_latency,
),
)
.await;
Expand Down Expand Up @@ -414,6 +460,8 @@ where
continue;
}
}
let received_at_opt = update.received_at;
let publish_time = update.price_feed.get_price_unchecked().publish_time;

let message = serde_json::to_string(&ServerMessage::PriceUpdate {
price_feed: RpcPriceFeed::from_price_feed_update(
Expand All @@ -425,6 +473,7 @@ where

// Close the connection if rate limit is exceeded and the ip is not whitelisted.
// If the ip address is None no rate limiting is applied.

if let Some(ip_addr) = self.ip_addr {
if !self
.ws_state
Expand Down Expand Up @@ -460,13 +509,30 @@ where
)
.await?;
self.sender.close().await?;
if let Some(received_at) = received_at_opt {
let pub_to_recv = (received_at - publish_time).max(0) as f64;
self.ws_state
.metrics
.publish_to_receive_latency
.observe(pub_to_recv);

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(received_at);
let recv_to_send = (now_secs - received_at).max(0) as f64;
self.ws_state
.metrics
.receive_to_ws_send_latency
.observe(recv_to_send);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the wrong place, we should observe the latency after self.sender.flush()

self.closed = true;
return Ok(());
}
}

// `sender.feed` buffers a message to the client but does not flush it, so we can send
// multiple messages and flush them all at once.
self.sender.feed(message.into()).await?;

self.ws_state
Expand Down
Loading