Skip to content

Commit 34beae7

Browse files
authored
Merge pull request #1111 from openmina/feat/heartbeat-tweaks
feat(heartbeats): Higher delay tolerance and other tweaks
2 parents db62cf0 + 70e0a42 commit 34beae7

File tree

4 files changed

+91
-50
lines changed

4 files changed

+91
-50
lines changed

tools/heartbeats-processor/.sqlx/query-489059c80fd17d3c1687d5e301fc85a4691c66707c673886ae08077a9e80c0e1.json

Lines changed: 0 additions & 12 deletions
This file was deleted.

tools/heartbeats-processor/.sqlx/query-5776d825f55385c0b83c30d311e5b68047cc9ce146b3eaba368a69810afd0203.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tools/heartbeats-processor/src/local_db.rs

Lines changed: 59 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -242,55 +242,58 @@ async fn batch_insert_produced_blocks(pool: &SqlitePool, blocks: &[ProducedBlock
242242
Ok(())
243243
}
244244

245-
/// Marks heartbeat presence entries as outdated (disabled) based on global slot comparisons.
245+
/// Marks heartbeat presence entries as outdated (disabled) based on block height comparisons.
246246
///
247247
/// This function performs the following steps:
248-
/// 1. Finds the maximum global slot for each window (considering only non-disabled entries).
249-
/// 2. Identifies the previous window's maximum global slot for each window.
250-
/// 3. Marks a presence entry as disabled if its global slot is less than:
251-
/// - The maximum global slot of the previous window (if it exists).
248+
/// 1. Finds the maximum block height for each window (considering only non-disabled entries).
249+
/// 2. Identifies the previous window's maximum block height for each window.
250+
/// 3. Marks a presence entry as disabled if its block height is less than:
251+
/// - The maximum block height of the previous window minus a tolerance of $HEIGHT_TOLERANCE blocks (if it exists).
252252
///
253-
/// This approach allows for a full window of tolerance in synchronization:
254-
/// - Entries matching or exceeding the previous window's max slot are considered up-to-date.
255-
/// - This allows for slight delays in propagation between windows.
253+
/// This approach allows for a reasonable tolerance in synchronization:
254+
/// - Entries matching or exceeding the previous window's max height - $HEIGHT_TOLERANCE are considered up-to-date.
255+
/// - This allows for slight delays in block propagation between windows.
256256
///
257257
/// Note: The first window in the sequence will not have any entries marked as disabled,
258258
/// as there is no previous window to compare against.
259259
///
260260
/// Returns the number of presence entries marked as disabled.
261261
async fn mark_outdated_presence(pool: &SqlitePool) -> Result<usize> {
262+
const HEIGHT_TOLERANCE: i64 = 5;
263+
262264
let affected = sqlx::query!(
263265
r#"
264-
WITH MaxSlots AS (
266+
WITH MaxHeights AS (
265267
SELECT
266268
window_id,
267-
MAX(best_tip_global_slot) as max_slot
269+
MAX(best_tip_height) as max_height
268270
FROM heartbeat_presence
269271
WHERE disabled = FALSE
270272
GROUP BY window_id
271273
),
272-
PrevMaxSlots AS (
273-
-- Get the max slot from the immediate previous window
274+
PrevMaxHeights AS (
275+
-- Get the max height from the immediate previous window
274276
SELECT
275277
tw.id as window_id,
276-
prev.max_slot as prev_max_slot
278+
prev.max_height as prev_max_height
277279
FROM time_windows tw
278280
LEFT JOIN time_windows prev_tw ON prev_tw.id = tw.id - 1
279-
LEFT JOIN MaxSlots prev ON prev.window_id = prev_tw.id
281+
LEFT JOIN MaxHeights prev ON prev.window_id = prev_tw.id
280282
)
281283
UPDATE heartbeat_presence
282284
SET disabled = TRUE
283-
WHERE (window_id, best_tip_global_slot) IN (
285+
WHERE (window_id, best_tip_height) IN (
284286
SELECT
285287
hp.window_id,
286-
hp.best_tip_global_slot
288+
hp.best_tip_height
287289
FROM heartbeat_presence hp
288-
JOIN PrevMaxSlots pms ON pms.window_id = hp.window_id
290+
JOIN PrevMaxHeights pmh ON pmh.window_id = hp.window_id
289291
WHERE hp.disabled = FALSE
290-
AND pms.prev_max_slot IS NOT NULL -- Ensure there is a previous window
291-
AND hp.best_tip_global_slot < pms.prev_max_slot -- Less than previous window max
292+
AND pmh.prev_max_height IS NOT NULL -- Ensure there is a previous window
293+
AND hp.best_tip_height < (pmh.prev_max_height - ?)
292294
)
293-
"#
295+
"#,
296+
HEIGHT_TOLERANCE
294297
)
295298
.execute(pool)
296299
.await?;
@@ -388,9 +391,11 @@ pub async fn process_heartbeats(
388391

389392
let best_tip = entry.best_tip_block();
390393
let public_key_id = *public_key_map.get(&entry.submitter).unwrap();
394+
let has_presence =
395+
(entry.is_synced() || entry.is_catchup()) && best_tip.is_some();
391396

392397
// Record presence only if node is synced and has a best tip
393-
if entry.is_synced() && best_tip.is_some() {
398+
if has_presence {
394399
presence_batch.push(HeartbeatPresence {
395400
window_id: window.id.unwrap(),
396401
public_key_id,
@@ -426,9 +431,9 @@ pub async fn process_heartbeats(
426431
}
427432

428433
// Verify that the block slot matches the expected one for the current time
429-
// TODO: maybe we can be a bit more lenient here?
434+
// Allow a difference of 1 in either direction
430435
let expected_slot = global_slot_at_time(entry.create_time);
431-
if block_info.global_slot != expected_slot {
436+
if (block_info.global_slot as i64 - expected_slot as i64).abs() > 1 {
432437
println!(
433438
"WARNING: Invalid block slot: {} (height: {}, producer: {}, expected slot: {}, actual slot: {})",
434439
block_info.hash, block_info.height, entry.submitter, expected_slot, block_info.global_slot
@@ -445,15 +450,25 @@ pub async fn process_heartbeats(
445450
continue;
446451
}
447452

448-
seen_blocks.insert(key.clone(), entry.create_time);
449-
produced_blocks_batch.push(ProducedBlock {
450-
window_id: window.id.unwrap(),
451-
public_key_id,
452-
block_hash: block_info.hash,
453-
block_height: block_info.height,
454-
block_global_slot: block_info.global_slot,
455-
block_data: block_info.base64_encoded_header,
456-
});
453+
if has_presence {
454+
seen_blocks.insert(key.clone(), entry.create_time);
455+
produced_blocks_batch.push(ProducedBlock {
456+
window_id: window.id.unwrap(),
457+
public_key_id,
458+
block_hash: block_info.hash,
459+
block_height: block_info.height,
460+
block_global_slot: block_info.global_slot,
461+
block_data: block_info.base64_encoded_header,
462+
});
463+
} else {
464+
println!(
465+
"WARNING: Block produced by unsynced node: {} (height: {}, producer: {})",
466+
block_info.hash, block_info.height, entry.submitter
467+
);
468+
println!("Submitter: {:?}", entry.submitter);
469+
println!("Sync status: {}", entry.sync_phase().unwrap_or_default());
470+
println!("Best tip: {:?}", entry.best_tip_block().map(|b| b.hash));
471+
}
457472
}
458473
Some((_block_info, Err(e))) => {
459474
println!(
@@ -698,21 +713,29 @@ pub async fn view_scores(pool: &SqlitePool, config: &Config) -> Result<()> {
698713

699714
let max_scores = get_max_scores(pool).await?;
700715

716+
println!("\nSubmitter Scores Summary:");
717+
println!("Current maximum score possible: {}", max_scores.current);
718+
println!("Total maximum score possible: {}", max_scores.total);
701719
println!("\nSubmitter Scores:");
702720
println!("--------------------------------------------------------");
703721
println!(
704-
"Public Key | Score | Blocks | Current Max | Total Max | Last Updated | Last Heartbeat"
722+
"Public Key | Score | Score % | Blocks | Last Updated | Last Heartbeat"
705723
);
706724
println!("--------------------------------------------------------");
707725

708726
for row in scores {
727+
let percentage = if max_scores.current > 0 {
728+
(row.score as f64 / max_scores.current as f64) * 100.0
729+
} else {
730+
0.0
731+
};
732+
709733
println!(
710-
"{:<40} | {:>5} | {:>6} | {:>11} | {:>9} | {} | {}",
734+
"{:<40} | {:>5} | {:>6.2}% | {:>6} | {} | {}",
711735
row.public_key,
712736
row.score,
737+
percentage,
713738
row.blocks_produced,
714-
max_scores.current,
715-
max_scores.total,
716739
row.last_updated.unwrap_or_default(),
717740
row.last_heartbeat.unwrap_or_default()
718741
);

tools/heartbeats-processor/src/remote_db.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,34 @@ impl HeartbeatEntry {
113113
})
114114
}
115115

116+
#[allow(dead_code)]
116117
pub fn sync_status(&self) -> Option<String> {
117118
self.transition_frontier()
118119
.and_then(|tf| tf.get("sync"))
119120
.and_then(|sync| sync.get("status"))
120121
.map(|status| status.as_str().unwrap().to_string())
121122
}
122123

124+
pub fn sync_phase(&self) -> Option<String> {
125+
self.transition_frontier()
126+
.and_then(|tf| tf.get("sync"))
127+
.and_then(|sync| sync.get("phase"))
128+
.map(|phase| phase.as_str().unwrap().to_string())
129+
}
130+
123131
pub fn is_synced(&self) -> bool {
124-
self.sync_status()
132+
self.sync_phase()
125133
.as_ref()
126134
.map(|status| status == "Synced")
127135
.unwrap_or(false)
128136
}
137+
138+
pub fn is_catchup(&self) -> bool {
139+
self.sync_phase()
140+
.as_ref()
141+
.map(|status| status == "Catchup")
142+
.unwrap_or(false)
143+
}
129144
}
130145

131146
#[derive(Debug, Serialize, Deserialize)]
@@ -203,7 +218,10 @@ pub async fn fetch_heartbeat_chunk(
203218

204219
q.for_all(conditions)
205220
})
206-
.order_by([("createTime", FirestoreQueryDirection::Ascending)])
221+
.order_by([
222+
("createTime", FirestoreQueryDirection::Ascending),
223+
("__name__", FirestoreQueryDirection::Ascending),
224+
])
207225
.limit(FIRESTORE_BATCH_SIZE);
208226

209227
let mut batch: Vec<HeartbeatEntry> = query.obj().query().await?;

0 commit comments

Comments
 (0)