Skip to content

Commit 56cdc46

Browse files
authored
Merge pull request #1062 from openmina/fix/heartbeats-scores
fix(heartbeats): Correct timestamp filtering in the computation of heartbeats scores
2 parents aab9c5d + db647cb commit 56cdc46

File tree

4 files changed

+69
-32
lines changed

4 files changed

+69
-32
lines changed

tools/heartbeats-processor/.sqlx/query-14d5d1973bb6a28e4be770e15dbb0293366513edb3945b91c2ae62ca2827ecc5.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tools/heartbeats-processor/.sqlx/query-bc586a064ad3094fe93bf09715e00c3638e403705c437816d56de4af3fcbdb17.json

Lines changed: 0 additions & 12 deletions
This file was deleted.

tools/heartbeats-processor/src/local_db.rs

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -483,27 +483,60 @@ pub async fn toggle_windows(
483483
Ok(())
484484
}
485485

486-
// TODO: multiple blocks for the same slot should be counted as one
487486
// TODO: take into account the validated flag to count blocks
488-
pub async fn update_scores(pool: &SqlitePool) -> Result<()> {
487+
pub async fn update_scores(pool: &SqlitePool, config: &Config) -> Result<()> {
488+
let window_start = to_unix_timestamp(config.window_range_start);
489+
let current_time = chrono::Utc::now().timestamp();
490+
489491
sqlx::query!(
490492
r#"
493+
WITH ValidWindows AS (
494+
SELECT id, start_time, end_time
495+
FROM time_windows
496+
WHERE disabled = FALSE
497+
AND end_time <= ?2
498+
AND start_time >= ?1
499+
),
500+
BlockCounts AS (
501+
-- Count one block per global slot per producer
502+
SELECT
503+
public_key_id,
504+
COUNT(DISTINCT block_global_slot) as blocks
505+
FROM (
506+
-- Deduplicate blocks per global slot
507+
SELECT
508+
pb.public_key_id,
509+
pb.block_global_slot
510+
FROM produced_blocks pb
511+
JOIN ValidWindows vw ON vw.id = pb.window_id
512+
-- TODO: enable once block proof validation has been implemented
513+
-- WHERE pb.validated = TRUE
514+
GROUP BY pb.public_key_id, pb.block_global_slot
515+
) unique_blocks
516+
GROUP BY public_key_id
517+
),
518+
HeartbeatCounts AS (
519+
SELECT hp.public_key_id, COUNT(DISTINCT hp.window_id) as heartbeats
520+
FROM heartbeat_presence hp
521+
JOIN ValidWindows vw ON vw.id = hp.window_id
522+
GROUP BY hp.public_key_id
523+
)
491524
INSERT INTO submitter_scores (public_key_id, score, blocks_produced)
492525
SELECT
493526
pk.id,
494-
COUNT(DISTINCT hp.window_id) as score,
495-
COUNT(DISTINCT pb.id) as blocks_produced
527+
COALESCE(hc.heartbeats, 0) as score,
528+
COALESCE(bc.blocks, 0) as blocks_produced
496529
FROM public_keys pk
497-
LEFT JOIN heartbeat_presence hp ON pk.id = hp.public_key_id
498-
LEFT JOIN time_windows tw ON hp.window_id = tw.id
499-
LEFT JOIN produced_blocks pb ON pk.id = pb.public_key_id
500-
WHERE tw.disabled = FALSE
501-
GROUP BY pk.id
530+
LEFT JOIN HeartbeatCounts hc ON hc.public_key_id = pk.id
531+
LEFT JOIN BlockCounts bc ON bc.public_key_id = pk.id
532+
WHERE hc.heartbeats > 0 OR bc.blocks > 0
502533
ON CONFLICT(public_key_id) DO UPDATE SET
503534
score = excluded.score,
504535
blocks_produced = excluded.blocks_produced,
505536
last_updated = strftime('%s', 'now')
506-
"#
537+
"#,
538+
window_start,
539+
current_time
507540
)
508541
.execute(pool)
509542
.await?;
@@ -537,9 +570,9 @@ pub async fn get_max_scores(pool: &SqlitePool) -> Result<MaxScores> {
537570
Ok(MaxScores { total, current })
538571
}
539572

540-
pub async fn view_scores(pool: &SqlitePool) -> Result<()> {
573+
pub async fn view_scores(pool: &SqlitePool, config: &Config) -> Result<()> {
541574
// Make sure scores are up to date
542-
update_scores(pool).await?;
575+
update_scores(pool, config).await?;
543576

544577
let scores = sqlx::query!(
545578
r#"
@@ -559,11 +592,11 @@ pub async fn view_scores(pool: &SqlitePool) -> Result<()> {
559592
let max_scores = get_max_scores(pool).await?;
560593

561594
println!("\nSubmitter Scores:");
562-
println!("----------------------------------------");
595+
println!("--------------------------------------------------------");
563596
println!(
564-
"Public Key | Score | Blocks | Current Max | Total Max | Last Updated"
597+
"Public Key | Score | Blocks | Current Max | Total Max | Last Updated"
565598
);
566-
println!("----------------------------------------");
599+
println!("--------------------------------------------------------");
567600

568601
for row in scores {
569602
println!(

tools/heartbeats-processor/src/main.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,13 @@ enum Commands {
6161
},
6262
}
6363

64-
async fn post_scores_to_firestore(pool: &SqlitePool, db: &FirestoreDb) -> Result<()> {
64+
async fn post_scores_to_firestore(
65+
pool: &SqlitePool,
66+
db: &FirestoreDb,
67+
config: &Config,
68+
) -> Result<()> {
6569
// Make sure scores are up to date
66-
local_db::update_scores(pool).await?;
70+
local_db::update_scores(pool, config).await?;
6771

6872
let scores = sqlx::query!(
6973
r#"
@@ -109,7 +113,7 @@ async fn run_process_loop(
109113
local_db::process_heartbeats(db, pool, config).await?;
110114

111115
println!("Posting scores...");
112-
post_scores_to_firestore(pool, db).await?;
116+
post_scores_to_firestore(pool, db, config).await?;
113117

114118
println!("Sleeping for {} seconds...", interval_seconds);
115119
tokio::time::sleep(interval).await;
@@ -149,12 +153,12 @@ async fn main() -> Result<()> {
149153
local_db::toggle_windows(&pool, start, end, disabled).await?;
150154
}
151155
Commands::ViewScores => {
152-
local_db::view_scores(&pool).await?;
156+
local_db::view_scores(&pool, &config).await?;
153157
}
154158
Commands::PostScores => {
155159
println!("Initializing firestore connection...");
156160
let db = remote_db::get_db(&config).await?;
157-
post_scores_to_firestore(&pool, &db).await?;
161+
post_scores_to_firestore(&pool, &db, &config).await?;
158162
}
159163
Commands::SetLastProcessed { time } => {
160164
local_db::set_last_processed_time(&pool, &time).await?;

0 commit comments

Comments
 (0)