-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH] Transform within compactions. #5507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
use async_trait::async_trait; | ||
use chroma_error::{ChromaError, ErrorCodes}; | ||
use chroma_system::Operator; | ||
use chroma_types::{Chunk, LogRecord}; | ||
use thiserror::Error; | ||
|
||
#[derive(Debug)] | ||
pub struct TransformOperator {} | ||
|
||
#[derive(Debug)] | ||
pub struct TransformInput { | ||
pub(crate) records: Chunk<LogRecord>, | ||
} | ||
|
||
impl TransformInput { | ||
pub fn new(records: Chunk<LogRecord>) -> Self { | ||
TransformInput { records } | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct TransformOutput { | ||
pub(crate) records: Chunk<LogRecord>, | ||
} | ||
|
||
#[derive(Debug, Error)] | ||
#[error("Failed to transform records.")] | ||
pub struct TransformError; | ||
|
||
impl ChromaError for TransformError { | ||
fn code(&self) -> ErrorCodes { | ||
ErrorCodes::Internal | ||
} | ||
} | ||
|
||
impl TransformOperator { | ||
pub fn new() -> Box<Self> { | ||
Box::new(TransformOperator {}) | ||
} | ||
|
||
pub fn transform(&self, records: &Chunk<LogRecord>) -> Chunk<LogRecord> { | ||
records.clone() | ||
Comment on lines
+41
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [BestPractice] The Context for Agents
|
||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Operator<TransformInput, TransformOutput> for TransformOperator { | ||
type Error = TransformError; | ||
|
||
fn get_name(&self) -> &'static str { | ||
"TransformOperator" | ||
} | ||
|
||
async fn run(&self, input: &TransformInput) -> Result<TransformOutput, TransformError> { | ||
let transformed_records = self.transform(&input.records); | ||
Ok(TransformOutput { | ||
records: transformed_records, | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -68,6 +68,7 @@ use crate::execution::operators::{ | |||||||||
SourceRecordSegmentError, SourceRecordSegmentInput, SourceRecordSegmentOperator, | ||||||||||
SourceRecordSegmentOutput, | ||||||||||
}, | ||||||||||
transform_log::{TransformError, TransformInput, TransformOperator, TransformOutput}, | ||||||||||
}; | ||||||||||
|
||||||||||
/** The state of the orchestrator. | ||||||||||
|
@@ -111,6 +112,7 @@ impl Default for CompactOrchestratorMetrics { | |||||||||
#[derive(Debug)] | ||||||||||
enum ExecutionState { | ||||||||||
Pending, | ||||||||||
Transform, | ||||||||||
Partition, | ||||||||||
MaterializeApplyCommitFlush, | ||||||||||
Register, | ||||||||||
|
@@ -141,6 +143,7 @@ pub struct CompactOrchestrator { | |||||||||
hnsw_provider: HnswIndexProvider, | ||||||||||
spann_provider: SpannProvider, | ||||||||||
|
||||||||||
// TODO(jobs): Split this into source and dest collections. | ||||||||||
collection: OnceCell<Collection>, | ||||||||||
writers: OnceCell<CompactWriters>, | ||||||||||
flush_results: Vec<SegmentFlushInfo>, | ||||||||||
|
@@ -162,6 +165,9 @@ pub struct CompactOrchestrator { | |||||||||
segment_spans: HashMap<SegmentUuid, Span>, | ||||||||||
|
||||||||||
metrics: CompactOrchestratorMetrics, | ||||||||||
|
||||||||||
// Functions | ||||||||||
function: Option<()>, | ||||||||||
Comment on lines
+169
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [BestPractice] The
Suggested change
⚡ Committable suggestion Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Context for Agents
|
||||||||||
} | ||||||||||
|
||||||||||
#[derive(Error, Debug)] | ||||||||||
|
@@ -192,6 +198,8 @@ pub enum CompactionError { | |||||||||
Panic(#[from] PanicError), | ||||||||||
#[error("Error partitioning logs: {0}")] | ||||||||||
Partition(#[from] PartitionError), | ||||||||||
#[error("Error transforming logs: {0}")] | ||||||||||
Transform(#[from] TransformError), | ||||||||||
#[error("Error prefetching segment: {0}")] | ||||||||||
PrefetchSegment(#[from] PrefetchSegmentError), | ||||||||||
#[error("Error creating record segment reader: {0}")] | ||||||||||
|
@@ -249,6 +257,7 @@ impl ChromaError for CompactionError { | |||||||||
Self::MetadataSegment(e) => e.should_trace_error(), | ||||||||||
Self::Panic(e) => e.should_trace_error(), | ||||||||||
Self::Partition(e) => e.should_trace_error(), | ||||||||||
Self::Transform(e) => e.should_trace_error(), | ||||||||||
Self::PrefetchSegment(e) => e.should_trace_error(), | ||||||||||
Self::RecordSegmentReader(e) => e.should_trace_error(), | ||||||||||
Self::RecordSegmentWriter(e) => e.should_trace_error(), | ||||||||||
|
@@ -276,6 +285,7 @@ pub enum CompactionResponse { | |||||||||
impl CompactOrchestrator { | ||||||||||
#[allow(clippy::too_many_arguments)] | ||||||||||
pub fn new( | ||||||||||
// TODO(jobs): Split this into source and dest collection IDs. | ||||||||||
collection_id: CollectionUuid, | ||||||||||
rebuild: bool, | ||||||||||
fetch_log_batch_size: u32, | ||||||||||
|
@@ -316,6 +326,7 @@ impl CompactOrchestrator { | |||||||||
num_materialized_logs: 0, | ||||||||||
segment_spans: HashMap::new(), | ||||||||||
metrics: CompactOrchestratorMetrics::default(), | ||||||||||
function: Some(()), | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [BestPractice] Hardcoded
Suggested change
⚡ Committable suggestion Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Context for Agents
|
||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -325,6 +336,36 @@ impl CompactOrchestrator { | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async fn transform_or_partition( | ||||||||||
&mut self, | ||||||||||
records: Chunk<LogRecord>, | ||||||||||
ctx: &ComponentContext<CompactOrchestrator>, | ||||||||||
) { | ||||||||||
if let Some(_fn) = self.function { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [BestPractice] Variable name
Suggested change
⚡ Committable suggestion Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Context for Agents
|
||||||||||
self.transform(records, ctx).await; | ||||||||||
} else { | ||||||||||
self.partition(records, ctx).await; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async fn transform( | ||||||||||
&mut self, | ||||||||||
records: Chunk<LogRecord>, | ||||||||||
ctx: &ComponentContext<CompactOrchestrator>, | ||||||||||
) { | ||||||||||
self.state = ExecutionState::Transform; | ||||||||||
let operator = TransformOperator::new(); | ||||||||||
tracing::info!("Transforming {} records", records.len()); | ||||||||||
let input = TransformInput::new(records); | ||||||||||
let task = wrap( | ||||||||||
operator, | ||||||||||
input, | ||||||||||
ctx.receiver(), | ||||||||||
self.context.task_cancellation_token.clone(), | ||||||||||
); | ||||||||||
self.send(task, ctx, Some(Span::current())).await; | ||||||||||
} | ||||||||||
|
||||||||||
async fn partition( | ||||||||||
&mut self, | ||||||||||
records: Chunk<LogRecord>, | ||||||||||
|
@@ -974,7 +1015,7 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for CompactOrchestrator | |||||||||
return; | ||||||||||
} | ||||||||||
} | ||||||||||
self.partition(output, ctx).await; | ||||||||||
self.transform_or_partition(output, ctx).await; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -1012,11 +1053,28 @@ impl Handler<TaskResult<SourceRecordSegmentOutput, SourceRecordSegmentError>> | |||||||||
) | ||||||||||
.await; | ||||||||||
} else { | ||||||||||
self.partition(output, ctx).await; | ||||||||||
self.transform_or_partition(output, ctx).await; | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
#[async_trait] | ||||||||||
impl Handler<TaskResult<TransformOutput, TransformError>> for CompactOrchestrator { | ||||||||||
type Result = (); | ||||||||||
|
||||||||||
async fn handle( | ||||||||||
&mut self, | ||||||||||
message: TaskResult<TransformOutput, TransformError>, | ||||||||||
ctx: &ComponentContext<CompactOrchestrator>, | ||||||||||
) { | ||||||||||
let output = match self.ok_or_terminate(message.into_inner(), ctx).await { | ||||||||||
Some(recs) => recs.records, | ||||||||||
None => return, | ||||||||||
}; | ||||||||||
self.partition(output, ctx).await; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
#[async_trait] | ||||||||||
impl Handler<TaskResult<PartitionOutput, PartitionError>> for CompactOrchestrator { | ||||||||||
type Result = (); | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Documentation]
A doc comment explaining the purpose of this operator, especially that it's currently a placeholder, would be helpful for future maintainers.
For example:
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents