Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.10.4"
version = "0.10.5"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
18 changes: 18 additions & 0 deletions apps/hermes/server/src/api/metrics_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
pub struct ApiMetrics {
pub requests: Family<Labels, Counter>,
pub latencies: Family<Labels, Histogram>,
pub sse_broadcast_latency: Histogram,
}

impl ApiMetrics {
Expand All @@ -36,11 +37,18 @@ impl ApiMetrics {
.into_iter(),
)
}),
sse_broadcast_latency: Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
),
};

{
let requests = new.requests.clone();
let latencies = new.latencies.clone();
let sse_broadcast_latency = new.sse_broadcast_latency.clone();

tokio::spawn(async move {
Metrics::register(
Expand All @@ -58,6 +66,16 @@ impl ApiMetrics {
),
)
.await;

Metrics::register(
&*state,
(
"sse_broadcast_latency_seconds",
"Latency from Hermes receive_time to SSE send in seconds",
sse_broadcast_latency,
),
)
.await;
});
}

Expand Down
13 changes: 13 additions & 0 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,19 @@ where
data: encoded_data,
};

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
for pu in &parsed_price_updates {
if let Some(receive_time) = pu.metadata.proof_available_time {
state
.metrics
.sse_broadcast_latency
.observe((now_secs - (receive_time as f64)).max(0.0));
}
}

Ok(Some(PriceUpdate {
binary: binary_price_update,
parsed: if parsed {
Expand Down
47 changes: 45 additions & 2 deletions apps/hermes/server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub struct Labels {

pub struct WsMetrics {
pub interactions: Family<Labels, Counter>,
pub broadcast_latency: prometheus_client::metrics::histogram::Histogram,
}

impl WsMetrics {
Expand All @@ -95,10 +96,17 @@ impl WsMetrics {
{
let new = Self {
interactions: Family::default(),
broadcast_latency: prometheus_client::metrics::histogram::Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
),
};

{
let interactions = new.interactions.clone();
let ws_broadcast_latency = new.broadcast_latency.clone();

tokio::spawn(async move {
Metrics::register(
Expand All @@ -110,6 +118,16 @@ impl WsMetrics {
),
)
.await;

Metrics::register(
&*state,
(
"ws_broadcast_latency_seconds",
"Latency from Hermes receive_time to WS send in seconds",
ws_broadcast_latency,
),
)
.await;
});
}

Expand Down Expand Up @@ -401,6 +419,17 @@ where
}
};

let batch_min_received_at = updates
.price_feeds
.iter()
.filter_map(|u| u.received_at)
.min();
let batch_min_publish_time = updates
.price_feeds
.iter()
.map(|u| u.price_feed.get_price_unchecked().publish_time)
.min();

for update in updates.price_feeds {
let config = self
.price_feeds_with_config
Expand Down Expand Up @@ -480,6 +509,21 @@ where
}

self.sender.flush().await?;
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
if let Some(min_recv) = batch_min_received_at {
self.ws_state
.metrics
.broadcast_latency
.observe((now_secs - (min_recv as f64)).max(0.0));
} else if let Some(min_pub) = batch_min_publish_time {
self.ws_state
.metrics
.broadcast_latency
.observe((now_secs - (min_pub as f64)).max(0.0));
}
Ok(())
}

Expand All @@ -488,8 +532,7 @@ where
let maybe_client_message = match message {
Message::Close(_) => {
// Closing the connection. We don't remove it from the subscribers
// list, instead when the Subscriber struct is dropped the channel
// to subscribers list will be closed and it will eventually get

// removed.
tracing::trace!(id = self.id, "Subscriber Closed Connection.");
self.ws_state
Expand Down
10 changes: 10 additions & 0 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,16 @@ where
// we can build the message states
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;

{
let mut data = self.into().data.write().await;
for ms in &message_states {
let publish = ms.message.publish_time() as f64;
let receive = ms.received_at as f64;
let latency = receive - publish;
data.metrics.observe_publish_to_receive(latency);
}
}

let message_state_keys = message_states
.iter()
.map(|message_state| message_state.key())
Expand Down
19 changes: 19 additions & 0 deletions apps/hermes/server/src/state/aggregate/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct ObservedSlotLabels {
pub struct Metrics {
observed_slot: Family<ObservedSlotLabels, Counter>,
observed_slot_latency: Family<ObservedSlotLabels, Histogram>,
publish_to_receive_latency: Histogram,
first_observed_time_of_slot: BTreeMap<Slot, Instant>,
newest_observed_slot: HashMap<Event, Slot>,
}
Expand All @@ -50,6 +51,12 @@ impl Metrics {
.into_iter(),
)
}),
publish_to_receive_latency: Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
),
first_observed_time_of_slot: BTreeMap::new(),
newest_observed_slot: HashMap::new(),
};
Expand All @@ -69,11 +76,23 @@ impl Metrics {
"Latency of observed slots in seconds",
observed_slot_latency,
);

metrics_registry.register(
"publish_to_receive_latency_seconds",
"Latency from message publish_time to Hermes receive_time in seconds",
new.publish_to_receive_latency.clone(),
);
}

new
}

pub fn observe_publish_to_receive(&mut self, latency_secs: f64) {
if latency_secs.is_finite() && latency_secs >= 0.0 {
self.publish_to_receive_latency.observe(latency_secs);
}
}

/// Observe a slot and event. An event at a slot should be observed only once.
pub fn observe(&mut self, slot: Slot, event: Event) {
let order = if self
Expand Down
Loading