Skip to content

Commit 79eb785

Browse files
authored
[ENH] clean up component channels (#2429)
1 parent 9f79843 commit 79eb785

20 files changed

+400
-364
lines changed

rust/worker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM rust:1.77.2 as builder
1+
FROM rust:1.79.0 as builder
22
ARG CHROMA_KUBERNETES_INTEGRATION=0
33
ENV CHROMA_KUBERNETES_INTEGRATION $CHROMA_KUBERNETES_INTEGRATION
44

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::config::CompactionServiceConfig;
77
use crate::config::Configurable;
88
use crate::errors::ChromaError;
99
use crate::errors::ErrorCodes;
10+
use crate::execution::dispatcher::Dispatcher;
1011
use crate::execution::operator::TaskMessage;
1112
use crate::execution::orchestration::CompactOrchestrator;
1213
use crate::execution::orchestration::CompactionResponse;
@@ -18,8 +19,8 @@ use crate::sysdb;
1819
use crate::sysdb::sysdb::SysDb;
1920
use crate::system::Component;
2021
use crate::system::ComponentContext;
22+
use crate::system::ComponentHandle;
2123
use crate::system::Handler;
22-
use crate::system::Receiver;
2324
use crate::system::System;
2425
use async_trait::async_trait;
2526
use futures::stream::FuturesUnordered;
@@ -42,7 +43,7 @@ pub(crate) struct CompactionManager {
4243
blockfile_provider: BlockfileProvider,
4344
hnsw_index_provider: HnswIndexProvider,
4445
// Dispatcher
45-
dispatcher: Option<Box<dyn Receiver<TaskMessage>>>,
46+
dispatcher: Option<ComponentHandle<Dispatcher>>,
4647
// Config
4748
compaction_manager_queue_size: usize,
4849
compaction_interval: Duration,
@@ -95,7 +96,7 @@ impl CompactionManager {
9596
compaction_job: &CompactionJob,
9697
) -> Result<CompactionResponse, Box<dyn ChromaError>> {
9798
let dispatcher = match self.dispatcher {
98-
Some(ref dispatcher) => dispatcher,
99+
Some(ref dispatcher) => dispatcher.clone(),
99100
None => {
100101
println!("No dispatcher found");
101102
return Err(Box::new(CompactionError::FailedToCompact));
@@ -112,7 +113,7 @@ impl CompactionManager {
112113
self.sysdb.clone(),
113114
self.blockfile_provider.clone(),
114115
self.hnsw_index_provider.clone(),
115-
dispatcher.clone(),
116+
dispatcher,
116117
None,
117118
None,
118119
Arc::new(AtomicU32::new(0)),
@@ -161,7 +162,7 @@ impl CompactionManager {
161162
(num_completed_jobs, num_failed_jobs)
162163
}
163164

164-
pub(crate) fn set_dispatcher(&mut self, dispatcher: Box<dyn Receiver<TaskMessage>>) {
165+
pub(crate) fn set_dispatcher(&mut self, dispatcher: ComponentHandle<Dispatcher>) {
165166
self.dispatcher = Some(dispatcher);
166167
}
167168

@@ -253,12 +254,8 @@ impl Component for CompactionManager {
253254

254255
async fn on_start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
255256
println!("Starting CompactionManager");
256-
ctx.scheduler.schedule(
257-
ctx.sender.clone(),
258-
ScheduleMessage {},
259-
self.compaction_interval,
260-
ctx,
261-
);
257+
ctx.scheduler
258+
.schedule(ScheduleMessage {}, self.compaction_interval, ctx);
262259
}
263260
}
264261

@@ -279,12 +276,8 @@ impl Handler<ScheduleMessage> for CompactionManager {
279276
println!("CompactionManager: Performing compaction");
280277
self.compact_batch().await;
281278
// Compaction is done, schedule the next compaction
282-
ctx.scheduler.schedule(
283-
ctx.sender.clone(),
284-
ScheduleMessage {},
285-
self.compaction_interval,
286-
ctx,
287-
);
279+
ctx.scheduler
280+
.schedule(ScheduleMessage {}, self.compaction_interval, ctx);
288281
}
289282
}
290283

@@ -508,7 +501,7 @@ mod tests {
508501

509502
let dispatcher = Dispatcher::new(10, 10, 10);
510503
let dispatcher_handle = system.start_component(dispatcher);
511-
manager.set_dispatcher(dispatcher_handle.receiver());
504+
manager.set_dispatcher(dispatcher_handle);
512505
manager.set_system(system);
513506
let (num_completed, number_failed) = manager.compact_batch().await;
514507
assert_eq!(num_completed, 2);

rust/worker/src/execution/dispatcher.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::execution::config::DispatcherConfig;
33
use crate::{
44
config::Configurable,
55
errors::ChromaError,
6-
system::{Component, ComponentContext, Handler, Receiver, System},
6+
system::{Component, ComponentContext, Handler, ReceiverForMessage, System},
77
};
88
use async_trait::async_trait;
99
use std::fmt::Debug;
@@ -83,7 +83,7 @@ impl Dispatcher {
8383
fn spawn_workers(
8484
&self,
8585
system: &mut System,
86-
self_receiver: Box<dyn Receiver<TaskRequestMessage>>,
86+
self_receiver: Box<dyn ReceiverForMessage<TaskRequestMessage>>,
8787
) {
8888
for _ in 0..self.n_worker_threads {
8989
let worker = WorkerThread::new(self_receiver.clone(), self.worker_queue_size);
@@ -154,15 +154,15 @@ impl Configurable<DispatcherConfig> for Dispatcher {
154154
/// - reply_to: The receiver to send the task to, this is the worker thread
155155
#[derive(Debug)]
156156
pub(super) struct TaskRequestMessage {
157-
reply_to: Box<dyn Receiver<TaskMessage>>,
157+
reply_to: Box<dyn ReceiverForMessage<TaskMessage>>,
158158
}
159159

160160
impl TaskRequestMessage {
161161
/// Create a new TaskRequestMessage
162162
/// # Parameters
163163
/// - reply_to: The receiver to send the task to, this is the worker thread
164164
/// that is requesting the task
165-
pub(super) fn new(reply_to: Box<dyn Receiver<TaskMessage>>) -> Self {
165+
pub(super) fn new(reply_to: Box<dyn ReceiverForMessage<TaskMessage>>) -> Self {
166166
TaskRequestMessage { reply_to }
167167
}
168168
}
@@ -180,7 +180,7 @@ impl Component for Dispatcher {
180180
}
181181

182182
async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
183-
self.spawn_workers(&mut ctx.system.clone(), ctx.sender.as_receiver());
183+
self.spawn_workers(&mut ctx.system.clone(), ctx.receiver());
184184
}
185185
}
186186

@@ -207,7 +207,7 @@ mod tests {
207207
use super::*;
208208
use crate::{
209209
execution::operator::{wrap, Operator, TaskResult},
210-
system::System,
210+
system::{ComponentHandle, System},
211211
};
212212
use std::{
213213
collections::HashSet,
@@ -243,7 +243,7 @@ mod tests {
243243

244244
#[derive(Debug)]
245245
struct MockDispatchUser {
246-
pub dispatcher: Box<dyn Receiver<TaskMessage>>,
246+
pub dispatcher: ComponentHandle<Dispatcher>,
247247
counter: Arc<AtomicUsize>, // We expect to recieve DISPATCH_COUNT messages
248248
sent_tasks: Arc<Mutex<HashSet<Uuid>>>,
249249
received_tasks: Arc<Mutex<HashSet<Uuid>>>,
@@ -261,13 +261,8 @@ mod tests {
261261
async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
262262
// dispatch a new task every DISPATCH_FREQUENCY_MS for DISPATCH_COUNT times
263263
let duration = std::time::Duration::from_millis(DISPATCH_FREQUENCY_MS);
264-
ctx.scheduler.schedule_interval(
265-
ctx.sender.clone(),
266-
(),
267-
duration,
268-
Some(DISPATCH_COUNT),
269-
ctx,
270-
);
264+
ctx.scheduler
265+
.schedule_interval((), duration, Some(DISPATCH_COUNT), ctx);
271266
}
272267
}
273268
#[async_trait]
@@ -290,7 +285,7 @@ mod tests {
290285
#[async_trait]
291286
impl Handler<()> for MockDispatchUser {
292287
async fn handle(&mut self, _message: (), ctx: &ComponentContext<MockDispatchUser>) {
293-
let task = wrap(Box::new(MockOperator {}), 42.0, ctx.sender.as_receiver());
288+
let task = wrap(Box::new(MockOperator {}), 42.0, ctx.receiver());
294289
let task_id = task.id();
295290
self.sent_tasks.lock().insert(task_id);
296291
let res = self.dispatcher.send(task, None).await;
@@ -306,7 +301,7 @@ mod tests {
306301
let sent_tasks = Arc::new(Mutex::new(HashSet::new()));
307302
let received_tasks = Arc::new(Mutex::new(HashSet::new()));
308303
let dispatch_user = MockDispatchUser {
309-
dispatcher: dispatcher_handle.receiver(),
304+
dispatcher: dispatcher_handle,
310305
counter: counter.clone(),
311306
sent_tasks: sent_tasks.clone(),
312307
received_tasks: received_tasks.clone(),

rust/worker/src/execution/operator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::system::Receiver;
1+
use crate::system::ReceiverForMessage;
22
use async_trait::async_trait;
33
use std::fmt::Debug;
44
use uuid::Uuid;
@@ -45,7 +45,7 @@ where
4545
{
4646
operator: Box<dyn Operator<Input, Output, Error = Error>>,
4747
input: Input,
48-
reply_channel: Box<dyn Receiver<TaskResult<Output, Error>>>,
48+
reply_channel: Box<dyn ReceiverForMessage<TaskResult<Output, Error>>>,
4949
task_id: Uuid,
5050
}
5151

@@ -89,7 +89,7 @@ where
8989
pub(super) fn wrap<Input, Output, Error>(
9090
operator: Box<dyn Operator<Input, Output, Error = Error>>,
9191
input: Input,
92-
reply_channel: Box<dyn Receiver<TaskResult<Output, Error>>>,
92+
reply_channel: Box<dyn ReceiverForMessage<TaskResult<Output, Error>>>,
9393
) -> TaskMessage
9494
where
9595
Error: Debug + 'static,

rust/worker/src/execution/orchestration/compact.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::blockstore::provider::BlockfileProvider;
33
use crate::compactor::CompactionJob;
44
use crate::errors::ChromaError;
55
use crate::execution::data::data_chunk::Chunk;
6+
use crate::execution::dispatcher::Dispatcher;
67
use crate::execution::operator::TaskResult;
78
use crate::execution::operators::flush_s3::FlushS3Input;
89
use crate::execution::operators::flush_s3::FlushS3Operator;
@@ -33,8 +34,9 @@ use crate::sysdb::sysdb::GetCollectionsError;
3334
use crate::sysdb::sysdb::GetSegmentsError;
3435
use crate::sysdb::sysdb::SysDb;
3536
use crate::system::Component;
37+
use crate::system::ComponentHandle;
3638
use crate::system::Handler;
37-
use crate::system::Receiver;
39+
use crate::system::ReceiverForMessage;
3840
use crate::system::System;
3941
use crate::types::LogRecord;
4042
use crate::types::Segment;
@@ -92,7 +94,7 @@ pub struct CompactOrchestrator {
9294
pulled_log_offset: Option<i64>,
9395
record_segment: Option<Segment>,
9496
// Dispatcher
95-
dispatcher: Box<dyn Receiver<TaskMessage>>,
97+
dispatcher: ComponentHandle<Dispatcher>,
9698
// number of write segments tasks
9799
num_write_tasks: i32,
98100
// Result Channel
@@ -149,7 +151,7 @@ impl CompactOrchestrator {
149151
sysdb: Box<SysDb>,
150152
blockfile_provider: BlockfileProvider,
151153
hnsw_index_provider: HnswIndexProvider,
152-
dispatcher: Box<dyn Receiver<TaskMessage>>,
154+
dispatcher: ComponentHandle<Dispatcher>,
153155
result_channel: Option<
154156
tokio::sync::oneshot::Sender<Result<CompactionResponse, Box<dyn ChromaError>>>,
155157
>,
@@ -180,7 +182,7 @@ impl CompactOrchestrator {
180182
// of the segment, and not fully respect the offset_id from the compaction job
181183
async fn pull_logs(
182184
&mut self,
183-
self_address: Box<dyn Receiver<TaskResult<PullLogsOutput, PullLogsError>>>,
185+
self_address: Box<dyn ReceiverForMessage<TaskResult<PullLogsOutput, PullLogsError>>>,
184186
) {
185187
self.state = ExecutionState::PullLogs;
186188
let operator = PullLogsOperator::new(self.log.clone());
@@ -215,7 +217,7 @@ impl CompactOrchestrator {
215217
async fn partition(
216218
&mut self,
217219
records: Chunk<LogRecord>,
218-
self_address: Box<dyn Receiver<TaskResult<PartitionOutput, PartitionError>>>,
220+
self_address: Box<dyn ReceiverForMessage<TaskResult<PartitionOutput, PartitionError>>>,
219221
) {
220222
self.state = ExecutionState::Partition;
221223
// TODO: make this configurable
@@ -236,7 +238,7 @@ impl CompactOrchestrator {
236238
&mut self,
237239
partitions: Vec<Chunk<LogRecord>>,
238240
self_address: Box<
239-
dyn Receiver<TaskResult<WriteSegmentsOutput, WriteSegmentsOperatorError>>,
241+
dyn ReceiverForMessage<TaskResult<WriteSegmentsOutput, WriteSegmentsOperatorError>>,
240242
>,
241243
) {
242244
self.state = ExecutionState::Write;
@@ -282,7 +284,7 @@ impl CompactOrchestrator {
282284
record_segment_writer: RecordSegmentWriter,
283285
hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
284286
metadata_segment_writer: MetadataSegmentWriter<'static>,
285-
self_address: Box<dyn Receiver<TaskResult<FlushS3Output, Box<dyn ChromaError>>>>,
287+
self_address: Box<dyn ReceiverForMessage<TaskResult<FlushS3Output, Box<dyn ChromaError>>>>,
286288
) {
287289
self.state = ExecutionState::Flush;
288290

@@ -306,7 +308,7 @@ impl CompactOrchestrator {
306308
&mut self,
307309
log_position: i64,
308310
segment_flush_info: Arc<[SegmentFlushInfo]>,
309-
self_address: Box<dyn Receiver<TaskResult<RegisterOutput, RegisterError>>>,
311+
self_address: Box<dyn ReceiverForMessage<TaskResult<RegisterOutput, RegisterError>>>,
310312
) {
311313
self.state = ExecutionState::Register;
312314
let operator = RegisterOperator::new();
@@ -494,7 +496,7 @@ impl Component for CompactOrchestrator {
494496
}
495497

496498
async fn on_start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
497-
self.pull_logs(ctx.sender.as_receiver()).await;
499+
self.pull_logs(ctx.receiver()).await;
498500
}
499501
}
500502

@@ -528,7 +530,7 @@ impl Handler<TaskResult<PullLogsOutput, PullLogsError>> for CompactOrchestrator
528530
Some(record) => {
529531
self.pulled_log_offset = Some(record.log_offset);
530532
println!("Pulled Logs Up To Offset: {:?}", self.pulled_log_offset);
531-
self.partition(records, ctx.sender.as_receiver()).await;
533+
self.partition(records, ctx.receiver()).await;
532534
}
533535
None => {
534536
// Log an error and return
@@ -561,7 +563,7 @@ impl Handler<TaskResult<PartitionOutput, PartitionError>> for CompactOrchestrato
561563
return;
562564
}
563565
};
564-
self.write(records, _ctx.sender.as_receiver()).await;
566+
self.write(records, _ctx.receiver()).await;
565567
}
566568
}
567569

@@ -602,7 +604,7 @@ impl Handler<TaskResult<WriteSegmentsOutput, WriteSegmentsOperatorError>> for Co
602604
output.record_segment_writer,
603605
output.hnsw_segment_writer,
604606
output.metadata_segment_writer,
605-
_ctx.sender.as_receiver(),
607+
_ctx.receiver(),
606608
)
607609
.await;
608610
}
@@ -623,7 +625,7 @@ impl Handler<TaskResult<FlushS3Output, Box<dyn ChromaError>>> for CompactOrchest
623625
self.register(
624626
self.pulled_log_offset.unwrap(),
625627
msg.segment_flush_info,
626-
_ctx.sender.as_receiver(),
628+
_ctx.receiver(),
627629
)
628630
.await;
629631
}

0 commit comments

Comments
 (0)