Skip to content

Commit 87d251f

Browse files
committed
[ENH] Wire up quantized writer in compaction
1 parent e3957b5 commit 87d251f

File tree

6 files changed

+119
-21
lines changed

6 files changed

+119
-21
lines changed

rust/worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ chroma-error = { workspace = true }
5757
chroma-index = { workspace = true }
5858
chroma-log = { workspace = true }
5959
chroma-memberlist = { workspace = true }
60-
chroma-segment = { workspace = true }
60+
chroma-segment = { workspace = true, features = ["usearch"] }
6161
chroma-storage = { workspace = true }
6262
chroma-system = { workspace = true }
6363
chroma-sysdb = { workspace = true }

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use chroma_config::registry::Registry;
2020
use chroma_config::Configurable;
2121
use chroma_error::{ChromaError, ErrorCodes};
2222
use chroma_index::hnsw_provider::HnswIndexProvider;
23+
use chroma_index::usearch::USearchIndexProvider;
2324
use chroma_log::Log;
2425
use chroma_memberlist::memberlist_provider::Memberlist;
2526
use chroma_segment::spann_provider::SpannProvider;
@@ -510,11 +511,16 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
510511
)
511512
.await?;
512513

514+
let usearch_cache =
515+
chroma_cache::from_config(&config.hnsw_provider.hnsw_cache_config).await?;
516+
let usearch_provider = USearchIndexProvider::new(storage.clone(), usearch_cache);
517+
513518
let spann_provider = SpannProvider::try_from_config(
514519
&(
515520
hnsw_index_provider.clone(),
516521
blockfile_provider.clone(),
517522
config.spann_provider.clone(),
523+
usearch_provider,
518524
),
519525
registry,
520526
)
@@ -1033,13 +1039,18 @@ mod tests {
10331039
16,
10341040
false,
10351041
);
1042+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
1043+
storage.clone(),
1044+
new_non_persistent_cache_for_test(),
1045+
);
10361046
let spann_provider = SpannProvider {
1037-
hnsw_provider: hnsw_provider.clone(),
1047+
adaptive_search_nprobe: true,
10381048
blockfile_provider: blockfile_provider.clone(),
10391049
garbage_collection_context: gc_context,
1050+
hnsw_provider: hnsw_provider.clone(),
10401051
metrics: SpannMetrics::default(),
10411052
pl_block_size: 5 * 1024 * 1024,
1042-
adaptive_search_nprobe: true,
1053+
usearch_provider,
10431054
};
10441055
let system = System::new();
10451056
let mut manager = CompactionManager::new(

rust/worker/src/execution/orchestration/attached_function_orchestrator.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ pub enum AttachedFunctionOrchestratorError {
123123
RecordSegmentReader(#[from] RecordSegmentReaderCreationError),
124124
#[error("Error creating hnsw writer: {0}")]
125125
HnswSegment(#[from] DistributedHNSWSegmentFromSegmentError),
126+
#[error("Error creating quantized spann writer: {0}")]
127+
QuantizedSpannSegment(#[from] chroma_segment::quantized_spann::QuantizedSpannSegmentError),
126128
#[error("Error creating spann writer: {0}")]
127129
SpannSegment(#[from] SpannSegmentWriterError),
128130
}
@@ -149,6 +151,7 @@ impl ChromaError for AttachedFunctionOrchestratorError {
149151
AttachedFunctionOrchestratorError::RecordSegmentWriter(e) => e.code(),
150152
AttachedFunctionOrchestratorError::RecordSegmentReader(e) => e.code(),
151153
AttachedFunctionOrchestratorError::HnswSegment(e) => e.code(),
154+
AttachedFunctionOrchestratorError::QuantizedSpannSegment(e) => e.code(),
152155
AttachedFunctionOrchestratorError::SpannSegment(e) => e.code(),
153156
}
154157
}
@@ -176,6 +179,7 @@ impl ChromaError for AttachedFunctionOrchestratorError {
176179
AttachedFunctionOrchestratorError::RecordSegmentWriter(e) => e.should_trace_error(),
177180
AttachedFunctionOrchestratorError::RecordSegmentReader(e) => e.should_trace_error(),
178181
AttachedFunctionOrchestratorError::HnswSegment(e) => e.should_trace_error(),
182+
AttachedFunctionOrchestratorError::QuantizedSpannSegment(e) => e.should_trace_error(),
179183
AttachedFunctionOrchestratorError::SpannSegment(e) => e.should_trace_error(),
180184
}
181185
}
@@ -676,6 +680,23 @@ impl Handler<TaskResult<CollectionAndSegments, GetCollectionAndSegmentsError>>
676680
};
677681

678682
let (hnsw_index_uuid, vector_writer) = match message.vector_segment.r#type {
683+
SegmentType::QuantizedSpann => match self
684+
.ok_or_terminate(
685+
self.output_context
686+
.spann_provider
687+
.write_quantized_usearch(
688+
collection,
689+
&message.vector_segment,
690+
&message.record_segment,
691+
)
692+
.await,
693+
ctx,
694+
)
695+
.await
696+
{
697+
Some(writer) => (None, VectorSegmentWriter::QuantizedSpann(writer)),
698+
None => return,
699+
},
679700
SegmentType::Spann => match self
680701
.ok_or_terminate(
681702
self.output_context
@@ -686,7 +707,10 @@ impl Handler<TaskResult<CollectionAndSegments, GetCollectionAndSegmentsError>>
686707
)
687708
.await
688709
{
689-
Some(writer) => (writer.hnsw_index_uuid(), VectorSegmentWriter::Spann(writer)),
710+
Some(writer) => (
711+
Some(writer.hnsw_index_uuid()),
712+
VectorSegmentWriter::Spann(writer),
713+
),
690714
None => return,
691715
},
692716
_ => match self
@@ -704,7 +728,10 @@ impl Handler<TaskResult<CollectionAndSegments, GetCollectionAndSegmentsError>>
704728
)
705729
.await
706730
{
707-
Some(writer) => (writer.index_uuid(), VectorSegmentWriter::Hnsw(writer)),
731+
Some(writer) => (
732+
Some(writer.index_uuid()),
733+
VectorSegmentWriter::Hnsw(writer),
734+
),
708735
None => return,
709736
},
710737
};
@@ -745,7 +772,7 @@ impl Handler<TaskResult<CollectionAndSegments, GetCollectionAndSegmentsError>>
745772
collection: message.collection.clone(),
746773
writers: Some(writers),
747774
pulled_log_offset: message.collection.log_position,
748-
hnsw_index_uuid: Some(hnsw_index_uuid),
775+
hnsw_index_uuid,
749776
schema: message.collection.schema.clone(),
750777
};
751778

