Skip to content

Commit 8a53443

Browse files
sui-indexer-alt-framework: separate trailing_init_watermark from init_watermark
1 parent 11c0c00 commit 8a53443

File tree

10 files changed

+296
-172
lines changed

10 files changed

+296
-172
lines changed

crates/sui-analytics-indexer/src/store/mod.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::sync::RwLock;
2323
use std::time::Duration;
2424
use std::time::Instant;
2525

26-
use anyhow::Result;
2726
use async_trait::async_trait;
2827
use object_store::PutPayload;
2928
use object_store::path::Path as ObjectPath;
@@ -33,8 +32,6 @@ use sui_indexer_alt_framework::store::Connection;
3332
use sui_indexer_alt_framework::store::Store;
3433
use sui_indexer_alt_framework::store::TransactionalStore;
3534
use sui_indexer_alt_framework_store_traits::CommitterWatermark;
36-
use sui_indexer_alt_framework_store_traits::InitWatermark;
37-
use sui_indexer_alt_framework_store_traits::init_with_committer_watermark;
3835
use sui_types::base_types::EpochId;
3936
use tokio::sync::mpsc;
4037
use tracing::debug;
@@ -287,7 +284,7 @@ impl AnalyticsStore {
287284
&self,
288285
first_checkpoint: Option<u64>,
289286
last_checkpoint: Option<u64>,
290-
) -> Result<(Option<u64>, Option<u64>)> {
287+
) -> anyhow::Result<(Option<u64>, Option<u64>)> {
291288
match &self.mode {
292289
StoreMode::Live(_) => Ok((first_checkpoint, last_checkpoint)),
293290
StoreMode::Migration(store) => {
@@ -533,7 +530,7 @@ impl<'a> AnalyticsConnection<'a> {
533530
pub async fn commit_batch<P: Processor>(
534531
&mut self,
535532
batch_from_framework: &[CheckpointRows],
536-
) -> Result<usize> {
533+
) -> anyhow::Result<usize> {
537534
let pipeline = P::NAME;
538535
let pipeline_config = self.pipeline_config(pipeline);
539536

@@ -623,18 +620,6 @@ impl TransactionalStore for AnalyticsStore {
623620

624621
#[async_trait]
625622
impl Connection for AnalyticsConnection<'_> {
626-
/// Initialize watermark.
627-
///
628-
/// In live mode: Watermarks are derived from file names, so just delegates to `committer_watermark`.
629-
/// In migration mode: Delegates to `MigrationStore::init_watermark`.
630-
async fn init_watermark(
631-
&mut self,
632-
pipeline_task: &str,
633-
init_watermark: InitWatermark,
634-
) -> anyhow::Result<InitWatermark> {
635-
init_with_committer_watermark(self, pipeline_task, init_watermark).await
636-
}
637-
638623
/// Determine the watermark.
639624
///
640625
/// In live mode: scans file names in the object store.

crates/sui-indexer-alt-consistent-store/src/store/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use prometheus::Registry;
1313
use scoped_futures::ScopedBoxFuture;
1414
use sui_indexer_alt_framework::service::Service;
1515
use sui_indexer_alt_framework::store::CommitterWatermark;
16-
use sui_indexer_alt_framework::store::InitWatermark;
1716
use sui_indexer_alt_framework::store::Store as _;
18-
use sui_indexer_alt_framework::store::init_with_committer_watermark;
1917
use sui_indexer_alt_framework::store::{self};
2018

2119
use crate::db::Db;
@@ -167,14 +165,6 @@ impl<S: Send + Sync + 'static> store::TransactionalStore for Store<S> {
167165

168166
#[async_trait]
169167
impl<S: Send + Sync> store::Connection for Connection<'_, S> {
170-
async fn init_watermark(
171-
&mut self,
172-
pipeline_task: &str,
173-
init_watermark: InitWatermark,
174-
) -> anyhow::Result<InitWatermark> {
175-
init_with_committer_watermark(self, pipeline_task, init_watermark).await
176-
}
177-
178168
async fn committer_watermark(
179169
&mut self,
180170
pipeline_task: &str,

crates/sui-indexer-alt-framework-store-traits/src/lib.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,21 @@ pub trait Connection: Send {
1515
/// Returns the `InitWatermark` based on the existing watermark if it exists.
1616
/// Otherwise, initializes a new watermark record with `InitWatermark` and returns
1717
/// the value passed in.
18+
///
19+
/// The default implementation does not perform initialization and delegates to `committer_watermark`.
1820
async fn init_watermark(
1921
&mut self,
2022
pipeline_task: &str,
21-
init_watermark: InitWatermark,
22-
) -> anyhow::Result<InitWatermark>;
23+
_init_watermark: InitWatermark,
24+
) -> anyhow::Result<InitWatermark> {
25+
let checkpoint_hi_inclusive = self
26+
.committer_watermark(pipeline_task)
27+
.await?
28+
.map(|w| w.checkpoint_hi_inclusive);
29+
Ok(InitWatermark {
30+
checkpoint_hi_inclusive,
31+
})
32+
}
2333

2434
/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
2535
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
@@ -45,6 +55,15 @@ pub trait Connection: Send {
4555
/// the pruning subsystem.
4656
#[async_trait]
4757
pub trait TrailingConnection: Connection {
58+
/// Returns the `InitWatermark` based on the existing watermark if it exists.
59+
/// Otherwise, initializes a new watermark record with `InitWatermark` and returns
60+
/// the value passed in.
61+
async fn trailing_init_watermark(
62+
&mut self,
63+
pipeline_task: &str,
64+
trailing_init_watermark: TrailingInitWatermark,
65+
) -> anyhow::Result<TrailingInitWatermark>;
66+
4867
/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
4968
/// to determine the new `reader_lo` or inclusive lower bound of available data.
5069
async fn reader_watermark(
@@ -120,12 +139,18 @@ pub trait TransactionalStore: Store {
120139
&'r mut Self::Connection<'_>,
121140
) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
122141
}
123-
124142
/// Used during watermark initialization to set and return state.
125143
#[derive(Default, Debug, Clone, Copy, PartialEq)]
126144
pub struct InitWatermark {
127145
/// Calculated by the framework as `default_next_checkpoint.checked_sub(1)`.
128146
pub checkpoint_hi_inclusive: Option<u64>,
147+
}
148+
149+
/// Used during watermark initialization to set and return state.
150+
#[derive(Default, Debug, Clone, Copy, PartialEq)]
151+
pub struct TrailingInitWatermark {
152+
/// Calculated by the framework as `default_next_checkpoint.checked_sub(1)`.
153+
pub checkpoint_hi_inclusive: Option<u64>,
129154
/// Calculated by the framework as `default_next_checkpoint`.
130155
pub reader_lo: u64,
131156
}
@@ -212,23 +237,6 @@ impl PrunerWatermark {
212237
}
213238
}
214239

215-
/// A utility function for connections that do not have special initialization logic. These
216-
/// connections delegate initialization to `Connection::committer_watermark`.
217-
pub async fn init_with_committer_watermark(
218-
connection: &mut impl Connection,
219-
pipeline_task: &str,
220-
init_watermark: InitWatermark,
221-
) -> anyhow::Result<InitWatermark> {
222-
let checkpoint_hi_inclusive = connection
223-
.committer_watermark(pipeline_task)
224-
.await?
225-
.map(|w| w.checkpoint_hi_inclusive);
226-
Ok(InitWatermark {
227-
checkpoint_hi_inclusive,
228-
..init_watermark
229-
})
230-
}
231-
232240
/// Check that the pipeline name does not contain the store's delimiter, and construct the string
233241
/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
234242
/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.

0 commit comments

Comments
 (0)