Skip to content

Commit b836e63

Browse files
authored
Merge pull request #15 from roborun-xyz/feat/consensus-health-check
feat: implement consensus-based health check for lagging validators
2 parents 176aa5b + d349b79 commit b836e63

File tree

2 files changed

+93
-22
lines changed

2 files changed

+93
-22
lines changed

src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct HealthCheckConfig {
3636
pub method: String,
3737
pub consecutive_failures_threshold: u32,
3838
pub consecutive_successes_threshold: u32,
39+
pub max_slot_lag: u64,
3940
}
4041

4142
impl Default for HealthCheckConfig {
@@ -46,6 +47,7 @@ impl Default for HealthCheckConfig {
4647
method: "getSlot".to_string(),
4748
consecutive_failures_threshold: 3,
4849
consecutive_successes_threshold: 2,
50+
max_slot_lag: 50,
4951
}
5052
}
5153
}

src/health.rs

Lines changed: 91 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,14 @@ impl HealthState {
8080
}
8181
}
8282

83+
/// Performs a health check against a backend.
84+
/// Returns `Ok(Some(slot))` if the method is `getSlot` or `getBlockHeight` and the response
85+
/// contains a numeric result. Returns `Ok(None)` for other methods. Returns `Err` on failure.
8386
async fn perform_health_check(
8487
client: &Client<HttpsConnector<HttpConnector>, Body>,
8588
backend: &Backend,
8689
health_config: &HealthCheckConfig,
87-
) -> Result<(), String> {
90+
) -> Result<Option<u64>, String> {
8891
// Build health check request
8992
let health_request = serde_json::json!({
9093
"jsonrpc": "2.0",
@@ -112,13 +115,34 @@ async fn perform_health_check(
112115

113116
match result {
114117
Ok(Ok(response)) => {
115-
if response.status().is_success() {
116-
Ok(())
117-
} else {
118-
Err(format!(
118+
if !response.status().is_success() {
119+
return Err(format!(
119120
"Health check returned status: {}",
120121
response.status()
121-
))
122+
));
123+
}
124+
125+
// Parse the response body to extract slot/block height
126+
let method = health_config.method.as_str();
127+
if method == "getSlot" || method == "getBlockHeight" {
128+
let body_bytes = http_body_util::BodyExt::collect(response.into_body())
129+
.await
130+
.map_err(|e| format!("Failed to read response body: {}", e))?
131+
.to_bytes();
132+
133+
let json: serde_json::Value = serde_json::from_slice(&body_bytes)
134+
.map_err(|e| format!("Failed to parse response JSON: {}", e))?;
135+
136+
if let Some(slot) = json.get("result").and_then(|v| v.as_u64()) {
137+
Ok(Some(slot))
138+
} else {
139+
Err(format!(
140+
"Health check response missing numeric 'result' field for method {}",
141+
method
142+
))
143+
}
144+
} else {
145+
Ok(None)
122146
}
123147
}
124148
Ok(Err(e)) => Err(format!("Health check request failed: {}", e)),
@@ -158,6 +182,15 @@ pub async fn health_check_loop(
158182

159183
let results = future::join_all(check_futures).await;
160184

185+
// Collect slot numbers from successful checks to determine the max (consensus tip)
186+
let max_slot: Option<u64> = results
187+
.iter()
188+
.filter_map(|(_, result)| match result {
189+
Ok(Some(slot)) => Some(*slot),
190+
_ => None,
191+
})
192+
.max();
193+
161194
for (i, (label, check_result)) in results.into_iter().enumerate() {
162195
let backend = &current_state.backends[i];
163196

@@ -169,23 +202,59 @@ pub async fn health_check_loop(
169202
let previous_healthy = current_status.healthy;
170203

171204
match check_result {
172-
Ok(_) => {
173-
current_status.consecutive_successes += 1;
174-
current_status.consecutive_failures = 0;
175-
current_status.last_error = None;
176-
177-
// Mark healthy if threshold reached
178-
if current_status.consecutive_successes
179-
>= health_config.consecutive_successes_threshold
180-
{
181-
current_status.healthy = true;
205+
Ok(slot_opt) => {
206+
// Check for slot lag against consensus
207+
let lagging = match (slot_opt, max_slot) {
208+
(Some(slot), Some(max)) if max > slot && (max - slot) > health_config.max_slot_lag => {
209+
true
210+
}
211+
_ => false,
212+
};
213+
214+
if lagging {
215+
let slot = slot_opt.unwrap();
216+
let max = max_slot.unwrap();
217+
current_status.consecutive_failures += 1;
218+
current_status.consecutive_successes = 0;
219+
current_status.last_error = Some(format!(
220+
"Backend lagging: slot {} is {} behind max {}",
221+
slot,
222+
max - slot,
223+
max
224+
));
225+
226+
if current_status.consecutive_failures
227+
>= health_config.consecutive_failures_threshold
228+
{
229+
current_status.healthy = false;
230+
}
231+
232+
tracing::warn!(
233+
"Backend {} is lagging: slot {} is {} behind consensus max {} (threshold: {})",
234+
label,
235+
slot,
236+
max - slot,
237+
max,
238+
health_config.max_slot_lag
239+
);
240+
} else {
241+
current_status.consecutive_successes += 1;
242+
current_status.consecutive_failures = 0;
243+
current_status.last_error = None;
244+
245+
// Mark healthy if threshold reached
246+
if current_status.consecutive_successes
247+
>= health_config.consecutive_successes_threshold
248+
{
249+
current_status.healthy = true;
250+
}
251+
252+
tracing::debug!(
253+
"Health check succeeded for backend {} (consecutive successes: {})",
254+
label,
255+
current_status.consecutive_successes
256+
);
182257
}
183-
184-
tracing::debug!(
185-
"Health check succeeded for backend {} (consecutive successes: {})",
186-
label,
187-
current_status.consecutive_successes
188-
);
189258
}
190259
Err(error) => {
191260
current_status.consecutive_failures += 1;

0 commit comments

Comments
 (0)