Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.10.4"
version = "0.10.5"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
3 changes: 3 additions & 0 deletions apps/hermes/server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ApiState<S> {
pub state: Arc<S>,
pub ws: Arc<ws::WsState>,
pub metrics: Arc<metrics_middleware::ApiMetrics>,
pub sse_metrics: Arc<crate::api::rest::SseMetrics>,
}

/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
Expand All @@ -33,6 +34,7 @@ impl<S> Clone for ApiState<S> {
state: self.state.clone(),
ws: self.ws.clone(),
metrics: self.metrics.clone(),
sse_metrics: self.sse_metrics.clone(),
}
}
}
Expand All @@ -50,6 +52,7 @@ impl<S> ApiState<S> {
requester_ip_header_name,
state.clone(),
)),
sse_metrics: Arc::new(crate::api::rest::SseMetrics::new(state.clone())),
state,
}
}
Expand Down
67 changes: 67 additions & 0 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
crate::state::metrics::Metrics,
crate::{
api::{
rest::{validate_price_ids, RestError},
Expand All @@ -16,15 +17,56 @@ use {
response::sse::{Event, KeepAlive, Sse},
},
futures::Stream,
prometheus_client::metrics::histogram::Histogram,
pyth_sdk::PriceIdentifier,
serde::Deserialize,
serde_qs::axum::QsQuery,
std::sync::Arc,
std::{convert::Infallible, time::Duration},
tokio::{sync::broadcast, time::Instant},
tokio_stream::{wrappers::BroadcastStream, StreamExt as _},
utoipa::IntoParams,
};

pub struct SseMetrics {
pub receive_to_sse_send_latency: Histogram,
}

impl SseMetrics {
pub fn new<S>(state: Arc<S>) -> Self
where
S: Metrics,
S: Send + Sync + 'static,
{
let receive_to_sse_send_latency = Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
);

let new = Self {
receive_to_sse_send_latency: receive_to_sse_send_latency.clone(),
};

{
tokio::spawn(async move {
Metrics::register(
&*state,
(
"receive_to_sse_send_latency",
"Latency from receive_time to SSE send time (seconds)",
receive_to_sse_send_latency,
),
)
.await;
});
}

new
}
}

// Constants
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours

Expand Down Expand Up @@ -173,6 +215,31 @@ where
)
.await?;

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok());

for pf in &price_feeds_with_update_data.price_feeds {
if let Some(received_at) = pf.received_at {
let publish_time = pf.price_feed.get_price_unchecked().publish_time;
let pub_to_recv = (received_at - publish_time).max(0) as f64;
state
.ws
.metrics
.publish_to_receive_latency
.observe(pub_to_recv);

if let Some(now) = now_secs {
let recv_to_send = (now - received_at).max(0) as f64;
state
.sse_metrics
.receive_to_sse_send_latency
.observe(recv_to_send);
}
}
}

let mut parsed_price_updates: Vec<ParsedPriceUpdate> = price_feeds_with_update_data
.price_feeds
.into_iter()
Expand Down
84 changes: 78 additions & 6 deletions apps/hermes/server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
nonzero_ext::nonzero,
prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
metrics::{counter::Counter, family::Family},
metrics::{counter::Counter, family::Family, histogram::Histogram},
},
pyth_sdk::PriceIdentifier,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -85,6 +85,8 @@ pub struct Labels {

pub struct WsMetrics {
pub interactions: Family<Labels, Counter>,
pub publish_to_receive_latency: Histogram,
pub receive_to_ws_send_latency: Histogram,
}

impl WsMetrics {
Expand All @@ -93,20 +95,64 @@ impl WsMetrics {
S: Metrics,
S: Send + Sync + 'static,
{
let publish_to_receive_latency = Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
);
let receive_to_ws_send_latency = Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
);
let new = Self {
interactions: Family::default(),
publish_to_receive_latency: publish_to_receive_latency.clone(),
receive_to_ws_send_latency: receive_to_ws_send_latency.clone(),
};

{
let interactions = new.interactions.clone();

tokio::spawn({
let state = state.clone();
async move {
Metrics::register(
&*state,
(
"ws_interactions",
"Total number of websocket interactions",
interactions,
),
)
.await;
}
});

tokio::spawn({
let state = state.clone();
async move {
Metrics::register(
&*state,
(
"publish_to_receive_latency",
"Latency from publish_time to receive_time (seconds)",
publish_to_receive_latency,
),
)
.await;
}
});

tokio::spawn(async move {
Metrics::register(
&*state,
(
"ws_interactions",
"Total number of websocket interactions",
interactions,
"receive_to_ws_send_latency",
"Latency from receive_time to WebSocket send time (seconds)",
receive_to_ws_send_latency,
),
)
.await;
Expand Down Expand Up @@ -400,6 +446,7 @@ where
.await?
}
};
let mut pending_latency: Vec<(i64, i64)> = Vec::new();

for update in updates.price_feeds {
let config = self
Expand All @@ -414,6 +461,12 @@ where
continue;
}
}
let received_at_opt = update.received_at;
let publish_time = update.price_feed.get_price_unchecked().publish_time;

if let Some(received_at) = received_at_opt {
pending_latency.push((received_at, publish_time));
}

let message = serde_json::to_string(&ServerMessage::PriceUpdate {
price_feed: RpcPriceFeed::from_price_feed_update(
Expand All @@ -425,6 +478,7 @@ where

// Close the connection if rate limit is exceeded and the ip is not whitelisted.
// If the ip address is None no rate limiting is applied.

if let Some(ip_addr) = self.ip_addr {
if !self
.ws_state
Expand Down Expand Up @@ -465,8 +519,6 @@ where
}
}

// `sender.feed` buffers a message to the client but does not flush it, so we can send
// multiple messages and flush them all at once.
self.sender.feed(message.into()).await?;

self.ws_state
Expand All @@ -480,6 +532,26 @@ where
}

self.sender.flush().await?;

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok());
if let Some(now) = now_secs {
for (received_at, publish_time) in pending_latency {
let pub_to_recv = (received_at - publish_time).max(0) as f64;
self.ws_state
.metrics
.publish_to_receive_latency
.observe(pub_to_recv);
let recv_to_send = (now - received_at).max(0) as f64;
self.ws_state
.metrics
.receive_to_ws_send_latency
.observe(recv_to_send);
}
}

Ok(())
}

Expand Down
Loading