Skip to content

Commit 2d8c492

Browse files
emmaling27Convex, Inc.
authored andcommitted
DB index backfill checkpointing (#42012)
Adds checkpointing to database index backfills so they are resumable (restarting the backend won't erase backfill progress). We store the `BackfillCursor` in the index backfills table to avoid OCCs in the `_index` table, which only allows one write at a time. This is not consistent with the way search index checkpoints are stored, but we should migrate those out of the `_index` table eventually for the same reason. GitOrigin-RevId: 1a0b21ac46b3edd1dd52da8a37e1ef173538d235
1 parent 2901722 commit 2d8c492

File tree

10 files changed

+402
-171
lines changed

10 files changed

+402
-171
lines changed

crates/database/src/bootstrap_model/index_backfills/mod.rs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use common::{
1111
runtime::Runtime,
1212
types::IndexId,
1313
};
14+
use sync_types::Timestamp;
1415
use value::{
1516
DeveloperDocumentId,
1617
FieldPath,
@@ -21,7 +22,10 @@ use value::{
2122
};
2223

2324
use crate::{
24-
bootstrap_model::index_backfills::types::IndexBackfillMetadata,
25+
bootstrap_model::index_backfills::types::{
26+
BackfillCursor,
27+
IndexBackfillMetadata,
28+
},
2529
system_tables::{
2630
SystemIndex,
2731
SystemTable,
@@ -88,14 +92,33 @@ impl<'a, RT: Runtime> IndexBackfillModel<'a, RT> {
8892
.await
8993
}
9094

95+
pub async fn initialize_search_index_backfill(
96+
&mut self,
97+
index_id: IndexId,
98+
total_docs: Option<u64>,
99+
) -> anyhow::Result<ResolvedDocumentId> {
100+
self.initialize_backfill(index_id, total_docs, None).await
101+
}
102+
103+
pub async fn initialize_database_index_backfill(
104+
&mut self,
105+
index_id: IndexId,
106+
total_docs: Option<u64>,
107+
snapshot_ts: Timestamp,
108+
) -> anyhow::Result<ResolvedDocumentId> {
109+
self.initialize_backfill(index_id, total_docs, Some(snapshot_ts))
110+
.await
111+
}
112+
91113
/// Creates a new index backfill entry or reset existing index backfill
92114
/// entry with 0 progress and the total number of documents, if available.
93115
/// total_docs may not be available if table summaries have not yet
94116
/// bootstrapped. We're ok to update it later (which will be approximate).
95-
pub async fn initialize_backfill(
117+
async fn initialize_backfill(
96118
&mut self,
97119
index_id: IndexId,
98120
total_docs: Option<u64>,
121+
snapshot_ts: Option<Timestamp>,
99122
) -> anyhow::Result<ResolvedDocumentId> {
100123
let index_id = self.index_id_as_developer_id(index_id);
101124
tracing::info!(
@@ -108,6 +131,10 @@ impl<'a, RT: Runtime> IndexBackfillModel<'a, RT> {
108131
index_id,
109132
num_docs_indexed: 0,
110133
total_docs,
134+
cursor: snapshot_ts.map(|ts| BackfillCursor {
135+
snapshot_ts: ts,
136+
cursor: None,
137+
}),
111138
};
112139
if let Some(existing_backfill_metadata) = maybe_existing_backfill_metadata {
113140
system_model
@@ -124,28 +151,63 @@ impl<'a, RT: Runtime> IndexBackfillModel<'a, RT> {
124151
}
125152
}
126153

154+
pub async fn update_search_index_backfill_progress(
155+
&mut self,
156+
index_id: IndexId,
157+
tablet_id: TabletId,
158+
num_docs_indexed: u64,
159+
) -> anyhow::Result<()> {
160+
self.update_index_backfill_progress(index_id, tablet_id, num_docs_indexed, None)
161+
.await
162+
}
163+
164+
pub async fn update_database_index_backfill_progress(
165+
&mut self,
166+
index_id: IndexId,
167+
tablet_id: TabletId,
168+
num_docs_indexed: u64,
169+
cursor: ResolvedDocumentId,
170+
) -> anyhow::Result<()> {
171+
self.update_index_backfill_progress(
172+
index_id,
173+
tablet_id,
174+
num_docs_indexed,
175+
Some(cursor.developer_id),
176+
)
177+
.await
178+
}
179+
127180
/// Upserts progress on index backfills. Only call this during the phase of
128181
/// the backfill where we walk a snapshot of a table, not the catching up
129182
/// phase where we walk the revision stream. These metrics don't make sense
130183
/// in the context of the revision stream.
131184
/// num_docs_indexed is the number of additional documents indexed since the
132185
/// last call.
133-
pub async fn update_index_backfill_progress(
186+
async fn update_index_backfill_progress(
134187
&mut self,
135188
index_id: IndexId,
136189
tablet_id: TabletId,
137190
num_docs_indexed: u64,
191+
cursor: Option<DeveloperDocumentId>,
138192
) -> anyhow::Result<()> {
139193
let index_id = self.index_id_as_developer_id(index_id);
140194
let maybe_existing_backfill_metadata = self.existing_backfill_metadata(index_id).await?;
141195
let Some(existing_backfill_metadata) = maybe_existing_backfill_metadata else {
142196
anyhow::bail!("Index backfill not found for index {}", index_id);
143197
};
198+
let cursor = existing_backfill_metadata
199+
.cursor
200+
.as_ref()
201+
.map(|c| BackfillCursor {
202+
snapshot_ts: c.snapshot_ts,
203+
cursor,
204+
});
144205
if let Some(total_docs) = existing_backfill_metadata.total_docs {
145206
let new_backfill_metadata = IndexBackfillMetadata {
146207
index_id,
147208
num_docs_indexed: existing_backfill_metadata.num_docs_indexed + num_docs_indexed,
148209
total_docs: Some(total_docs),
210+
cursor,
149211
};
150212
SystemMetadataModel::new_global(self.tx)
151213
.replace(
@@ -162,8 +224,10 @@ impl<'a, RT: Runtime> IndexBackfillModel<'a, RT> {
162224
// still bootstrapping when the backfill began.
163225
let new_backfill_metadata = IndexBackfillMetadata {
164226
index_id,
165-
num_docs_indexed,
227+
num_docs_indexed: existing_backfill_metadata.num_docs_indexed
228+
+ num_docs_indexed,
166229
total_docs: Some(count),
230+
cursor,
167231
};
168232
SystemMetadataModel::new_global(self.tx)
169233
.replace(

crates/database/src/bootstrap_model/index_backfills/tests.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async fn test_initialize_backfill_creates_new_entry(rt: TestRuntime) -> anyhow::
4242
let total_docs = Some(1000u64);
4343

4444
let backfill_id = IndexBackfillModel::new(&mut tx)
45-
.initialize_backfill(index_id, total_docs)
45+
.initialize_backfill(index_id, total_docs, None)
4646
.await?;
4747

4848
// Verify the backfill was created
@@ -65,7 +65,7 @@ async fn test_initialize_backfill_with_none_total_docs(rt: TestRuntime) -> anyho
6565
let total_docs = None;
6666

6767
let backfill_id = IndexBackfillModel::new(&mut tx)
68-
.initialize_backfill(index_id, total_docs)
68+
.initialize_backfill(index_id, total_docs, None)
6969
.await?;
7070

7171
// Verify the backfill was created with None total_docs
@@ -88,12 +88,12 @@ async fn test_initialize_backfill_resets_existing_entry(rt: TestRuntime) -> anyh
8888

8989
// Create initial backfill
9090
let first_backfill_id = IndexBackfillModel::new(&mut tx)
91-
.initialize_backfill(index_id, Some(500))
91+
.initialize_backfill(index_id, Some(500), None)
9292
.await?;
9393

9494
// Update progress
9595
IndexBackfillModel::new(&mut tx)
96-
.update_index_backfill_progress(index_id, create_test_tablet_id(), 100)
96+
.update_index_backfill_progress(index_id, create_test_tablet_id(), 100, None)
9797
.await?;
9898

9999
// Verify progress was updated
@@ -104,7 +104,7 @@ async fn test_initialize_backfill_resets_existing_entry(rt: TestRuntime) -> anyh
104104

105105
// Initialize again with different total_docs - should reset progress
106106
let second_backfill_id = IndexBackfillModel::new(&mut tx)
107-
.initialize_backfill(index_id, Some(1000))
107+
.initialize_backfill(index_id, Some(1000), None)
108108
.await?;
109109

110110
// Should return the same ID
@@ -132,12 +132,12 @@ async fn test_update_index_backfill_progress_with_total_docs(
132132

133133
// Initialize backfill
134134
let backfill_id = IndexBackfillModel::new(&mut tx)
135-
.initialize_backfill(index_id, total_docs)
135+
.initialize_backfill(index_id, total_docs, None)
136136
.await?;
137137

138138
// Update progress
139139
IndexBackfillModel::new(&mut tx)
140-
.update_index_backfill_progress(index_id, tablet_id, 250)
140+
.update_index_backfill_progress(index_id, tablet_id, 250, None)
141141
.await?;
142142

143143
// Verify progress was updated
@@ -149,7 +149,7 @@ async fn test_update_index_backfill_progress_with_total_docs(
149149

150150
// Update progress again
151151
IndexBackfillModel::new(&mut tx)
152-
.update_index_backfill_progress(index_id, tablet_id, 150)
152+
.update_index_backfill_progress(index_id, tablet_id, 150, None)
153153
.await?;
154154

155155
// Verify progress was accumulated
@@ -173,7 +173,7 @@ async fn test_update_index_backfill_progress_nonexistent_backfill(
173173

174174
// Try to update progress for non-existent backfill
175175
let result = IndexBackfillModel::new(&mut tx)
176-
.update_index_backfill_progress(index_id, tablet_id, 100)
176+
.update_index_backfill_progress(index_id, tablet_id, 100, None)
177177
.await;
178178

179179
// Should return an error
@@ -200,13 +200,13 @@ async fn test_update_index_backfill_progress_with_none_total_docs(
200200
.tablet_id;
201201
// Initialize backfill without total_docs
202202
let backfill_id = IndexBackfillModel::new(&mut tx)
203-
.initialize_backfill(index_id, None)
203+
.initialize_backfill(index_id, None, None)
204204
.await?;
205205

206206
// In a test environment, table mapping for a non-existent tablet will fail,
207207
// so this tests that the method handles the error gracefully
208208
IndexBackfillModel::new(&mut tx)
209-
.update_index_backfill_progress(index_id, tablet_id, 100)
209+
.update_index_backfill_progress(index_id, tablet_id, 100, None)
210210
.await?;
211211

212212
let backfill_doc = tx.get(backfill_id).await?;
@@ -226,7 +226,7 @@ async fn test_delete_index_backfill_existing(rt: TestRuntime) -> anyhow::Result<
226226

227227
// Initialize backfill
228228
let backfill_id = IndexBackfillModel::new(&mut tx)
229-
.initialize_backfill(index_id, total_docs)
229+
.initialize_backfill(index_id, total_docs, None)
230230
.await?;
231231

232232
// Verify it exists
@@ -279,15 +279,19 @@ async fn test_multiple_backfills_different_indexes(rt: TestRuntime) -> anyhow::R
279279

280280
// Initialize backfills for different indexes
281281
let mut model = IndexBackfillModel::new(&mut tx);
282-
let backfill_id1 = model.initialize_backfill(index_id1, Some(1000)).await?;
283-
let backfill_id2 = model.initialize_backfill(index_id2, Some(2000)).await?;
282+
let backfill_id1 = model
283+
.initialize_backfill(index_id1, Some(1000), None)
284+
.await?;
285+
let backfill_id2 = model
286+
.initialize_backfill(index_id2, Some(2000), None)
287+
.await?;
284288

285289
// Update progress for both
286290
model
287-
.update_index_backfill_progress(index_id1, tablet_id, 100)
291+
.update_index_backfill_progress(index_id1, tablet_id, 100, None)
288292
.await?;
289293
model
290-
.update_index_backfill_progress(index_id2, tablet_id, 200)
294+
.update_index_backfill_progress(index_id2, tablet_id, 200, None)
291295
.await?;
292296

293297
// Verify both backfills exist with correct progress

crates/database/src/bootstrap_model/index_backfills/types.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@ use serde::{
22
Deserialize,
33
Serialize,
44
};
5+
use sync_types::Timestamp;
56
use value::{
67
codegen_convex_serialization,
78
DeveloperDocumentId,
89
};
910

11+
/// Cursor for a database index that has an in-progress backfill
12+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
13+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
14+
pub struct BackfillCursor {
15+
pub snapshot_ts: Timestamp,
16+
pub cursor: Option<DeveloperDocumentId>,
17+
}
18+
1019
/// Metadata for tracking index backfill progress.
1120
///
1221
/// This structure stores the progress of an index backfill operation,
@@ -29,6 +38,36 @@ pub struct IndexBackfillMetadata {
2938
/// (does not include documents written since the backfill began)
3039
/// This field is None if there is no table summary available.
3140
pub total_docs: Option<u64>,
41+
/// We only track the backfill cursor for database indexes because search
42+
/// index backfill state is stored in the _index table.
43+
pub cursor: Option<BackfillCursor>,
44+
}
45+
46+
#[derive(Serialize, Deserialize)]
47+
#[serde(rename_all = "camelCase")]
48+
pub struct SerializedBackfillCursor {
49+
snapshot_ts: i64,
50+
cursor: Option<String>,
51+
}
52+
53+
impl From<BackfillCursor> for SerializedBackfillCursor {
54+
fn from(cursor: BackfillCursor) -> Self {
55+
SerializedBackfillCursor {
56+
snapshot_ts: cursor.snapshot_ts.into(),
57+
cursor: cursor.cursor.map(|id| id.to_string()),
58+
}
59+
}
60+
}
61+
62+
impl TryFrom<SerializedBackfillCursor> for BackfillCursor {
63+
type Error = anyhow::Error;
64+
65+
fn try_from(cursor: SerializedBackfillCursor) -> Result<Self, Self::Error> {
66+
Ok(BackfillCursor {
67+
snapshot_ts: cursor.snapshot_ts.try_into()?,
68+
cursor: cursor.cursor.map(|id| id.parse()).transpose()?,
69+
})
70+
}
3271
}
3372

3473
#[derive(Serialize, Deserialize)]
@@ -37,6 +76,7 @@ pub struct SerializedIndexBackfillMetadata {
3776
index_id: String,
3877
num_docs_indexed: i64,
3978
total_docs: Option<i64>,
79+
cursor: Option<SerializedBackfillCursor>,
4080
}
4181

4282
impl From<IndexBackfillMetadata> for SerializedIndexBackfillMetadata {
@@ -45,6 +85,7 @@ impl From<IndexBackfillMetadata> for SerializedIndexBackfillMetadata {
4585
index_id: metadata.index_id.to_string(),
4686
num_docs_indexed: metadata.num_docs_indexed as i64,
4787
total_docs: metadata.total_docs.map(|v| v as i64),
88+
cursor: metadata.cursor.map(|cursor| cursor.into()),
4889
}
4990
}
5091
}
@@ -57,6 +98,7 @@ impl TryFrom<SerializedIndexBackfillMetadata> for IndexBackfillMetadata {
5798
index_id: serialized.index_id.parse()?,
5899
num_docs_indexed: serialized.num_docs_indexed as u64,
59100
total_docs: serialized.total_docs.map(|v| v as u64),
101+
cursor: serialized.cursor.map(|c| c.try_into()).transpose()?,
60102
})
61103
}
62104
}

crates/database/src/database.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,6 +1707,10 @@ impl<RT: Runtime> Database<RT> {
17071707
Ok(snapshot)
17081708
}
17091709

1710+
pub fn latest_ts_and_snapshot(&self) -> anyhow::Result<(RepeatableTimestamp, Snapshot)> {
1711+
Ok(self.snapshot_manager.lock().latest())
1712+
}
1713+
17101714
pub fn latest_database_snapshot(&self) -> anyhow::Result<DatabaseSnapshot<RT>> {
17111715
let (ts, snapshot) = self.snapshot_manager.lock().latest();
17121716
let repeatable_persistence =

0 commit comments

Comments
 (0)