-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Refactor compactor into three chained orchestrators #5831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
Stage-based Compactor Refactor: Split CompactOrchestrator into DataFetch, ApplyData & Register orchestrators This PR replaces the legacy, monolithic CompactOrchestrator with three narrowly-scoped orchestrators that are executed in a fixed chain (DataFetch → ApplyData → Register). The change removes the function-driven scheduler, reshapes operator boundaries, and rewires the worker/compaction manager to use the new flow. The refactor is purely internal—no catalogue, API or on-disk format changes—but it dramatically improves modularity, testability and paves the way for smarter scheduling and parallelism. Key Changes• Introduced DataFetchOrchestrator, ApplyDataOrchestrator and RegisterOrchestrator, each owning a single phase of compaction Affected Areas• worker/execution/orchestration This summary was automatically generated by @propel-code-bot |
2cbf88b to
caaea81
Compare
caaea81 to
1f6723b
Compare
| num_materialized_logs: 0, | ||
| segment_spans: HashMap::new(), | ||
| materialized_log_data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Resource leak risk: HNSW index cleanup only happens in success case:
async fn try_purge_hnsw(path: &Path, hnsw_index_uuid: Option<IndexUuid>) {
if let Some(hnsw_index_uuid) = hnsw_index_uuid {
let _ = HnswIndexProvider::purge_one_id(path, hnsw_index_uuid).await;
}
}This cleanup method ignores all errors (let _ =). If purging fails due to file system errors or permissions, temporary HNSW indexes will accumulate on disk. Add error logging and potentially retry logic for cleanup failures.
Context for Agents
[**BestPractice**]
Resource leak risk: HNSW index cleanup only happens in success case:
```rust
async fn try_purge_hnsw(path: &Path, hnsw_index_uuid: Option<IndexUuid>) {
if let Some(hnsw_index_uuid) = hnsw_index_uuid {
let _ = HnswIndexProvider::purge_one_id(path, hnsw_index_uuid).await;
}
}
```
This cleanup method ignores all errors (`let _ =`). If purging fails due to file system errors or permissions, temporary HNSW indexes will accumulate on disk. Add error logging and potentially retry logic for cleanup failures.
File: rust/worker/src/execution/orchestration/apply_data_orchestrator.rs
Line: 1901f6723b to
2c77401
Compare
| self.context | ||
| .orchestrator_context | ||
| .task_cancellation_token | ||
| .clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Resource leak: The try_purge_hnsw function is called in cleanup methods but errors are silently ignored:
let _ = HnswIndexProvider::purge_one_id(path, hnsw_index_uuid).await;If the HNSW index cleanup fails, it could leave dangling resources on disk. Consider logging errors:
if let Err(e) = HnswIndexProvider::purge_one_id(path, hnsw_index_uuid).await {
tracing::warn!("Failed to purge HNSW index {}: {}", hnsw_index_uuid, e);
}Context for Agents
[**BestPractice**]
Resource leak: The `try_purge_hnsw` function is called in cleanup methods but errors are silently ignored:
```rust
let _ = HnswIndexProvider::purge_one_id(path, hnsw_index_uuid).await;
```
If the HNSW index cleanup fails, it could leave dangling resources on disk. Consider logging errors:
```rust
if let Err(e) = HnswIndexProvider::purge_one_id(path, hnsw_index_uuid).await {
tracing::warn!("Failed to purge HNSW index {}: {}", hnsw_index_uuid, e);
}
```
File: rust/worker/src/execution/orchestration/data_fetch_orchestrator.rs
Line: 207| self.terminate_with_result(Err(e), ctx).await; | ||
| return; | ||
| #[allow(clippy::too_many_arguments)] | ||
| pub async fn compact( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider making this an Orchestrator with different stages for better code organization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe consider CollectionCompactionContext::data_fetch,apply_data,...
rust/worker/src/execution/orchestration/apply_data_orchestrator.rs
Outdated
Show resolved
Hide resolved
2c77401 to
d4c2383
Compare
This comment has been minimized.
This comment has been minimized.
| pub fn get_segment_writer_by_id( | ||
| &self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Schema field mutation without proper validation: collection_info.schema = apply_data_response.schema; directly assigns the schema without checking if the assignment conflicts with existing collection constraints. If apply_data_response.schema is None when the collection requires a schema, this could create an invalid state.
// Add validation:
let updated_schema = apply_data_response.schema;
if collection_info.collection.dimension.is_some() && updated_schema.is_none() {
return Err(CompactionError::InvariantViolation(
"Collection with dimension must have a schema"
));
}
collection_info.schema = updated_schema;Context for Agents
[**BestPractice**]
Schema field mutation without proper validation: `collection_info.schema = apply_data_response.schema;` directly assigns the schema without checking if the assignment conflicts with existing collection constraints. If `apply_data_response.schema` is `None` when the collection requires a schema, this could create an invalid state.
```rust
// Add validation:
let updated_schema = apply_data_response.schema;
if collection_info.collection.dimension.is_some() && updated_schema.is_none() {
return Err(CompactionError::InvariantViolation(
"Collection with dimension must have a schema"
));
}
collection_info.schema = updated_schema;
```
File: rust/worker/src/execution/orchestration/compact.rs
Line: 204| None => return, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Error handling gap: self.ok_or_terminate(segment_writer, ctx).await pattern doesn't handle the case where get_segment_writer_by_id() returns Ok(writer) but the writer is in an invalid state (e.g., already consumed/moved). This could lead to runtime panics when trying to use the writer.
// Add state validation:
let segment_writer = match self.context.get_segment_writer_by_id(message.segment_id) {
Ok(writer) => {
// Validate writer is still usable
if !writer.is_valid() {
return self.terminate_with_result(Err(...), ctx).await;
}
writer
},
Err(e) => {
return self.terminate_with_result(Err(e.into()), ctx).await;
}
};Context for Agents
[**BestPractice**]
Error handling gap: `self.ok_or_terminate(segment_writer, ctx).await` pattern doesn't handle the case where `get_segment_writer_by_id()` returns `Ok(writer)` but the writer is in an invalid state (e.g., already consumed/moved). This could lead to runtime panics when trying to use the writer.
```rust
// Add state validation:
let segment_writer = match self.context.get_segment_writer_by_id(message.segment_id) {
Ok(writer) => {
// Validate writer is still usable
if !writer.is_valid() {
return self.terminate_with_result(Err(...), ctx).await;
}
writer
},
Err(e) => {
return self.terminate_with_result(Err(e.into()), ctx).await;
}
};
```
File: rust/worker/src/execution/orchestration/apply_data_orchestrator.rs
Line: 568d4c2383 to
daee3e2
Compare
daee3e2 to
67e5b87
Compare
rust/worker/src/execution/orchestration/apply_data_orchestrator.rs
Outdated
Show resolved
Hide resolved
| impl ExecuteAttachedFunctionOperator { | ||
| /// Create a new ExecuteAttachedFunctionOperator from an AttachedFunction. | ||
| /// The executor is selected based on the function_id in the attached function. | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The PR description mentions removing all function-related code. Since this function is now unused, it seems it should be removed completely instead of being marked with #[allow(dead_code)]. This would make the codebase cleaner and more aligned with the PR's goal.
Context for Agents
[**BestPractice**]
The PR description mentions removing all function-related code. Since this function is now unused, it seems it should be removed completely instead of being marked with `#[allow(dead_code)]`. This would make the codebase cleaner and more aligned with the PR's goal.
File: rust/worker/src/execution/operators/execute_task.rs
Line: 871605c9a to
906531e
Compare
| ) | ||
| .await; | ||
| return Vec::new(); | ||
| } | ||
| }; | ||
|
|
||
| for materialized_output in materialized_outputs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Resource Leak: Span Not Dropped on Early Termination
When tasks fail and terminate_with_result is called, spans stored in self.segment_spans are never removed:
let result = self.create_apply_log_to_segment_writer_tasks(/*...*/).await;
let mut new_tasks = match result {
Ok(tasks) => tasks,
Err(err) => {
self.terminate_with_result(Err(err.into()), ctx).await; // Early return
return Vec::new(); // Spans in self.segment_spans never dropped
}
};Spans remain in memory until the orchestrator is dropped, causing:
- Memory leak for span data
- Incorrect trace timing (spans appear active when work stopped)
- Open telemetry connections held longer than necessary
Fix:
Err(err) => {
self.segment_spans.clear(); // Drop all spans before terminating
self.terminate_with_result(Err(err.into()), ctx).await;
return Vec::new();
}Context for Agents
[**CriticalError**]
**Resource Leak: Span Not Dropped on Early Termination**
When tasks fail and `terminate_with_result` is called, spans stored in `self.segment_spans` are never removed:
```rust
let result = self.create_apply_log_to_segment_writer_tasks(/*...*/).await;
let mut new_tasks = match result {
Ok(tasks) => tasks,
Err(err) => {
self.terminate_with_result(Err(err.into()), ctx).await; // Early return
return Vec::new(); // Spans in self.segment_spans never dropped
}
};
```
Spans remain in memory until the orchestrator is dropped, causing:
1. Memory leak for span data
2. Incorrect trace timing (spans appear active when work stopped)
3. Open telemetry connections held longer than necessary
**Fix:**
```rust
Err(err) => {
self.segment_spans.clear(); // Drop all spans before terminating
self.terminate_with_result(Err(err.into()), ctx).await;
return Vec::new();
}
```
File: rust/worker/src/execution/orchestration/apply_logs_orchestrator.rs
Line: 464906531e to
4104603
Compare
| let result = sysdb | ||
| .flush_compaction( | ||
| input.tenant.clone(), | ||
| input.collection_id, | ||
| input.log_position, | ||
| input.collection_version, | ||
| input.segment_flush_info.clone(), | ||
| input.total_records_post_compaction, | ||
| input.collection_logical_size_bytes, | ||
| input.schema.clone(), | ||
| ) | ||
| .await; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Idempotency violation: The flush_compaction call has no unique transaction/request ID to prevent duplicate execution:
let result = sysdb.flush_compaction(
input.tenant.clone(),
input.collection_id,
input.log_position,
input.collection_version,
input.segment_flush_info.clone(),
// ... no idempotency key
).await;If the operator crashes after flush_compaction succeeds but before update_collection_log_offset, a retry will re-execute flush_compaction with the same parameters, potentially:
- Incrementing counters twice
- Creating duplicate segment records
- Corrupting collection state
Fix: Add an idempotency key (e.g., job_id or request UUID) to flush_compaction to detect retries:
struct FlushCompactionRequest {
idempotency_key: Uuid, // Deduplicate retries
// ... existing fields
}Context for Agents
[**BestPractice**]
**Idempotency violation**: The `flush_compaction` call has no unique transaction/request ID to prevent duplicate execution:
```rust
let result = sysdb.flush_compaction(
input.tenant.clone(),
input.collection_id,
input.log_position,
input.collection_version,
input.segment_flush_info.clone(),
// ... no idempotency key
).await;
```
If the operator crashes after `flush_compaction` succeeds but before `update_collection_log_offset`, a retry will re-execute `flush_compaction` with the same parameters, potentially:
- Incrementing counters twice
- Creating duplicate segment records
- Corrupting collection state
**Fix**: Add an idempotency key (e.g., `job_id` or request UUID) to `flush_compaction` to detect retries:
```rust
struct FlushCompactionRequest {
idempotency_key: Uuid, // Deduplicate retries
// ... existing fields
}
```
File: rust/worker/src/execution/operators/register.rs
Line: 1384104603 to
2e336c3
Compare
| collection_info.collection.total_records_post_compaction = count; | ||
| } | ||
|
|
||
| self.num_uncompleted_materialization_tasks = partitions.len(); | ||
| for partition in partitions.iter() { | ||
| let operator = MaterializeLogOperator::new(); | ||
| let input = MaterializeLogInput::new( | ||
| partition.clone(), | ||
| record_reader.clone(), | ||
| next_max_offset_id.clone(), | ||
| ); | ||
| let task = wrap( | ||
| operator, | ||
| input, | ||
| ctx.receiver(), | ||
| self.context | ||
| .orchestrator_context | ||
| .task_cancellation_token | ||
| .clone(), | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Partition size check missing
self.num_uncompleted_materialization_tasks = partitions.len();
for partition in partitions.iter() {
let operator = MaterializeLogOperator::new();
// ...
}Issue: If partitions.len() is 0, num_uncompleted_materialization_tasks becomes 0, but no work is dispatched. The orchestrator will never decrement the counter, causing it to hang waiting for tasks that never run. Add early-return when partitions.is_empty():
if partitions.is_empty() {
// terminate with empty success
return;
}Context for Agents
[**CriticalError**]
**Partition size check missing**
```rust
self.num_uncompleted_materialization_tasks = partitions.len();
for partition in partitions.iter() {
let operator = MaterializeLogOperator::new();
// ...
}
```
**Issue**: If `partitions.len()` is 0, `num_uncompleted_materialization_tasks` becomes 0, but no work is dispatched. The orchestrator will never decrement the counter, causing it to hang waiting for tasks that never run. Add early-return when `partitions.is_empty()`:
```rust
if partitions.is_empty() {
// terminate with empty success
return;
}
```
File: rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs
Line: 362| impl Ord for RecordMeasure { | ||
| fn cmp(&self, other: &Self) -> Ordering { | ||
| self.measure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
The Ord implementation for RecordMeasure has been changed to no longer use offset_id as a tie-breaker. This makes the sort unstable for records with the same measure. This could break assumptions in consumers of this sorted data that rely on a deterministic order, like KnnMerge.
If a stable sort order is required, consider reverting this change to include the tie-breaker.
| impl Ord for RecordMeasure { | |
| fn cmp(&self, other: &Self) -> Ordering { | |
| self.measure | |
| self.measure | |
| .total_cmp(&other.measure) | |
| .then_with(|| self.offset_id.cmp(&other.offset_id)) |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
The `Ord` implementation for `RecordMeasure` has been changed to no longer use `offset_id` as a tie-breaker. This makes the sort unstable for records with the same measure. This could break assumptions in consumers of this sorted data that rely on a deterministic order, like `KnnMerge`.
If a stable sort order is required, consider reverting this change to include the tie-breaker.
```suggestion
self.measure
.total_cmp(&other.measure)
.then_with(|| self.offset_id.cmp(&other.offset_id))
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/types/src/execution/operator.rs
Line: 415| self.get_collection_info()?.writers.clone().ok_or( | ||
| CompactionContextError::InvariantViolation("Segment writers should have been set"), | ||
| ) | ||
| } | ||
|
|
||
| async fn dispatch_segment_writer_commit( | ||
| &mut self, | ||
| segment_writer: ChromaSegmentWriter<'static>, | ||
| ctx: &ComponentContext<CompactOrchestrator>, | ||
| ) { | ||
| let span = self.get_segment_writer_span(&segment_writer); | ||
| let operator = CommitSegmentWriterOperator::new(); | ||
| let input = CommitSegmentWriterInput::new(segment_writer); | ||
| let task = wrap( | ||
| operator, | ||
| input, | ||
| ctx.receiver(), | ||
| self.context.task_cancellation_token.clone(), | ||
| ); | ||
| let res = self.dispatcher().send(task, Some(span)).await; | ||
| self.ok_or_terminate(res, ctx).await; | ||
| pub fn get_collection_info(&self) -> Result<&CollectionCompactInfo, CompactionContextError> { | ||
| self.collection_info | ||
| .get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
HNSW cleanup error silently ignored
let _ = HnswIndexProvider::purge_one_id(
self.hnsw_provider.temporary_storage_path.as_path(),
hnsw_index_uuid,
)
.await;Issue: If HNSW purge fails (e.g., disk full, permission denied), the temporary index file leaks. Under heavy load or repeated failures, this causes disk space exhaustion. Log the error and consider surfacing it as a warning metric:
if let Err(e) = HnswIndexProvider::purge_one_id(...).await {
tracing::warn!("Failed to purge HNSW index {}: {}", hnsw_index_uuid, e);
}Context for Agents
[**BestPractice**]
**HNSW cleanup error silently ignored**
```rust
let _ = HnswIndexProvider::purge_one_id(
self.hnsw_provider.temporary_storage_path.as_path(),
hnsw_index_uuid,
)
.await;
```
**Issue**: If HNSW purge fails (e.g., disk full, permission denied), the temporary index file leaks. Under heavy load or repeated failures, this causes disk space exhaustion. Log the error and consider surfacing it as a warning metric:
```rust
if let Err(e) = HnswIndexProvider::purge_one_id(...).await {
tracing::warn!("Failed to purge HNSW index {}: {}", hnsw_index_uuid, e);
}
```
File: rust/worker/src/execution/orchestration/compact.rs
Line: 258|
|
||
| if message.segment_type == "MetadataSegmentWriter" { | ||
| if let Some(update) = message.schema_update { | ||
| let collection_info_cell = self.context.collection_info.get_mut(); | ||
| let collection_info = match collection_info_cell { | ||
| Some(collection_info) => collection_info, | ||
| None => { | ||
| let err = ApplyLogsOrchestratorError::InvariantViolation( | ||
| "Collection info should have been set", | ||
| ); | ||
| self.terminate_with_result(Err(err), ctx).await; | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| match collection_info.schema.take() { | ||
| Some(existing) => match existing.merge(&update) { | ||
| Ok(merged) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Schema merge error loses context
match collection_info.schema.take() {
Some(existing) => match existing.merge(&update) {
Ok(merged) => {
collection_info.schema = Some(merged);
}
Err(err) => {
let err = ApplyLogsOrchestratorError::ApplyLog(
ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(
ApplyMaterializedLogError::Schema(err),
),
);
self.terminate_with_result(Err(err), ctx).await;
return;
}
},Issue: When schema merge fails, the error doesn't include which collection or what schema update was attempted. In production, this makes debugging impossible. Wrap the error with context:
Err(err) => {
tracing::error!("Schema merge failed for collection {}: {:?}", collection_info.collection_id, err);
// then terminate
}Context for Agents
[**BestPractice**]
**Schema merge error loses context**
```rust
match collection_info.schema.take() {
Some(existing) => match existing.merge(&update) {
Ok(merged) => {
collection_info.schema = Some(merged);
}
Err(err) => {
let err = ApplyLogsOrchestratorError::ApplyLog(
ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(
ApplyMaterializedLogError::Schema(err),
),
);
self.terminate_with_result(Err(err), ctx).await;
return;
}
},
```
**Issue**: When schema merge fails, the error doesn't include which collection or what schema update was attempted. In production, this makes debugging impossible. Wrap the error with context:
```rust
Err(err) => {
tracing::error!("Schema merge failed for collection {}: {:?}", collection_info.collection_id, err);
// then terminate
}
```
File: rust/worker/src/execution/orchestration/apply_logs_orchestrator.rs
Line: 542| let apply_logs_response = match apply_logs_orchestrator.run(system).await { | ||
| Ok(response) => response, | ||
| Err(e) => { | ||
| if e.should_trace_error() { | ||
| tracing::error!("Apply data phase failed: {e}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The result of HnswIndexProvider::purge_one_id is ignored here. If purging the temporary HNSW index files fails, the error will be silently swallowed, potentially leading to disk space leaks. The error should be logged at a minimum.
Suggested Change
| let apply_logs_response = match apply_logs_orchestrator.run(system).await { | |
| Ok(response) => response, | |
| Err(e) => { | |
| if e.should_trace_error() { | |
| tracing::error!("Apply data phase failed: {e}"); | |
| if let Err(e) = HnswIndexProvider::purge_one_id( | |
| self.hnsw_provider.temporary_storage_path.as_path(), | |
| hnsw_index_uuid, | |
| ) | |
| .await | |
| { | |
| tracing::error!("Failed to purge HNSW index files for uuid {}: {}", hnsw_index_uuid, e); | |
| } |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
The result of `HnswIndexProvider::purge_one_id` is ignored here. If purging the temporary HNSW index files fails, the error will be silently swallowed, potentially leading to disk space leaks. The error should be logged at a minimum.
<details>
<summary>Suggested Change</summary>
```suggestion
if let Err(e) = HnswIndexProvider::purge_one_id(
self.hnsw_provider.temporary_storage_path.as_path(),
hnsw_index_uuid,
)
.await
{
tracing::error!("Failed to purge HNSW index files for uuid {}: {}", hnsw_index_uuid, e);
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: rust/worker/src/execution/orchestration/compact.rs
Line: 376| const result = await collection.search(search.rank(Knn({ query: queryEmbedding }))); | ||
|
|
||
| // Option 2: Pass text query (embedding created using collection's schema configuration) | ||
| const queryText = "What are the latest advances in quantum computing?"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Documentation]
The variable declaration was changed from "const result2" to "result"—removing both the distinct name and the "const" keyword. This introduces an undeclared assignment that may confuse readers or cause runtime errors. Consider keeping the original "const result2" (or adding "const result") for clarity.
Context for Agents
[**Documentation**]
The variable declaration was changed from "const result2" to "result"—removing both the distinct name and the "const" keyword. This introduces an undeclared assignment that may confuse readers or cause runtime errors. Consider keeping the original "const result2" (or adding "const result") for clarity.
File: docs/docs.trychroma.com/markdoc/content/cloud/search-api/overview.md
Line: 67| test_segments.collection.dimension, | ||
| false, | ||
| ) | ||
| first_compaction_result.expect("Should succeed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check through the record segment to validate data.
| .unwrap() | ||
| .collection; | ||
| assert_eq!(collection.log_position, -1); | ||
| assert_eq!(collection.version, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check that filepaths are empty
| "Expected RequireCompactionOffsetRepair but got error: {:?}", | ||
| e | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can repair and then check another compaction.
|
|
||
| // Add logs that represent inserts and deletes that net out to 0 | ||
| // Use the add_delete_generator to create 250 records (125 pairs of insert+delete) | ||
| use chroma_log::test::{add_delete_net_zero_generator, LogGenerator}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_compaction_with_empty_logs_from_inserts_and_deletes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make another one where this is not the first compaction.
HammadB
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed offline - would be good to audit all the Propel comments.
2e336c3 to
85e37d5
Compare
| collection_id: collection.collection_id, | ||
| collection: collection.clone(), | ||
| writers: None, | ||
| pulled_log_offset: collection.log_position, | ||
| hnsw_index_uuid: None, | ||
| schema: collection.schema.clone(), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Missing error handling for get_collection_info() and get_segment_writers() calls
Multiple locations in log_fetch_orchestrator.rs and apply_logs_orchestrator.rs call self.context.get_collection_info() or self.context.get_segment_writers() and immediately call terminate_with_result(Err(...)) on error. However, if these methods return Err, the orchestrator terminates but may leave resources in an inconsistent state (e.g., spawned tasks, locks, channels).
Example from log_fetch_orchestrator.rs:442-448:
let collection_info = match self.context.get_collection_info_mut() {
Ok(info) => info,
Err(err) => {
self.terminate_with_result(Err(err.into()), ctx).await;
return;
}
};Issue: The terminate_with_result pattern doesn't guarantee cleanup of in-flight operations. If materialization tasks are running when this error occurs, they may continue executing and attempt to access the now-invalid context.
Fix: Ensure that terminate_with_result cancels all in-flight tasks via the task_cancellation_token before terminating, or add explicit cleanup logic:
let collection_info = match self.context.get_collection_info_mut() {
Ok(info) => info,
Err(err) => {
// Cancel all in-flight tasks before terminating
self.context.orchestrator_context.task_cancellation_token.cancel();
self.terminate_with_result(Err(err.into()), ctx).await;
return;
}
};Context for Agents
[**CriticalError**]
**Missing error handling for `get_collection_info()` and `get_segment_writers()` calls**
Multiple locations in `log_fetch_orchestrator.rs` and `apply_logs_orchestrator.rs` call `self.context.get_collection_info()` or `self.context.get_segment_writers()` and immediately call `terminate_with_result(Err(...))` on error. However, if these methods return `Err`, the orchestrator terminates but may leave resources in an inconsistent state (e.g., spawned tasks, locks, channels).
**Example from log_fetch_orchestrator.rs:442-448:**
```rust
let collection_info = match self.context.get_collection_info_mut() {
Ok(info) => info,
Err(err) => {
self.terminate_with_result(Err(err.into()), ctx).await;
return;
}
};
```
**Issue:** The `terminate_with_result` pattern doesn't guarantee cleanup of in-flight operations. If materialization tasks are running when this error occurs, they may continue executing and attempt to access the now-invalid context.
**Fix:** Ensure that `terminate_with_result` cancels all in-flight tasks via the `task_cancellation_token` before terminating, or add explicit cleanup logic:
```rust
let collection_info = match self.context.get_collection_info_mut() {
Ok(info) => info,
Err(err) => {
// Cancel all in-flight tasks before terminating
self.context.orchestrator_context.task_cancellation_token.cancel();
self.terminate_with_result(Err(err.into()), ctx).await;
return;
}
};
```
File: rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs
Line: 448| Err(LogFetchOrchestratorError::InvariantViolation( | ||
| "self.collection_info not set", | ||
| )), | ||
| ctx, | ||
| ) | ||
| .await; | ||
| return; | ||
| } | ||
| }; | ||
| let materialized = std::mem::take(&mut self.materialized_outputs); | ||
| self.terminate_with_result( | ||
| Ok(Success::new(materialized, collection_info.clone()).into()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Race condition: num_uncompleted_materialization_tasks decrement without synchronization
In log_fetch_orchestrator.rs:774-785, num_uncompleted_materialization_tasks is decremented without atomic operations or locks:
self.num_uncompleted_materialization_tasks -= 1;
if self.num_uncompleted_materialization_tasks == 0 {
// ... finish logic
}Issue: If multiple MaterializeLogOutput handlers execute concurrently on different threads (which is possible in an async executor), the decrement operation is not atomic. This can cause:
- Lost decrements (two threads read same value, both decrement, only one write persists)
- Multiple threads passing the
== 0check simultaneously finish_materialized_output()being called multiple times
Fix: Use atomic operations:
use std::sync::atomic::{AtomicUsize, Ordering};
// In struct:
num_uncompleted_materialization_tasks: Arc<AtomicUsize>,
// In handler:
let remaining = self.num_uncompleted_materialization_tasks.fetch_sub(1, Ordering::SeqCst) - 1;
if remaining == 0 {
self.finish_materialized_output(ctx).await;
}Context for Agents
[**CriticalError**]
**Race condition: `num_uncompleted_materialization_tasks` decrement without synchronization**
In `log_fetch_orchestrator.rs:774-785`, `num_uncompleted_materialization_tasks` is decremented without atomic operations or locks:
```rust
self.num_uncompleted_materialization_tasks -= 1;
if self.num_uncompleted_materialization_tasks == 0 {
// ... finish logic
}
```
**Issue:** If multiple `MaterializeLogOutput` handlers execute concurrently on different threads (which is possible in an async executor), the decrement operation is not atomic. This can cause:
1. Lost decrements (two threads read same value, both decrement, only one write persists)
2. Multiple threads passing the `== 0` check simultaneously
3. `finish_materialized_output()` being called multiple times
**Fix:** Use atomic operations:
```rust
use std::sync::atomic::{AtomicUsize, Ordering};
// In struct:
num_uncompleted_materialization_tasks: Arc<AtomicUsize>,
// In handler:
let remaining = self.num_uncompleted_materialization_tasks.fetch_sub(1, Ordering::SeqCst) - 1;
if remaining == 0 {
self.finish_materialized_output(ctx).await;
}
```
File: rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs
Line: 785| self.send(prefetch_task, ctx, Some(prefetch_span)).await; | ||
| } | ||
|
|
||
| self.send(log_task, ctx, Some(Span::current())).await; | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl Handler<TaskResult<PrefetchSegmentOutput, PrefetchSegmentError>> for LogFetchOrchestrator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Off-by-one error: witnessed_offset_in_sysdb uses pulled_log_offset instead of collection.log_position
In log_fetch_orchestrator.rs:608-616, when logs are not pulled, the code returns:
RequireCompactionOffsetRepair::new(
collection_info.collection_id.into(),
collection_info.pulled_log_offset, // ← This is the LAST compacted offset
)Issue: pulled_log_offset represents the last offset that was successfully compacted. However, RequireCompactionOffsetRepair should contain the offset witnessed in sysdb (i.e., collection.log_position from the collection record), not the pull offset from the orchestrator.
If collection.log_position = 100 and we pull up to offset 105, pulled_log_offset = 105. If no new logs exist, we should report that sysdb shows offset 100 (not 105) for the repair operation.
Fix:
self.terminate_with_result(
Ok(RequireCompactionOffsetRepair::new(
collection_info.collection_id.into(),
collection_info.collection.log_position, // Use sysdb's witnessed position
)
.into()),
ctx,
)
.await;Context for Agents
[**CriticalError**]
**Off-by-one error: `witnessed_offset_in_sysdb` uses `pulled_log_offset` instead of `collection.log_position`**
In `log_fetch_orchestrator.rs:608-616`, when logs are not pulled, the code returns:
```rust
RequireCompactionOffsetRepair::new(
collection_info.collection_id.into(),
collection_info.pulled_log_offset, // ← This is the LAST compacted offset
)
```
**Issue:** `pulled_log_offset` represents the last offset that was successfully compacted. However, `RequireCompactionOffsetRepair` should contain the offset witnessed in sysdb (i.e., `collection.log_position` from the collection record), not the pull offset from the orchestrator.
If `collection.log_position = 100` and we pull up to offset `105`, `pulled_log_offset = 105`. If no new logs exist, we should report that sysdb shows offset `100` (not `105`) for the repair operation.
**Fix:**
```rust
self.terminate_with_result(
Ok(RequireCompactionOffsetRepair::new(
collection_info.collection_id.into(),
collection_info.collection.log_position, // Use sysdb's witnessed position
)
.into()),
ctx,
)
.await;
```
File: rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs
Line: 616| collection_info.schema = Some(merged); | ||
| } | ||
| Err(err) => { | ||
| let err = ApplyLogsOrchestratorError::ApplyLog( | ||
| ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError( | ||
| ApplyMaterializedLogError::Schema(err), | ||
| ), | ||
| ); | ||
| self.terminate_with_result(Err(err), ctx).await; | ||
| return; | ||
| } | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Resource leak: Segment writers not cleaned up on early termination
In apply_logs_orchestrator.rs:543-554, when an error occurs during create_apply_log_to_segment_writer_tasks:
let mut new_tasks = match result {
Ok(tasks) => tasks,
Err(err) => {
self.terminate_with_result(Err(err.into()), ctx).await;
return Vec::new();
}
};Issue: The get_segment_writers() call (line 518) may have created segment writers that hold:
- Open file handles
- Memory-mapped regions
- Database connections
- Locks on blockfiles
If create_apply_log_to_segment_writer_tasks fails after writers are created, these resources are not explicitly released. While Rust's Drop trait will eventually clean them up, if the orchestrator state is kept around (e.g., for debugging/logging), resources remain held.
Fix: Add explicit cleanup in error paths:
let mut new_tasks = match result {
Ok(tasks) => tasks,
Err(err) => {
// Explicitly drop writers to release resources
if let Ok(writers) = self.context.get_segment_writers() {
drop(writers);
}
self.terminate_with_result(Err(err.into()), ctx).await;
return Vec::new();
}
};Or ensure CompactionContext implements proper cleanup in its Drop implementation.
Context for Agents
[**BestPractice**]
**Resource leak: Segment writers not cleaned up on early termination**
In `apply_logs_orchestrator.rs:543-554`, when an error occurs during `create_apply_log_to_segment_writer_tasks`:
```rust
let mut new_tasks = match result {
Ok(tasks) => tasks,
Err(err) => {
self.terminate_with_result(Err(err.into()), ctx).await;
return Vec::new();
}
};
```
**Issue:** The `get_segment_writers()` call (line 518) may have created segment writers that hold:
- Open file handles
- Memory-mapped regions
- Database connections
- Locks on blockfiles
If `create_apply_log_to_segment_writer_tasks` fails after writers are created, these resources are not explicitly released. While Rust's `Drop` trait will eventually clean them up, if the orchestrator state is kept around (e.g., for debugging/logging), resources remain held.
**Fix:** Add explicit cleanup in error paths:
```rust
let mut new_tasks = match result {
Ok(tasks) => tasks,
Err(err) => {
// Explicitly drop writers to release resources
if let Ok(writers) = self.context.get_segment_writers() {
drop(writers);
}
self.terminate_with_result(Err(err.into()), ctx).await;
return Vec::new();
}
};
```
Or ensure `CompactionContext` implements proper cleanup in its `Drop` implementation.
File: rust/worker/src/execution/orchestration/apply_logs_orchestrator.rs
Line: 554
Description of changes
Summarize the changes made by this PR.
This change removes all function related code from the compaction path including the scheduler and the compaction orchestrator. This is done to make way for a refactor on the preexisting compaction orchestrator.
This refactor entails breaking the CompactOrchestrator into three chained orchestrators:
The DataFetchOrchestrator that does GetCollectionAndSegments -> FetchLog/SourceRecordSegments -> Partition -> Materialized Logs. Its main task is to source data.
The ApplyDataOrchestrator that takes in materialized log records from the previous orchestrator and applies them to segments via ApplyOperators, CommitOperators and Flush operators.
The RegisterOrchestrator that takes in flushed segment path from the previous step and invokes the Register operator.
Any common code across these three orchestrators has remained in compact.rs.
Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_