Skip to content

Commit 6aa3298

Browse files
committed
First draft of worker pausing
1 parent bb0db42 commit 6aa3298

File tree

1 file changed

+31
-1
lines changed

1 file changed

+31
-1
lines changed

packages/deployment/src/queue/BullQueue.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ export interface BullQueueConfig {
2121
retryAttempts?: number;
2222
}
2323

24+
interface BullWorker extends Closeable {
25+
get worker(): Worker;
26+
}
27+
2428
/**
2529
* TaskQueue implementation for BullMQ
2630
*/
@@ -30,6 +34,9 @@ export class BullQueue
3034
{
3135
private activePromise?: Promise<void>;
3236

37+
private activeWorkers: Record<string, BullWorker> = {};
38+
private activeJobs = 0;
39+
3340
public createWorker(
3441
name: string,
3542
executor: (data: TaskPayload) => Promise<TaskPayload>,
@@ -42,6 +49,7 @@ export class BullQueue
4249
// This is by far not optimal - since it still picks up 1 task per queue but waits until
4350
// computing them, so that leads to bad performance over multiple workers.
4451
// For that we need to restructure tasks to be flowing through a single queue however
52+
this.activeJobs += 1;
4553

4654
// TODO Use worker.pause()
4755
while (this.activePromise !== undefined) {
@@ -54,10 +62,27 @@ export class BullQueue
5462
});
5563
this.activePromise = promise;
5664

65+
// Pause all other workers
66+
const workersToPause = Object.entries(this.activeWorkers).filter(
67+
([key]) => key !== name
68+
);
69+
await Promise.all(
70+
workersToPause.map(([, workerToPause]) =>
71+
workerToPause.worker.pause(true)
72+
)
73+
);
74+
5775
const result = await executor(job.data);
5876
this.activePromise = undefined;
5977
void resOutside();
6078

79+
this.activeJobs -= 1;
80+
if (this.activeJobs === 0) {
81+
Object.entries(this.activeWorkers).forEach(([, resumingWorker]) =>
82+
resumingWorker.worker.resume()
83+
);
84+
}
85+
6186
return result;
6287
},
6388
{
@@ -76,11 +101,16 @@ export class BullQueue
76101
log.error(error);
77102
});
78103

79-
return {
104+
const instantiatedWorker = {
80105
async close() {
81106
await worker.close();
82107
},
108+
get worker() {
109+
return worker;
110+
},
83111
};
112+
this.activeWorkers[name] = instantiatedWorker;
113+
return instantiatedWorker;
84114
}
85115

86116
public async getQueue(queueName: string): Promise<InstantiatedQueue> {

0 commit comments

Comments
 (0)