rust/worker/src/execution/orchestration/compact.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,13 +1419,18 @@ mod tests {
14191419
16,
14201420
false,
14211421
);
1422+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
1423+
storage.clone(),
1424+
new_non_persistent_cache_for_test(),
1425+
);
14221426
let spann_provider = SpannProvider {
1423-
hnsw_provider: hnsw_provider.clone(),
1427+
adaptive_search_nprobe: true,
14241428
blockfile_provider: blockfile_provider.clone(),
14251429
garbage_collection_context: gc_context,
1430+
hnsw_provider: hnsw_provider.clone(),
14261431
metrics: SpannMetrics::default(),
14271432
pl_block_size: 5 * 1024 * 1024,
1428-
adaptive_search_nprobe: true,
1433+
usearch_provider,
14291434
};
14301435

14311436
let config = RootConfig::default();
@@ -1612,13 +1617,18 @@ mod tests {
16121617
16,
16131618
false,
16141619
);
1620+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
1621+
storage.clone(),
1622+
new_non_persistent_cache_for_test(),
1623+
);
16151624
let spann_provider = SpannProvider {
1616-
hnsw_provider: hnsw_provider.clone(),
1625+
adaptive_search_nprobe: true,
16171626
blockfile_provider: blockfile_provider.clone(),
16181627
garbage_collection_context: gc_context,
1628+
hnsw_provider: hnsw_provider.clone(),
16191629
metrics: SpannMetrics::default(),
16201630
pl_block_size: 5 * 1024 * 1024,
1621-
adaptive_search_nprobe: true,
1631+
usearch_provider,
16221632
};
16231633

