Skip to content

Commit 4f4be83

Browse files
feat(hermes): add staleness histograms for publish→receive and receive→broadcast latencies
Co-Authored-By: Tejas Badadare <[email protected]>
1 parent f1c82b6 commit 4f4be83

File tree

6 files changed

+90
-1
lines changed

6 files changed

+90
-1
lines changed

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.10.4"
3+
version = "0.10.5"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/api/metrics_middleware.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use {
1818
pub struct ApiMetrics {
1919
pub requests: Family<Labels, Counter>,
2020
pub latencies: Family<Labels, Histogram>,
21+
pub sse_broadcast_latency: Histogram,
2122
}
2223

2324
impl ApiMetrics {
@@ -36,6 +37,12 @@ impl ApiMetrics {
3637
.into_iter(),
3738
)
3839
}),
40+
sse_broadcast_latency: Histogram::new(
41+
[
42+
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
43+
]
44+
.into_iter(),
45+
),
3946
};
4047

4148
{
@@ -58,6 +65,16 @@ impl ApiMetrics {
5865
),
5966
)
6067
.await;
68+
69+
Metrics::register(
70+
&*state,
71+
(
72+
"sse_broadcast_latency_seconds",
73+
"Latency from Hermes receive_time to SSE send in seconds",
74+
new.sse_broadcast_latency.clone(),
75+
),
76+
)
77+
.await;
6178
});
6279
}
6380

apps/hermes/server/src/api/rest/v2/sse.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,20 @@ where
206206
return Ok(None);
207207
}
208208

209+
let now_secs = std::time::SystemTime::now()
210+
.duration_since(std::time::UNIX_EPOCH)
211+
.map(|d| d.as_secs_f64())
212+
.unwrap_or(0.0);
213+
for pu in &parsed_price_updates {
214+
if let Some(receive_time) = pu.metadata.proof_available_time {
215+
let latency = now_secs - (receive_time as f64);
216+
state
217+
.metrics
218+
.sse_broadcast_latency
219+
.observe(latency.max(0.0));
220+
}
221+
}
222+
209223
let price_update_data = price_feeds_with_update_data.update_data;
210224
let encoded_data: Vec<String> = price_update_data
211225
.into_iter()

apps/hermes/server/src/api/ws.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub struct Labels {
8585

8686
pub struct WsMetrics {
8787
pub interactions: Family<Labels, Counter>,
88+
pub broadcast_latency: prometheus_client::metrics::histogram::Histogram,
8889
}
8990

9091
impl WsMetrics {
@@ -95,6 +96,12 @@ impl WsMetrics {
9596
{
9697
let new = Self {
9798
interactions: Family::default(),
99+
broadcast_latency: prometheus_client::metrics::histogram::Histogram::new(
100+
[
101+
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
102+
]
103+
.into_iter(),
104+
),
98105
};
99106

100107
{
@@ -110,6 +117,16 @@ impl WsMetrics {
110117
),
111118
)
112119
.await;
120+
121+
Metrics::register(
122+
&*state,
123+
(
124+
"ws_broadcast_latency_seconds",
125+
"Latency from Hermes receive_time to WS send in seconds",
126+
new.broadcast_latency.clone(),
127+
),
128+
)
129+
.await;
113130
});
114131
}
115132

@@ -415,6 +432,18 @@ where
415432
}
416433
}
417434

435+
let now_secs = std::time::SystemTime::now()
436+
.duration_since(std::time::UNIX_EPOCH)
437+
.map(|d| d.as_secs_f64())
438+
.unwrap_or(0.0);
439+
if let Some(received_at) = update.received_at {
440+
let latency = now_secs - (received_at as f64);
441+
self.ws_state
442+
.metrics
443+
.broadcast_latency
444+
.observe(latency.max(0.0));
445+
}
446+
418447
let message = serde_json::to_string(&ServerMessage::PriceUpdate {
419448
price_feed: RpcPriceFeed::from_price_feed_update(
420449
update,

apps/hermes/server/src/state/aggregate.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,16 @@ where
367367
// we can build the message states
368368
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;
369369

370+
{
371+
let mut data = self.into().data.write().await;
372+
for ms in &message_states {
373+
let publish = ms.message.publish_time() as f64;
374+
let receive = ms.received_at as f64;
375+
let latency = receive - publish;
376+
data.metrics.observe_publish_to_receive(latency);
377+
}
378+
}
379+
370380
let message_state_keys = message_states
371381
.iter()
372382
.map(|message_state| message_state.key())

apps/hermes/server/src/state/aggregate/metrics.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct ObservedSlotLabels {
3434
pub struct Metrics {
3535
observed_slot: Family<ObservedSlotLabels, Counter>,
3636
observed_slot_latency: Family<ObservedSlotLabels, Histogram>,
37+
publish_to_receive_latency: Histogram,
3738
first_observed_time_of_slot: BTreeMap<Slot, Instant>,
3839
newest_observed_slot: HashMap<Event, Slot>,
3940
}
@@ -50,6 +51,12 @@ impl Metrics {
5051
.into_iter(),
5152
)
5253
}),
54+
publish_to_receive_latency: Histogram::new(
55+
[
56+
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
57+
]
58+
.into_iter(),
59+
),
5360
first_observed_time_of_slot: BTreeMap::new(),
5461
newest_observed_slot: HashMap::new(),
5562
};
@@ -69,11 +76,23 @@ impl Metrics {
6976
"Latency of observed slots in seconds",
7077
observed_slot_latency,
7178
);
79+
80+
metrics_registry.register(
81+
"publish_to_receive_latency_seconds",
82+
"Latency from message publish_time to Hermes receive_time in seconds",
83+
new.publish_to_receive_latency.clone(),
84+
);
7285
}
7386

7487
new
7588
}
7689

90+
pub fn observe_publish_to_receive(&mut self, latency_secs: f64) {
91+
if latency_secs.is_finite() && latency_secs >= 0.0 {
92+
self.publish_to_receive_latency.observe(latency_secs);
93+
}
94+
}
95+
7796
/// Observe a slot and event. An event at a slot should be observed only once.
7897
pub fn observe(&mut self, slot: Slot, event: Event) {
7998
let order = if self

0 commit comments

Comments
 (0)