Skip to content

Commit caaea81

Browse files
committed
[ENH]: Refactor compactor
1 parent 0605458 commit caaea81

21 files changed

+2026
-3757
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/segment/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ impl<'log_data, 'segment_data: 'log_data> HydratedMaterializedLogRecord<'log_dat
462462
| MaterializedLogOperation::OverwriteExisting
463463
| MaterializedLogOperation::UpdateExisting => {
464464
(self.get_user_id().len()
465-
+ size_of_val(self.merged_embeddings_ref())
465+
+ self.merged_embeddings_ref().len() * size_of::<f32>()
466466
+ logical_size_of_metadata(&merged_metadata)
467467
+ self
468468
.merged_document_ref()

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 48 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use super::scheduler::Scheduler;
22
use super::scheduler_policy::LasCompactionTimeSchedulerPolicy;
33
use super::OneOffCompactMessage;
44
use super::RebuildMessage;
5-
use crate::compactor::tasks::SchedulableFunction;
65
use crate::compactor::types::{ListDeadJobsMessage, ScheduledCompactMessage};
76
use crate::config::CompactionServiceConfig;
87
use crate::execution::operators::purge_dirty_log::PurgeDirtyLog;
@@ -13,8 +12,7 @@ use crate::execution::operators::repair_log_offsets::RepairLogOffsets;
1312
use crate::execution::operators::repair_log_offsets::RepairLogOffsetsError;
1413
use crate::execution::operators::repair_log_offsets::RepairLogOffsetsInput;
1514
use crate::execution::operators::repair_log_offsets::RepairLogOffsetsOutput;
16-
use crate::execution::orchestration::CompactOrchestrator;
17-
use crate::execution::orchestration::CompactionResponse;
15+
use crate::execution::orchestration::compact::{compact, CompactionResponse};
1816
use async_trait::async_trait;
1917
use chroma_blockstore::provider::BlockfileProvider;
2018
use chroma_config::assignment::assignment_policy::AssignmentPolicy;
@@ -27,11 +25,9 @@ use chroma_memberlist::memberlist_provider::Memberlist;
2725
use chroma_segment::spann_provider::SpannProvider;
2826
use chroma_storage::Storage;
2927
use chroma_sysdb::SysDb;
30-
use chroma_system::wrap;
31-
use chroma_system::Dispatcher;
32-
use chroma_system::Orchestrator;
33-
use chroma_system::TaskResult;
34-
use chroma_system::{Component, ComponentContext, ComponentHandle, Handler, System};
28+
use chroma_system::{
29+
wrap, Component, ComponentContext, ComponentHandle, Dispatcher, Handler, System, TaskResult,
30+
};
3531
use chroma_types::{CollectionUuid, JobId};
3632
use futures::stream::FuturesUnordered;
3733
use futures::FutureExt;
@@ -118,8 +114,6 @@ pub(crate) struct CompactionManager {
118114
pub(crate) enum CompactionError {
119115
#[error("Failed to compact")]
120116
FailedToCompact,
121-
#[error("Failed to execute task")]
122-
FailedToExecuteTask,
123117
#[error("Heap service is not initialized for task based compaction")]
124118
HeapServiceNotInitialized,
125119
}
@@ -128,7 +122,6 @@ impl ChromaError for CompactionError {
128122
fn code(&self) -> ErrorCodes {
129123
match self {
130124
CompactionError::FailedToCompact => ErrorCodes::Internal,
131-
CompactionError::FailedToExecuteTask => ErrorCodes::Internal,
132125
CompactionError::HeapServiceNotInitialized => ErrorCodes::InvalidArgument,
133126
}
134127
}
@@ -211,70 +204,33 @@ impl CompactionManager {
211204
let compact_awaiter_channel = &self.compact_awaiter_channel;
212205
self.scheduler.schedule().await;
213206

214-
match self.mode {
215-
ExecutionMode::Compaction => {
216-
let jobs: Vec<_> = self.scheduler.get_jobs().cloned().collect();
217-
for job in jobs {
218-
let instrumented_span = span!(
219-
parent: None,
220-
tracing::Level::INFO,
221-
"Compacting job",
222-
collection_id = ?job.collection_id
223-
);
224-
Span::current()
225-
.add_link(instrumented_span.context().span().span_context().clone());
226-
227-
let future = self
228-
.context
229-
.clone()
230-
.compact(job.collection_id, false)
231-
.instrument(instrumented_span);
232-
if let Err(e) = compact_awaiter_channel
233-
.send(CompactionTask {
234-
job_id: job.collection_id.into(),
235-
future: Box::pin(future),
236-
})
237-
.await
238-
{
239-
tracing::error!(
240-
collection_id = ?job.collection_id,
241-
error = ?e,
242-
"Failed to send start scheduled compaction task"
243-
);
244-
}
245-
}
246-
}
247-
ExecutionMode::AttachedFunction => {
248-
let tasks_iter = self.scheduler.get_tasks_scheduled_for_execution().clone();
249-
for task in tasks_iter {
250-
let instrumented_span = span!(
251-
parent: None,
252-
tracing::Level::INFO,
253-
"Compacting task",
254-
collection_id = ?task.collection_id
255-
);
256-
Span::current()
257-
.add_link(instrumented_span.context().span().span_context().clone());
258-
259-
let future = self
260-
.context
261-
.clone()
262-
.execute_task(task.clone())
263-
.instrument(instrumented_span);
264-
if let Err(e) = compact_awaiter_channel
265-
.send(CompactionTask {
266-
job_id: task.task_id.into(),
267-
future: Box::pin(future),
268-
})
269-
.await
270-
{
271-
tracing::error!(
272-
task_id = ?task.task_id,
273-
error = ?e,
274-
"Failed to start scheduled task run"
275-
);
276-
}
277-
}
207+
let jobs: Vec<_> = self.scheduler.get_jobs().cloned().collect();
208+
for job in jobs {
209+
let instrumented_span = span!(
210+
parent: None,
211+
tracing::Level::INFO,
212+
"Compacting job",
213+
collection_id = ?job.collection_id
214+
);
215+
Span::current().add_link(instrumented_span.context().span().span_context().clone());
216+
217+
let future = self
218+
.context
219+
.clone()
220+
.compact(job.collection_id, false)
221+
.instrument(instrumented_span);
222+
if let Err(e) = compact_awaiter_channel
223+
.send(CompactionTask {
224+
job_id: job.collection_id.into(),
225+
future: Box::pin(future),
226+
})
227+
.await
228+
{
229+
tracing::error!(
230+
collection_id = ?job.collection_id,
231+
error = ?e,
232+
"Failed to send start scheduled compaction task"
233+
);
278234
}
279235
}
280236
}
@@ -380,17 +336,17 @@ impl CompactionManager {
380336
while let Ok(resp) = compact_awaiter_completion_channel.try_recv() {
381337
match resp.result {
382338
Ok(ref compaction_response) => match compaction_response {
383-
CompactionResponse::Success { job_id } => {
384-
if job_id != &resp.job_id.0 {
339+
CompactionResponse::Success { job_id, .. } => {
340+
if job_id != &resp.job_id {
385341
tracing::event!(Level::ERROR, name = "mismatched job ids in result", lhs =? *job_id, rhs =? resp.job_id);
386342
}
387343
self.scheduler.succeed_job(resp.job_id);
388344
}
389345
CompactionResponse::RequireCompactionOffsetRepair {
390-
collection_id,
346+
job_id: collection_id,
391347
witnessed_offset_in_sysdb,
392348
} => {
393-
if collection_id.0 != resp.job_id.0 {
349+
if *collection_id != resp.job_id {
394350
tracing::event!(Level::ERROR, name = "mismatched job ids in result", lhs =? *collection_id, rhs =? resp.job_id);
395351
self.scheduler.fail_job(resp.job_id);
396352
} else {
@@ -434,8 +390,11 @@ impl CompactionManagerContext {
434390
}
435391
};
436392

437-
let orchestrator = CompactOrchestrator::new(
438-
collection_id, // input_collection_id
393+
// fetch data to compact -> execute_task/compact -> register
394+
// Use the compact function to handle the entire orchestration process
395+
let compact_result = compact(
396+
self.system.clone(),
397+
collection_id,
439398
rebuild,
440399
self.fetch_log_batch_size,
441400
self.max_compaction_size,
@@ -445,67 +404,17 @@ impl CompactionManagerContext {
445404
self.blockfile_provider.clone(),
446405
self.hnsw_index_provider.clone(),
447406
self.spann_provider.clone(),
448-
dispatcher,
449-
None,
450-
);
451-
452-
match orchestrator.run(self.system.clone()).await {
453-
Ok(result) => {
454-
tracing::info!("Compaction Job completed: {:?}", result);
455-
return Ok(result);
456-
}
457-
Err(e) => {
458-
if e.should_trace_error() {
459-
tracing::error!("Compaction Job failed: {:?}", e);
460-
}
461-
return Err(Box::new(e));
462-
}
463-
}
464-
}
465-
466-
async fn execute_task(self, task: SchedulableFunction) -> CompactionOutput {
467-
tracing::info!("Executing task {}", task.task_id);
468-
let dispatcher = match self.dispatcher {
469-
Some(ref dispatcher) => dispatcher.clone(),
470-
None => {
471-
tracing::error!("No dispatcher found");
472-
return Err(Box::new(CompactionError::FailedToExecuteTask));
473-
}
474-
};
407+
dispatcher.clone(),
408+
)
409+
.await;
475410

476-
let orchestrator = CompactOrchestrator::new_for_attached_function(
477-
task.collection_id,
478-
false,
479-
self.fetch_log_batch_size,
480-
self.max_compaction_size,
481-
self.max_partition_size,
482-
self.log.clone(),
483-
self.sysdb.clone(),
484-
self.heap_service.ok_or_else(|| {
485-
Box::new(CompactionError::HeapServiceNotInitialized) as Box<dyn ChromaError>
486-
})?,
487-
self.blockfile_provider.clone(),
488-
self.hnsw_index_provider.clone(),
489-
self.spann_provider.clone(),
490-
dispatcher,
491-
None,
492-
task.task_id,
493-
task.nonce,
494-
);
495-
match orchestrator.run(self.system.clone()).await {
496-
Ok(result) => {
497-
tracing::info!(
498-
" Attached Function {} completed: {:?}",
499-
task.task_id,
500-
result
501-
);
502-
Ok(result)
503-
}
411+
match compact_result {
412+
Ok(response) => Ok(response),
504413
Err(e) => {
505414
if e.should_trace_error() {
506-
tracing::error!(" Attached Function {} failed: {:?}", task.task_id, e);
415+
tracing::error!("Compaction failed: {:?}", e);
507416
}
508-
Err(Box::new(e))
417+
Err(e)
509418
}
510419
}
511420
}
@@ -566,11 +475,9 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
566475
let job_expiry_seconds = config.compactor.job_expiry_seconds;
567476
let max_failure_count = config.compactor.max_failure_count;
568477
let scheduler = Scheduler::new(
569-
ExecutionMode::Compaction, // Default to Compaction mode
570478
my_ip,
571479
log.clone(),
572480
sysdb.clone(),
573-
storage.clone(),
574481
policy,
575482
max_concurrent_jobs,
576483
min_compaction_size,
@@ -678,11 +585,9 @@ pub(crate) async fn attach_functionrunner_manager(
678585
};
679586

680587
let scheduler = Scheduler::new(
681-
ExecutionMode::AttachedFunction, // Taskrunner mode
682588
my_ip,
683589
log.clone(),
684590
sysdb.clone(),
685-
storage.clone(),
686591
policy,
687592
task_config.max_concurrent_jobs,
688593
0, // min_compaction_size not used for tasks
@@ -1265,6 +1170,7 @@ mod tests {
12651170
while let Some(entry) = entries.next_entry().await.expect("Failed to read next dir") {
12661171
let path = entry.path();
12671172
let metadata = entry.metadata().await.expect("Failed to read metadata");
1173+
println!("{}", path.display());
12681174

12691175
if metadata.is_dir() {
12701176
assert!(path.ends_with("tenant"));

rust/worker/src/compactor/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ mod compaction_manager;
22
pub(crate) mod config;
33
mod scheduler;
44
mod scheduler_policy;
5-
mod tasks;
65
mod types;
76

87
pub(crate) use compaction_manager::*;

0 commit comments

Comments
 (0)