16241634
let config = RootConfig::default();
@@ -1804,13 +1814,18 @@ mod tests {
18041814
16,
18051815
false,
18061816
);
1817+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
1818+
storage.clone(),
1819+
new_non_persistent_cache_for_test(),
1820+
);
18071821
let spann_provider = SpannProvider {
1808-
hnsw_provider: hnsw_provider.clone(),
1822+
adaptive_search_nprobe: true,
18091823
blockfile_provider: blockfile_provider.clone(),
18101824
garbage_collection_context: gc_context,
1825+
hnsw_provider: hnsw_provider.clone(),
18111826
metrics: SpannMetrics::default(),
18121827
pl_block_size: 5 * 1024 * 1024,
1813-
adaptive_search_nprobe: true,
1828+
usearch_provider,
18141829
};
18151830

18161831
let config = RootConfig::default();
@@ -2065,13 +2080,18 @@ mod tests {
20652080
16,
20662081
false,
20672082
);
2083+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
2084+
storage.clone(),
2085+
new_non_persistent_cache_for_test(),
2086+
);
20682087
let spann_provider = SpannProvider {
2069-
hnsw_provider: hnsw_provider.clone(),
2088+
adaptive_search_nprobe: true,
20702089
blockfile_provider: blockfile_provider.clone(),
20712090
garbage_collection_context: gc_context,
2091+
hnsw_provider: hnsw_provider.clone(),
20722092
metrics: SpannMetrics::default(),
20732093
pl_block_size: 5 * 1024 * 1024,
2074-
adaptive_search_nprobe: true,
2094+
usearch_provider,
20752095
};
20762096
let system = System::new();
20772097

@@ -2290,13 +2310,18 @@ mod tests {
22902310
16,
22912311
false,
22922312
);
2313+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
2314+
storage.clone(),
2315+
new_non_persistent_cache_for_test(),
2316+
);
22932317
let spann_provider = SpannProvider {
2294-
hnsw_provider: hnsw_provider.clone(),
2318+
adaptive_search_nprobe: true,
22952319
blockfile_provider: blockfile_provider.clone(),
22962320
garbage_collection_context: gc_context,
2321+
hnsw_provider: hnsw_provider.clone(),
22972322
metrics: SpannMetrics::default(),
22982323
pl_block_size: 5 * 1024 * 1024,
2299-
adaptive_search_nprobe: true,
2324+
usearch_provider,
23002325
};
23012326
let system = System::new();
23022327

@@ -2609,13 +2634,18 @@ mod tests {
26092634
16,
26102635
false,
26112636
);
2637+
let usearch_provider = chroma_index::usearch::USearchIndexProvider::new(
2638+
storage.clone(),
2639+
new_non_persistent_cache_for_test(),
2640+
);
26122641
let spann_provider = SpannProvider {
2613-
hnsw_provider: hnsw_provider.clone(),
2642+
adaptive_search_nprobe: true,
26142643
blockfile_provider: blockfile_provider.clone(),
26152644
garbage_collection_context: gc_context,
2645+
hnsw_provider: hnsw_provider.clone(),
26162646
metrics: SpannMetrics::default(),
26172647
pl_block_size: 5 * 1024 * 1024,
2618-
adaptive_search_nprobe: true,
2648+
usearch_provider,
26192649
};
26202650
let system = System::new();
26212651

rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ pub enum LogFetchOrchestratorError {
8888
RecordSegmentWriter(#[from] RecordSegmentWriterCreationError),
8989
#[error("Error receiving final result: {0}")]
9090
RecvError(#[from] RecvError),
91+
#[error("Error creating quantized spann writer: {0}")]
92+
QuantizedSpannSegment(#[from] chroma_segment::quantized_spann::QuantizedSpannSegmentError),
9193
#[error("Error creating spann writer: {0}")]
9294
SpannSegment(#[from] SpannSegmentWriterError),
9395
#[error("Error sourcing record segment: {0}")]
@@ -122,6 +124,7 @@ impl ChromaError for LogFetchOrchestratorError {
122124
Self::Panic(e) => e.should_trace_error(),
123125
Self::Partition(e) => e.should_trace_error(),
124126
Self::PrefetchSegment(e) => e.should_trace_error(),
127+
Self::QuantizedSpannSegment(e) => e.should_trace_error(),
125128
Self::RecordSegmentReader(e) => e.should_trace_error(),
126129
Self::RecordSegmentWriter(e) => e.should_trace_error(),
127130
Self::RecvError(_) => true,
@@ -571,6 +574,27 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
571574
};
572575
let (hnsw_index_uuid, vector_writer, is_vector_segment_spann) = match vector_segment.r#type
573576
{
577+
SegmentType::QuantizedSpann => match self
578+
.ok_or_terminate(
579+
self.context
580+
.spann_provider
581+
.write_quantized_usearch(
582+
&collection,
583+
&vector_segment,
584+
&record_segment,
585+
)
586+
.await,
587+
ctx,
588+
)
589+
.await
590+
{
591+
Some(writer) => (
592+
None,
593+
VectorSegmentWriter::QuantizedSpann(writer),
594+
true,
595+
),
596+
None => return,
597+
},
574598
SegmentType::Spann => match self
575599
.ok_or_terminate(
576600
self.context
@@ -582,7 +606,7 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
582606
.await
583607
{
584608
Some(writer) => (
585-
writer.hnsw_index_uuid(),
609+
Some(writer.hnsw_index_uuid()),
586610
VectorSegmentWriter::Spann(writer),
587611
true,
588612
),
@@ -604,7 +628,7 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
604628
.await
605629
{
606630
Some(writer) => (
607-
writer.index_uuid(),
631+
Some(writer.index_uuid()),
608632
VectorSegmentWriter::Hnsw(writer),
609633
false,
610634
),
@@ -628,7 +652,7 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
628652
};
629653

630654
collection_info.writers = Some(writers.clone());
631-
collection_info.hnsw_index_uuid = Some(hnsw_index_uuid);
655+
collection_info.hnsw_index_uuid = hnsw_index_uuid;
632656

633657
// Prefetch segments
634658
let prefetch_segments = match self.context.is_rebuild {

rust/worker/src/server.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use chroma_blockstore::provider::BlockfileProvider;
55
use chroma_config::{registry::Registry, Configurable};
66
use chroma_error::ChromaError;
77
use chroma_index::hnsw_provider::HnswIndexProvider;
8+
use chroma_index::usearch::USearchIndexProvider;
89
use chroma_jemalloc_pprof_server::spawn_pprof_server;
910
use chroma_log::Log;
1011
use chroma_segment::spann_provider::SpannProvider;
@@ -82,11 +83,16 @@ impl Configurable<(QueryServiceConfig, System)> for WorkerServer {
8283
registry,
8384
)
8485
.await?;
86+
let usearch_cache =
87+
chroma_cache::from_config(&config.hnsw_provider.hnsw_cache_config).await?;
88+
let usearch_provider = USearchIndexProvider::new(storage.clone(), usearch_cache);
89+
8590
let spann_provider = SpannProvider::try_from_config(
8691
&(
8792
hnsw_index_provider.clone(),
8893
blockfile_provider.clone(),
8994
config.spann_provider.clone(),
95+
usearch_provider,
9096
),
9197
registry,
9298
)

0 commit comments

Comments
 (0)