Skip to content

Commit 572f5f8

Browse files
committed
batch size 8
1 parent 601e0e8 commit 572f5f8

File tree

1 file changed

+42
-2
lines changed

1 file changed

+42
-2
lines changed

compute/src/workers/workflow.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ impl WorkflowsWorker {
3333
/// Batch size that defines how many tasks can be executed in parallel at once.
3434
/// IMPORTANT NOTE: `run` function is designed to handle the batch size here specifically,
3535
/// if there are more tasks than the batch size, the function will panic.
36-
const BATCH_SIZE: usize = 5;
36+
const BATCH_SIZE: usize = 8;
3737

3838
pub fn new(
3939
worklow_rx: mpsc::Receiver<WorkflowsWorkerInput>,
@@ -61,6 +61,7 @@ impl WorkflowsWorker {
6161
debug_assert_eq!(num_tasks, batch_vec.len());
6262

6363
if num_tasks == 0 {
64+
log::warn!("Closing workflows worker.");
6465
self.worklow_rx.close();
6566
return;
6667
}
@@ -69,7 +70,10 @@ impl WorkflowsWorker {
6970
let mut batch = batch_vec.into_iter();
7071
log::info!("Processing {} workflows in batch", num_tasks);
7172
let results = match num_tasks {
72-
1 => vec![WorkflowsWorker::execute(batch.next().unwrap()).await],
73+
1 => {
74+
let r0 = WorkflowsWorker::execute(batch.next().unwrap()).await;
75+
vec![r0]
76+
}
7377
2 => {
7478
let (r0, r1) = tokio::join!(
7579
WorkflowsWorker::execute(batch.next().unwrap()),
@@ -104,6 +108,42 @@ impl WorkflowsWorker {
104108
);
105109
vec![r0, r1, r2, r3, r4]
106110
}
111+
6 => {
112+
let (r0, r1, r2, r3, r4, r5) = tokio::join!(
113+
WorkflowsWorker::execute(batch.next().unwrap()),
114+
WorkflowsWorker::execute(batch.next().unwrap()),
115+
WorkflowsWorker::execute(batch.next().unwrap()),
116+
WorkflowsWorker::execute(batch.next().unwrap()),
117+
WorkflowsWorker::execute(batch.next().unwrap()),
118+
WorkflowsWorker::execute(batch.next().unwrap())
119+
);
120+
vec![r0, r1, r2, r3, r4, r5]
121+
}
122+
7 => {
123+
let (r0, r1, r2, r3, r4, r5, r6) = tokio::join!(
124+
WorkflowsWorker::execute(batch.next().unwrap()),
125+
WorkflowsWorker::execute(batch.next().unwrap()),
126+
WorkflowsWorker::execute(batch.next().unwrap()),
127+
WorkflowsWorker::execute(batch.next().unwrap()),
128+
WorkflowsWorker::execute(batch.next().unwrap()),
129+
WorkflowsWorker::execute(batch.next().unwrap()),
130+
WorkflowsWorker::execute(batch.next().unwrap())
131+
);
132+
vec![r0, r1, r2, r3, r4, r5, r6]
133+
}
134+
8 => {
135+
let (r0, r1, r2, r3, r4, r5, r6, r7) = tokio::join!(
136+
WorkflowsWorker::execute(batch.next().unwrap()),
137+
WorkflowsWorker::execute(batch.next().unwrap()),
138+
WorkflowsWorker::execute(batch.next().unwrap()),
139+
WorkflowsWorker::execute(batch.next().unwrap()),
140+
WorkflowsWorker::execute(batch.next().unwrap()),
141+
WorkflowsWorker::execute(batch.next().unwrap()),
142+
WorkflowsWorker::execute(batch.next().unwrap()),
143+
WorkflowsWorker::execute(batch.next().unwrap())
144+
);
145+
vec![r0, r1, r2, r3, r4, r5, r6, r7]
146+
}
107147
_ => {
108148
unreachable!("drain cant be larger than batch size");
109149
}

0 commit comments

Comments
 (0)