Skip to content

Commit eb136c9

Browse files
authored
fix(cat-gateway): Log relevant chain indexer info and update live tip metric (#2752)
* fix(cat-gateway): log relevant chain indexer info and update live tip metric * fix(cat-gateway): fix update sync task count
1 parent 8752b5e commit eb136c9

File tree

1 file changed

+61
-35
lines changed
  • catalyst-gateway/bin/src/cardano

1 file changed

+61
-35
lines changed

catalyst-gateway/bin/src/cardano/mod.rs

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
3939

4040
let mut cfg = ChainSyncConfig::default_for(chain);
4141
cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
42-
info!(chain = %chain, "Starting Blockchain Sync");
42+
info!(chain = %chain, "Starting Chain Sync Task");
4343

4444
if let Err(error) = cfg.run().await {
45-
error!(chain=%chain, error=%error, "Failed to start chain sync task");
45+
error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
4646
Err(error)?;
4747
}
4848

@@ -237,14 +237,12 @@ fn sync_subchain(
237237
params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
238238
) -> tokio::task::JoinHandle<SyncParams> {
239239
tokio::spawn(async move {
240-
info!(chain = %params.chain, params=%params, "Indexing Blockchain");
241-
242240
// Backoff hitting the database if we need to.
243241
params.backoff().await;
244242

245243
// Wait for indexing DB to be ready before continuing.
246244
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
247-
info!(chain=%params.chain, params=%params,"Indexing DB is ready");
245+
info!(chain=%params.chain, params=%params,"Starting Chain Indexing");
248246

249247
let mut first_indexed_block = params.first_indexed_block.clone();
250248
let mut first_immutable = params.first_is_immutable;
@@ -255,6 +253,18 @@ fn sync_subchain(
255253
let mut follower =
256254
ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
257255
while let Some(chain_update) = follower.next().await {
256+
let tips = ChainFollower::get_tips(params.chain).await;
257+
let immutable_slot = tips.0.slot_or_default();
258+
let live_slot = tips.1.slot_or_default();
259+
let event = event::ChainIndexerEvent::LiveTipSlotChanged {
260+
immutable_slot,
261+
live_slot,
262+
};
263+
if let Err(err) = event_sender.send(event) {
264+
error!(error=%err, "Unable to send event.");
265+
} else {
266+
debug!(live_tip_slot=?live_slot, "Chain Indexer update");
267+
}
258268
match chain_update.kind {
259269
cardano_chain_follower::Kind::ImmutableBlockRollForward => {
260270
// We only process these on the follower tracking the TIP.
@@ -437,6 +447,30 @@ impl SyncTask {
437447
}
438448
}
439449

450+
/// Add a new `SyncTask` to the queue.
451+
fn add_sync_task(
452+
&mut self, params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
453+
) {
454+
self.sync_tasks.push(sync_subchain(params, event_sender));
455+
self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
456+
debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
457+
self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
458+
current_sync_tasks: self.current_sync_tasks,
459+
});
460+
}
461+
462+
/// Update `SyncTask` count.
463+
fn sync_task_finished(&mut self) {
464+
self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
465+
error!("current_sync_tasks -= 1 overflow");
466+
0
467+
});
468+
debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
469+
self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
470+
current_sync_tasks: self.current_sync_tasks,
471+
});
472+
}
473+
440474
/// Primary Chain Follower task.
441475
///
442476
/// This continuously runs in the background, and never terminates.
@@ -452,7 +486,7 @@ impl SyncTask {
452486
let tips = ChainFollower::get_tips(self.cfg.chain).await;
453487
self.immutable_tip_slot = tips.0.slot_or_default();
454488
self.live_tip_slot = tips.1.slot_or_default();
455-
info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Blockchain ready to sync from.");
489+
info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
456490

457491
self.dispatch_event(event::ChainIndexerEvent::ImmutableTipSlotChanged {
458492
immutable_slot: self.immutable_tip_slot,
@@ -470,20 +504,20 @@ impl SyncTask {
470504
// After waiting, we set the liveness flag to true if it is not already set.
471505
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
472506

473-
info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
507+
info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
474508
self.sync_status = get_sync_status().await;
475509
debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
476510

477511
// Start the Live Chain sync task - This can never end because it is syncing to TIP.
478512
// So, if it fails, it will automatically be restarted.
479-
self.sync_tasks.push(sync_subchain(
513+
self.add_sync_task(
480514
SyncParams::new(
481515
self.cfg.chain,
482516
Point::fuzzy(self.immutable_tip_slot),
483517
Point::TIP,
484518
),
485519
self.event_channel.0.clone(),
486-
));
520+
);
487521

488522
self.start_immutable_followers();
489523

@@ -495,8 +529,15 @@ impl SyncTask {
495529
// They will return from this iterator in the order they complete.
496530
// This iterator actually never ends, because the live sync task is always restarted.
497531
while let Some(completed) = self.sync_tasks.next().await {
532+
// update sync task count
533+
self.sync_task_finished();
534+
498535
match completed {
499536
Ok(finished) => {
537+
let tips = ChainFollower::get_tips(self.cfg.chain).await;
538+
let immutable_tip_slot = tips.0.slot_or_default();
539+
let live_tip_slot = tips.1.slot_or_default();
540+
info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
500541
// Sync task finished. Check if it completed OK or had an error.
501542
// If it failed, we need to reschedule it.
502543

@@ -516,26 +557,20 @@ impl SyncTask {
516557
live_slot: self.live_tip_slot,
517558
},
518559
);
560+
info!(chain=%self.cfg.chain, report=%finished,
561+
"Chain Indexer finished reaching TIP.");
519562

520563
self.start_immutable_followers();
521564
} else {
522565
error!(chain=%self.cfg.chain, report=%finished,
523-
"The TIP follower failed, restarting it.");
566+
"Chain Indexer finished without to reach TIP.");
524567
}
525568

526569
// Start the Live Chain sync task again from where it left off.
527-
self.sync_tasks.push(sync_subchain(
528-
finished.retry(),
529-
self.event_channel.0.clone(),
530-
));
570+
self.add_sync_task(finished.retry(), self.event_channel.0.clone());
531571
} else if let Some(result) = finished.result.as_ref() {
532572
match result {
533573
Ok(()) => {
534-
self.current_sync_tasks =
535-
self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
536-
error!("current_sync_tasks -= 1 overflow");
537-
0
538-
});
539574
info!(chain=%self.cfg.chain, report=%finished,
540575
"The Immutable follower completed successfully.");
541576

@@ -546,9 +581,6 @@ impl SyncTask {
546581
},
547582
);
548583
});
549-
self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
550-
current_sync_tasks: self.current_sync_tasks,
551-
});
552584

553585
// If we need more immutable chain followers to sync the block
554586
// chain, we can now start them.
@@ -559,10 +591,7 @@ impl SyncTask {
559591
"An Immutable follower failed, restarting it.");
560592
// Restart the Immutable Chain sync task again from where it left
561593
// off.
562-
self.sync_tasks.push(sync_subchain(
563-
finished.retry(),
564-
self.event_channel.0.clone(),
565-
));
594+
self.add_sync_task(finished.retry(), self.event_channel.0.clone());
566595
},
567596
}
568597
} else {
@@ -575,6 +604,8 @@ impl SyncTask {
575604
},
576605
}
577606

