Skip to content

Commit 6b68b3c

Browse files
committed
fix(cardano-chain-follower): thread stat calling
Signed-off-by: bkioshn <[email protected]>
1 parent 7a06a97 commit 6b68b3c

File tree

6 files changed

+32
-80
lines changed

6 files changed

+32
-80
lines changed

rust/cardano-chain-follower/src/chain_sync.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -411,31 +411,14 @@ async fn live_sync_backfill(
411411
Ok(())
412412
}
413413

414-
/// Call the live sync backfill.
415-
/// This is a helper function to pause and resume the stats thread.
416-
async fn call_live_sync_backfill(
417-
cfg: &ChainSyncConfig, update: &MithrilUpdateMessage,
418-
) -> anyhow::Result<()> {
419-
stats::pause_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
420-
let result = live_sync_backfill(cfg, update).await;
421-
stats::resume_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
422-
result
423-
}
424-
425414
/// Backfill and Purge the live chain, based on the Mithril Sync updates.
426415
async fn live_sync_backfill_and_purge(
427416
cfg: ChainSyncConfig, mut rx: mpsc::Receiver<MithrilUpdateMessage>,
428417
mut sync_ready: SyncReadyWaiter,
429418
) {
430-
stats::start_thread(
431-
cfg.chain,
432-
stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE,
433-
true,
434-
);
435419
// Wait for first Mithril Update advice, which triggers a BACKFILL of the Live Data.
436420
let Some(update) = rx.recv().await else {
437421
error!("Mithril Sync Failed, can not continue chain sync either.");
438-
stats::stop_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
439422
return;
440423
};
441424

@@ -450,7 +433,7 @@ async fn live_sync_backfill_and_purge(
450433
// We will re-attempt backfill, until its successful.
451434
// Backfill is atomic, it either fully works, or none of the live-chain is changed.
452435
debug!("Mithril Tip has advanced to: {update:?} : BACKFILL");
453-
while let Err(error) = call_live_sync_backfill(&cfg, &update).await {
436+
while let Err(error) = live_sync_backfill(&cfg, &update).await {
454437
error!("Mithril Backfill Sync Failed: {}", error);
455438
sleep(Duration::from_secs(10)).await;
456439
}
@@ -481,7 +464,6 @@ async fn live_sync_backfill_and_purge(
481464
loop {
482465
let Some(update) = rx.recv().await else {
483466
error!("Mithril Sync Failed, can not continue chain sync either.");
484-
stats::stop_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
485467
return;
486468
};
487469

@@ -546,7 +528,13 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
546528

547529
// Start the Live chain backfill task.
548530
let _backfill_join_handle = spawn(async move {
531+
stats::start_thread(
532+
cfg.chain,
533+
stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE,
534+
true,
535+
);
549536
live_sync_backfill_and_purge(backfill_cfg.clone(), rx, sync_waiter).await;
537+
stats::stop_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
550538
});
551539

552540
// Live Fill data starts at fork 1.

rust/cardano-chain-follower/src/chain_sync_config.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! each network. Chain Followers use the data supplied by the Chain-Sync.
55
//! This module configures the chain sync processes.
66
7-
use std::{panic, sync::LazyLock};
7+
use std::sync::LazyLock;
88

99
use cardano_blockchain_types::Network;
1010
use dashmap::DashMap;
@@ -150,23 +150,13 @@ impl ChainSyncConfig {
150150
// Start the Mithril Snapshot Follower
151151
let rx = self.mithril_cfg.run().await?;
152152

153-
// Wrap inside a panic catcher to detect if the task panics.
154-
let result = panic::catch_unwind(|| {
155-
stats::start_thread(self.chain, stats::thread::name::CHAIN_SYNC, true);
156-
// Start Chain Sync
157-
tokio::spawn(chain_sync(self.clone(), rx))
158-
});
159-
160-
if let Ok(handle) = result {
161-
*locked_handle = Some(handle);
162-
} else {
163-
// Chain sync panic, stop the thread and log.
164-
error!(
165-
chain = self.chain.to_string(),
166-
"Chain Sync for {} : PANICKED", self.chain
167-
);
168-
stats::stop_thread(self.chain, stats::thread::name::CHAIN_SYNC);
169-
}
153+
let config = self.clone();
154+
// Start Chain Sync
155+
*locked_handle = Some(tokio::spawn(async move {
156+
stats::start_thread(config.chain, stats::thread::name::CHAIN_SYNC, true);
157+
chain_sync(config.clone(), rx).await;
158+
stats::stop_thread(config.chain, stats::thread::name::CHAIN_SYNC);
159+
}));
170160

171161
// sync_map.insert(chain, handle);
172162
debug!("Chain Sync for {} : Started", self.chain);

rust/cardano-chain-follower/src/mithril_snapshot_config.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Configuration for the Mithril Snapshot used by the follower.
22
33
use std::{
4-
panic,
54
path::{Path, PathBuf},
65
str::FromStr,
76
sync::LazyLock,
@@ -415,27 +414,16 @@ impl MithrilSnapshotConfig {
415414
let (tx, rx) = mpsc::channel::<MithrilUpdateMessage>(2);
416415

417416
// let handle = tokio::spawn(background_mithril_update(chain, self.clone(), tx));
418-
419-
// Wrap inside a panic catcher to detect if the task panics.
420-
let result = panic::catch_unwind(|| {
417+
let config = self.clone();
418+
*locked_handle = Some(tokio::spawn(async move {
421419
stats::start_thread(
422-
self.chain,
420+
config.chain,
423421
stats::thread::name::MITHRIL_SNAPSHOT_UPDATER,
424422
true,
425423
);
426-
tokio::spawn(background_mithril_update(self.clone(), tx))
427-
});
428-
429-
if let Ok(handle) = result {
430-
*locked_handle = Some(handle);
431-
} else {
432-
// Mithril update panic, stop the thread and log.
433-
error!(
434-
chain = self.chain.to_string(),
435-
"Background Mithril Update for {} : PANICKED", self.chain
436-
);
437-
stats::stop_thread(self.chain, stats::thread::name::MITHRIL_SNAPSHOT_UPDATER);
438-
}
424+
background_mithril_update(config.clone(), tx).await;
425+
stats::stop_thread(config.chain, stats::thread::name::MITHRIL_SNAPSHOT_UPDATER);
426+
}));
439427

440428
// sync_map.insert(chain, handle);
441429
debug!(

rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use tracing_log::log;
1515
use crate::{
1616
error::{Error, Result},
1717
mithril_query::{make_mithril_iterator, ImmutableBlockIterator},
18-
stats,
1918
};
2019

2120
/// Search backwards by 60 slots (seconds) looking for a previous block.
@@ -197,17 +196,7 @@ impl MithrilSnapshotIterator {
197196
let res = task::spawn_blocking(move || {
198197
#[allow(clippy::unwrap_used)] // Unwrap is safe here because the lock can't be poisoned.
199198
let mut inner_iterator = inner.lock().unwrap();
200-
stats::start_thread(
201-
inner_iterator.chain,
202-
stats::thread::name::MITHRIL_ITERATOR_NEXT,
203-
false,
204-
);
205-
let next = inner_iterator.next();
206-
stats::stop_thread(
207-
inner_iterator.chain,
208-
stats::thread::name::MITHRIL_ITERATOR_NEXT,
209-
);
210-
next
199+
inner_iterator.next()
211200
})
212201
.await;
213202

rust/cardano-chain-follower/src/stats/thread/name.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ pub(crate) const LIVE_SYNC_BACKFILL_AND_PURGE: &str = "LiveSyncBackfillAndPurge"
1010
pub(crate) const MITHRIL_ITERATOR: &str = "MithrilIterator";
1111
/// Background Mithril Snapshot Updater.
1212
pub(crate) const MITHRIL_SNAPSHOT_UPDATER: &str = "MithrilSnapshotUpdater";
13-
/// Mithril Snapshot Iterator Next.
14-
pub(crate) const MITHRIL_ITERATOR_NEXT: &str = "MithrilIteratorNext";
1513
/// Mithril compute snapshot.
1614
pub(crate) const COMPUTE_SNAPSHOT_MSG: &str = "ComputeSnapshotMsg";
1715
/// Background Mithril Snapshot Validator.

rust/cardano-chain-follower/src/turbo_downloader/mod.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,11 @@ impl ParallelDownloadProcessor {
421421
let (work_queue_tx, work_queue_rx) = crossbeam_channel::unbounded::<DlWorkOrder>();
422422
let params = self.0.clone();
423423
thread::spawn(move || {
424+
let thread_name = format!("{}::{worker}", stats::thread::name::PARALLEL_DL_WORKER);
425+
426+
stats::start_thread(chain, thread_name.as_str(), false);
424427
Self::worker(&params, worker, &work_queue_rx, chain);
428+
stats::stop_thread(chain, thread_name.as_str());
425429
});
426430

427431
let _unused = self.0.work_queue.insert(worker, work_queue_tx);
@@ -433,11 +437,13 @@ impl ParallelDownloadProcessor {
433437
/// Call the work queue receiver.
434438
/// This is a helper function to pause and resume the stats thread.
435439
fn call_work_queue_receiver(
436-
chain: Network, name: &str, work_queue: &Receiver<usize>,
440+
chain: Network, worker_id: usize, work_queue: &Receiver<usize>,
437441
) -> Result<usize, RecvError> {
438-
stats::pause_thread(chain, name);
442+
let thread_name = format!("{}::{worker_id}", stats::thread::name::PARALLEL_DL_WORKER);
443+
444+
stats::pause_thread(chain, &thread_name);
439445
let recv = work_queue.recv();
440-
stats::resume_thread(chain, name);
446+
stats::resume_thread(chain, &thread_name);
441447
recv
442448
}
443449

@@ -447,10 +453,6 @@ impl ParallelDownloadProcessor {
447453
params: &Arc<ParallelDownloadProcessorInner>, worker_id: usize,
448454
work_queue: &crossbeam_channel::Receiver<DlWorkOrder>, chain: Network,
449455
) {
450-
let thread_name = format!("{}::{worker_id}", stats::thread::name::PARALLEL_DL_WORKER);
451-
452-
stats::start_thread(chain, thread_name.as_str(), false);
453-
454456
debug!("Worker {worker_id} started");
455457

456458
// Each worker has its own http_client, so there is no cross worker pathology
@@ -463,9 +465,7 @@ impl ParallelDownloadProcessor {
463465
}
464466
let http_agent = params.cfg.make_http_agent(worker_id);
465467

466-
while let Ok(next_chunk) =
467-
Self::call_work_queue_receiver(chain, thread_name.as_str(), work_queue)
468-
{
468+
while let Ok(next_chunk) = Self::call_work_queue_receiver(chain, worker_id, work_queue) {
469469
// Add a small delay to the first chunks for each worker.
470470
// So that the leading chunks are more likely to finish downloading first.
471471
if next_chunk > 0 && next_chunk < params.cfg.workers {
@@ -516,7 +516,6 @@ impl ParallelDownloadProcessor {
516516
// debug!("Worker {worker_id} DL chunk queued {next_chunk}");
517517
}
518518
debug!("Worker {worker_id} ended");
519-
stats::stop_thread(chain, thread_name.as_str());
520519
}
521520

522521
/// Send a work order to a worker.

0 commit comments

Comments
 (0)