Skip to content

Commit 5307cf0

Browse files
committed
feat(heartbeats): Add some extra validation to synced heartbeats
1 parent f4144ed commit 5307cf0

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

tools/heartbeats-processor/schema.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ ON heartbeat_presence(public_key_id);
8484
CREATE INDEX IF NOT EXISTS idx_heartbeat_presence_global_slot
8585
ON heartbeat_presence(best_tip_global_slot);
8686

87+
-- Index for presence queries by best tip height
88+
CREATE INDEX IF NOT EXISTS idx_heartbeat_presence_height
89+
ON heartbeat_presence(best_tip_height);
90+
91+
-- Combined index for height queries with disabled flag
92+
CREATE INDEX IF NOT EXISTS idx_heartbeat_presence_window_disabled_height
93+
ON heartbeat_presence(window_id, disabled, best_tip_height);
94+
95+
-- Combined index for disabled flag, window and global slot lookups
96+
CREATE INDEX IF NOT EXISTS idx_heartbeat_presence_window_disabled_global_slot
97+
ON heartbeat_presence(window_id, disabled, best_tip_global_slot);
98+
8799
-- Index for heartbeat time queries
88100
CREATE INDEX IF NOT EXISTS idx_heartbeat_presence_time
89101
ON heartbeat_presence(heartbeat_time);

tools/heartbeats-processor/src/local_db.rs

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,62 @@ async fn batch_insert_produced_blocks(pool: &SqlitePool, blocks: &[ProducedBlock
242242
Ok(())
243243
}
244244

245+
/// Marks heartbeat presence entries as outdated (disabled) based on global slot comparisons.
246+
///
247+
/// This function performs the following steps:
248+
/// 1. Finds the maximum global slot for each window (considering only non-disabled entries).
249+
/// 2. Identifies the previous window's maximum global slot for each window.
250+
/// 3. Marks a presence entry as disabled if its global slot is less than:
251+
/// - The maximum global slot of the previous window (if it exists).
252+
///
253+
/// This approach allows for a full window of tolerance in synchronization:
254+
/// - Entries matching or exceeding the previous window's max slot are considered up-to-date.
255+
/// - This allows for slight delays in propagation between windows.
256+
///
257+
/// Note: The first window in the sequence will not have any entries marked as disabled,
258+
/// as there is no previous window to compare against.
259+
///
260+
/// Returns the number of presence entries marked as disabled.
261+
async fn mark_outdated_presence(pool: &SqlitePool) -> Result<usize> {
262+
let affected = sqlx::query!(
263+
r#"
264+
WITH MaxSlots AS (
265+
SELECT
266+
window_id,
267+
MAX(best_tip_global_slot) as max_slot
268+
FROM heartbeat_presence
269+
WHERE disabled = FALSE
270+
GROUP BY window_id
271+
),
272+
PrevMaxSlots AS (
273+
-- Get the max slot from the immediate previous window
274+
SELECT
275+
tw.id as window_id,
276+
prev.max_slot as prev_max_slot
277+
FROM time_windows tw
278+
LEFT JOIN time_windows prev_tw ON prev_tw.id = tw.id - 1
279+
LEFT JOIN MaxSlots prev ON prev.window_id = prev_tw.id
280+
)
281+
UPDATE heartbeat_presence
282+
SET disabled = TRUE
283+
WHERE (window_id, best_tip_global_slot) IN (
284+
SELECT
285+
hp.window_id,
286+
hp.best_tip_global_slot
287+
FROM heartbeat_presence hp
288+
JOIN PrevMaxSlots pms ON pms.window_id = hp.window_id
289+
WHERE hp.disabled = FALSE
290+
AND pms.prev_max_slot IS NOT NULL -- Ensure there is a previous window
291+
AND hp.best_tip_global_slot < pms.prev_max_slot -- Less than previous window max
292+
)
293+
"#
294+
)
295+
.execute(pool)
296+
.await?;
297+
298+
Ok(affected.rows_affected() as usize)
299+
}
300+
245301
pub async fn process_heartbeats(
246302
db: &FirestoreDb,
247303
pool: &SqlitePool,
@@ -449,6 +505,15 @@ pub async fn process_heartbeats(
449505

450506
if latest_time > last_processed_time {
451507
update_last_processed_time(pool, latest_time).await?;
508+
509+
// Mark outdated presence entries as disabled
510+
let disabled_count = mark_outdated_presence(pool).await?;
511+
if disabled_count > 0 {
512+
println!(
513+
"Marked {} outdated presence entries as disabled",
514+
disabled_count
515+
);
516+
}
452517
}
453518

454519
Ok(total_heartbeats)
@@ -537,16 +602,17 @@ pub async fn update_scores(pool: &SqlitePool, config: &Config) -> Result<()> {
537602
GROUP BY public_key_id
538603
),
539604
HeartbeatCounts AS (
540-
-- Count heartbeats only within valid windows
605+
-- Count heartbeats only within valid windows and not disabled
541606
SELECT
542607
hp.public_key_id,
543608
COUNT(DISTINCT hp.window_id) as heartbeats
544609
FROM heartbeat_presence hp
545610
JOIN ValidWindows vw ON vw.id = hp.window_id
611+
WHERE hp.disabled = FALSE
546612
GROUP BY hp.public_key_id
547613
),
548614
LastHeartbeats AS (
549-
-- Get last heartbeat time across all windows
615+
-- Get last heartbeat time across all windows, including disabled entries
550616
SELECT
551617
public_key_id,
552618
MAX(heartbeat_time) as last_heartbeat

0 commit comments

Comments
 (0)