Skip to content

Commit ad0dde7

Browse files
emmaling27Convex, Inc.
authored andcommitted
Make LiveFlush and Backfill variants of text and vector search flushers so that backfill doesn't block flushes (#39341)
Writes start failing if the search index flusher can't keep up, which happens sometimes when the search index flusher is blocked on building backfill segments that can take a long time. This PR separates the vector and text flushers into two flushers each - one for live flushes of the in-memory index and one for backfills. This should make it safer to backfill vector indexes on large tables. GitOrigin-RevId: 3abf4d4a477d77dbf0c7e2f17dd17ea9817a7277
1 parent 4ad31e1 commit ad0dde7

File tree

12 files changed

+246
-68
lines changed

12 files changed

+246
-68
lines changed

crates/database/src/index_workers/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,28 @@ pub(crate) enum BuildReason {
3131
VersionMismatch,
3232
}
3333

34+
/// There are two types of search index flushers: live flushers and backfill
35+
/// flushers. Live flushers are responsible for flushing the in-memory search
36+
/// index contents when they get too large or old. Backfill flushers are
37+
/// responsible for backfilling newly added indexes or indexes on the wrong
38+
/// version. We separate the flushers so that building backfill segments (which
39+
/// can take a long time) does not block the in-memory index flushes. In-memory
40+
/// flushes need to happen quickly or else writes fail when the in-memory
41+
/// index gets too large.
42+
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
43+
pub enum FlusherType {
44+
LiveFlush,
45+
Backfill,
46+
}
47+
impl From<BuildReason> for FlusherType {
48+
fn from(reason: BuildReason) -> Self {
49+
match reason {
50+
BuildReason::Backfilling | BuildReason::VersionMismatch => FlusherType::Backfill,
51+
BuildReason::TooOld | BuildReason::TooLarge => FlusherType::LiveFlush,
52+
}
53+
}
54+
}
55+
3456
impl BuildReason {
3557
pub fn read_max_pages_per_second(&self) -> NonZeroU32 {
3658
match self {

crates/database/src/index_workers/search_flusher.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ use crate::{
7474
SearchIndexWriteResult,
7575
},
7676
BuildReason,
77+
FlusherType,
7778
MultiSegmentBackfillResult,
7879
},
7980
metrics::{
@@ -95,6 +96,7 @@ pub struct SearchFlusher<RT: Runtime, T: SearchIndex> {
9596
params: Params<RT, T>,
9697
writer: SearchIndexMetadataWriter<RT, T>,
9798
_config: PhantomData<T>,
99+
flusher_type: FlusherType,
98100
}
99101

100102
impl<RT: Runtime, T: SearchIndex> Deref for SearchFlusher<RT, T> {
@@ -144,6 +146,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
144146
limits: SearchIndexLimits,
145147
writer: SearchIndexMetadataWriter<RT, T>,
146148
build_args: T::BuildIndexArgs,
149+
flusher_type: FlusherType,
147150
) -> Self {
148151
Self {
149152
params: Params {
@@ -156,6 +159,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
156159
},
157160
writer,
158161
_config: PhantomData,
162+
flusher_type,
159163
}
160164
}
161165

@@ -306,6 +310,14 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
306310
},
307311
};
308312
if let Some(build_reason) = needs_backfill {
313+
if FlusherType::from(build_reason) != self.flusher_type {
314+
tracing::info!(
315+
"Skipping build for index {name} with id {index_id} and {build_reason:?} \
316+
because it is a {:?} flusher",
317+
self.flusher_type
318+
);
319+
continue;
320+
}
309321
tracing::info!(
310322
"Queueing {} index for rebuild: {name:?} ({build_reason:?})",
311323
self.index_type_name()

crates/database/src/index_workers/search_worker.rs

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::{
4343
search_compactor::CompactionConfig,
4444
timeout_with_jitter,
4545
writer::SearchIndexMetadataWriter,
46+
FlusherType,
4647
},
4748
text_index_worker::{
4849
compactor::{
@@ -122,8 +123,8 @@ impl SearchIndexWorkers {
122123
segment_term_metadata_fetcher: segment_term_metadata_fetcher.clone(),
123124
},
124125
);
125-
let vector_flush = retry_loop_expect_occs_and_overloaded(
126-
"VectorFlusher",
126+
let vector_live_flush = retry_loop_expect_occs_and_overloaded(
127+
"VectorLiveFlusher",
127128
runtime.clone(),
128129
database.clone(),
129130
// Wait a bit since vector needs time to bootstrap. Makes startup logs a bit cleaner.
@@ -136,6 +137,24 @@ impl SearchIndexWorkers {
136137
reader.clone(),
137138
search_storage.clone(),
138139
vector_index_metadata_writer.clone(),
140+
FlusherType::LiveFlush,
141+
)),
142+
);
143+
let vector_backfill_flush = retry_loop_expect_occs_and_overloaded(
144+
"VectorBackfillFlusher",
145+
runtime.clone(),
146+
database.clone(),
147+
// Wait a bit since vector needs time to bootstrap. Makes startup logs a bit cleaner.
148+
Duration::from_secs(5),
149+
*INDEX_WORKERS_INITIAL_BACKOFF,
150+
*SEARCH_INDEX_FLUSHER_MAX_BACKOFF,
151+
SearchIndexWorker::VectorFlusher(new_vector_flusher(
152+
runtime.clone(),
153+
database.clone(),
154+
reader.clone(),
155+
search_storage.clone(),
156+
vector_index_metadata_writer.clone(),
157+
FlusherType::Backfill,
139158
)),
140159
);
141160
let vector_compact = retry_loop_expect_occs_and_overloaded(
@@ -153,22 +172,41 @@ impl SearchIndexWorkers {
153172
vector_index_metadata_writer,
154173
)),
155174
);
156-
let text_flusher = SearchIndexWorker::TextFlusher(new_text_flusher(
175+
let text_live_flusher = SearchIndexWorker::TextFlusher(new_text_flusher(
176+
runtime.clone(),
177+
database.clone(),
178+
reader.clone(),
179+
search_storage.clone(),
180+
segment_term_metadata_fetcher.clone(),
181+
text_index_metadata_writer.clone(),
182+
FlusherType::LiveFlush,
183+
));
184+
let text_live_flush = retry_loop_expect_occs_and_overloaded(
185+
"TextLiveFlusher",
186+
runtime.clone(),
187+
database.clone(),
188+
Duration::ZERO,
189+
*INDEX_WORKERS_INITIAL_BACKOFF,
190+
*SEARCH_INDEX_FLUSHER_MAX_BACKOFF,
191+
text_live_flusher,
192+
);
193+
let text_backfill_flusher = SearchIndexWorker::TextFlusher(new_text_flusher(
157194
runtime.clone(),
158195
database.clone(),
159196
reader,
160197
search_storage.clone(),
161198
segment_term_metadata_fetcher,
162199
text_index_metadata_writer.clone(),
200+
FlusherType::Backfill,
163201
));
164-
let text_flush = retry_loop_expect_occs_and_overloaded(
165-
"SearchFlusher",
202+
let text_backfill_flush = retry_loop_expect_occs_and_overloaded(
203+
"TextBackfillFlusher",
166204
runtime.clone(),
167205
database.clone(),
168206
Duration::ZERO,
169207
*INDEX_WORKERS_INITIAL_BACKOFF,
170208
*SEARCH_INDEX_FLUSHER_MAX_BACKOFF,
171-
text_flusher,
209+
text_backfill_flusher,
172210
);
173211

174212
let text_compact = retry_loop_expect_occs_and_overloaded(
@@ -187,15 +225,20 @@ impl SearchIndexWorkers {
187225
)),
188226
);
189227

190-
let vector_flush_handle = runtime.spawn("vector_flush", vector_flush);
228+
let vector_backfill_flush_handle =
229+
runtime.spawn("vector_backfill_flush", vector_backfill_flush);
230+
let vector_live_flush_handle = runtime.spawn("vector_live_flush", vector_live_flush);
191231
let vector_compact_handle = runtime.spawn("vector_compact", vector_compact);
192-
let text_flush_handle = runtime.spawn("text_flush", text_flush);
232+
let text_live_flush_handle = runtime.spawn("text_live_flush", text_live_flush);
233+
let text_backfill_flush_handle = runtime.spawn("text_backfill_flush", text_backfill_flush);
193234
let text_compact_handle = runtime.spawn("text_compact", text_compact);
194235
Self {
195236
handles: vec![
196-
vector_flush_handle,
237+
vector_backfill_flush_handle,
238+
vector_live_flush_handle,
197239
vector_compact_handle,
198-
text_flush_handle,
240+
text_live_flush_handle,
241+
text_backfill_flush_handle,
199242
text_compact_handle,
200243
],
201244
}

crates/database/src/tests/randomized_search_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ use crate::{
9898
index_workers::{
9999
search_compactor::CompactionConfig,
100100
writer::SearchIndexMetadataWriter,
101+
FlusherType,
101102
},
102103
query::{
103104
PaginationOptions,
@@ -261,6 +262,7 @@ impl Scenario {
261262
self.search_storage.clone(),
262263
self.build_index_args.segment_term_metadata_fetcher.clone(),
263264
writer,
265+
FlusherType::Backfill,
264266
);
265267
flusher.step().await?;
266268

crates/database/src/tests/text_test_utils.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ use value::{
5656
};
5757

5858
use crate::{
59-
index_workers::search_compactor::CompactionConfig,
59+
index_workers::{
60+
search_compactor::CompactionConfig,
61+
FlusherType,
62+
},
6063
test_helpers::{
6164
DbFixtures,
6265
DbFixturesArgs,
@@ -151,16 +154,31 @@ impl TextFixtures {
151154
self.storage.clone(),
152155
self.segment_term_metadata_fetcher.clone(),
153156
self.writer.clone(),
157+
FlusherType::Backfill,
154158
)
155159
}
156160

157-
pub fn new_search_flusher(&self) -> TextIndexFlusher<TestRuntime> {
161+
pub fn new_backfill_text_flusher(&self) -> TextIndexFlusher<TestRuntime> {
158162
self.new_search_flusher_builder().set_soft_limit(0).build()
159163
}
160164

161-
pub fn new_search_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> {
165+
pub fn new_live_text_flusher(&self) -> TextIndexFlusher<TestRuntime> {
166+
self.new_search_flusher_builder()
167+
.set_soft_limit(0)
168+
.set_live_flush()
169+
.build()
170+
}
171+
172+
pub fn new_backfill_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> {
173+
self.new_search_flusher_builder()
174+
.set_soft_limit(2048)
175+
.build()
176+
}
177+
178+
pub fn new_live_flusher_with_soft_limit(&self) -> TextIndexFlusher<TestRuntime> {
162179
self.new_search_flusher_builder()
163180
.set_soft_limit(2048)
181+
.set_live_flush()
164182
.build()
165183
}
166184

@@ -354,7 +372,7 @@ impl TextFixtures {
354372
pause: PauseController,
355373
label: &'static str,
356374
) -> anyhow::Result<()> {
357-
let flusher = self.new_search_flusher();
375+
let flusher = self.new_live_text_flusher();
358376
let hold_guard = pause.hold(label);
359377
let flush = flusher.step();
360378
let compactor = self.new_compactor();

crates/database/src/tests/usage_tracking.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ async fn vectors_in_segment_count_as_usage(rt: TestRuntime) -> anyhow::Result<()
205205
)
206206
.await;
207207

208-
fixtures.new_index_flusher()?.step().await?;
208+
fixtures.new_live_index_flusher()?.step().await?;
209209

210210
let storage = fixtures
211211
.db
@@ -231,7 +231,7 @@ async fn vector_query_counts_bandwidth(rt: TestRuntime) -> anyhow::Result<()> {
231231
.await?;
232232
add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?;
233233
fixtures.db.commit(tx).await?;
234-
fixtures.new_index_flusher()?.step().await?;
234+
fixtures.new_backfill_index_flusher()?.step().await?;
235235

236236
let (_, usage_stats) = fixtures
237237
.db

crates/database/src/tests/vector_test_utils.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ use crate::{
8585
index_workers::{
8686
search_compactor::CompactionConfig,
8787
search_flusher::FLUSH_RUNNING_LABEL,
88+
FlusherType,
8889
},
8990
test_helpers::DbFixturesArgs,
9091
vector_index_worker::{
@@ -258,8 +259,18 @@ impl VectorFixtures {
258259
))
259260
}
260261

261-
pub fn new_index_flusher(&self) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
262-
self.new_index_flusher_with_full_scan_threshold(*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB)
262+
pub fn new_backfill_index_flusher(&self) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
263+
self.new_index_flusher_with_full_scan_threshold(
264+
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
265+
FlusherType::Backfill,
266+
)
267+
}
268+
269+
pub fn new_live_index_flusher(&self) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
270+
self.new_index_flusher_with_full_scan_threshold(
271+
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
272+
FlusherType::LiveFlush,
273+
)
263274
}
264275

265276
pub async fn run_compaction_during_flush(&self, pause: PauseController) -> anyhow::Result<()> {
@@ -272,6 +283,7 @@ impl VectorFixtures {
272283
0,
273284
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
274285
8,
286+
FlusherType::LiveFlush,
275287
);
276288
let hold_guard = pause.hold(FLUSH_RUNNING_LABEL);
277289
let flush = flusher.step();
@@ -290,6 +302,7 @@ impl VectorFixtures {
290302
pub fn new_index_flusher_with_full_scan_threshold(
291303
&self,
292304
full_scan_threshold_kb: usize,
305+
flusher_type: FlusherType,
293306
) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
294307
Ok(new_vector_flusher_for_tests(
295308
self.rt.clone(),
@@ -300,12 +313,14 @@ impl VectorFixtures {
300313
0,
301314
full_scan_threshold_kb,
302315
*VECTOR_INDEX_SIZE_SOFT_LIMIT,
316+
flusher_type,
303317
))
304318
}
305319

306320
pub fn new_index_flusher_with_incremental_part_threshold(
307321
&self,
308322
incremental_part_threshold: usize,
323+
// flusher_type: FlusherType,
309324
) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
310325
Ok(new_vector_flusher_for_tests(
311326
self.rt.clone(),
@@ -316,6 +331,7 @@ impl VectorFixtures {
316331
0,
317332
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
318333
incremental_part_threshold,
334+
FlusherType::Backfill,
319335
))
320336
}
321337

crates/database/src/tests/vector_tests.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use vector::{
7070
};
7171

7272
use crate::{
73+
index_workers::FlusherType,
7374
test_helpers::{
7475
vector_utils::{
7576
random_vector,
@@ -776,6 +777,7 @@ async fn test_index_backfill_is_incremental(rt: TestRuntime) -> anyhow::Result<(
776777
*VECTOR_INDEX_SIZE_SOFT_LIMIT,
777778
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
778779
incremental_index_size,
780+
FlusherType::Backfill,
779781
);
780782

781783
let mut backfill_ts = None;
@@ -860,6 +862,7 @@ async fn test_incremental_backfill_with_compaction(rt: TestRuntime) -> anyhow::R
860862
*VECTOR_INDEX_SIZE_SOFT_LIMIT,
861863
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
862864
incremental_index_size,
865+
FlusherType::Backfill,
863866
);
864867

865868
for _ in 0..num_parts {
@@ -1000,7 +1003,8 @@ async fn test_multi_segment_search_obeys_sorted_order(rt: TestRuntime) -> anyhow
10001003
.add_document_vec_array(index_name.table(), vector)
10011004
.await?;
10021005
ids.push(id);
1003-
let worker = fixtures.new_index_flusher_with_full_scan_threshold(0)?;
1006+
let worker =
1007+
fixtures.new_index_flusher_with_full_scan_threshold(0, FlusherType::LiveFlush)?;
10041008
let (metrics, _) = worker.step().await?;
10051009
assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1});
10061010
}

crates/database/src/text_index_worker/fast_forward.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub mod tests {
131131
} = fixtures
132132
.insert_backfilling_text_index_with_document()
133133
.await?;
134-
let worker = fixtures.new_search_flusher();
134+
let worker = fixtures.new_backfill_text_flusher();
135135

136136
// Backfill the index
137137
let (metrics, _) = worker.step().await?;

0 commit comments

Comments
 (0)