Skip to content

Commit b18d6ac

Browse files
feat(hermes): add staleness metrics (#2978)
* feat(hermes): add staleness histograms for publish→receive and receive→broadcast latencies Co-Authored-By: Tejas Badadare <[email protected]> * fix(hermes): clone histograms before registering to avoid moving into async tasks Co-Authored-By: Tejas Badadare <[email protected]> * move ws latency observation * conversion comments * comment * fmt --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 01b2f80 commit b18d6ac

File tree

7 files changed

+105
-2
lines changed

7 files changed

+105
-2
lines changed

apps/hermes/server/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.10.4"
3+
version = "0.10.5"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/api/metrics_middleware.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use {
1818
pub struct ApiMetrics {
1919
pub requests: Family<Labels, Counter>,
2020
pub latencies: Family<Labels, Histogram>,
21+
pub sse_broadcast_latency: Histogram,
2122
}
2223

2324
impl ApiMetrics {
@@ -36,11 +37,18 @@ impl ApiMetrics {
3637
.into_iter(),
3738
)
3839
}),
40+
sse_broadcast_latency: Histogram::new(
41+
[
42+
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
43+
]
44+
.into_iter(),
45+
),
3946
};
4047

4148
{
4249
let requests = new.requests.clone();
4350
let latencies = new.latencies.clone();
51+
let sse_broadcast_latency = new.sse_broadcast_latency.clone();
4452

4553
tokio::spawn(async move {
4654
Metrics::register(
@@ -58,6 +66,16 @@ impl ApiMetrics {
5866
),
5967
)
6068
.await;
69+
70+
Metrics::register(
71+
&*state,
72+
(
73+
"sse_broadcast_latency_seconds",
74+
"Latency from Hermes receive_time to SSE send in seconds",
75+
sse_broadcast_latency,
76+
),
77+
)
78+
.await;
6179
});
6280
}
6381

apps/hermes/server/src/api/rest/v2/sse.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,20 @@ where
206206
return Ok(None);
207207
}
208208

209+
let now_secs = std::time::SystemTime::now()
210+
.duration_since(std::time::UNIX_EPOCH)
211+
.map(|d| d.as_secs_f64())
212+
.unwrap_or(0.0);
213+
for pu in &parsed_price_updates {
214+
if let Some(receive_time) = pu.metadata.proof_available_time {
215+
let latency = now_secs - (receive_time as f64);
216+
state
217+
.metrics
218+
.sse_broadcast_latency
219+
.observe(latency.max(0.0));
220+
}
221+
}
222+
209223
let price_update_data = price_feeds_with_update_data.update_data;
210224
let encoded_data: Vec<String> = price_update_data
211225
.into_iter()

apps/hermes/server/src/api/ws.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub struct Labels {
8585

8686
pub struct WsMetrics {
8787
pub interactions: Family<Labels, Counter>,
88+
pub broadcast_latency: prometheus_client::metrics::histogram::Histogram,
8889
}
8990

9091
impl WsMetrics {
@@ -95,10 +96,17 @@ impl WsMetrics {
9596
{
9697
let new = Self {
9798
interactions: Family::default(),
99+
broadcast_latency: prometheus_client::metrics::histogram::Histogram::new(
100+
[
101+
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
102+
]
103+
.into_iter(),
104+
),
98105
};
99106

100107
{
101108
let interactions = new.interactions.clone();
109+
let ws_broadcast_latency = new.broadcast_latency.clone();
102110

103111
tokio::spawn(async move {
104112
Metrics::register(
@@ -110,6 +118,16 @@ impl WsMetrics {
110118
),
111119
)
112120
.await;
121+
122+
Metrics::register(
123+
&*state,
124+
(
125+
"ws_broadcast_latency_seconds",
126+
"Latency from Hermes receive_time to WS send in seconds",
127+
ws_broadcast_latency,
128+
),
129+
)
130+
.await;
113131
});
114132
}
115133

@@ -401,6 +419,13 @@ where
401419
}
402420
};
403421

422+
// Capture the minimum receive_time from the updates batch
423+
let min_received_at = updates
424+
.price_feeds
425+
.iter()
426+
.filter_map(|update| update.received_at)
427+
.min();
428+
404429
for update in updates.price_feeds {
405430
let config = self
406431
.price_feeds_with_config
@@ -480,6 +505,21 @@ where
480505
}
481506

482507
self.sender.flush().await?;
508+
509+
// Record latency from receive to ws send after flushing
510+
if let Some(min_received_at) = min_received_at {
511+
let now_secs = std::time::SystemTime::now()
512+
.duration_since(std::time::UNIX_EPOCH)
513+
.map(|d| d.as_secs_f64())
514+
.unwrap_or(0.0);
515+
// Histogram only accepts f64. The conversion is safe (never panics), but very large values lose precision.
516+
let latency = now_secs - (min_received_at as f64);
517+
self.ws_state
518+
.metrics
519+
.broadcast_latency
520+
.observe(latency.max(0.0));
521+
}
522+
483523
Ok(())
484524
}
485525

apps/hermes/server/src/state/aggregate.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,16 @@ where
367367
// we can build the message states
368368
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;
369369

370+
{
371+
let mut data = self.into().data.write().await;
372+
for ms in &message_states {
373+
let publish = ms.message.publish_time();
374+
let receive = ms.received_at;
375+
let latency = receive - publish;
376+
data.metrics.observe_publish_to_receive(latency);
377+
}
378+
}
379+
370380
let message_state_keys = message_states
371381
.iter()
372382
.map(|message_state| message_state.key())

apps/hermes/server/src/state/aggregate/metrics.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct ObservedSlotLabels {
3434
pub struct Metrics {
3535
observed_slot: Family<ObservedSlotLabels, Counter>,
3636
observed_slot_latency: Family<ObservedSlotLabels, Histogram>,
37+
publish_to_receive_latency: Histogram,
3738
first_observed_time_of_slot: BTreeMap<Slot, Instant>,
3839
newest_observed_slot: HashMap<Event, Slot>,
3940
}
@@ -50,6 +51,12 @@ impl Metrics {
5051
.into_iter(),
5152
)
5253
}),
54+
publish_to_receive_latency: Histogram::new(
55+
[
56+
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
57+
]
58+
.into_iter(),
59+
),
5360
first_observed_time_of_slot: BTreeMap::new(),
5461
newest_observed_slot: HashMap::new(),
5562
};
@@ -69,11 +76,25 @@ impl Metrics {
6976
"Latency of observed slots in seconds",
7077
observed_slot_latency,
7178
);
79+
80+
metrics_registry.register(
81+
"publish_to_receive_latency_seconds",
82+
"Latency from message publish_time to Hermes receive_time in seconds",
83+
new.publish_to_receive_latency.clone(),
84+
);
7285
}
7386

7487
new
7588
}
7689

90+
pub fn observe_publish_to_receive(&mut self, latency_secs: i64) {
91+
// Histogram only accepts f64. The conversion is safe (never panics), but very large values lose precision.
92+
let latency_secs = latency_secs as f64;
93+
if latency_secs.is_finite() && latency_secs >= 0.0 {
94+
self.publish_to_receive_latency.observe(latency_secs);
95+
}
96+
}
97+
7798
/// Observe a slot and event. An event at a slot should be observed only once.
7899
pub fn observe(&mut self, slot: Slot, event: Event) {
79100
let order = if self

0 commit comments

Comments
 (0)