From d0a0aec4a222847ebf9e7fd6820395b98733e19a Mon Sep 17 00:00:00 2001 From: Will Yang Date: Tue, 2 Dec 2025 13:04:42 -0800 Subject: [PATCH 1/6] the default ingestion concurrency of 200 means that if the indexer is unlucky enough to not be able to process the first 11 checkpoints before saturation, we'll stall forever --- crates/sui-indexer-alt-framework/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 10fe320be54f3..be9bc0ff63a89 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -1992,7 +1992,10 @@ mod tests { ..Default::default() }; - let ingestion_config = IngestionConfig::default(); + let ingestion_config = IngestionConfig { + ingest_concurrency: 501, + ..Default::default() + }; let mut tasked_indexer = Indexer::new( store.clone(), From dab94f3e90eaceb1b03770a6a7cd51c9a3c4f127 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Tue, 2 Dec 2025 15:12:34 -0800 Subject: [PATCH 2/6] something about adding a print to wait_for_watermark seems to have fixed the flakiness.. why is that? --- crates/sui-indexer-alt-framework/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index be9bc0ff63a89..6af6e86c6028f 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -498,6 +498,7 @@ mod tests { .await .ok(); + println!("Processed checkpoint {}", cp_num); Ok(vec![MockValue(cp_num)]) } } From 99de94dcf31f1be84a3d8c54b9b57ee86e3d9aa7 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Tue, 2 Dec 2025 15:47:10 -0800 Subject: [PATCH 3/6] maybe fanout is an issue --- crates/sui-indexer-alt-framework/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 6af6e86c6028f..e0b0611215286 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -483,6 +483,7 @@ mod tests { #[async_trait] impl Processor for ControllableHandler { const NAME: &'static str = "controllable"; + const FANOUT: usize = 501; type Value = MockValue; async fn process( From 6cafd689f9eb2f9cc65afa213a9db7c9b6b61056 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Tue, 2 Dec 2025 16:27:53 -0800 Subject: [PATCH 4/6] this should do. we can explore using start_paused = true later to further decrease the duration of this test. --- crates/sui-indexer-alt-framework/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index e0b0611215286..85d10c36edd9b 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -499,7 +499,6 @@ mod tests { .await .ok(); - println!("Processed checkpoint {}", cp_num); Ok(vec![MockValue(cp_num)]) } } From 76757575ad522c7beb45fac0118f569e73e7904d Mon Sep 17 00:00:00 2001 From: Will Yang Date: Tue, 2 Dec 2025 16:35:53 -0800 Subject: [PATCH 5/6] now that we've figured out the source of stalls, start_paused should work and speed up this test --- crates/sui-indexer-alt-framework/src/lib.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 85d10c36edd9b..2949e94eee97d 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -483,6 +483,10 @@ mod tests { #[async_trait] impl Processor for ControllableHandler { const NAME: &'static str = "controllable"; + /// The checkpoints to process come out of order. To account for potential flakiness in test + /// `test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo`, we set the FANOUT to + /// the total number of checkpoints to process, so we can ensure the correct checkpoints are + /// processed. const FANOUT: usize = 501; type Value = MockValue; @@ -1961,7 +1965,7 @@ mod tests { /// During a run, the tasked pipeline will stop sending checkpoints below the main pipeline's /// reader watermark to the committer. Committer watermark should still advance. - #[tokio::test] + #[tokio::test(start_paused = true)] async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() { let cancel = CancellationToken::new(); let registry = Registry::new(); @@ -1993,10 +1997,7 @@ mod tests { ..Default::default() }; - let ingestion_config = IngestionConfig { - ingest_concurrency: 501, - ..Default::default() - }; + let ingestion_config = IngestionConfig::default(); let mut tasked_indexer = Indexer::new( store.clone(), From aa2403393ff61b4d8df3dc7aac2b9b1b4a029be2 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Wed, 3 Dec 2025 08:49:09 -0800 Subject: [PATCH 6/6] don't need lenient check anymore --- crates/sui-indexer-alt-framework/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 2949e94eee97d..d56ee81b32baa 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -2082,8 +2082,7 @@ mod tests { let lt_250 = data.iter().filter(|e| *e.key() < 250).count(); // Checkpoints 250 to 500 inclusive must have been committed. assert_eq!(ge_250, 251); - // Lenient check that not all checkpoints < 250 were committed. - assert!(lt_250 < 250); + assert_eq!(lt_250, 11); assert_eq!( conn.committer_watermark( &pipeline_task::(ControllableHandler::NAME, Some("task")).unwrap()