@@ -9,6 +9,7 @@ use std::fs;
99
1010use crate :: config:: Config ;
1111use crate :: remote_db:: BlockInfo ;
12+ use crate :: remote_db:: HeartbeatChunkState ;
1213use crate :: time:: * ;
1314
1415#[ derive( Debug ) ]
@@ -29,16 +30,24 @@ pub struct ProducedBlock {
2930 pub block_data : String ,
3031}
3132
32- pub async fn get_last_processed_time ( pool : & SqlitePool ) -> Result < DateTime < Utc > > {
33+ pub async fn get_last_processed_time (
34+ pool : & SqlitePool ,
35+ config : Option < & Config > ,
36+ ) -> Result < DateTime < Utc > > {
3337 let record = sqlx:: query!( "SELECT last_processed_time FROM processing_state WHERE id = 1" )
3438 . fetch_one ( pool)
3539 . await ?;
3640
37- Ok ( from_unix_timestamp ( record. last_processed_time ) )
41+ let db_time = from_unix_timestamp ( record. last_processed_time ) ;
42+
43+ Ok ( match config {
44+ Some ( cfg) => db_time. max ( cfg. window_range_start ) ,
45+ None => db_time,
46+ } )
3847}
3948
4049pub async fn update_last_processed_time ( pool : & SqlitePool , time : DateTime < Utc > ) -> Result < ( ) > {
41- let current = get_last_processed_time ( pool) . await ?;
50+ let current = get_last_processed_time ( pool, None ) . await ?;
4251 let ts = to_unix_timestamp ( time) ;
4352
4453 println ! ( "Updating last processed time: {} -> {}" , current, time) ;
@@ -232,132 +241,191 @@ async fn batch_insert_produced_blocks(pool: &SqlitePool, blocks: &[ProducedBlock
232241 Ok ( ( ) )
233242}
234243
235- pub async fn process_heartbeats ( db : & FirestoreDb , pool : & SqlitePool ) -> Result < ( ) > {
236- let last_processed_time = get_last_processed_time ( pool) . await ?;
244+ pub async fn process_heartbeats (
245+ db : & FirestoreDb ,
246+ pool : & SqlitePool ,
247+ config : & Config ,
248+ ) -> Result < ( ) > {
249+ let last_processed_time = get_last_processed_time ( pool, Some ( config) ) . await ?;
237250 let now = Utc :: now ( ) ;
238251
239- let heartbeats =
240- crate :: remote_db:: fetch_heartbeats_in_chunks ( db, last_processed_time, now) . await ?;
241- println ! ( "Fetched {} heartbeats" , heartbeats. len( ) ) ;
242- println ! ( "heartbeat {:?}" , heartbeats. first( ) . map( |x| x. create_time) ) ;
252+ let mut total_heartbeats = 0 ;
253+ let mut latest_time = last_processed_time;
254+ let mut seen_blocks: HashMap < ( i64 , String ) , DateTime < Utc > > = HashMap :: new ( ) ;
255+
256+ // Statistics
257+ let mut total_presence_count = 0 ;
258+ let mut total_skipped_count = 0 ;
259+ let mut total_blocks_recorded = 0 ;
260+ let mut total_blocks_duplicate = 0 ;
261+ let mut total_outside_windows = 0 ;
262+
263+ let mut chunk_state = HeartbeatChunkState {
264+ chunk_start : last_processed_time,
265+ last_timestamp : None ,
266+ } ;
243267
244- if heartbeats. is_empty ( ) {
245- return Ok ( ( ) ) ;
246- }
268+ loop {
269+ let heartbeats = crate :: remote_db:: fetch_heartbeat_chunk ( db, & mut chunk_state, now) . await ?;
270+ if heartbeats. is_empty ( ) {
271+ break ;
272+ }
247273
248- let mut latest_time = last_processed_time;
249- latest_time = heartbeats
250- . iter ( )
251- . map ( |h| h. create_time )
252- . max ( )
253- . unwrap_or ( latest_time) ;
274+ total_heartbeats += heartbeats. len ( ) ;
275+ println ! ( "Processing batch of {} heartbeats..." , heartbeats. len( ) ) ;
254276
255- let start_ts = to_unix_timestamp ( last_processed_time) ;
256- let end_ts = to_unix_timestamp ( latest_time) ;
277+ latest_time = latest_time. max (
278+ heartbeats
279+ . iter ( )
280+ . map ( |h| h. create_time )
281+ . max ( )
282+ . unwrap_or ( latest_time) ,
283+ ) ;
257284
258- let existing_windows = sqlx:: query!(
259- r#"
260- SELECT id, start_time, end_time
261- FROM time_windows
262- WHERE start_time <= ?2 AND end_time >= ?1 AND disabled = FALSE
263- ORDER BY start_time ASC
264- "# ,
265- start_ts,
266- end_ts
267- )
268- . fetch_all ( pool)
269- . await ?;
285+ let start_ts = to_unix_timestamp ( last_processed_time) ;
286+ let end_ts = to_unix_timestamp ( latest_time) ;
270287
271- let unique_submitters: HashSet < & str > = heartbeats
272- . iter ( )
273- . map ( |entry| entry. submitter . as_str ( ) )
274- . collect ( ) ;
275-
276- let public_key_map =
277- ensure_public_keys ( pool, & unique_submitters. into_iter ( ) . collect :: < Vec < _ > > ( ) ) . await ?;
278-
279- let mut presence_count = 0 ;
280- let mut skipped_count = 0 ;
281- let mut blocks_recorded = 0 ;
282- let mut blocks_duplicate = 0 ;
283- let mut processed_heartbeats = HashSet :: new ( ) ;
284- let mut produced_blocks_batch = Vec :: new ( ) ;
285- let mut seen_blocks = HashSet :: new ( ) ;
286-
287- for window in existing_windows {
288- let window_start = from_unix_timestamp ( window. start_time ) ;
289- let window_end = from_unix_timestamp ( window. end_time ) ;
290- let mut presence_batch = Vec :: new ( ) ;
291-
292- for ( idx, entry) in heartbeats. iter ( ) . enumerate ( ) {
293- if entry. create_time >= window_start && entry. create_time < window_end {
294- processed_heartbeats. insert ( idx) ;
295-
296- let best_tip = entry. best_tip_block ( ) ;
297-
298- if entry. is_synced ( ) && best_tip. is_some ( ) {
299- if let Some ( & public_key_id) = public_key_map. get ( & entry. submitter ) {
300- presence_batch. push ( HeartbeatPresence {
301- window_id : window. id . unwrap ( ) ,
302- public_key_id,
303- best_tip : best_tip. unwrap ( ) , // Cannot fail due to the above check
304- heartbeat_time : to_unix_timestamp ( entry. create_time ) ,
305- } ) ;
306- presence_count += 1 ;
307-
308- // Add produced block if it exists
309- if let Some ( block) = entry. last_produced_block_decoded ( ) {
310- let block_data = entry. last_produced_block_raw ( ) . unwrap ( ) ; // Cannot fail, we have the block
311- let key = ( public_key_id, block. hash ( ) . to_string ( ) ) ;
312-
313- if !seen_blocks. insert ( key. clone ( ) ) {
314- blocks_duplicate += 1 ;
315- println ! (
316- "Duplicate block detected: {} (producer: {})" ,
317- key. 1 , entry. submitter
318- ) ;
319- continue ;
320- }
288+ let existing_windows = sqlx:: query!(
289+ r#"
290+ SELECT id, start_time, end_time
291+ FROM time_windows
292+ WHERE start_time <= ?2 AND end_time >= ?1 AND disabled = FALSE
293+ ORDER BY start_time ASC
294+ "# ,
295+ start_ts,
296+ end_ts
297+ )
298+ . fetch_all ( pool)
299+ . await ?;
300+
301+ let unique_submitters: HashSet < & str > = heartbeats
302+ . iter ( )
303+ . map ( |entry| entry. submitter . as_str ( ) )
304+ . collect ( ) ;
305+
306+ let public_key_map =
307+ ensure_public_keys ( pool, & unique_submitters. into_iter ( ) . collect :: < Vec < _ > > ( ) ) . await ?;
308+
309+ let mut presence_count = 0 ;
310+ let mut skipped_count = 0 ;
311+ let mut blocks_recorded = 0 ;
312+ let mut blocks_duplicate = 0 ;
313+ let mut processed_heartbeats = HashSet :: new ( ) ;
314+ let mut produced_blocks_batch = Vec :: new ( ) ;
315+
316+ for window in existing_windows {
317+ let window_start = from_unix_timestamp ( window. start_time ) ;
318+ let window_end = from_unix_timestamp ( window. end_time ) ;
319+ let mut presence_batch = Vec :: new ( ) ;
320+
321+ for ( idx, entry) in heartbeats. iter ( ) . enumerate ( ) {
322+ if entry. create_time >= window_start && entry. create_time < window_end {
323+ processed_heartbeats. insert ( idx) ;
324+
325+ let best_tip = entry. best_tip_block ( ) ;
321326
322- produced_blocks_batch. push ( ProducedBlock {
327+ if entry. is_synced ( ) && best_tip. is_some ( ) {
328+ if let Some ( & public_key_id) = public_key_map. get ( & entry. submitter ) {
329+ presence_batch. push ( HeartbeatPresence {
323330 window_id : window. id . unwrap ( ) ,
324331 public_key_id,
325- block_hash : block. hash ( ) . to_string ( ) ,
326- block_height : block. height ( ) ,
327- block_global_slot : block. global_slot ( ) ,
328- block_data,
332+ best_tip : best_tip. unwrap ( ) , // Cannot fail due to the above check
333+ heartbeat_time : to_unix_timestamp ( entry. create_time ) ,
329334 } ) ;
335+ presence_count += 1 ;
336+
337+ // Add produced block if it exists
338+ match entry. last_produced_block_decoded ( ) {
339+ Ok ( Some ( block) ) => {
340+ let block_data = entry. last_produced_block_raw ( ) . unwrap ( ) ; // Cannot fail, we have the block
341+ let key = ( public_key_id, block. hash ( ) . to_string ( ) ) ;
342+
343+ if let Some ( first_seen) = seen_blocks. get ( & key) {
344+ blocks_duplicate += 1 ;
345+ println ! (
346+ "Duplicate block detected: {} (height: {}, producer: {}, peer_id: {}) [first seen at {}, now at {}]" ,
347+ key. 1 ,
348+ block. height( ) ,
349+ entry. submitter,
350+ entry. peer_id( ) . unwrap_or_else( || "unknown" . to_string( ) ) ,
351+ first_seen,
352+ entry. create_time
353+ ) ;
354+ continue ;
355+ }
356+
357+ seen_blocks. insert ( key. clone ( ) , entry. create_time ) ;
358+ produced_blocks_batch. push ( ProducedBlock {
359+ window_id : window. id . unwrap ( ) ,
360+ public_key_id,
361+ block_hash : block. hash ( ) . to_string ( ) ,
362+ block_height : block. height ( ) ,
363+ block_global_slot : block. global_slot ( ) ,
364+ block_data,
365+ } ) ;
366+ }
367+ Ok ( None ) => ( ) , // No block to process
368+ Err ( e) => {
369+ println ! (
370+ "WARNING: Failed to decode block from {}: {}" ,
371+ entry. submitter, e
372+ )
373+ }
374+ }
375+ }
376+ } else {
377+ if let Ok ( Some ( block) ) = entry. last_produced_block_decoded ( ) {
378+ println ! (
379+ "Skipping unsynced block: {} (height: {}, producer: {}, peer_id: {})" ,
380+ block. hash( ) ,
381+ block. height( ) ,
382+ entry. submitter,
383+ entry. peer_id( ) . unwrap_or_else( || "unknown" . to_string( ) )
384+ ) ;
330385 }
386+ skipped_count += 1 ;
331387 }
332- } else {
333- skipped_count += 1 ;
334388 }
335389 }
390+
391+ if !presence_batch. is_empty ( ) {
392+ batch_insert_presence ( pool, & presence_batch) . await ?;
393+ }
336394 }
337395
338- if !presence_batch. is_empty ( ) {
339- batch_insert_presence ( pool, & presence_batch) . await ?;
396+ if !produced_blocks_batch. is_empty ( ) {
397+ blocks_recorded = produced_blocks_batch. len ( ) ;
398+ batch_insert_produced_blocks ( pool, & produced_blocks_batch) . await ?;
340399 }
341- }
342400
343- if !produced_blocks_batch. is_empty ( ) {
344- blocks_recorded = produced_blocks_batch. len ( ) ;
345- batch_insert_produced_blocks ( pool, & produced_blocks_batch) . await ?;
346- }
401+ let outside_windows = heartbeats. len ( ) - processed_heartbeats. len ( ) ;
347402
348- let outside_windows = heartbeats. len ( ) - processed_heartbeats. len ( ) ;
403+ println ! (
404+ "Batch complete: {} presences, {} blocks ({} duplicates), {} skipped, {} outside windows" ,
405+ presence_count,
406+ blocks_recorded,
407+ blocks_duplicate,
408+ skipped_count,
409+ outside_windows
410+ ) ;
411+
412+ total_presence_count += presence_count;
413+ total_skipped_count += skipped_count;
414+ total_blocks_recorded += blocks_recorded;
415+ total_blocks_duplicate += blocks_duplicate;
416+ total_outside_windows += outside_windows;
417+ }
349418
350419 println ! (
351- "Processed {} heartbeats ({} synced presences recorded, {} unique blocks recorded ({} duplicates skipped), {} unsynced skipped), {} outside of defined windows" ,
352- processed_heartbeats . len ( ) ,
353- presence_count ,
354- blocks_recorded ,
355- blocks_duplicate ,
356- skipped_count ,
357- outside_windows
420+ "Processed {} total heartbeats ({} synced presences recorded, {} unique blocks recorded ({} duplicates skipped), {} unsynced skipped), {} outside of defined windows" ,
421+ total_heartbeats ,
422+ total_presence_count ,
423+ total_blocks_recorded ,
424+ total_blocks_duplicate ,
425+ total_skipped_count ,
426+ total_outside_windows ,
358427 ) ;
359428
360- // Update the last processed time
361429 if latest_time > last_processed_time {
362430 update_last_processed_time ( pool, latest_time) . await ?;
363431 }
0 commit comments