From d349b792a3690cfea0e31522f22bcdcbbbe2783e Mon Sep 17 00:00:00 2001 From: Yuru Shao Date: Tue, 17 Feb 2026 07:40:54 +0000 Subject: [PATCH] feat: implement consensus-based health check for lagging validators - Add max_slot_lag config (default 50) - Parse JSON-RPC response in health checks to extract slot number - Calculate max slot across all backends - Mark backends unhealthy if they lag behind max slot by > threshold --- src/config.rs | 2 + src/health.rs | 113 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 93 insertions(+), 22 deletions(-) diff --git a/src/config.rs b/src/config.rs index 050ae70..c1ab7f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -36,6 +36,7 @@ pub struct HealthCheckConfig { pub method: String, pub consecutive_failures_threshold: u32, pub consecutive_successes_threshold: u32, + pub max_slot_lag: u64, } impl Default for HealthCheckConfig { @@ -46,6 +47,7 @@ impl Default for HealthCheckConfig { method: "getSlot".to_string(), consecutive_failures_threshold: 3, consecutive_successes_threshold: 2, + max_slot_lag: 50, } } } diff --git a/src/health.rs b/src/health.rs index 14f5cff..6bb5010 100644 --- a/src/health.rs +++ b/src/health.rs @@ -80,11 +80,14 @@ impl HealthState { } } +/// Performs a health check against a backend. +/// Returns `Ok(Some(slot))` if the method is `getSlot` or `getBlockHeight` and the response +/// contains a numeric result. Returns `Ok(None)` for other methods. Returns `Err` on failure. async fn perform_health_check( client: &Client, Body>, backend: &Backend, health_config: &HealthCheckConfig, -) -> Result<(), String> { +) -> Result, String> { // Build health check request let health_request = serde_json::json!({ "jsonrpc": "2.0", @@ -112,13 +115,34 @@ async fn perform_health_check( match result { Ok(Ok(response)) => { - if response.status().is_success() { - Ok(()) - } else { - Err(format!( + if !response.status().is_success() { + return Err(format!( "Health check returned status: {}", response.status() - )) + )); + } + + // Parse the response body to extract slot/block height + let method = health_config.method.as_str(); + if method == "getSlot" || method == "getBlockHeight" { + let body_bytes = http_body_util::BodyExt::collect(response.into_body()) + .await + .map_err(|e| format!("Failed to read response body: {}", e))? + .to_bytes(); + + let json: serde_json::Value = serde_json::from_slice(&body_bytes) + .map_err(|e| format!("Failed to parse response JSON: {}", e))?; + + if let Some(slot) = json.get("result").and_then(|v| v.as_u64()) { + Ok(Some(slot)) + } else { + Err(format!( + "Health check response missing numeric 'result' field for method {}", + method + )) + } + } else { + Ok(None) } } Ok(Err(e)) => Err(format!("Health check request failed: {}", e)), @@ -158,6 +182,15 @@ pub async fn health_check_loop( let results = future::join_all(check_futures).await; + // Collect slot numbers from successful checks to determine the max (consensus tip) + let max_slot: Option = results + .iter() + .filter_map(|(_, result)| match result { + Ok(Some(slot)) => Some(*slot), + _ => None, + }) + .max(); + for (i, (label, check_result)) in results.into_iter().enumerate() { let backend = ¤t_state.backends[i]; @@ -169,23 +202,59 @@ pub async fn health_check_loop( let previous_healthy = current_status.healthy; match check_result { - Ok(_) => { - current_status.consecutive_successes += 1; - current_status.consecutive_failures = 0; - current_status.last_error = None; - - // Mark healthy if threshold reached - if current_status.consecutive_successes - >= health_config.consecutive_successes_threshold - { - current_status.healthy = true; + Ok(slot_opt) => { + // Check for slot lag against consensus + let lagging = match (slot_opt, max_slot) { + (Some(slot), Some(max)) if max > slot && (max - slot) > health_config.max_slot_lag => { + true + } + _ => false, + }; + + if lagging { + let slot = slot_opt.unwrap(); + let max = max_slot.unwrap(); + current_status.consecutive_failures += 1; + current_status.consecutive_successes = 0; + current_status.last_error = Some(format!( + "Backend lagging: slot {} is {} behind max {}", + slot, + max - slot, + max + )); + + if current_status.consecutive_failures + >= health_config.consecutive_failures_threshold + { + current_status.healthy = false; + } + + tracing::warn!( + "Backend {} is lagging: slot {} is {} behind consensus max {} (threshold: {})", + label, + slot, + max - slot, + max, + health_config.max_slot_lag + ); + } else { + current_status.consecutive_successes += 1; + current_status.consecutive_failures = 0; + current_status.last_error = None; + + // Mark healthy if threshold reached + if current_status.consecutive_successes + >= health_config.consecutive_successes_threshold + { + current_status.healthy = true; + } + + tracing::debug!( + "Health check succeeded for backend {} (consecutive successes: {})", + label, + current_status.consecutive_successes + ); } - - tracing::debug!( - "Health check succeeded for backend {} (consecutive successes: {})", - label, - current_status.consecutive_successes - ); } Err(error) => { current_status.consecutive_failures += 1;