Skip to content

Commit 75e461e

Browse files
authored
[indexer-alt-framework] watermark tasks (#23737)
## Description Building on #24192, we now have new fields `--task` and `--reader-interval-ms`, which when set creates an indexer whose pipelines commit checkpoint data as long as it is not less than each of its main pipeline's `pruner_hi`. Startup behavior remains the same as defined in the previous PR: all pipelines, tasked or not, will resume committing from next after its committer watermark, or expect 0, unless overridden by `--first-checkpoint`. The indexer determines ingestion as the smallest next checkpoint across pipelines (as calculated above.) To backfill, operators either: - run a new indexer with `-task` and `-first-checkpoint` (preferred), or - wipe watermark entries for main pipelines (without task) and restart main indexer with `-first-checkpoint.` Multiple tasked indexers can run in parallel with different ranges to accelerate backfill. ## Test plan New tests in `lib.rs` and `collector.rs` Additional tests in `collector` --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [x] JSON-RPC: This PR, in tandem with [https://github.com/MystenLabs/sui/pull/24192](https://github.com/MystenLabs/sui/pull/24192), unifies how indexers determine their ingestion starting point and introduce watermark-gated backfill tasks. 1. `--skip-watermark` is removed, and the previous ability to bypass watermark safety checks for concurrent pipelines is no longer supported. 2. `--first-checkpoint` no longer forces the indexer to start ingesting from the configured checkpoint. The indexer now always determines its starting ingestion point as the minimum next checkpoint across all pipelines to resume processing from. From this release, `--first-checkpoint` now only applies to pipelines that do not yet have a committer watermark. These pipelines will resume processing from the configured value. Pipelines with existing watermarks will always resume processing from their own next checkpoint. 3. A new mechanism, watermark tasks, allows operators to run the same pipelines on multiple indexer instances for historical backfilling. Two new flags, `--task` and `--reader-interval-ms`, enable this mechanism. These flags create a tasked indexer whose pipelines commit checkpoint data as long as the checkpoint is not below the `reader_lo` watermark of their corresponding main pipelines. The indexer controls how frequently these tasked pipelines poll the main pipelines' watermarks per `--reader-interval-ms`. Migration guidance: 1. If you use `--first-checkpoint` only for _initial_ ingestion of a fresh pipeline, no further action is needed. 2. If you previously used `--first-checkpoint` and optionally `--skip-watermark` to backfill existing tables, you can achieve the same workflow by starting a new indexer instance with a configured `--task`, `--reader-interval-ms`, and `--first-checkpoint`. 3. Like `--skip-watermark`, `--task` cannot be used to run sequential pipelines. - [x] GraphQL: See above - [ ] CLI: - [ ] Rust SDK:
1 parent 838a94e commit 75e461e

File tree

16 files changed

+1048
-115
lines changed

16 files changed

+1048
-115
lines changed

crates/sui-checkpoint-blob-indexer/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod tests {
1818

1919
#[tokio::test]
2020
async fn test_checkpoint_blob_handler_uncompressed() {
21-
let store = ObjectStore::new(Arc::new(InMemory::new()), None);
21+
let store = ObjectStore::new(Arc::new(InMemory::new()));
2222
let mut conn = store.connect().await.unwrap();
2323

2424
let blob = CheckpointBlob {
@@ -40,7 +40,7 @@ mod tests {
4040

4141
#[tokio::test]
4242
async fn test_checkpoint_blob_handler_compressed() {
43-
let store = ObjectStore::new(Arc::new(InMemory::new()), None);
43+
let store = ObjectStore::new(Arc::new(InMemory::new()));
4444
let mut conn = store.connect().await.unwrap();
4545

4646
let test_data = vec![0u8; 1000];
@@ -66,7 +66,7 @@ mod tests {
6666

6767
#[tokio::test]
6868
async fn test_epochs_handler() {
69-
let store = ObjectStore::new(Arc::new(InMemory::new()), None);
69+
let store = ObjectStore::new(Arc::new(InMemory::new()));
7070
let path = ObjectPath::from("epochs.json");
7171

7272
// Test 1: Create new file with first epoch

crates/sui-checkpoint-blob-indexer/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ async fn main() -> anyhow::Result<()> {
126126
unreachable!("clap ensures exactly one storage backend is provided");
127127
};
128128

129-
let store = ObjectStore::new(object_store, args.watermark_task);
129+
let store = ObjectStore::new(object_store);
130130

131131
let cancel = tokio_util::sync::CancellationToken::new();
132132

crates/sui-indexer-alt-consistent-store/src/store/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub(crate) struct Store<S>(Arc<Inner<S>>);
3838
pub(crate) struct Connection<'s, S> {
3939
pub store: &'s Store<S>,
4040
pub batch: rocksdb::WriteBatch,
41-
watermark: Option<(&'static str, Watermark)>,
41+
watermark: Option<(String, Watermark)>,
4242
}
4343

4444
/// The contents of the store.
@@ -147,7 +147,7 @@ impl<S: Send + Sync + 'static> store::TransactionalStore for Store<S> {
147147
.queue
148148
.get()
149149
.context("Synchronizer not running for store")?
150-
.get(pipeline)
150+
.get(pipeline.as_str())
151151
.with_context(|| format!("No {pipeline:?} synchronizer queue"))?
152152
.send((watermark, conn.batch))
153153
.await
@@ -161,17 +161,22 @@ impl<S: Send + Sync + 'static> store::TransactionalStore for Store<S> {
161161
impl<S: Send + Sync> store::Connection for Connection<'_, S> {
162162
async fn committer_watermark(
163163
&mut self,
164-
pipeline: &'static str,
164+
pipeline_task: &str,
165165
) -> anyhow::Result<Option<CommitterWatermark>> {
166-
Ok(self.store.0.db.commit_watermark(pipeline)?.map(Into::into))
166+
Ok(self
167+
.store
168+
.0
169+
.db
170+
.commit_watermark(pipeline_task)?
171+
.map(Into::into))
167172
}
168173

169174
async fn set_committer_watermark(
170175
&mut self,
171-
pipeline: &'static str,
176+
pipeline_task: &str,
172177
watermark: CommitterWatermark,
173178
) -> anyhow::Result<bool> {
174-
self.watermark = Some((pipeline, watermark.into()));
179+
self.watermark = Some((pipeline_task.to_string(), watermark.into()));
175180
Ok(true)
176181
}
177182

crates/sui-indexer-alt-framework-store-traits/src/lib.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Mysten Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use anyhow::Result;
45
use async_trait::async_trait;
56
use chrono::{DateTime, Utc};
67
use scoped_futures::ScopedBoxFuture;
@@ -10,11 +11,13 @@ use std::time::Duration;
1011
/// operations, agnostic of the underlying store implementation.
1112
#[async_trait]
1213
pub trait Connection: Send {
13-
/// Given a pipeline, return the committer watermark from the `Store`. This is used by the
14-
/// indexer on startup to determine which checkpoint to resume processing from.
14+
/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
15+
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
16+
/// from the `Store`. The indexer fetches this value for each pipeline added to determine which
17+
/// checkpoint to resume processing from.
1518
async fn committer_watermark(
1619
&mut self,
17-
pipeline: &'static str,
20+
pipeline_task: &str,
1821
) -> anyhow::Result<Option<CommitterWatermark>>;
1922

2023
/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
@@ -36,11 +39,13 @@ pub trait Connection: Send {
3639
delay: Duration,
3740
) -> anyhow::Result<Option<PrunerWatermark>>;
3841

39-
/// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
40-
/// a boolean indicating whether the watermark was actually updated or not.
42+
/// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
43+
/// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
44+
/// long as it raises the watermark stored in the database. Returns a boolean indicating whether
45+
/// the watermark was actually updated or not.
4146
async fn set_committer_watermark(
4247
&mut self,
43-
pipeline: &'static str,
48+
pipeline_task: &str,
4449
watermark: CommitterWatermark,
4550
) -> anyhow::Result<bool>;
4651

@@ -63,7 +68,7 @@ pub trait Connection: Send {
6368
reader_lo: u64,
6469
) -> anyhow::Result<bool>;
6570

66-
/// Update the pruner watermark, returns true if the watermark was actually updated
71+
/// Update the pruner watermark, returns true if the watermark was actually updated.
6772
async fn set_pruner_watermark(
6873
&mut self,
6974
pipeline: &'static str,
@@ -81,6 +86,10 @@ pub trait Store: Send + Sync + 'static + Clone {
8186
where
8287
Self: 'c;
8388

89+
/// Delimiter used to separate pipeline names from task identifiers when reading or writing the
90+
/// committer watermark.
91+
const DELIMITER: &'static str = "@";
92+
8493
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
8594
}
8695

@@ -179,6 +188,27 @@ impl PrunerWatermark {
179188
}
180189
}
181190

191+
/// Check that the pipeline name does not contain the store's delimiter, and construct the string
192+
/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
193+
/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
194+
pub fn pipeline_task<S: Store>(
195+
pipeline_name: &'static str,
196+
task_name: Option<&str>,
197+
) -> Result<String> {
198+
if pipeline_name.contains(S::DELIMITER) {
199+
anyhow::bail!(
200+
"Pipeline name '{}' contains invalid delimiter '{}'",
201+
pipeline_name,
202+
S::DELIMITER
203+
);
204+
}
205+
206+
Ok(match task_name {
207+
Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
208+
None => pipeline_name.to_string(),
209+
})
210+
}
211+
182212
#[cfg(test)]
183213
mod tests {
184214
use super::*;

crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async fn ingest_and_broadcast_range(
217217
// we treat an error returned here as cancellation too.
218218
if tokio::select! {
219219
result = ingest_hi_rx.wait_for(|hi| hi.is_none_or(|h| cp < h)) => result.is_err(),
220-
_ = cancel.cancelled() => true,
220+
_ = cancel.cancelled() => true
221221
} {
222222
return Err(Error::Cancelled);
223223
}
@@ -238,8 +238,8 @@ async fn ingest_and_broadcast_range(
238238
debug!(checkpoint = cp, "Broadcasted checkpoint");
239239
Ok(())
240240
} else {
241-
// An error is returned meaning some subscriber channel has closed, which we consider
242-
// a cancellation signal for the entire ingestion.
241+
// An error is returned meaning some subscriber channel has closed,
242+
// which we consider a cancellation signal for the entire ingestion.
243243
cancel.cancel();
244244
Err(Error::Cancelled)
245245
}

0 commit comments

Comments
 (0)