Skip to content

Commit 8678c49

Browse files
sjuddConvex, Inc.
authored andcommitted
Generalize VectorIndexCompactor (#26367)
We'll use this same basic pattern for text search to start with. Still more to extract which will be done in follow-up PRs GitOrigin-RevId: 030f97251a6e064f51f4d8516994f9fca90fc84a
1 parent 575b6cb commit 8678c49

File tree

10 files changed

+230
-134
lines changed

10 files changed

+230
-134
lines changed

crates/database/src/index_workers/index_meta.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use common::{
2626
ObjectKey,
2727
},
2828
};
29-
use search::metrics::SearchType;
29+
use search::{
30+
metrics::SearchType,
31+
Searcher,
32+
};
3033
use storage::Storage;
3134
use sync_types::Timestamp;
3235
use value::{
@@ -50,9 +53,9 @@ pub trait PreviousSegmentsType: Send {
5053
pub trait SegmentType<T: SearchIndex> {
5154
fn id(&self) -> &str;
5255

53-
fn num_deleted(&self) -> u64;
54-
5556
fn statistics(&self) -> anyhow::Result<T::Statistics>;
57+
58+
fn total_size_bytes(&self, config: &T::DeveloperConfig) -> anyhow::Result<u64>;
5659
}
5760

5861
#[async_trait]
@@ -128,6 +131,13 @@ pub trait SearchIndex: Clone + Debug {
128131
storage: Arc<dyn Storage>,
129132
segments: Self::PreviousSegments,
130133
) -> anyhow::Result<Vec<Self::Segment>>;
134+
135+
async fn execute_compaction(
136+
searcher: Arc<dyn Searcher>,
137+
search_storage: Arc<dyn Storage>,
138+
config: &Self::DeveloperConfig,
139+
segments: &Vec<&Self::Segment>,
140+
) -> anyhow::Result<Self::Segment>;
131141
}
132142

133143
pub trait SegmentStatistics: Default + Debug {
@@ -136,6 +146,10 @@ pub trait SegmentStatistics: Default + Debug {
136146
fn num_documents(&self) -> u64;
137147

138148
fn num_non_deleted_documents(&self) -> u64;
149+
150+
fn num_deleted_documents(&self) -> u64 {
151+
self.num_documents() - self.num_non_deleted_documents()
152+
}
139153
}
140154
pub struct SearchIndexConfig<T: SearchIndex> {
141155
pub developer_config: T::DeveloperConfig,

crates/database/src/index_workers/search_worker.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,21 @@ use crate::{
4040
TextIndexFlusher2,
4141
},
4242
vector_index_worker::{
43-
compactor::CompactionConfig,
43+
compactor::{
44+
CompactionConfig,
45+
VectorIndexCompactor2,
46+
},
4447
flusher::new_vector_flusher,
4548
},
4649
Database,
4750
TextIndexFlusher,
48-
VectorIndexCompactor,
4951
VectorIndexFlusher,
5052
};
5153

5254
/// Builds and compacts text/vector search indexes.
5355
pub enum SearchIndexWorker<RT: Runtime> {
5456
VectorFlusher(VectorIndexFlusher<RT>),
55-
VectorCompactor(VectorIndexCompactor<RT>),
57+
VectorCompactor(VectorIndexCompactor2<RT>),
5658
TextFlusher(TextIndexFlusher<RT>),
5759
TextFlusher2(TextIndexFlusher2<RT>),
5860
}
@@ -103,7 +105,7 @@ impl<RT: Runtime> SearchIndexWorker<RT> {
103105
runtime.clone(),
104106
database.clone(),
105107
Duration::ZERO,
106-
SearchIndexWorker::VectorCompactor(VectorIndexCompactor::new(
108+
SearchIndexWorker::VectorCompactor(VectorIndexCompactor2::new(
107109
database.clone(),
108110
searcher,
109111
search_storage.clone(),

crates/database/src/index_workers/writer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::{
4444
SearchIndex,
4545
SearchOnDiskState,
4646
SearchSnapshot,
47+
SegmentStatistics,
4748
SegmentType,
4849
SnapshotData,
4950
},
@@ -244,7 +245,9 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
244245
// which creates a new segment with a new id. So if the number of deletes has
245246
// changed, it's due to an increase from a conflicting write by the
246247
// flusher.
247-
if current_version.num_deleted() != original_segment.num_deleted() {
248+
if current_version.statistics()?.num_deleted_documents()
249+
!= original_segment.statistics()?.num_deleted_documents()
250+
{
248251
return Ok(true);
249252
}
250253
}

crates/database/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#![feature(trait_upcasting)]
1717
#![feature(impl_trait_in_assoc_type)]
1818
#![feature(cow_is_borrowed)]
19+
#![feature(try_find)]
1920

2021
mod bootstrap_model;
2122
mod committer;

crates/database/src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ pub mod vector {
10481048
VECTOR_COMPACTION_COMPACTED_SEGMENT_NUM_VECTORS_TOTAL,
10491049
"Size of the newly generated compacted segment",
10501050
);
1051-
pub fn log_vector_compaction_compacted_segment_num_vectors_total(total_vectors: u32) {
1051+
pub fn log_vector_compaction_compacted_segment_num_vectors_total(total_vectors: u64) {
10521052
log_distribution(
10531053
&VECTOR_COMPACTION_COMPACTED_SEGMENT_NUM_VECTORS_TOTAL,
10541054
total_vectors as f64,

crates/database/src/tests/vector_test_utils.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ use crate::{
9090
index_workers::search_flusher::FLUSH_RUNNING_LABEL,
9191
test_helpers::DbFixturesArgs,
9292
vector_index_worker::{
93-
compactor::CompactionConfig,
93+
compactor::{
94+
CompactionConfig,
95+
VectorIndexCompactor2,
96+
},
9497
flusher::{
9598
backfill_vector_indexes,
9699
new_vector_flusher_for_tests,
@@ -102,7 +105,6 @@ use crate::{
102105
TestFacingModel,
103106
Transaction,
104107
UserFacingModel,
105-
VectorIndexCompactor,
106108
};
107109

108110
pub struct VectorFixtures {
@@ -219,15 +221,15 @@ impl VectorFixtures {
219221
Ok(result)
220222
}
221223

222-
pub async fn new_compactor(&self) -> anyhow::Result<VectorIndexCompactor<TestRuntime>> {
224+
pub async fn new_compactor(&self) -> anyhow::Result<VectorIndexCompactor2<TestRuntime>> {
223225
self.new_compactor_with_searchlight(self.searcher.clone())
224226
.await
225227
}
226228

227229
pub async fn new_compactor_delete_on_compact(
228230
&self,
229231
id_to_delete: ResolvedDocumentId,
230-
) -> anyhow::Result<VectorIndexCompactor<TestRuntime>> {
232+
) -> anyhow::Result<VectorIndexCompactor2<TestRuntime>> {
231233
let searcher = DeleteOnCompactSearchlight {
232234
rt: self.rt.clone(),
233235
db: self.db.clone(),
@@ -244,8 +246,8 @@ impl VectorFixtures {
244246
async fn new_compactor_with_searchlight(
245247
&self,
246248
searcher: Arc<dyn Searcher>,
247-
) -> anyhow::Result<VectorIndexCompactor<TestRuntime>> {
248-
Ok(VectorIndexCompactor::new_for_tests(
249+
) -> anyhow::Result<VectorIndexCompactor2<TestRuntime>> {
250+
Ok(VectorIndexCompactor2::new_for_tests(
249251
self.rt.clone(),
250252
self.db.clone(),
251253
self.storage.clone(),

crates/database/src/tests/vector_tests.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,17 @@ use crate::{
8686
IndexData,
8787
VectorFixtures,
8888
},
89-
vector_index_worker::flusher::{
90-
backfill_vector_indexes,
91-
new_vector_flusher_for_tests,
89+
vector_index_worker::{
90+
compactor::VectorIndexCompactor2,
91+
flusher::{
92+
backfill_vector_indexes,
93+
new_vector_flusher_for_tests,
94+
},
9295
},
9396
Database,
9497
IndexModel,
9598
TableModel,
9699
UserFacingModel,
97-
VectorIndexCompactor,
98100
};
99101

100102
const TABLE_NAME: &str = "test";
@@ -206,7 +208,7 @@ impl<RT: Runtime> Scenario<RT> {
206208
}
207209

208210
async fn compact(&mut self) -> anyhow::Result<()> {
209-
VectorIndexCompactor::process_all_in_test(
211+
VectorIndexCompactor2::process_all_in_test(
210212
self.rt.clone(),
211213
self.database.clone(),
212214
self.search_storage.clone(),

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use search::{
4343
searcher::SegmentTermMetadataFetcher,
4444
NewTextSegment,
4545
PreviousTextSegments,
46+
Searcher,
4647
TantivySearchIndexSchema,
4748
UpdatableTextSegment,
4849
};
@@ -110,16 +111,19 @@ impl SegmentType<TextSearchIndex> for FragmentedTextSegment {
110111
&self.id
111112
}
112113

113-
fn num_deleted(&self) -> u64 {
114-
self.num_deleted_documents
115-
}
116-
117114
fn statistics(&self) -> anyhow::Result<TextStatistics> {
118115
Ok(TextStatistics {
119116
num_indexed_documents: self.num_indexed_documents,
120117
num_deleted_documents: self.num_deleted_documents,
121118
})
122119
}
120+
121+
fn total_size_bytes(
122+
&self,
123+
_config: &<TextSearchIndex as SearchIndex>::DeveloperConfig,
124+
) -> anyhow::Result<u64> {
125+
Ok(self.size_bytes_total)
126+
}
123127
}
124128
#[derive(Clone)]
125129
pub struct BuildTextIndexArgs {
@@ -262,6 +266,15 @@ impl SearchIndex for TextSearchIndex {
262266
fn search_type() -> SearchType {
263267
SearchType::Text
264268
}
269+
270+
async fn execute_compaction(
271+
_searcher: Arc<dyn Searcher>,
272+
_search_storage: Arc<dyn Storage>,
273+
_config: &Self::DeveloperConfig,
274+
_segments: &Vec<&Self::Segment>,
275+
) -> anyhow::Result<Self::Segment> {
276+
unimplemented!()
277+
}
265278
}
266279

267280
#[derive(Debug, Default)]

0 commit comments

Comments
 (0)