Skip to content

Commit 7e58931

Browse files
authored
Merge pull request #1075 from openmina/feat/heartbeats-last-hb2
feat(heartbeats): Tweaks
2 parents eddafcd + 438dfac commit 7e58931

File tree

5 files changed

+40
-22
lines changed

5 files changed

+40
-22
lines changed

tools/heartbeats-processor/.sqlx/query-94669ea962673f9d266e0cbeeaf854c789c92aa5ccc2673293022c7b505243e8.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tools/heartbeats-processor/.sqlx/query-c69ce193a25e2b2b5c1208df04d22d54114d61c2a2fc0f026b58183e8a8232ca.json

Lines changed: 0 additions & 12 deletions
This file was deleted.

tools/heartbeats-processor/src/local_db.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,11 @@ pub async fn process_heartbeats(
245245
db: &FirestoreDb,
246246
pool: &SqlitePool,
247247
config: &Config,
248-
) -> Result<()> {
248+
) -> Result<usize> {
249249
let last_processed_time = get_last_processed_time(pool, Some(config)).await?;
250250
let now = Utc::now();
251+
// Don't fetch heartbeats beyond window range end
252+
let end_time = config.window_range_end.min(now);
251253

252254
let mut total_heartbeats = 0;
253255
let mut latest_time = last_processed_time;
@@ -266,7 +268,8 @@ pub async fn process_heartbeats(
266268
};
267269

268270
loop {
269-
let heartbeats = crate::remote_db::fetch_heartbeat_chunk(db, &mut chunk_state, now).await?;
271+
let heartbeats =
272+
crate::remote_db::fetch_heartbeat_chunk(db, &mut chunk_state, end_time).await?;
270273
if heartbeats.is_empty() {
271274
break;
272275
}
@@ -430,7 +433,7 @@ pub async fn process_heartbeats(
430433
update_last_processed_time(pool, latest_time).await?;
431434
}
432435

433-
Ok(())
436+
Ok(total_heartbeats)
434437
}
435438

436439
pub async fn create_tables_from_file(pool: &SqlitePool) -> Result<()> {
@@ -516,13 +519,21 @@ pub async fn update_scores(pool: &SqlitePool, config: &Config) -> Result<()> {
516519
GROUP BY public_key_id
517520
),
518521
HeartbeatCounts AS (
522+
-- Count heartbeats only within valid windows
519523
SELECT
520524
hp.public_key_id,
521-
COUNT(DISTINCT hp.window_id) as heartbeats,
522-
MAX(hp.heartbeat_time) as last_heartbeat
525+
COUNT(DISTINCT hp.window_id) as heartbeats
523526
FROM heartbeat_presence hp
524527
JOIN ValidWindows vw ON vw.id = hp.window_id
525528
GROUP BY hp.public_key_id
529+
),
530+
LastHeartbeats AS (
531+
-- Get last heartbeat time across all windows
532+
SELECT
533+
public_key_id,
534+
MAX(heartbeat_time) as last_heartbeat
535+
FROM heartbeat_presence
536+
GROUP BY public_key_id
526537
)
527538
INSERT INTO submitter_scores (
528539
public_key_id,
@@ -534,10 +545,11 @@ pub async fn update_scores(pool: &SqlitePool, config: &Config) -> Result<()> {
534545
pk.id,
535546
COALESCE(hc.heartbeats, 0) as score,
536547
COALESCE(bc.blocks, 0) as blocks_produced,
537-
COALESCE(hc.last_heartbeat, 0) as last_heartbeat
548+
COALESCE(lh.last_heartbeat, 0) as last_heartbeat
538549
FROM public_keys pk
539550
LEFT JOIN HeartbeatCounts hc ON hc.public_key_id = pk.id
540551
LEFT JOIN BlockCounts bc ON bc.public_key_id = pk.id
552+
LEFT JOIN LastHeartbeats lh ON lh.public_key_id = pk.id
541553
WHERE hc.heartbeats > 0 OR bc.blocks > 0
542554
ON CONFLICT(public_key_id) DO UPDATE SET
543555
score = excluded.score,

tools/heartbeats-processor/src/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,12 @@ async fn run_process_loop(
112112

113113
loop {
114114
println!("Processing heartbeats...");
115-
local_db::process_heartbeats(db, pool, config).await?;
115+
let count = local_db::process_heartbeats(db, pool, config).await?;
116116

117-
println!("Posting scores...");
118-
post_scores_to_firestore(pool, db, config).await?;
117+
if count > 0 {
118+
println!("Posting scores...");
119+
post_scores_to_firestore(pool, db, config).await?;
120+
}
119121

120122
println!("Sleeping for {} seconds...", interval_seconds);
121123
tokio::time::sleep(interval).await;

tools/heartbeats-processor/src/remote_db.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,14 @@ pub async fn fetch_heartbeat_chunk(
171171
let chunk_end = (state.chunk_start + chunk_duration).min(end_time);
172172

173173
if state.chunk_start >= end_time {
174+
println!("Reached end of testing window: {}", end_time);
174175
return Ok(Vec::new());
175176
}
176177

177-
println!("Fetching heartbeat chunk... {}", state.chunk_start);
178+
println!(
179+
"Fetching heartbeat chunk... {} to {}",
180+
state.chunk_start, chunk_end
181+
);
178182

179183
let query = db
180184
.fluent()

0 commit comments

Comments
 (0)