Skip to content

Commit caf71e4

Browse files
fix(hermes): observe WS latency histograms and satisfy clippy/fmt
Co-Authored-By: Tejas Badadare <[email protected]>
1 parent 8a29892 commit caf71e4

File tree

2 files changed

+34
-11
lines changed

2 files changed

+34
-11
lines changed

apps/hermes/server/src/api/rest/v2/sse.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use {
2+
crate::state::metrics::Metrics,
23
crate::{
34
api::{
45
rest::{validate_price_ids, RestError},
@@ -16,19 +17,17 @@ use {
1617
response::sse::{Event, KeepAlive, Sse},
1718
},
1819
futures::Stream,
20+
prometheus_client::metrics::histogram::Histogram,
1921
pyth_sdk::PriceIdentifier,
2022
serde::Deserialize,
2123
serde_qs::axum::QsQuery,
24+
std::sync::Arc,
2225
std::{convert::Infallible, time::Duration},
2326
tokio::{sync::broadcast, time::Instant},
2427
tokio_stream::{wrappers::BroadcastStream, StreamExt as _},
25-
prometheus_client::metrics::histogram::Histogram,
2628
utoipa::IntoParams,
27-
std::sync::Arc,
28-
crate::state::metrics::Metrics,
2929
};
3030

31-
3231
pub struct SseMetrics {
3332
pub receive_to_sse_send_latency: Histogram,
3433
}
@@ -40,15 +39,17 @@ impl SseMetrics {
4039
S: Send + Sync + 'static,
4140
{
4241
let receive_to_sse_send_latency = Histogram::new(
43-
[0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(),
42+
[
43+
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
44+
]
45+
.into_iter(),
4446
);
4547

4648
let new = Self {
4749
receive_to_sse_send_latency: receive_to_sse_send_latency.clone(),
4850
};
4951

5052
{
51-
5253
tokio::spawn(async move {
5354
Metrics::register(
5455
&*state,
@@ -66,7 +67,6 @@ impl SseMetrics {
6667
}
6768
}
6869

69-
7070
// Constants
7171
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours
7272

@@ -242,7 +242,6 @@ where
242242

243243
let mut parsed_price_updates: Vec<ParsedPriceUpdate> = price_feeds_with_update_data
244244
.price_feeds
245-
246245
.into_iter()
247246
.map(|price_feed| price_feed.into())
248247
.collect();

apps/hermes/server/src/api/ws.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,16 @@ impl WsMetrics {
9696
S: Send + Sync + 'static,
9797
{
9898
let publish_to_receive_latency = Histogram::new(
99-
[0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(),
99+
[
100+
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
101+
]
102+
.into_iter(),
100103
);
101104
let receive_to_ws_send_latency = Histogram::new(
102-
[0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0].into_iter(),
105+
[
106+
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
107+
]
108+
.into_iter(),
103109
);
104110
let new = Self {
105111
interactions: Family::default(),
@@ -503,12 +509,30 @@ where
503509
)
504510
.await?;
505511
self.sender.close().await?;
512+
if let Some(received_at) = received_at_opt {
513+
let pub_to_recv = (received_at - publish_time).max(0) as f64;
514+
self.ws_state
515+
.metrics
516+
.publish_to_receive_latency
517+
.observe(pub_to_recv);
518+
519+
let now_secs = std::time::SystemTime::now()
520+
.duration_since(std::time::UNIX_EPOCH)
521+
.ok()
522+
.and_then(|d| i64::try_from(d.as_secs()).ok())
523+
.unwrap_or(received_at);
524+
let recv_to_send = (now_secs - received_at).max(0) as f64;
525+
self.ws_state
526+
.metrics
527+
.receive_to_ws_send_latency
528+
.observe(recv_to_send);
529+
}
530+
506531
self.closed = true;
507532
return Ok(());
508533
}
509534
}
510535

511-
512536
self.sender.feed(message.into()).await?;
513537

514538
self.ws_state

0 commit comments

Comments
 (0)