Skip to content

Commit 8a29892

Browse files
feat(hermes): add latency histograms for SSE/WS send paths
Co-Authored-By: Tejas Badadare <[email protected]>
1 parent f1c82b6 commit 8a29892

File tree

4 files changed

+120
-7
lines changed

4 files changed

+120
-7
lines changed

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.10.4"
3+
version = "0.10.5"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/api.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct ApiState<S> {
2323
pub state: Arc<S>,
2424
pub ws: Arc<ws::WsState>,
2525
pub metrics: Arc<metrics_middleware::ApiMetrics>,
26+
pub sse_metrics: Arc<crate::api::rest::SseMetrics>,
2627
}
2728

2829
/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
@@ -33,6 +34,7 @@ impl<S> Clone for ApiState<S> {
3334
state: self.state.clone(),
3435
ws: self.ws.clone(),
3536
metrics: self.metrics.clone(),
37+
sse_metrics: self.sse_metrics.clone(),
3638
}
3739
}
3840
}
@@ -50,6 +52,7 @@ impl<S> ApiState<S> {
5052
requester_ip_header_name,
5153
state.clone(),
5254
)),
55+
sse_metrics: Arc::new(crate::api::rest::SseMetrics::new(state.clone())),
5356
state,
5457
}
5558
}

apps/hermes/server/src/api/rest/v2/sse.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,51 @@ use {
2222
std::{convert::Infallible, time::Duration},
2323
tokio::{sync::broadcast, time::Instant},
2424
tokio_stream::{wrappers::BroadcastStream, StreamExt as _},
25+
prometheus_client::metrics::histogram::Histogram,
2526
utoipa::IntoParams,
27+
std::sync::Arc,
28+
crate::state::metrics::Metrics,
2629
};
2730

31+
32+
pub struct SseMetrics {
33+
pub receive_to_sse_send_latency: Histogram,
34+
}
35+
36+
impl SseMetrics {
37+
pub fn new<S>(state: Arc<S>) -> Self
38+
where
39+
S: Metrics,
40+
S: Send + Sync + 'static,
41+
{
42+
let receive_to_sse_send_latency = Histogram::new(
43+
[0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(),
44+
);
45+
46+
let new = Self {
47+
receive_to_sse_send_latency: receive_to_sse_send_latency.clone(),
48+
};
49+
50+
{
51+
52+
tokio::spawn(async move {
53+
Metrics::register(
54+
&*state,
55+
(
56+
"receive_to_sse_send_latency",
57+
"Latency from receive_time to SSE send time (seconds)",
58+
receive_to_sse_send_latency,
59+
),
60+
)
61+
.await;
62+
});
63+
}
64+
65+
new
66+
}
67+
}
68+
69+
2870
// Constants
2971
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours
3072

@@ -173,8 +215,34 @@ where
173215
)
174216
.await?;
175217

218+
let now_secs = std::time::SystemTime::now()
219+
.duration_since(std::time::UNIX_EPOCH)
220+
.ok()
221+
.and_then(|d| i64::try_from(d.as_secs()).ok());
222+
223+
for pf in &price_feeds_with_update_data.price_feeds {
224+
if let Some(received_at) = pf.received_at {
225+
let publish_time = pf.price_feed.get_price_unchecked().publish_time;
226+
let pub_to_recv = (received_at - publish_time).max(0) as f64;
227+
state
228+
.ws
229+
.metrics
230+
.publish_to_receive_latency
231+
.observe(pub_to_recv);
232+
233+
if let Some(now) = now_secs {
234+
let recv_to_send = (now - received_at).max(0) as f64;
235+
state
236+
.sse_metrics
237+
.receive_to_sse_send_latency
238+
.observe(recv_to_send);
239+
}
240+
}
241+
}
242+
176243
let mut parsed_price_updates: Vec<ParsedPriceUpdate> = price_feeds_with_update_data
177244
.price_feeds
245+
178246
.into_iter()
179247
.map(|price_feed| price_feed.into())
180248
.collect();

apps/hermes/server/src/api/ws.rs

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use {
2626
nonzero_ext::nonzero,
2727
prometheus_client::{
2828
encoding::{EncodeLabelSet, EncodeLabelValue},
29-
metrics::{counter::Counter, family::Family},
29+
metrics::{counter::Counter, family::Family, histogram::Histogram},
3030
},
3131
pyth_sdk::PriceIdentifier,
3232
serde::{Deserialize, Serialize},
@@ -85,6 +85,8 @@ pub struct Labels {
8585

8686
pub struct WsMetrics {
8787
pub interactions: Family<Labels, Counter>,
88+
pub publish_to_receive_latency: Histogram,
89+
pub receive_to_ws_send_latency: Histogram,
8890
}
8991

9092
impl WsMetrics {
@@ -93,20 +95,58 @@ impl WsMetrics {
9395
S: Metrics,
9496
S: Send + Sync + 'static,
9597
{
98+
let publish_to_receive_latency = Histogram::new(
99+
[0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(),
100+
);
101+
let receive_to_ws_send_latency = Histogram::new(
102+
[0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(),
103+
);
96104
let new = Self {
97105
interactions: Family::default(),
106+
publish_to_receive_latency: publish_to_receive_latency.clone(),
107+
receive_to_ws_send_latency: receive_to_ws_send_latency.clone(),
98108
};
99109

100110
{
101111
let interactions = new.interactions.clone();
102112

113+
tokio::spawn({
114+
let state = state.clone();
115+
async move {
116+
Metrics::register(
117+
&*state,
118+
(
119+
"ws_interactions",
120+
"Total number of websocket interactions",
121+
interactions,
122+
),
123+
)
124+
.await;
125+
}
126+
});
127+
128+
tokio::spawn({
129+
let state = state.clone();
130+
async move {
131+
Metrics::register(
132+
&*state,
133+
(
134+
"publish_to_receive_latency",
135+
"Latency from publish_time to receive_time (seconds)",
136+
publish_to_receive_latency,
137+
),
138+
)
139+
.await;
140+
}
141+
});
142+
103143
tokio::spawn(async move {
104144
Metrics::register(
105145
&*state,
106146
(
107-
"ws_interactions",
108-
"Total number of websocket interactions",
109-
interactions,
147+
"receive_to_ws_send_latency",
148+
"Latency from receive_time to WebSocket send time (seconds)",
149+
receive_to_ws_send_latency,
110150
),
111151
)
112152
.await;
@@ -414,6 +454,8 @@ where
414454
continue;
415455
}
416456
}
457+
let received_at_opt = update.received_at;
458+
let publish_time = update.price_feed.get_price_unchecked().publish_time;
417459

418460
let message = serde_json::to_string(&ServerMessage::PriceUpdate {
419461
price_feed: RpcPriceFeed::from_price_feed_update(
@@ -425,6 +467,7 @@ where
425467

426468
// Close the connection if rate limit is exceeded and the ip is not whitelisted.
427469
// If the ip address is None no rate limiting is applied.
470+
428471
if let Some(ip_addr) = self.ip_addr {
429472
if !self
430473
.ws_state
@@ -465,8 +508,7 @@ where
465508
}
466509
}
467510

468-
// `sender.feed` buffers a message to the client but does not flush it, so we can send
469-
// multiple messages and flush them all at once.
511+
470512
self.sender.feed(message.into()).await?;
471513

472514
self.ws_state

0 commit comments

Comments
 (0)