Skip to content

Commit cdcfef1

Browse files
authored
improve stale metrics handling (#393)
* improve stale metrics handling * add metrics testing for stale metrics
1 parent f45a07c commit cdcfef1

File tree

7 files changed

+231
-14
lines changed

7 files changed

+231
-14
lines changed

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use shared::models::{
99
api::ApiResponse,
1010
heartbeat::{HeartbeatRequest, HeartbeatResponse},
1111
};
12+
use std::collections::HashSet;
1213
use std::str::FromStr;
1314

1415
async fn heartbeat(
@@ -42,6 +43,40 @@ async fn heartbeat(
4243

4344
app_state.store_context.heartbeat_store.beat(&heartbeat);
4445
if let Some(metrics) = heartbeat.metrics.clone() {
46+
// Get all previously reported metrics for this node
47+
let previous_metrics = app_state
48+
.store_context
49+
.metrics_store
50+
.get_metrics_for_node(node_address);
51+
52+
// Create a HashSet of new metrics for efficient lookup
53+
let new_metrics_set: HashSet<_> = metrics
54+
.iter()
55+
.map(|metric| (&metric.key.task_id, &metric.key.label))
56+
.collect();
57+
58+
// Clean up stale metrics
59+
for (task_id, task_metrics) in previous_metrics {
60+
for (label, _value) in task_metrics {
61+
let prev_key = (&task_id, &label);
62+
if !new_metrics_set.contains(&prev_key) {
63+
// Remove from Prometheus metrics
64+
app_state.metrics.remove_compute_task_gauge(
65+
&node_address.to_string(),
66+
&task_id,
67+
&label,
68+
);
69+
// Remove from Redis metrics store
70+
app_state.store_context.metrics_store.delete_metric(
71+
&task_id,
72+
&label,
73+
&node_address.to_string(),
74+
);
75+
}
76+
}
77+
}
78+
79+
// Store new metrics and update Prometheus
4580
app_state
4681
.store_context
4782
.metrics_store
@@ -82,6 +117,8 @@ pub fn heartbeat_routes() -> Scope {
82117

83118
#[cfg(test)]
84119
mod tests {
120+
use std::collections::HashMap;
121+
85122
use super::*;
86123
use crate::api::tests::helper::create_test_app_state;
87124

@@ -158,6 +195,59 @@ mod tests {
158195
let metrics = app_state.metrics.export_metrics().unwrap();
159196
assert!(metrics.contains("performance/batch_avg_seq_length"));
160197
assert!(metrics.contains("performance/batch_min_seq_length"));
198+
assert!(metrics.contains("long-task-1234"));
199+
200+
let heartbeat_two = json!({"address": address, "metrics": [
201+
{"key": {"task_id": "long-task-1235", "label": "performance/batch_len"}, "value": 10.0},
202+
{"key": {"task_id": "long-task-1235", "label": "performance/batch_min_len"}, "value": 50.0}
203+
]});
204+
205+
let req = test::TestRequest::post()
206+
.uri("/heartbeat")
207+
.set_json(&heartbeat_two)
208+
.to_request();
209+
210+
let resp = test::call_service(&app, req).await;
211+
assert_eq!(resp.status(), StatusCode::OK);
212+
213+
let metrics = app_state.metrics.export_metrics().unwrap();
214+
assert!(metrics.contains("long-task-1235"));
215+
assert!(metrics.contains("performance/batch_len"));
216+
assert!(metrics.contains("performance/batch_min_len"));
217+
assert!(!metrics.contains("long-task-1234"));
218+
let aggregated_metrics = app_state
219+
.store_context
220+
.metrics_store
221+
.get_aggregate_metrics_for_all_tasks();
222+
assert_eq!(aggregated_metrics.len(), 2);
223+
assert_eq!(aggregated_metrics.get("performance/batch_len"), Some(&10.0));
224+
assert_eq!(
225+
aggregated_metrics.get("performance/batch_min_len"),
226+
Some(&50.0)
227+
);
228+
assert_eq!(
229+
aggregated_metrics.get("performance/batch_avg_seq_length"),
230+
None
231+
);
232+
233+
let heartbeat_three = json!({"address": address, "metrics": [
234+
]});
235+
236+
let req = test::TestRequest::post()
237+
.uri("/heartbeat")
238+
.set_json(&heartbeat_three)
239+
.to_request();
240+
241+
let resp = test::call_service(&app, req).await;
242+
assert_eq!(resp.status(), StatusCode::OK);
243+
244+
let metrics = app_state.metrics.export_metrics().unwrap();
245+
let aggregated_metrics = app_state
246+
.store_context
247+
.metrics_store
248+
.get_aggregate_metrics_for_all_tasks();
249+
assert_eq!(aggregated_metrics, HashMap::new());
250+
assert_eq!(metrics, "");
161251
}
162252

163253
#[actix_web::test]

crates/orchestrator/src/api/tests/helper.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,54 @@ pub fn setup_contract() -> Contracts {
150150
.build()
151151
.unwrap()
152152
}
153+
154+
#[cfg(test)]
155+
pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
156+
use shared::utils::MockStorageProvider;
157+
158+
use crate::{
159+
metrics::MetricsContext, scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats,
160+
ServerMode,
161+
};
162+
163+
let store = Arc::new(RedisStore::new_test());
164+
let mut con = store
165+
.client
166+
.get_connection()
167+
.expect("Should connect to test Redis instance");
168+
169+
redis::cmd("PING")
170+
.query::<String>(&mut con)
171+
.expect("Redis should be responsive");
172+
redis::cmd("FLUSHALL")
173+
.query::<String>(&mut con)
174+
.expect("Redis should be flushed");
175+
176+
let store_context = Arc::new(StoreContext::new(store.clone()));
177+
let mode = ServerMode::Full;
178+
let scheduler = Scheduler::new(store_context.clone(), vec![]);
179+
180+
let mock_storage = MockStorageProvider::new();
181+
let storage_provider = Arc::new(mock_storage);
182+
let metrics = Arc::new(MetricsContext::new("0".to_string()));
183+
184+
Data::new(AppState {
185+
store_context: store_context.clone(),
186+
contracts: None,
187+
pool_id: 1,
188+
wallet: Arc::new(
189+
Wallet::new(
190+
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
191+
Url::parse("http://localhost:8545").unwrap(),
192+
)
193+
.unwrap(),
194+
),
195+
storage_provider,
196+
heartbeats: Arc::new(LoopHeartbeats::new(&mode)),
197+
hourly_upload_limit: 12,
198+
redis_store: store.clone(),
199+
scheduler,
200+
node_groups_plugin: None,
201+
metrics,
202+
})
203+
}

crates/orchestrator/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ async fn main() -> Result<()> {
315315
let status_update_store_context = store_context.clone();
316316
let status_update_heartbeats = heartbeats.clone();
317317
let status_update_contracts = contracts.clone();
318+
let status_update_metrics = metrics_context.clone();
318319

319320
tasks.spawn(async move {
320321
let status_updater = NodeStatusUpdater::new(
@@ -326,6 +327,7 @@ async fn main() -> Result<()> {
326327
args.disable_ejection,
327328
status_update_heartbeats.clone(),
328329
status_update_plugins,
330+
status_update_metrics,
329331
);
330332
status_updater.run().await
331333
});

crates/orchestrator/src/metrics/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,17 @@ impl MetricsContext {
3737
.set(value);
3838
}
3939

40+
pub fn remove_compute_task_gauge(&self, node_address: &str, task_id: &str, label: &str) {
41+
if let Err(e) = self.compute_task_gauges.remove_label_values(&[
42+
node_address,
43+
task_id,
44+
label,
45+
&self.pool_id,
46+
]) {
47+
println!("Error removing compute task gauge: {}", e);
48+
}
49+
}
50+
4051
pub fn export_metrics(&self) -> Result<String, prometheus::Error> {
4152
let encoder = TextEncoder::new();
4253
let metric_families = self.registry.gather();

0 commit comments

Comments
 (0)