Skip to content

Commit ff1afdf

Browse files
committed
Update provider
1 parent 87d251f commit ff1afdf

File tree

4 files changed

+98
-21
lines changed

4 files changed

+98
-21
lines changed

rust/segment/src/spann_provider.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use async_trait::async_trait;
22
use chroma_blockstore::provider::BlockfileProvider;
33
use chroma_config::{registry::Registry, Configurable};
44
use chroma_error::ChromaError;
5+
#[cfg(feature = "usearch")]
6+
use chroma_index::usearch::USearchIndexProvider;
57
use chroma_index::{
68
config::SpannProviderConfig,
79
hnsw_provider::HnswIndexProvider,
@@ -10,17 +12,28 @@ use chroma_index::{
1012
use chroma_types::{Cmek, Collection, Segment};
1113

1214
use crate::distributed_spann::{SpannSegmentWriter, SpannSegmentWriterError};
15+
#[cfg(feature = "usearch")]
16+
use crate::quantized_spann::{QuantizedSpannSegmentError, QuantizedSpannSegmentWriter};
1317

14-
#[derive(Debug, Clone)]
18+
#[derive(Clone)]
1519
pub struct SpannProvider {
16-
pub hnsw_provider: HnswIndexProvider,
20+
pub adaptive_search_nprobe: bool,
1721
pub blockfile_provider: BlockfileProvider,
1822
pub garbage_collection_context: GarbageCollectionContext,
23+
pub hnsw_provider: HnswIndexProvider,
1924
pub metrics: SpannMetrics,
2025
pub pl_block_size: usize,
21-
pub adaptive_search_nprobe: bool,
26+
#[cfg(feature = "usearch")]
27+
pub usearch_provider: USearchIndexProvider,
2228
}
2329

30+
impl std::fmt::Debug for SpannProvider {
31+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32+
f.debug_struct("SpannProvider").finish()
33+
}
34+
}
35+
36+
#[cfg(not(feature = "usearch"))]
2437
#[async_trait]
2538
impl Configurable<(HnswIndexProvider, BlockfileProvider, SpannProviderConfig)> for SpannProvider {
2639
async fn try_from_config(
@@ -36,12 +49,51 @@ impl Configurable<(HnswIndexProvider, BlockfileProvider, SpannProviderConfig)> f
3649
)
3750
.await?;
3851
Ok(SpannProvider {
39-
hnsw_provider: config.0.clone(),
52+
adaptive_search_nprobe: config.2.adaptive_search_nprobe,
4053
blockfile_provider: config.1.clone(),
4154
garbage_collection_context,
55+
hnsw_provider: config.0.clone(),
4256
metrics: SpannMetrics::default(),
4357
pl_block_size: config.2.pl_block_size,
58+
})
59+
}
60+
}
61+
62+
#[cfg(feature = "usearch")]
63+
#[async_trait]
64+
impl
65+
Configurable<(
66+
HnswIndexProvider,
67+
BlockfileProvider,
68+
SpannProviderConfig,
69+
USearchIndexProvider,
70+
)> for SpannProvider
71+
{
72+
async fn try_from_config(
73+
config: &(
74+
HnswIndexProvider,
75+
BlockfileProvider,
76+
SpannProviderConfig,
77+
USearchIndexProvider,
78+
),
79+
registry: &Registry,
80+
) -> Result<Self, Box<dyn ChromaError>> {
81+
let garbage_collection_context = GarbageCollectionContext::try_from_config(
82+
&(
83+
config.2.pl_garbage_collection.clone(),
84+
config.2.hnsw_garbage_collection.clone(),
85+
),
86+
registry,
87+
)
88+
.await?;
89+
Ok(SpannProvider {
4490
adaptive_search_nprobe: config.2.adaptive_search_nprobe,
91+
blockfile_provider: config.1.clone(),
92+
garbage_collection_context,
93+
hnsw_provider: config.0.clone(),
94+
metrics: SpannMetrics::default(),
95+
pl_block_size: config.2.pl_block_size,
96+
usearch_provider: config.3.clone(),
4597
})
4698
}
4799
}
@@ -67,4 +119,22 @@ impl SpannProvider {
67119
)
68120
.await
69121
}
122+
123+
#[cfg(feature = "usearch")]
124+
pub async fn write_quantized_usearch(
125+
&self,
126+
collection: &Collection,
127+
vector_segment: &Segment,
128+
record_segment: &Segment,
129+
) -> Result<QuantizedSpannSegmentWriter, QuantizedSpannSegmentError> {
130+
QuantizedSpannSegmentWriter::from_segment(
131+
self.pl_block_size,
132+
collection,
133+
vector_segment,
134+
record_segment,
135+
&self.blockfile_provider,
136+
&self.usearch_provider,
137+
)
138+
.await
139+
}
70140
}

rust/segment/src/test.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,35 @@ impl TestDistributedSegment {
5757
.await
5858
.expect("Expected to construct gc context for spann");
5959

60+
#[allow(unused_mut)]
61+
let mut temp_dirs = vec![blockfile_dir, hnsw_dir];
62+
63+
#[cfg(feature = "usearch")]
64+
let (usearch_dir, usearch_provider) = {
65+
let (dir, storage) = chroma_storage::test_storage();
66+
let provider = chroma_index::usearch::USearchIndexProvider::new(
67+
storage,
68+
chroma_cache::new_non_persistent_cache_for_test(),
69+
);
70+
(dir, provider)
71+
};
72+
73+
#[cfg(feature = "usearch")]
74+
temp_dirs.push(usearch_dir);
75+
6076
Self {
61-
temp_dirs: vec![blockfile_dir, hnsw_dir],
77+
temp_dirs,
6278
blockfile_provider: blockfile_provider.clone(),
6379
hnsw_provider: hnsw_provider.clone(),
6480
spann_provider: SpannProvider {
65-
hnsw_provider,
81+
adaptive_search_nprobe: true,
6682
blockfile_provider,
6783
garbage_collection_context,
84+
hnsw_provider,
6885
metrics: SpannMetrics::default(),
6986
pl_block_size: 5 * 1024 * 1024,
70-
adaptive_search_nprobe: true,
87+
#[cfg(feature = "usearch")]
88+
usearch_provider,
7189
},
7290
collection,
7391
metadata_segment: test_segment(collection_uuid, SegmentScope::METADATA),

rust/worker/src/execution/orchestration/attached_function_orchestrator.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -728,10 +728,7 @@ impl Handler<TaskResult<CollectionAndSegments, GetCollectionAndSegmentsError>>
728728
)
729729
.await
730730
{
731-
Some(writer) => (
732-
Some(writer.index_uuid()),
733-
VectorSegmentWriter::Hnsw(writer),
734-
),
731+
Some(writer) => (Some(writer.index_uuid()), VectorSegmentWriter::Hnsw(writer)),
735732
None => return,
736733
},
737734
};

rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -578,21 +578,13 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
578578
.ok_or_terminate(
579579
self.context
580580
.spann_provider
581-
.write_quantized_usearch(
582-
&collection,
583-
&vector_segment,
584-
&record_segment,
585-
)
581+
.write_quantized_usearch(&collection, &vector_segment, &record_segment)
586582
.await,
587583
ctx,
588584
)
589585
.await
590586
{
591-
Some(writer) => (
592-
None,
593-
VectorSegmentWriter::QuantizedSpann(writer),
594-
true,
595-
),
587+
Some(writer) => (None, VectorSegmentWriter::QuantizedSpann(writer), true),
596588
None => return,
597589
},
598590
SegmentType::Spann => match self

0 commit comments

Comments
 (0)