Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct HealthCheckConfig {
pub method: String,
pub consecutive_failures_threshold: u32,
pub consecutive_successes_threshold: u32,
pub max_slot_lag: u64,
}

impl Default for HealthCheckConfig {
Expand All @@ -46,6 +47,7 @@ impl Default for HealthCheckConfig {
method: "getSlot".to_string(),
consecutive_failures_threshold: 3,
consecutive_successes_threshold: 2,
max_slot_lag: 50,
}
}
}
Expand Down
113 changes: 91 additions & 22 deletions src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ impl HealthState {
}
}

/// Performs a health check against a backend.
/// Returns `Ok(Some(slot))` if the method is `getSlot` or `getBlockHeight` and the response
/// contains a numeric result. Returns `Ok(None)` for other methods. Returns `Err` on failure.
async fn perform_health_check(
client: &Client<HttpsConnector<HttpConnector>, Body>,
backend: &Backend,
health_config: &HealthCheckConfig,
) -> Result<(), String> {
) -> Result<Option<u64>, String> {
// Build health check request
let health_request = serde_json::json!({
"jsonrpc": "2.0",
Expand Down Expand Up @@ -112,13 +115,34 @@ async fn perform_health_check(

match result {
Ok(Ok(response)) => {
if response.status().is_success() {
Ok(())
} else {
Err(format!(
if !response.status().is_success() {
return Err(format!(
"Health check returned status: {}",
response.status()
))
));
}

// Parse the response body to extract slot/block height
let method = health_config.method.as_str();
if method == "getSlot" || method == "getBlockHeight" {
let body_bytes = http_body_util::BodyExt::collect(response.into_body())
.await
.map_err(|e| format!("Failed to read response body: {}", e))?
.to_bytes();

let json: serde_json::Value = serde_json::from_slice(&body_bytes)
.map_err(|e| format!("Failed to parse response JSON: {}", e))?;

if let Some(slot) = json.get("result").and_then(|v| v.as_u64()) {
Ok(Some(slot))
} else {
Err(format!(
"Health check response missing numeric 'result' field for method {}",
method
))
}
} else {
Ok(None)
}
}
Ok(Err(e)) => Err(format!("Health check request failed: {}", e)),
Expand Down Expand Up @@ -158,6 +182,15 @@ pub async fn health_check_loop(

let results = future::join_all(check_futures).await;

// Collect slot numbers from successful checks to determine the max (consensus tip)
let max_slot: Option<u64> = results
.iter()
.filter_map(|(_, result)| match result {
Ok(Some(slot)) => Some(*slot),
_ => None,
})
.max();

for (i, (label, check_result)) in results.into_iter().enumerate() {
let backend = &current_state.backends[i];

Expand All @@ -169,23 +202,59 @@ pub async fn health_check_loop(
let previous_healthy = current_status.healthy;

match check_result {
Ok(_) => {
current_status.consecutive_successes += 1;
current_status.consecutive_failures = 0;
current_status.last_error = None;

// Mark healthy if threshold reached
if current_status.consecutive_successes
>= health_config.consecutive_successes_threshold
{
current_status.healthy = true;
Ok(slot_opt) => {
// Check for slot lag against consensus
let lagging = match (slot_opt, max_slot) {
(Some(slot), Some(max)) if max > slot && (max - slot) > health_config.max_slot_lag => {
true
}
_ => false,
};

if lagging {
let slot = slot_opt.unwrap();
let max = max_slot.unwrap();
current_status.consecutive_failures += 1;
current_status.consecutive_successes = 0;
current_status.last_error = Some(format!(
"Backend lagging: slot {} is {} behind max {}",
slot,
max - slot,
max
));

if current_status.consecutive_failures
>= health_config.consecutive_failures_threshold
{
current_status.healthy = false;
}

tracing::warn!(
"Backend {} is lagging: slot {} is {} behind consensus max {} (threshold: {})",
label,
slot,
max - slot,
max,
health_config.max_slot_lag
);
} else {
current_status.consecutive_successes += 1;
current_status.consecutive_failures = 0;
current_status.last_error = None;

// Mark healthy if threshold reached
if current_status.consecutive_successes
>= health_config.consecutive_successes_threshold
{
current_status.healthy = true;
}

tracing::debug!(
"Health check succeeded for backend {} (consecutive successes: {})",
label,
current_status.consecutive_successes
);
}

tracing::debug!(
"Health check succeeded for backend {} (consecutive successes: {})",
label,
current_status.consecutive_successes
);
}
Err(error) => {
current_status.consecutive_failures += 1;
Expand Down