607+
let sync_task_count = self.sync_tasks.len();
608+
578609
// IF there is only 1 chain follower left in sync_tasks, then all
579610
// immutable followers have finished.
580611
// When this happens we need to purge the live index of any records that exist
@@ -583,7 +614,7 @@ impl SyncTask {
583614
// want to put a gap in this, so that there are X slots of overlap
584615
// between the live chain and immutable chain. This gap should be
585616
// a parameter.
586-
if self.sync_tasks.len() == 1 {
617+
if sync_task_count == 1 {
587618
self.dispatch_event(event::ChainIndexerEvent::SyncCompleted);
588619

589620
// Purge data up to this slot
@@ -622,15 +653,10 @@ impl SyncTask {
622653
if let Some((first_point, last_point)) =
623654
self.get_syncable_range(self.start_slot, end_slot)
624655
{
625-
self.sync_tasks.push(sync_subchain(
656+
self.add_sync_task(
626657
SyncParams::new(self.cfg.chain, first_point, last_point.clone()),
627658
self.event_channel.0.clone(),
628-
));
629-
self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
630-
631-
self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
632-
current_sync_tasks: self.current_sync_tasks,
633-
});
659+
);
634660
}
635661

636662
// The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
@@ -691,6 +717,7 @@ impl event::EventTarget<event::ChainIndexerEvent> for SyncTask {
691717
}
692718

693719
fn dispatch_event(&self, message: event::ChainIndexerEvent) {
720+
debug!(event = ?message, "Chain Indexer Event");
694721
let _ = self.event_channel.0.send(message);
695722
}
696723
}
@@ -704,7 +731,6 @@ pub(crate) async fn start_followers() -> anyhow::Result<()> {
704731

705732
// Start Syncing the blockchain, so we can consume its data as required.
706733
start_sync_for(&cfg).await?;
707-
info!(chain=%cfg.chain,"Chain Sync is started.");
708734

709735
tokio::spawn(async move {
710736
use self::event::ChainIndexerEvent as Event;

0 commit comments

Comments
 (0)