Skip to content

Commit 7f2b8f0

Browse files
committed
feat(cardano-chain-follower): add thread and mmap file stats
Signed-off-by: bkioshn <[email protected]>
1 parent e182213 commit 7f2b8f0

File tree

13 files changed

+697
-61
lines changed

13 files changed

+697
-61
lines changed

rust/cardano-chain-follower/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ tokio = { version = "1.42.0", features = [
3131
] }
3232
tracing = "0.1.41"
3333
tracing-log = "0.2.0"
34-
dashmap = "6.1.0"
34+
dashmap = { version = "6.1.0", features = ["serde"] }
3535
url = "2.5.4"
3636
anyhow = "1.0.95"
3737
chrono = "0.4.39"
@@ -56,6 +56,7 @@ ureq = { version = "2.12.1", features = ["native-certs"] }
5656
http = "1.2.0"
5757
hickory-resolver = { version = "0.24.2", features = ["dns-over-rustls"] }
5858
moka = { version = "0.12.9", features = ["sync"] }
59+
cpu-time = "1.0.0"
5960

6061
[dev-dependencies]
6162
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

rust/cardano-chain-follower/src/chain_sync.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,9 +416,14 @@ async fn live_sync_backfill_and_purge(
416416
cfg: ChainSyncConfig, mut rx: mpsc::Receiver<MithrilUpdateMessage>,
417417
mut sync_ready: SyncReadyWaiter,
418418
) {
419+
/// Thread name for stats.
420+
const THREAD_NAME: &str = "LiveSyncBackfillAndPurge";
421+
422+
stats::start_thread(cfg.chain, THREAD_NAME, true);
419423
// Wait for first Mithril Update advice, which triggers a BACKFILL of the Live Data.
420424
let Some(update) = rx.recv().await else {
421425
error!("Mithril Sync Failed, can not continue chain sync either.");
426+
stats::stop_thread(cfg.chain, THREAD_NAME);
422427
return;
423428
};
424429

@@ -430,12 +435,15 @@ async fn live_sync_backfill_and_purge(
430435
let live_chain_head: Point;
431436

432437
loop {
438+
stats::resume_thread(cfg.chain, THREAD_NAME);
433439
// We will re-attempt backfill, until its successful.
434440
// Backfill is atomic, it either fully works, or none of the live-chain is changed.
435441
debug!("Mithril Tip has advanced to: {update:?} : BACKFILL");
436442
while let Err(error) = live_sync_backfill(&cfg, &update).await {
437443
error!("Mithril Backfill Sync Failed: {}", error);
444+
stats::pause_thread(cfg.chain, THREAD_NAME);
438445
sleep(Duration::from_secs(10)).await;
446+
stats::resume_thread(cfg.chain, THREAD_NAME);
439447
}
440448

441449
if let Some(head_point) = get_live_head_point(cfg.chain) {
@@ -462,8 +470,11 @@ async fn live_sync_backfill_and_purge(
462470
let mut update_sender = get_chain_update_tx_queue(cfg.chain).await;
463471

464472
loop {
473+
stats::resume_thread(cfg.chain, THREAD_NAME);
474+
465475
let Some(update) = rx.recv().await else {
466476
error!("Mithril Sync Failed, can not continue chain sync either.");
477+
stats::stop_thread(cfg.chain, THREAD_NAME);
467478
return;
468479
};
469480

@@ -516,6 +527,10 @@ async fn live_sync_backfill_and_purge(
516527
///
517528
/// This does not return, it is a background task.
518529
pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilUpdateMessage>) {
530+
/// Thread name for stats.
531+
const THREAD_NAME: &str = "ChainSync";
532+
533+
stats::start_thread(cfg.chain, THREAD_NAME, true);
519534
debug!(
520535
"Chain Sync for: {} from {} : Starting",
521536
cfg.chain, cfg.relay_address,
@@ -537,9 +552,9 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
537552
let mut fork_count: Fork = Fork::FIRST_LIVE;
538553

539554
loop {
555+
stats::resume_thread(cfg.chain, THREAD_NAME);
540556
// We never have a connection if we end up around the loop, so make a new one.
541557
let mut peer = persistent_reconnect(&cfg.relay_address, cfg.chain).await;
542-
543558
match resync_live_tip(&mut peer, cfg.chain).await {
544559
Ok(tip) => debug!("Tip Resynchronized to {tip}"),
545560
Err(error) => {

rust/cardano-chain-follower/src/chain_sync_ready.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tokio::{
1212
};
1313
use tracing::error;
1414

15-
use crate::chain_update;
15+
use crate::{chain_update, stats};
1616

1717
/// Data we hold related to sync being ready or not.
1818
struct SyncReady {
@@ -85,9 +85,13 @@ static SYNC_READY: LazyLock<DashMap<Network, RwLock<SyncReady>>> = LazyLock::new
8585
/// Write Lock the `SYNC_READY` lock for a network.
8686
/// When we are signaled to be ready, set it to true and release the lock.
8787
pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
88+
/// Thread name for stats.
89+
const THREAD_NAME: &str = "WaitForSyncReady";
90+
8891
let (tx, rx) = oneshot::channel::<()>();
8992

9093
tokio::spawn(async move {
94+
stats::start_thread(chain, THREAD_NAME, true);
9195
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
9296
// initialized. Its a Serious BUG if that not True, so panic is OK.
9397
#[allow(clippy::expect_used)]
@@ -101,7 +105,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
101105
if let Ok(()) = rx.await {
102106
status.ready = true;
103107
}
104-
108+
stats::stop_thread(chain, THREAD_NAME);
105109
// If the channel closes early, we can NEVER use the Blockchain data.
106110
});
107111

rust/cardano-chain-follower/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod mithril_snapshot_data;
1414
mod mithril_snapshot_iterator;
1515
mod mithril_snapshot_sync;
1616
mod mithril_turbo_downloader;
17+
mod mmap_file;
1718
mod snapshot_id;
1819
mod stats;
1920
pub mod turbo_downloader;

rust/cardano-chain-follower/src/mithril_query.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,35 @@
22
33
use std::path::Path;
44

5-
use cardano_blockchain_types::Point;
5+
use cardano_blockchain_types::{Network, Point};
66
use pallas_hardano::storage::immutable::FallibleBlock;
77
use tokio::task;
88

9-
use crate::error::{Error, Result};
9+
use crate::{
10+
error::{Error, Result},
11+
stats,
12+
};
1013

1114
/// Synchronous Immutable block iterator.
1215
pub(crate) type ImmutableBlockIterator = Box<dyn Iterator<Item = FallibleBlock> + Send + Sync>;
1316

1417
/// Get a mithril snapshot iterator.
1518
pub(crate) async fn make_mithril_iterator(
16-
path: &Path, start: &Point,
19+
path: &Path, start: &Point, chain: Network,
1720
) -> Result<ImmutableBlockIterator> {
21+
/// Thread name for stats.
22+
const THREAD_NAME: &str = "MithrilIterator";
23+
1824
let path = path.to_path_buf();
1925
let start = start.clone();
2026
// Initial input
2127
let res = task::spawn_blocking(move || {
22-
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
23-
.map_err(|error| Error::MithrilSnapshot(Some(error)))
28+
stats::start_thread(chain, THREAD_NAME, false);
29+
let result =
30+
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
31+
.map_err(|error| Error::MithrilSnapshot(Some(error)));
32+
stats::stop_thread(chain, THREAD_NAME);
33+
result
2434
})
2535
.await;
2636

rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tracing_log::log;
1515
use crate::{
1616
error::{Error, Result},
1717
mithril_query::{make_mithril_iterator, ImmutableBlockIterator},
18+
stats,
1819
};
1920

2021
/// Search backwards by 60 slots (seconds) looking for a previous block.
@@ -73,7 +74,7 @@ impl MithrilSnapshotIterator {
7374
chain: Network, path: &Path, from: &Point, search_interval: u64,
7475
) -> Option<MithrilSnapshotIterator> {
7576
let point = probe_point(from, search_interval);
76-
let Ok(mut iterator) = make_mithril_iterator(path, &point).await else {
77+
let Ok(mut iterator) = make_mithril_iterator(path, &point, chain).await else {
7778
return None;
7879
};
7980

@@ -116,7 +117,7 @@ impl MithrilSnapshotIterator {
116117
let this = this?;
117118

118119
// Remake the iterator, based on the new known point.
119-
let Ok(iterator) = make_mithril_iterator(path, &this).await else {
120+
let Ok(iterator) = make_mithril_iterator(path, &this, chain).await else {
120121
return None;
121122
};
122123

@@ -176,7 +177,7 @@ impl MithrilSnapshotIterator {
176177

177178
debug!("Actual Mithril Iterator Start: {}", from);
178179

179-
let iterator = make_mithril_iterator(path, from).await?;
180+
let iterator = make_mithril_iterator(path, from, chain).await?;
180181

181182
Ok(MithrilSnapshotIterator {
182183
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
@@ -191,12 +192,18 @@ impl MithrilSnapshotIterator {
191192
/// Get the next block, in a way that is Async friendly.
192193
/// Returns the next block, or None if there are no more blocks.
193194
pub(crate) async fn next(&self) -> Option<MultiEraBlock> {
195+
/// Thread name for stats.
196+
const THREAD_NAME: &str = "MithrilSnapshotIterator::Next";
197+
194198
let inner = self.inner.clone();
195199

196200
let res = task::spawn_blocking(move || {
197201
#[allow(clippy::unwrap_used)] // Unwrap is safe here because the lock can't be poisoned.
198202
let mut inner_iterator = inner.lock().unwrap();
199-
inner_iterator.next()
203+
stats::start_thread(inner_iterator.chain, THREAD_NAME, false);
204+
let next = inner_iterator.next();
205+
stats::stop_thread(inner_iterator.chain, THREAD_NAME);
206+
next
200207
})
201208
.await;
202209

rust/cardano-chain-follower/src/mithril_snapshot_sync.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,14 +295,20 @@ async fn get_mithril_snapshot_and_certificate(
295295
async fn validate_mithril_snapshot(
296296
chain: Network, certificate: &MithrilCertificate, path: &Path,
297297
) -> bool {
298+
/// Thread name for stats.
299+
const THREAD_NAME: &str = "ValidateMithrilSnapshot";
300+
298301
let cert = certificate.clone();
299302
let mithril_path = path.to_path_buf();
300303
match tokio::spawn(async move {
301304
// This can be long running and CPU Intensive.
302305
// So we spawn it off to a background task.
303-
MessageBuilder::new()
306+
stats::start_thread(chain, THREAD_NAME, true);
307+
let result = MessageBuilder::new()
304308
.compute_snapshot_message(&cert, &mithril_path)
305-
.await
309+
.await;
310+
stats::stop_thread(chain, THREAD_NAME);
311+
result
306312
})
307313
.await
308314
{
@@ -683,6 +689,10 @@ macro_rules! next_iteration {
683689
pub(crate) async fn background_mithril_update(
684690
cfg: MithrilSnapshotConfig, tx: Sender<MithrilUpdateMessage>,
685691
) {
692+
/// Thread name for stats.
693+
const THREAD_NAME: &str = "MithrilSnapshotUpdater";
694+
695+
stats::start_thread(cfg.chain, THREAD_NAME, true);
686696
debug!(
687697
"Mithril Snapshot background updater for: {} from {} to {} : Starting",
688698
cfg.chain,
@@ -694,6 +704,7 @@ pub(crate) async fn background_mithril_update(
694704
let mut current_snapshot = recover_existing_snapshot(&cfg, &tx).await;
695705

696706
loop {
707+
stats::resume_thread(cfg.chain, THREAD_NAME);
697708
debug!("Background Mithril Updater - New Loop");
698709

699710
cleanup(&cfg).await;

0 commit comments

Comments
 (0)