Skip to content

Commit 1605c9a

Browse files
committed
[ENH]: Refactor compactor
1 parent 0605458 commit 1605c9a

24 files changed

+3132
-3737
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/log/src/in_memory_log.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,24 @@
11
use std::collections::HashMap;
22
use std::fmt::Debug;
33

4-
use chroma_error::ChromaError;
4+
use chroma_error::{ChromaError, ErrorCodes};
55
use chroma_types::{CollectionUuid, LogRecord};
6+
use thiserror::Error;
67

78
use crate::types::CollectionInfo;
89

10+
#[derive(Error, Debug)]
11+
pub enum InMemoryLogError {
12+
#[error("Failed to update log offset (simulated failure)")]
13+
UpdateOffsetFailed,
14+
}
15+
16+
impl ChromaError for InMemoryLogError {
17+
fn code(&self) -> ErrorCodes {
18+
ErrorCodes::Internal
19+
}
20+
}
21+
922
// This is used for testing only, it represents a log record that is stored in memory
1023
// internal to a mock log implementation
1124
#[derive(Clone)]
@@ -32,16 +45,22 @@ impl Debug for InternalLogRecord {
3245
pub struct InMemoryLog {
3346
collection_to_log: HashMap<CollectionUuid, Vec<InternalLogRecord>>,
3447
offsets: HashMap<CollectionUuid, i64>,
48+
fail_update_offset: bool,
3549
}
3650

3751
impl InMemoryLog {
3852
pub fn new() -> InMemoryLog {
3953
InMemoryLog {
4054
collection_to_log: HashMap::new(),
4155
offsets: HashMap::new(),
56+
fail_update_offset: false,
4257
}
4358
}
4459

60+
pub fn set_fail_update_offset(&mut self, fail: bool) {
61+
self.fail_update_offset = fail;
62+
}
63+
4564
pub fn add_log(&mut self, collection_id: CollectionUuid, log: InternalLogRecord) {
4665
let logs = self.collection_to_log.entry(collection_id).or_default();
4766
// Ensure that the log offset is correct. Since we only use the InMemoryLog for testing,
@@ -123,8 +142,12 @@ impl InMemoryLog {
123142
&mut self,
124143
collection_id: CollectionUuid,
125144
new_offset: i64,
126-
) {
145+
) -> Result<(), Box<dyn ChromaError>> {
146+
if self.fail_update_offset {
147+
return Err(Box::new(InMemoryLogError::UpdateOffsetFailed));
148+
}
127149
self.offsets.insert(collection_id, new_offset);
150+
Ok(())
128151
}
129152

130153
pub(super) async fn scout_logs(

rust/log/src/log.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ impl Log {
166166
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
167167
Log::InMemory(log) => {
168168
log.update_collection_log_offset(collection_id, new_offset)
169-
.await;
170-
Ok(())
169+
.await
171170
}
172171
}
173172
}

rust/log/src/test.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,33 @@ pub fn add_delete_generator(offset: usize) -> OperationRecord {
115115
}
116116
}
117117

118+
/// Adds new record and deletes from the start every 6 records`
119+
///
120+
/// # Illustration for head of log
121+
/// [Add 1], [Del 1], [Add 2], [Del 2], [Add 3], [Del 3], [Add 4] ...
122+
pub fn add_delete_net_zero_generator(offset: usize) -> OperationRecord {
123+
if offset % 2 == 1 {
124+
OperationRecord {
125+
id: int_as_id(offset / 2),
126+
embedding: None,
127+
encoding: None,
128+
metadata: None,
129+
document: None,
130+
operation: Operation::Delete,
131+
}
132+
} else {
133+
let int_id = offset / 2;
134+
OperationRecord {
135+
id: int_as_id(int_id),
136+
embedding: Some(random_embedding(TEST_EMBEDDING_DIMENSION)),
137+
encoding: None,
138+
metadata: Some(modulo_metadata(int_id)),
139+
document: Some(modulo_document(int_id)),
140+
operation: Operation::Add,
141+
}
142+
}
143+
}
144+
118145
#[async_trait]
119146
pub trait LoadFromGenerator<L: LogGenerator> {
120147
async fn populate_with_generator(&mut self, log_count: usize, generator: L);

rust/sysdb/src/test_sysdb.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,17 @@ impl TestSysDb {
475475
return Err(FlushCompactionError::CollectionNotFound);
476476
}
477477
let collection = collection.unwrap();
478+
479+
// Check for stale version (optimistic concurrency control)
480+
if collection.version > collection_version {
481+
return Err(FlushCompactionError::FailedToFlushCompaction(
482+
tonic::Status::failed_precondition(format!(
483+
"Collection version is stale: expected {}, but collection is at version {}",
484+
collection_version, collection.version
485+
)),
486+
));
487+
}
488+
478489
let mut collection = collection.clone();
479490
collection.log_position = log_position;
480491
new_collection_version = collection_version + 1;

0 commit comments

Comments
 (0)