Skip to content

Commit 9160e0a

Browse files
emmaling27Convex, Inc.
authored andcommitted
Refactor search and vector bootstrap to be more readable (#23842)
This PR moves the `IndexesToBootstrap` creation fn and bootstrap fn to be on `IndexesToBootstrap` instead of in `SearchAndVectorBootstrapWorker`. I think it's a minor improvement to separate the code so the worker is really just the async loop and sending the message to the committer. `IndexesToBootstrap` is the where most of the logic of transforming index documents into index managers lives. This change could make it easier to unit test in the future. Right now all the tests are at the worker level. GitOrigin-RevId: 8c61b038f324e8ca28cf402fd750609f2b6bb002
1 parent 605ea1d commit 9160e0a

File tree

1 file changed

+170
-171
lines changed

1 file changed

+170
-171
lines changed

crates/database/src/search_and_vector_bootstrap.rs

Lines changed: 170 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,125 @@ pub struct BootstrappedSearchAndVectorIndexes {
112112
}
113113

114114
impl IndexesToBootstrap {
115+
fn create(
116+
upper_bound: RepeatableTimestamp,
117+
indexes_with_fast_forward_ts: Vec<(ParsedDocument<TabletIndexMetadata>, Option<Timestamp>)>,
118+
) -> anyhow::Result<Self> {
119+
let mut table_to_vector_indexes: BTreeMap<_, Vec<_>> = BTreeMap::new();
120+
let mut table_to_search_indexes: BTreeMap<_, Vec<_>> = BTreeMap::new();
121+
// We keep track of latest ts we can bootstrap from for each vector index.
122+
let mut oldest_index_ts = *upper_bound;
123+
124+
for (index_doc, fast_forward_ts) in indexes_with_fast_forward_ts {
125+
let is_enabled = index_doc.config.is_enabled();
126+
let (index_id, index_metadata) = index_doc.into_id_and_value();
127+
match index_metadata.config {
128+
IndexConfig::Vector {
129+
on_disk_state,
130+
ref developer_config,
131+
..
132+
} => {
133+
let qdrant_schema = QdrantSchema::new(developer_config);
134+
let ts = match on_disk_state {
135+
VectorIndexState::Backfilled(ref snapshot_info)
136+
| VectorIndexState::SnapshottedAt(ref snapshot_info) => {
137+
// Use fast forward ts instead of snapshot ts.
138+
let current_index_ts =
139+
max(fast_forward_ts.unwrap_or_default(), snapshot_info.ts);
140+
oldest_index_ts = min(oldest_index_ts, current_index_ts);
141+
snapshot_info.ts
142+
},
143+
VectorIndexState::Backfilling(_) => upper_bound.succ()?,
144+
};
145+
let vector_index_bootstrap_data = VectorIndexBootstrapData {
146+
index_id: index_id.internal_id(),
147+
on_disk_state,
148+
memory_index: MemoryVectorIndex::new(WriteTimestamp::Committed(ts.succ()?)),
149+
qdrant_schema,
150+
};
151+
if let Some(vector_indexes) =
152+
table_to_vector_indexes.get_mut(index_metadata.name.table())
153+
{
154+
vector_indexes.push(vector_index_bootstrap_data);
155+
} else {
156+
table_to_vector_indexes.insert(
157+
*index_metadata.name.table(),
158+
vec![vector_index_bootstrap_data],
159+
);
160+
}
161+
},
162+
IndexConfig::Search {
163+
ref developer_config,
164+
on_disk_state,
165+
} => {
166+
let search_index = match on_disk_state {
167+
SearchIndexState::Backfilling => {
168+
// We'll start a new memory search index starting at the next commit
169+
// after our persistence upper bound. After
170+
// bootstrapping, all commits after
171+
// `persistence.upper_bound()` will flow through `Self::update`, so our
172+
// memory index contains all revisions `>=
173+
// persistence.upper_bound().succ()?`.
174+
let memory_index = MemorySearchIndex::new(WriteTimestamp::Committed(
175+
upper_bound.succ()?,
176+
));
177+
SearchIndex::Backfilling { memory_index }
178+
},
179+
SearchIndexState::Backfilled(SearchIndexSnapshot {
180+
index,
181+
ts: disk_ts,
182+
version,
183+
})
184+
| SearchIndexState::SnapshottedAt(SearchIndexSnapshot {
185+
index,
186+
ts: disk_ts,
187+
version,
188+
}) => {
189+
let current_index_ts =
190+
max(disk_ts, fast_forward_ts.unwrap_or_default());
191+
oldest_index_ts = min(oldest_index_ts, current_index_ts);
192+
let memory_index =
193+
MemorySearchIndex::new(WriteTimestamp::Committed(disk_ts.succ()?));
194+
let snapshot = SnapshotInfo {
195+
disk_index: index,
196+
disk_index_ts: current_index_ts,
197+
disk_index_version: version,
198+
memory_index,
199+
};
200+
if is_enabled {
201+
SearchIndex::Ready(snapshot)
202+
} else {
203+
SearchIndex::Backfilled(snapshot)
204+
}
205+
},
206+
};
207+
let tantivy_schema = TantivySearchIndexSchema::new(developer_config);
208+
let search_index_bootstrap_data = SearchIndexBootstrapData {
209+
index_id: index_id.internal_id(),
210+
search_index,
211+
tantivy_schema,
212+
};
213+
if let Some(search_indexes) =
214+
table_to_search_indexes.get_mut(index_metadata.name.table())
215+
{
216+
search_indexes.push(search_index_bootstrap_data);
217+
} else {
218+
table_to_search_indexes.insert(
219+
*index_metadata.name.table(),
220+
vec![search_index_bootstrap_data],
221+
);
222+
}
223+
},
224+
_ => continue,
225+
};
226+
}
227+
Ok(Self {
228+
table_to_search_indexes,
229+
table_to_vector_indexes,
230+
oldest_index_ts,
231+
})
232+
}
233+
115234
fn tables_with_indexes(&self) -> BTreeSet<TableId> {
116235
self.table_to_search_indexes
117236
.keys()
@@ -120,6 +239,55 @@ impl IndexesToBootstrap {
120239
.collect()
121240
}
122241

242+
async fn bootstrap(
243+
mut self,
244+
persistence: &RepeatablePersistence,
245+
) -> anyhow::Result<BootstrappedSearchAndVectorIndexes> {
246+
let _status = log_worker_starting("SearchAndVectorBootstrap");
247+
let timer = crate::metrics::bootstrap_timer();
248+
let upper_bound = persistence.upper_bound();
249+
let mut num_revisions = 0;
250+
let mut total_size = 0;
251+
252+
let range = TimestampRange::new((
253+
Bound::Excluded(self.oldest_index_ts),
254+
Bound::Included(*upper_bound),
255+
))?;
256+
let tables_with_indexes = self.tables_with_indexes();
257+
let revision_stream =
258+
stream_revision_pairs_for_indexes(&tables_with_indexes, persistence, range);
259+
futures::pin_mut!(revision_stream);
260+
261+
while let Some(revision_pair) = revision_stream.try_next().await? {
262+
num_revisions += 1;
263+
total_size += revision_pair.document().map(|d| d.size()).unwrap_or(0);
264+
if let Some(vector_indexes_to_update) = self
265+
.table_to_vector_indexes
266+
.get_mut(revision_pair.id.table())
267+
{
268+
for vector_index in vector_indexes_to_update {
269+
vector_index.update(&revision_pair)?;
270+
}
271+
}
272+
if let Some(search_indexes_to_update) = self
273+
.table_to_search_indexes
274+
.get_mut(revision_pair.id.table())
275+
{
276+
for search_index in search_indexes_to_update {
277+
search_index.update(&revision_pair)?;
278+
}
279+
}
280+
}
281+
282+
tracing::info!(
283+
"Loaded {num_revisions} revisions ({total_size} bytes) in {:?}.",
284+
timer.elapsed()
285+
);
286+
crate::metrics::finish_bootstrap(num_revisions, total_size, timer);
287+
288+
Ok(self.finish(persistence.version()))
289+
}
290+
123291
fn finish(self, persistence_version: PersistenceVersion) -> BootstrappedSearchAndVectorIndexes {
124292
let tables_with_indexes = self.tables_with_indexes();
125293
let search_index_manager = SearchIndexManager::new(
@@ -364,180 +532,11 @@ impl<RT: Runtime> SearchAndVectorIndexBootstrapWorker<RT> {
364532
.buffer_unordered(20)
365533
.try_collect::<Vec<_>>()
366534
.await?;
367-
let indexes_to_bootstrap = Self::indexes_to_bootstrap(
535+
let indexes_to_bootstrap = IndexesToBootstrap::create(
368536
self.persistence.upper_bound(),
369537
indexes_with_fast_forward_ts,
370538
)?;
371-
372-
Self::bootstrap_inner(&self.persistence, indexes_to_bootstrap).await
373-
}
374-
375-
fn indexes_to_bootstrap(
376-
upper_bound: RepeatableTimestamp,
377-
indexes_with_fast_forward_ts: Vec<(ParsedDocument<TabletIndexMetadata>, Option<Timestamp>)>,
378-
) -> anyhow::Result<IndexesToBootstrap> {
379-
let mut table_to_vector_indexes: BTreeMap<_, Vec<_>> = BTreeMap::new();
380-
let mut table_to_search_indexes: BTreeMap<_, Vec<_>> = BTreeMap::new();
381-
// We keep track of latest ts we can bootstrap from for each vector index.
382-
let mut oldest_index_ts = *upper_bound;
383-
384-
for (index_doc, fast_forward_ts) in indexes_with_fast_forward_ts {
385-
let is_enabled = index_doc.config.is_enabled();
386-
let (index_id, index_metadata) = index_doc.into_id_and_value();
387-
match index_metadata.config {
388-
IndexConfig::Vector {
389-
on_disk_state,
390-
ref developer_config,
391-
..
392-
} => {
393-
let qdrant_schema = QdrantSchema::new(developer_config);
394-
let ts = match on_disk_state {
395-
VectorIndexState::Backfilled(ref snapshot_info)
396-
| VectorIndexState::SnapshottedAt(ref snapshot_info) => {
397-
// Use fast forward ts instead of snapshot ts.
398-
let current_index_ts =
399-
max(fast_forward_ts.unwrap_or_default(), snapshot_info.ts);
400-
oldest_index_ts = min(oldest_index_ts, current_index_ts);
401-
snapshot_info.ts
402-
},
403-
VectorIndexState::Backfilling(_) => upper_bound.succ()?,
404-
};
405-
let vector_index_bootstrap_data = VectorIndexBootstrapData {
406-
index_id: index_id.internal_id(),
407-
on_disk_state,
408-
memory_index: MemoryVectorIndex::new(WriteTimestamp::Committed(ts.succ()?)),
409-
qdrant_schema,
410-
};
411-
if let Some(vector_indexes) =
412-
table_to_vector_indexes.get_mut(index_metadata.name.table())
413-
{
414-
vector_indexes.push(vector_index_bootstrap_data);
415-
} else {
416-
table_to_vector_indexes.insert(
417-
*index_metadata.name.table(),
418-
vec![vector_index_bootstrap_data],
419-
);
420-
}
421-
},
422-
IndexConfig::Search {
423-
ref developer_config,
424-
on_disk_state,
425-
} => {
426-
let search_index = match on_disk_state {
427-
SearchIndexState::Backfilling => {
428-
// We'll start a new memory search index starting at the next commit
429-
// after our persistence upper bound. After
430-
// bootstrapping, all commits after
431-
// `persistence.upper_bound()` will flow through `Self::update`, so our
432-
// memory index contains all revisions `>=
433-
// persistence.upper_bound().succ()?`.
434-
let memory_index = MemorySearchIndex::new(WriteTimestamp::Committed(
435-
upper_bound.succ()?,
436-
));
437-
SearchIndex::Backfilling { memory_index }
438-
},
439-
SearchIndexState::Backfilled(SearchIndexSnapshot {
440-
index,
441-
ts: disk_ts,
442-
version,
443-
})
444-
| SearchIndexState::SnapshottedAt(SearchIndexSnapshot {
445-
index,
446-
ts: disk_ts,
447-
version,
448-
}) => {
449-
let current_index_ts =
450-
max(disk_ts, fast_forward_ts.unwrap_or_default());
451-
oldest_index_ts = min(oldest_index_ts, current_index_ts);
452-
let memory_index =
453-
MemorySearchIndex::new(WriteTimestamp::Committed(disk_ts.succ()?));
454-
let snapshot = SnapshotInfo {
455-
disk_index: index,
456-
disk_index_ts: current_index_ts,
457-
disk_index_version: version,
458-
memory_index,
459-
};
460-
if is_enabled {
461-
SearchIndex::Ready(snapshot)
462-
} else {
463-
SearchIndex::Backfilled(snapshot)
464-
}
465-
},
466-
};
467-
let tantivy_schema = TantivySearchIndexSchema::new(developer_config);
468-
let search_index_bootstrap_data = SearchIndexBootstrapData {
469-
index_id: index_id.internal_id(),
470-
search_index,
471-
tantivy_schema,
472-
};
473-
if let Some(search_indexes) =
474-
table_to_search_indexes.get_mut(index_metadata.name.table())
475-
{
476-
search_indexes.push(search_index_bootstrap_data);
477-
} else {
478-
table_to_search_indexes.insert(
479-
*index_metadata.name.table(),
480-
vec![search_index_bootstrap_data],
481-
);
482-
}
483-
},
484-
_ => continue,
485-
};
486-
}
487-
Ok(IndexesToBootstrap {
488-
table_to_search_indexes,
489-
table_to_vector_indexes,
490-
oldest_index_ts,
491-
})
492-
}
493-
494-
async fn bootstrap_inner(
495-
persistence: &RepeatablePersistence,
496-
mut indexes_to_bootstrap: IndexesToBootstrap,
497-
) -> anyhow::Result<BootstrappedSearchAndVectorIndexes> {
498-
let _status = log_worker_starting("SearchAndVectorBootstrap");
499-
let timer = crate::metrics::bootstrap_timer();
500-
let upper_bound = persistence.upper_bound();
501-
let mut num_revisions = 0;
502-
let mut total_size = 0;
503-
504-
let range = TimestampRange::new((
505-
Bound::Excluded(indexes_to_bootstrap.oldest_index_ts),
506-
Bound::Included(*upper_bound),
507-
))?;
508-
let tables_with_indexes = indexes_to_bootstrap.tables_with_indexes();
509-
let revision_stream =
510-
stream_revision_pairs_for_indexes(&tables_with_indexes, persistence, range);
511-
futures::pin_mut!(revision_stream);
512-
513-
while let Some(revision_pair) = revision_stream.try_next().await? {
514-
num_revisions += 1;
515-
total_size += revision_pair.document().map(|d| d.size()).unwrap_or(0);
516-
if let Some(vector_indexes_to_update) = indexes_to_bootstrap
517-
.table_to_vector_indexes
518-
.get_mut(revision_pair.id.table())
519-
{
520-
for vector_index in vector_indexes_to_update {
521-
vector_index.update(&revision_pair)?;
522-
}
523-
}
524-
if let Some(search_indexes_to_update) = indexes_to_bootstrap
525-
.table_to_search_indexes
526-
.get_mut(revision_pair.id.table())
527-
{
528-
for search_index in search_indexes_to_update {
529-
search_index.update(&revision_pair)?;
530-
}
531-
}
532-
}
533-
534-
tracing::info!(
535-
"Loaded {num_revisions} revisions ({total_size} bytes) in {:?}.",
536-
timer.elapsed()
537-
);
538-
crate::metrics::finish_bootstrap(num_revisions, total_size, timer);
539-
540-
Ok(indexes_to_bootstrap.finish(persistence.version()))
539+
indexes_to_bootstrap.bootstrap(&self.persistence).await
541540
}
542541
}
543542

0 commit comments

Comments
 (0)