@@ -2,7 +2,6 @@ use super::scheduler::Scheduler;
22use super :: scheduler_policy:: LasCompactionTimeSchedulerPolicy ;
33use super :: OneOffCompactMessage ;
44use super :: RebuildMessage ;
5- use crate :: compactor:: tasks:: SchedulableFunction ;
65use crate :: compactor:: types:: { ListDeadJobsMessage , ScheduledCompactMessage } ;
76use crate :: config:: CompactionServiceConfig ;
87use crate :: execution:: operators:: purge_dirty_log:: PurgeDirtyLog ;
@@ -13,8 +12,7 @@ use crate::execution::operators::repair_log_offsets::RepairLogOffsets;
1312use crate :: execution:: operators:: repair_log_offsets:: RepairLogOffsetsError ;
1413use crate :: execution:: operators:: repair_log_offsets:: RepairLogOffsetsInput ;
1514use crate :: execution:: operators:: repair_log_offsets:: RepairLogOffsetsOutput ;
16- use crate :: execution:: orchestration:: CompactOrchestrator ;
17- use crate :: execution:: orchestration:: CompactionResponse ;
15+ use crate :: execution:: orchestration:: compact:: { compact, CompactionResponse } ;
1816use async_trait:: async_trait;
1917use chroma_blockstore:: provider:: BlockfileProvider ;
2018use chroma_config:: assignment:: assignment_policy:: AssignmentPolicy ;
@@ -27,11 +25,9 @@ use chroma_memberlist::memberlist_provider::Memberlist;
2725use chroma_segment:: spann_provider:: SpannProvider ;
2826use chroma_storage:: Storage ;
2927use chroma_sysdb:: SysDb ;
30- use chroma_system:: wrap;
31- use chroma_system:: Dispatcher ;
32- use chroma_system:: Orchestrator ;
33- use chroma_system:: TaskResult ;
34- use chroma_system:: { Component , ComponentContext , ComponentHandle , Handler , System } ;
28+ use chroma_system:: {
29+ wrap, Component , ComponentContext , ComponentHandle , Dispatcher , Handler , System , TaskResult ,
30+ } ;
3531use chroma_types:: { CollectionUuid , JobId } ;
3632use futures:: stream:: FuturesUnordered ;
3733use futures:: FutureExt ;
@@ -118,8 +114,6 @@ pub(crate) struct CompactionManager {
118114pub ( crate ) enum CompactionError {
119115 #[ error( "Failed to compact" ) ]
120116 FailedToCompact ,
121- #[ error( "Failed to execute task" ) ]
122- FailedToExecuteTask ,
123117 #[ error( "Heap service is not initialized for task based compaction" ) ]
124118 HeapServiceNotInitialized ,
125119}
@@ -128,7 +122,6 @@ impl ChromaError for CompactionError {
128122 fn code ( & self ) -> ErrorCodes {
129123 match self {
130124 CompactionError :: FailedToCompact => ErrorCodes :: Internal ,
131- CompactionError :: FailedToExecuteTask => ErrorCodes :: Internal ,
132125 CompactionError :: HeapServiceNotInitialized => ErrorCodes :: InvalidArgument ,
133126 }
134127 }
@@ -211,70 +204,33 @@ impl CompactionManager {
211204 let compact_awaiter_channel = & self . compact_awaiter_channel ;
212205 self . scheduler . schedule ( ) . await ;
213206
214- match self . mode {
215- ExecutionMode :: Compaction => {
216- let jobs: Vec < _ > = self . scheduler . get_jobs ( ) . cloned ( ) . collect ( ) ;
217- for job in jobs {
218- let instrumented_span = span ! (
219- parent: None ,
220- tracing:: Level :: INFO ,
221- "Compacting job" ,
222- collection_id = ?job. collection_id
223- ) ;
224- Span :: current ( )
225- . add_link ( instrumented_span. context ( ) . span ( ) . span_context ( ) . clone ( ) ) ;
226-
227- let future = self
228- . context
229- . clone ( )
230- . compact ( job. collection_id , false )
231- . instrument ( instrumented_span) ;
232- if let Err ( e) = compact_awaiter_channel
233- . send ( CompactionTask {
234- job_id : job. collection_id . into ( ) ,
235- future : Box :: pin ( future) ,
236- } )
237- . await
238- {
239- tracing:: error!(
240- collection_id = ?job. collection_id,
241- error = ?e,
242- "Failed to send start scheduled compaction task"
243- ) ;
244- }
245- }
246- }
247- ExecutionMode :: AttachedFunction => {
248- let tasks_iter = self . scheduler . get_tasks_scheduled_for_execution ( ) . clone ( ) ;
249- for task in tasks_iter {
250- let instrumented_span = span ! (
251- parent: None ,
252- tracing:: Level :: INFO ,
253- "Compacting task" ,
254- collection_id = ?task. collection_id
255- ) ;
256- Span :: current ( )
257- . add_link ( instrumented_span. context ( ) . span ( ) . span_context ( ) . clone ( ) ) ;
258-
259- let future = self
260- . context
261- . clone ( )
262- . execute_task ( task. clone ( ) )
263- . instrument ( instrumented_span) ;
264- if let Err ( e) = compact_awaiter_channel
265- . send ( CompactionTask {
266- job_id : task. task_id . into ( ) ,
267- future : Box :: pin ( future) ,
268- } )
269- . await
270- {
271- tracing:: error!(
272- task_id = ?task. task_id,
273- error = ?e,
274- "Failed to start scheduled task run"
275- ) ;
276- }
277- }
207+ let jobs: Vec < _ > = self . scheduler . get_jobs ( ) . cloned ( ) . collect ( ) ;
208+ for job in jobs {
209+ let instrumented_span = span ! (
210+ parent: None ,
211+ tracing:: Level :: INFO ,
212+ "Compacting job" ,
213+ collection_id = ?job. collection_id
214+ ) ;
215+ Span :: current ( ) . add_link ( instrumented_span. context ( ) . span ( ) . span_context ( ) . clone ( ) ) ;
216+
217+ let future = self
218+ . context
219+ . clone ( )
220+ . compact ( job. collection_id , false )
221+ . instrument ( instrumented_span) ;
222+ if let Err ( e) = compact_awaiter_channel
223+ . send ( CompactionTask {
224+ job_id : job. collection_id . into ( ) ,
225+ future : Box :: pin ( future) ,
226+ } )
227+ . await
228+ {
229+ tracing:: error!(
230+ collection_id = ?job. collection_id,
231+ error = ?e,
232+ "Failed to send start scheduled compaction task"
233+ ) ;
278234 }
279235 }
280236 }
@@ -380,17 +336,17 @@ impl CompactionManager {
380336 while let Ok ( resp) = compact_awaiter_completion_channel. try_recv ( ) {
381337 match resp. result {
382338 Ok ( ref compaction_response) => match compaction_response {
383- CompactionResponse :: Success { job_id } => {
384- if job_id != & resp. job_id . 0 {
339+ CompactionResponse :: Success { job_id, .. } => {
340+ if job_id != & resp. job_id {
385341 tracing:: event!( Level :: ERROR , name = "mismatched job ids in result" , lhs =? * job_id, rhs =? resp. job_id) ;
386342 }
387343 self . scheduler . succeed_job ( resp. job_id ) ;
388344 }
389345 CompactionResponse :: RequireCompactionOffsetRepair {
390- collection_id,
346+ job_id : collection_id,
391347 witnessed_offset_in_sysdb,
392348 } => {
393- if collection_id. 0 != resp. job_id . 0 {
349+ if * collection_id != resp. job_id {
394350 tracing:: event!( Level :: ERROR , name = "mismatched job ids in result" , lhs =? * collection_id, rhs =? resp. job_id) ;
395351 self . scheduler . fail_job ( resp. job_id ) ;
396352 } else {
@@ -434,8 +390,11 @@ impl CompactionManagerContext {
434390 }
435391 } ;
436392
437- let orchestrator = CompactOrchestrator :: new (
438- collection_id, // input_collection_id
393+ // fetch data to compact -> execute_task/compact -> register
394+ // Use the compact function to handle the entire orchestration process
395+ let compact_result = compact (
396+ self . system . clone ( ) ,
397+ collection_id,
439398 rebuild,
440399 self . fetch_log_batch_size ,
441400 self . max_compaction_size ,
@@ -445,67 +404,17 @@ impl CompactionManagerContext {
445404 self . blockfile_provider . clone ( ) ,
446405 self . hnsw_index_provider . clone ( ) ,
447406 self . spann_provider . clone ( ) ,
448- dispatcher,
449- None ,
450- ) ;
451-
452- match orchestrator. run ( self . system . clone ( ) ) . await {
453- Ok ( result) => {
454- tracing:: info!( "Compaction Job completed: {:?}" , result) ;
455- return Ok ( result) ;
456- }
457- Err ( e) => {
458- if e. should_trace_error ( ) {
459- tracing:: error!( "Compaction Job failed: {:?}" , e) ;
460- }
461- return Err ( Box :: new ( e) ) ;
462- }
463- }
464- }
465-
466- async fn execute_task ( self , task : SchedulableFunction ) -> CompactionOutput {
467- tracing:: info!( "Executing task {}" , task. task_id) ;
468- let dispatcher = match self . dispatcher {
469- Some ( ref dispatcher) => dispatcher. clone ( ) ,
470- None => {
471- tracing:: error!( "No dispatcher found" ) ;
472- return Err ( Box :: new ( CompactionError :: FailedToExecuteTask ) ) ;
473- }
474- } ;
407+ dispatcher. clone ( ) ,
408+ )
409+ . await ;
475410
476- let orchestrator = CompactOrchestrator :: new_for_attached_function (
477- task. collection_id ,
478- false ,
479- self . fetch_log_batch_size ,
480- self . max_compaction_size ,
481- self . max_partition_size ,
482- self . log . clone ( ) ,
483- self . sysdb . clone ( ) ,
484- self . heap_service . ok_or_else ( || {
485- Box :: new ( CompactionError :: HeapServiceNotInitialized ) as Box < dyn ChromaError >
486- } ) ?,
487- self . blockfile_provider . clone ( ) ,
488- self . hnsw_index_provider . clone ( ) ,
489- self . spann_provider . clone ( ) ,
490- dispatcher,
491- None ,
492- task. task_id ,
493- task. nonce ,
494- ) ;
495- match orchestrator. run ( self . system . clone ( ) ) . await {
496- Ok ( result) => {
497- tracing:: info!(
498- " Attached Function {} completed: {:?}" ,
499- task. task_id,
500- result
501- ) ;
502- Ok ( result)
503- }
411+ match compact_result {
412+ Ok ( response) => Ok ( response) ,
504413 Err ( e) => {
505414 if e. should_trace_error ( ) {
506- tracing:: error!( " Attached Function {} failed: {:?}" , task . task_id , e) ;
415+ tracing:: error!( "Compaction failed: {:?}" , e) ;
507416 }
508- Err ( Box :: new ( e ) )
417+ Err ( e )
509418 }
510419 }
511420 }
@@ -566,11 +475,9 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
566475 let job_expiry_seconds = config. compactor . job_expiry_seconds ;
567476 let max_failure_count = config. compactor . max_failure_count ;
568477 let scheduler = Scheduler :: new (
569- ExecutionMode :: Compaction , // Default to Compaction mode
570478 my_ip,
571479 log. clone ( ) ,
572480 sysdb. clone ( ) ,
573- storage. clone ( ) ,
574481 policy,
575482 max_concurrent_jobs,
576483 min_compaction_size,
@@ -678,11 +585,9 @@ pub(crate) async fn attach_functionrunner_manager(
678585 } ;
679586
680587 let scheduler = Scheduler :: new (
681- ExecutionMode :: AttachedFunction , // Taskrunner mode
682588 my_ip,
683589 log. clone ( ) ,
684590 sysdb. clone ( ) ,
685- storage. clone ( ) ,
686591 policy,
687592 task_config. max_concurrent_jobs ,
688593 0 , // min_compaction_size not used for tasks
0 commit comments