Skip to content

Commit 683c83b

Browse files
authored
[indexer-alt-framework] remove skip_watermark and standardize first_checkpoint behavior (#24192)
## Description This PR standardizes behavior around `--first-checkpoint` and its effects on A. The checkpoint to start ingesting from B. The checkpoint for a pipeline to resume processing from - When an indexer starts, it will always start ingesting from the next checkpoint after the smallest committed checkpoint across all pipelines added to it. - Pipelines without a watermark are by default to be backfilled from genesis. - Providing `--first-checkpoint` overrides this behavior, and tells the indexer that these pipelines are to process checkpoints from `--first-checkpoint` instead. ## Test plan New tests --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK:
1 parent 91dcdb9 commit 683c83b

File tree

8 files changed

+860
-450
lines changed

8 files changed

+860
-450
lines changed

crates/sui-indexer-alt-framework/src/lib.rs

Lines changed: 846 additions & 366 deletions
Large diffs are not rendered by default.

crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,16 @@ use super::Handler;
4141
/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
4242
///
4343
/// The task will shutdown if the `cancel` token is signalled, or if the `rx` channel closes and
44-
/// the watermark cannot be progressed. If `skip_watermark` is set, the task will shutdown
45-
/// immediately.
44+
/// the watermark cannot be progressed.
4645
pub(super) fn commit_watermark<H: Handler + 'static>(
4746
mut next_checkpoint: u64,
4847
config: CommitterConfig,
49-
skip_watermark: bool,
5048
mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
5149
store: H::Store,
5250
metrics: Arc<IndexerMetrics>,
5351
cancel: CancellationToken,
5452
) -> JoinHandle<()> {
5553
tokio::spawn(async move {
56-
if skip_watermark {
57-
info!(pipeline = H::NAME, "Skipping commit watermark task");
58-
return;
59-
}
60-
6154
let mut poll = interval(config.watermark_interval());
6255
poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
6356

@@ -341,7 +334,6 @@ mod tests {
341334
let commit_watermark_handle = commit_watermark::<H>(
342335
next_checkpoint,
343336
config,
344-
false,
345337
watermark_rx,
346338
store_clone,
347339
metrics,

crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs

Lines changed: 8 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@ const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
3030
///
3131
/// The writing of each batch will be repeatedly retried on an exponential back-off until it
3232
/// succeeds. Once the write succeeds, the [WatermarkPart]s for that batch are sent on `tx` to the
33-
/// watermark task, as long as `skip_watermark` is not true.
33+
/// watermark task.
3434
///
3535
/// This task will shutdown via its `cancel`lation token, or if its receiver or sender channels are
3636
/// closed.
3737
pub(super) fn committer<H: Handler + 'static>(
3838
handler: Arc<H>,
3939
config: CommitterConfig,
40-
skip_watermark: bool,
4140
rx: mpsc::Receiver<BatchedRows<H>>,
4241
tx: mpsc::Sender<Vec<WatermarkPart>>,
4342
db: H::Store,
@@ -194,7 +193,7 @@ pub(super) fn committer<H: Handler + 'static>(
194193
}
195194
};
196195

197-
if !skip_watermark && tx.send(watermark).await.is_err() {
196+
if tx.send(watermark).await.is_err() {
198197
info!(pipeline = H::NAME, "Watermark closed channel");
199198
return Err(Break::Cancel);
200199
}
@@ -333,8 +332,7 @@ mod tests {
333332
///
334333
/// # Arguments
335334
/// * `store` - The mock store to use for testing
336-
/// * `skip_watermark` - Whether to skip sending watermarks to the watermark channel
337-
async fn setup_test(store: MockStore, skip_watermark: bool) -> TestSetup {
335+
async fn setup_test(store: MockStore) -> TestSetup {
338336
let config = CommitterConfig::default();
339337
let metrics = IndexerMetrics::new(None, &Default::default());
340338
let cancel = CancellationToken::new();
@@ -348,7 +346,6 @@ mod tests {
348346
let _ = committer(
349347
handler,
350348
config,
351-
skip_watermark,
352349
batch_rx,
353350
watermark_tx,
354351
store_clone,
@@ -368,7 +365,7 @@ mod tests {
368365

369366
#[tokio::test]
370367
async fn test_concurrent_batch_processing() {
371-
let mut setup = setup_test(MockStore::default(), false).await;
368+
let mut setup = setup_test(MockStore::default()).await;
372369

373370
// Send batches
374371
let batch1 = BatchedRows::from_vec(
@@ -451,7 +448,7 @@ mod tests {
451448

452449
#[tokio::test]
453450
async fn test_commit_with_retries_for_commit_failure() {
454-
let mut setup = setup_test(MockStore::default(), false).await;
451+
let mut setup = setup_test(MockStore::default()).await;
455452

456453
// Create a batch with a single item that will fail once before succeeding
457454
let batch = BatchedRows::from_vec(
@@ -520,7 +517,7 @@ mod tests {
520517
})),
521518
..Default::default()
522519
};
523-
let mut setup = setup_test(store, false).await;
520+
let mut setup = setup_test(store).await;
524521

525522
let batch = BatchedRows::from_vec(
526523
vec![StoredData {
@@ -577,7 +574,7 @@ mod tests {
577574

578575
#[tokio::test]
579576
async fn test_empty_batch_handling() {
580-
let mut setup = setup_test(MockStore::default(), false).await;
577+
let mut setup = setup_test(MockStore::default()).await;
581578

582579
let empty_batch = BatchedRows::from_vec(
583580
vec![], // Empty batch
@@ -616,54 +613,9 @@ mod tests {
616613
let _ = setup.committer_handle.await;
617614
}
618615

619-
#[tokio::test]
620-
async fn test_skip_watermark_mode() {
621-
let mut setup = setup_test(MockStore::default(), true).await;
622-
623-
let batch = BatchedRows::from_vec(
624-
vec![StoredData {
625-
cp_sequence_number: 1,
626-
tx_sequence_numbers: vec![1, 2, 3],
627-
..Default::default()
628-
}],
629-
vec![WatermarkPart {
630-
watermark: CommitterWatermark {
631-
epoch_hi_inclusive: 0,
632-
checkpoint_hi_inclusive: 1,
633-
tx_hi: 3,
634-
timestamp_ms_hi_inclusive: 1000,
635-
},
636-
batch_rows: 1,
637-
total_rows: 1,
638-
}],
639-
);
640-
641-
// Send the batch
642-
setup.batch_tx.send(batch).await.unwrap();
643-
644-
// Wait for processing
645-
tokio::time::sleep(Duration::from_millis(200)).await;
646-
647-
// Verify data was committed
648-
{
649-
let data = setup.store.data.get(DataPipeline::NAME).unwrap();
650-
assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
651-
}
652-
653-
// Verify no watermark was sent (skip_watermark mode)
654-
assert!(
655-
setup.watermark_rx.try_recv().is_err(),
656-
"No watermark should be sent in skip_watermark mode"
657-
);
658-
659-
// Clean up
660-
drop(setup.batch_tx);
661-
let _ = setup.committer_handle.await;
662-
}
663-
664616
#[tokio::test]
665617
async fn test_watermark_channel_closed() {
666-
let setup = setup_test(MockStore::default(), false).await;
618+
let setup = setup_test(MockStore::default()).await;
667619

668620
let batch = BatchedRows::from_vec(
669621
vec![StoredData {

crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,7 @@ impl Default for PrunerConfig {
194194
/// time.
195195
///
196196
/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
197-
/// watermark below which all data has been committed (modulo pruning), as long as `skip_watermark`
198-
/// is not true.
197+
/// watermark below which all data has been committed (modulo pruning).
199198
///
200199
/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
201200
/// channels are created to communicate between its various components. The pipeline can be
@@ -205,7 +204,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
205204
handler: H,
206205
next_checkpoint: u64,
207206
config: ConcurrentConfig,
208-
skip_watermark: bool,
209207
store: H::Store,
210208
checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
211209
metrics: Arc<IndexerMetrics>,
@@ -255,7 +253,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
255253
let committer = committer::<H>(
256254
handler.clone(),
257255
committer_config.clone(),
258-
skip_watermark,
259256
committer_rx,
260257
committer_tx,
261258
store.clone(),
@@ -266,7 +263,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
266263
let commit_watermark = commit_watermark::<H>(
267264
next_checkpoint,
268265
committer_config,
269-
skip_watermark,
270266
watermark_rx,
271267
store.clone(),
272268
metrics.clone(),
@@ -412,12 +408,10 @@ mod tests {
412408
let metrics = IndexerMetrics::new(None, &Registry::default());
413409
let cancel = CancellationToken::new();
414410

415-
let skip_watermark = false;
416411
let pipeline_handle = pipeline(
417412
DataPipeline,
418413
next_checkpoint,
419414
config,
420-
skip_watermark,
421415
store.clone(),
422416
checkpoint_rx,
423417
metrics,

crates/sui-indexer-alt-framework/src/pipeline/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ const PIPELINE_BUFFER: usize = 5;
2121
/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
2222
/// greater than its current watermark -- in that case, the pipeline will never be able to update
2323
/// its watermarks.
24-
///
25-
/// This may be a legitimate thing to do when backfilling a table, but in that case
26-
/// `--skip-watermarks` should be used.
2724
const WARN_PENDING_WATERMARKS: usize = 10000;
2825

2926
#[derive(Serialize, Deserialize, Debug, Clone)]

crates/sui-indexer-alt-framework/src/postgres/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ pub mod tests {
160160
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
161161
.await
162162
.unwrap();
163-
assert_eq!(indexer.first_checkpoint_from_watermark, 0);
163+
assert_eq!(indexer.first_ingestion_checkpoint, 0);
164164
}
165165

166166
#[tokio::test]
@@ -179,7 +179,7 @@ pub mod tests {
179179
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
180180
.await
181181
.unwrap();
182-
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
182+
assert_eq!(indexer.first_ingestion_checkpoint, 11);
183183
}
184184

185185
#[tokio::test]
@@ -205,11 +205,11 @@ pub mod tests {
205205
.concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
206206
.await
207207
.unwrap();
208-
assert_eq!(indexer.first_checkpoint_from_watermark, 21);
208+
assert_eq!(indexer.first_ingestion_checkpoint, 21);
209209
indexer
210210
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
211211
.await
212212
.unwrap();
213-
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
213+
assert_eq!(indexer.first_ingestion_checkpoint, 11);
214214
}
215215
}

crates/sui-indexer-alt/src/benchmark.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ pub async fn run_benchmark(
5151
first_checkpoint: Some(first_checkpoint),
5252
last_checkpoint: Some(last_checkpoint),
5353
pipeline,
54-
..Default::default()
5554
};
5655

5756
let client_args = ClientArgs {

docs/content/guides/developer/advanced/custom-indexer/indexer-runtime-perf.mdx

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,6 @@ Include the following command-line arguments to help focus processing. These val
7575
--pipeline "tx_counts" # Run specific pipeline only
7676
--pipeline "events" # Can specify multiple pipelines
7777

78-
# Watermark behavior
79-
--skip-watermark
80-
```
81-
8278
**Use cases:**
8379

8480
- **Checkpoint range:** Essential for backfills and historical data processing.

0 commit comments

Comments
 (0)