Skip to content

Commit 309ab94

Browse files
authored
add additional status update loop metrics (#575)
1 parent 422a7a6 commit 309ab94

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

crates/orchestrator/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ async fn main() -> Result<()> {
381381

382382
let status_update_store_context = store_context.clone();
383383
let status_update_heartbeats = heartbeats.clone();
384+
let status_update_metrics = metrics_context.clone();
384385
tasks.spawn({
385386
let contracts = contracts.clone();
386387
async move {
@@ -393,6 +394,7 @@ async fn main() -> Result<()> {
393394
args.disable_ejection,
394395
status_update_heartbeats.clone(),
395396
status_updater_plugins,
397+
status_update_metrics,
396398
);
397399
status_updater.run().await
398400
}

crates/orchestrator/src/metrics/mod.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use prometheus::{CounterVec, GaugeVec, Opts, Registry, TextEncoder};
1+
use prometheus::{CounterVec, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder};
22
pub mod sync_service;
33
pub mod webhook_sender;
44

@@ -14,6 +14,7 @@ pub struct MetricsContext {
1414
pub heartbeat_requests_total: CounterVec,
1515
pub nodes_per_task: GaugeVec,
1616
pub task_state: GaugeVec,
17+
pub status_update_execution_time: HistogramVec,
1718
}
1819

1920
impl MetricsContext {
@@ -90,7 +91,6 @@ impl MetricsContext {
9091
&["task_id", "task_name", "pool_id"],
9192
)
9293
.unwrap();
93-
9494
let task_state = GaugeVec::new(
9595
Opts::new(
9696
"orchestrator_task_state",
@@ -100,6 +100,19 @@ impl MetricsContext {
100100
)
101101
.unwrap();
102102

103+
let status_update_execution_time = HistogramVec::new(
104+
HistogramOpts::new(
105+
"orchestrator_status_update_execution_time_seconds",
106+
"Duration of status update execution",
107+
)
108+
.buckets(vec![
109+
0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 15.0, 30.0, 45.0, 60.0, 90.0,
110+
120.0,
111+
]),
112+
&["node_address", "pool_id"],
113+
)
114+
.unwrap();
115+
103116
let registry = Registry::new();
104117
let _ = registry.register(Box::new(compute_task_gauges.clone()));
105118
let _ = registry.register(Box::new(task_info.clone()));
@@ -110,6 +123,7 @@ impl MetricsContext {
110123
let _ = registry.register(Box::new(heartbeat_requests_total.clone()));
111124
let _ = registry.register(Box::new(nodes_per_task.clone()));
112125
let _ = registry.register(Box::new(task_state.clone()));
126+
let _ = registry.register(Box::new(status_update_execution_time.clone()));
113127

114128
Self {
115129
compute_task_gauges,
@@ -123,6 +137,7 @@ impl MetricsContext {
123137
heartbeat_requests_total,
124138
nodes_per_task,
125139
task_state,
140+
status_update_execution_time,
126141
}
127142
}
128143

@@ -218,6 +233,12 @@ impl MetricsContext {
218233
self.compute_task_gauges.reset();
219234
}
220235

236+
pub fn record_status_update_execution_time(&self, node_address: &str, duration: f64) {
237+
self.status_update_execution_time
238+
.with_label_values(&[node_address, &self.pool_id])
239+
.observe(duration);
240+
}
241+
221242
/// Clear all orchestrator statistics metrics
222243
pub fn clear_orchestrator_statistics(&self) {
223244
self.nodes_total.reset();

crates/orchestrator/src/status_update/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::metrics::MetricsContext;
12
use crate::models::node::{NodeStatus, OrchestratorNode};
23
use crate::plugins::StatusUpdatePlugin;
34
use crate::store::core::StoreContext;
@@ -7,7 +8,7 @@ use shared::web3::contracts::core::builder::Contracts;
78
use shared::web3::wallet::WalletProvider;
89
use std::result::Result;
910
use std::sync::Arc;
10-
use std::time::Duration;
11+
use std::time::{Duration, Instant};
1112
use tokio::time::interval;
1213

1314
pub struct NodeStatusUpdater {
@@ -19,6 +20,7 @@ pub struct NodeStatusUpdater {
1920
disable_ejection: bool,
2021
heartbeats: Arc<LoopHeartbeats>,
2122
plugins: Vec<Box<dyn StatusUpdatePlugin>>,
23+
metrics: Arc<MetricsContext>,
2224
}
2325

2426
impl NodeStatusUpdater {
@@ -32,6 +34,7 @@ impl NodeStatusUpdater {
3234
disable_ejection: bool,
3335
heartbeats: Arc<LoopHeartbeats>,
3436
plugins: Vec<Box<dyn StatusUpdatePlugin>>,
37+
metrics: Arc<MetricsContext>,
3538
) -> Self {
3639
Self {
3740
store_context,
@@ -42,6 +45,7 @@ impl NodeStatusUpdater {
4245
disable_ejection,
4346
heartbeats,
4447
plugins,
48+
metrics,
4549
}
4650
}
4751

@@ -133,6 +137,7 @@ impl NodeStatusUpdater {
133137
pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {
134138
let nodes = self.store_context.node_store.get_nodes().await?;
135139
for node in nodes {
140+
let start_time = Instant::now();
136141
let node = node.clone();
137142
let old_status = node.status.clone();
138143
let heartbeat = self
@@ -313,6 +318,12 @@ impl NodeStatusUpdater {
313318
}
314319
}
315320
}
321+
// Record status update execution time
322+
let duration = start_time.elapsed();
323+
self.metrics.record_status_update_execution_time(
324+
&node.address.to_string(),
325+
duration.as_secs_f64(),
326+
);
316327
}
317328
Ok(())
318329
}
@@ -346,6 +357,7 @@ mod tests {
346357
false,
347358
Arc::new(LoopHeartbeats::new(&mode)),
348359
vec![],
360+
app_state.metrics.clone(),
349361
);
350362
let node = OrchestratorNode {
351363
address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
@@ -450,6 +462,7 @@ mod tests {
450462
false,
451463
Arc::new(LoopHeartbeats::new(&mode)),
452464
vec![],
465+
app_state.metrics.clone(),
453466
);
454467
tokio::spawn(async move {
455468
updater
@@ -502,6 +515,7 @@ mod tests {
502515
false,
503516
Arc::new(LoopHeartbeats::new(&mode)),
504517
vec![],
518+
app_state.metrics.clone(),
505519
);
506520
tokio::spawn(async move {
507521
updater
@@ -570,6 +584,7 @@ mod tests {
570584
false,
571585
Arc::new(LoopHeartbeats::new(&mode)),
572586
vec![],
587+
app_state.metrics.clone(),
573588
);
574589
tokio::spawn(async move {
575590
updater
@@ -648,6 +663,7 @@ mod tests {
648663
false,
649664
Arc::new(LoopHeartbeats::new(&mode)),
650665
vec![],
666+
app_state.metrics.clone(),
651667
);
652668
tokio::spawn(async move {
653669
updater
@@ -734,6 +750,7 @@ mod tests {
734750
false,
735751
Arc::new(LoopHeartbeats::new(&mode)),
736752
vec![],
753+
app_state.metrics.clone(),
737754
);
738755
tokio::spawn(async move {
739756
updater
@@ -817,6 +834,7 @@ mod tests {
817834
false,
818835
Arc::new(LoopHeartbeats::new(&mode)),
819836
vec![],
837+
app_state.metrics.clone(),
820838
);
821839
tokio::spawn(async move {
822840
updater
@@ -894,6 +912,7 @@ mod tests {
894912
false,
895913
Arc::new(LoopHeartbeats::new(&mode)),
896914
vec![],
915+
app_state.metrics.clone(),
897916
);
898917
tokio::spawn(async move {
899918
updater
@@ -962,6 +981,7 @@ mod tests {
962981
false,
963982
Arc::new(LoopHeartbeats::new(&mode)),
964983
vec![],
984+
app_state.metrics.clone(),
965985
);
966986
tokio::spawn(async move {
967987
updater

0 commit comments

Comments
 (0)