Skip to content

Commit db3d8ea

Browse files
emmaling27Convex, Inc.
authored andcommitted
Skip backfilling to retention cutoff in IndexWorker (#41756)
Changes `perform_backfill` into two functions `backfill_from_ts` and `backfill_from_retention_cutoff`. The index worker should never need to backfill all the way to retention cutoff because new indexes don't get queried until after they are committed. I'll follow up to modify db cluster migration to allow us to get rid of `backfill_from_retention_cutoff` completely. GitOrigin-RevId: 789db6ecbb4186a383b18fc95f3ee99d4dc9db3c
1 parent cf40c80 commit db3d8ea

File tree

3 files changed

+49
-3
lines changed

3 files changed

+49
-3
lines changed

crates/database/src/database_index_workers/index_writer.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl<RT: Runtime> IndexWriter<RT> {
207207
/// with a timestamp <= `snapshot_ts`.
208208
///
209209
/// Takes a an optional database to update progress on the index backfill
210-
pub async fn perform_backfill(
210+
pub async fn backfill_from_retention_cutoff(
211211
&self,
212212
snapshot_ts: RepeatableTimestamp,
213213
index_metadata: &IndexRegistry,
@@ -265,6 +265,52 @@ impl<RT: Runtime> IndexWriter<RT> {
265265
Ok(())
266266
}
267267

268+
/// Backfill indexes based on a snapshot at the current time. After the
269+
/// current snapshot is backfilled, index snapshot reads at >=ts are
270+
/// valid.
271+
///
272+
/// The goal of this backfill is to make snapshot reads of `index_name`
273+
/// valid at or after snapshot_ts.
274+
/// To support:
275+
/// 1. Latest documents as of snapshot_ts.
276+
/// 2. Document changes after `snapshot_ts`. These are handled by active
277+
/// writes, assuming `snapshot_ts` is after the index was created. If
278+
/// there are no active writes, then `backfill_forwards` must be called
279+
/// with a timestamp <= `snapshot_ts`.
280+
///
281+
/// Takes a an optional database to update progress on the index backfill
282+
pub async fn backfill_from_ts(
283+
&self,
284+
snapshot_ts: RepeatableTimestamp,
285+
index_metadata: &IndexRegistry,
286+
index_selector: IndexSelector,
287+
concurrency: usize,
288+
database: Option<Database<RT>>,
289+
) -> anyhow::Result<()> {
290+
let pause_client = self.runtime.pause_client();
291+
pause_client.wait(PERFORM_BACKFILL_LABEL).await;
292+
stream::iter(index_selector.iterate_tables().map(Ok))
293+
.try_for_each_concurrent(concurrency, |table_id| {
294+
let index_metadata = index_metadata.clone();
295+
let index_selector = index_selector.clone();
296+
let database = database.clone();
297+
let self_ = (*self).clone();
298+
try_join("index_backfill_table_snapshot", async move {
299+
self_
300+
.backfill_exact_snapshot_of_table(
301+
snapshot_ts,
302+
&index_selector,
303+
&index_metadata,
304+
table_id,
305+
database,
306+
)
307+
.await
308+
})
309+
})
310+
.await?;
311+
Ok(())
312+
}
313+
268314
/// Backfills exactly the index entries necessary to represent documents
269315
/// which were latest at `snapshot`. In particular it does not create any
270316
/// tombstone index entries. And it only does snapshot reads (of `by_id`) at

crates/database/src/database_index_workers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl<RT: Runtime> IndexWorker<RT> {
334334
};
335335
let index_registry = snapshot.index_registry;
336336
index_writer
337-
.perform_backfill(
337+
.backfill_from_ts(
338338
ts,
339339
&index_registry,
340340
index_selector,

crates/database/src/tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1916,7 +1916,7 @@ async fn test_index_write(rt: TestRuntime) -> anyhow::Result<()> {
19161916
.await?;
19171917
let index_metadata = database_snapshot.index_registry().clone();
19181918
index_writer
1919-
.perform_backfill(
1919+
.backfill_from_ts(
19201920
unchecked_repeatable_ts(ts),
19211921
&index_metadata,
19221922
IndexSelector::All(index_metadata.clone()),

0 commit comments

Comments
 (0)