Skip to content

Commit 1ff1ea5

Browse files
emmaling27Convex, Inc.
authored andcommitted
Search race condition tests: flusher beats compactor (#41101)
Adds race condition test coverage for when the the search index flusher beats the compactor. Previously we only had coverage for the compactor beating the flusher (unless I somehow missed these tests elsewhere). I made a new trait to make it easier to write tests that cover both, but I feel like maybe there's a more elegant way to do this using macros. GitOrigin-RevId: 30e48ef87226b11c5c719ff35778d50b0d1483d7
1 parent 17e13a5 commit 1ff1ea5

File tree

3 files changed

+440
-233
lines changed

3 files changed

+440
-233
lines changed

crates/database/src/search_index_workers/search_compactor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ use crate::{
5151
Token,
5252
};
5353

54+
pub(crate) const COMPACTION_RUNNING_LABEL: &str = "compaction_running";
55+
5456
pub struct SearchIndexCompactor<RT: Runtime, T: SearchIndex> {
5557
database: Database<RT>,
5658
searcher: Arc<dyn Searcher>,
@@ -92,6 +94,9 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
9294
);
9395
}
9496

97+
let pause_client = self.database.runtime().pause_client();
98+
pause_client.wait(COMPACTION_RUNNING_LABEL).await;
99+
95100
for job in to_build {
96101
task::consume_budget().await;
97102

crates/database/src/tests/vector_test_utils.rs

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::sync::Arc;
1+
use std::sync::{
2+
Arc,
3+
LazyLock,
4+
};
25

36
use anyhow::Context;
47
use async_trait::async_trait;
@@ -21,7 +24,6 @@ use common::{
2124
MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
2225
VECTOR_INDEX_SIZE_SOFT_LIMIT,
2326
},
24-
pause::PauseController,
2527
persistence::PersistenceReader,
2628
runtime::Runtime,
2729
types::{
@@ -32,7 +34,6 @@ use common::{
3234
},
3335
};
3436
use events::testing::TestUsageEventLogger;
35-
use futures::try_join;
3637
use maplit::btreeset;
3738
use must_let::must_let;
3839
use pb::searchlight::FragmentedVectorSegmentPaths;
@@ -86,7 +87,6 @@ use crate::{
8687
bootstrap_model::index_backfills::IndexBackfillModel,
8788
search_index_workers::{
8889
search_compactor::CompactionConfig,
89-
search_flusher::FLUSH_RUNNING_LABEL,
9090
FlusherType,
9191
},
9292
test_helpers::DbFixturesArgs,
@@ -287,36 +287,6 @@ impl VectorFixtures {
287287
)
288288
}
289289

290-
pub async fn run_compaction_during_flush(
291-
&self,
292-
pause: PauseController,
293-
flusher_type: FlusherType,
294-
) -> anyhow::Result<()> {
295-
let flusher = new_vector_flusher_for_tests(
296-
self.rt.clone(),
297-
self.db.clone(),
298-
self.reader.clone(),
299-
self.storage.clone(),
300-
// Force indexes to always be built.
301-
0,
302-
*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
303-
8,
304-
flusher_type,
305-
);
306-
let hold_guard = pause.hold(FLUSH_RUNNING_LABEL);
307-
let flush = flusher.step();
308-
let compactor = self.new_compactor().await?;
309-
let compact_during_flush = async move {
310-
if let Some(pause_guard) = hold_guard.wait_for_blocked().await {
311-
compactor.step().await?;
312-
pause_guard.unpause();
313-
};
314-
Ok::<(), anyhow::Error>(())
315-
};
316-
try_join!(flush, compact_during_flush)?;
317-
Ok(())
318-
}
319-
320290
pub fn new_index_flusher_with_full_scan_threshold(
321291
&self,
322292
full_scan_threshold_kb: usize,
@@ -484,13 +454,19 @@ pub struct IndexData {
484454
pub metadata: IndexMetadata<TableName>,
485455
}
486456

457+
pub(crate) static VECTOR_INDEX_NAME: LazyLock<IndexName> = LazyLock::new(|| {
458+
IndexName::new(
459+
"table".parse().unwrap(),
460+
IndexDescriptor::new("vector_index").unwrap(),
461+
)
462+
.unwrap()
463+
});
464+
487465
fn new_backfilling_vector_index() -> anyhow::Result<IndexMetadata<TableName>> {
488-
let table_name: TableName = "table".parse()?;
489-
let index_name = IndexName::new(table_name, IndexDescriptor::new("vector_index")?)?;
490466
let vector_field: FieldPath = "vector".parse()?;
491467
let filter_field: FieldPath = "channel".parse()?;
492468
let metadata = IndexMetadata::new_backfilling_vector_index(
493-
index_name,
469+
VECTOR_INDEX_NAME.clone(),
494470
vector_field,
495471
(2u32).try_into()?,
496472
btreeset![filter_field],

0 commit comments

Comments
 (0)