Skip to content

Commit 8ec5fcb

Browse files
committed
Revert "First draft of worker pausing"
This reverts commit 6aa3298
1 parent 8baeee9 commit 8ec5fcb

File tree

1 file changed

+1
-32
lines changed

1 file changed

+1
-32
lines changed

packages/deployment/src/queue/BullQueue.ts

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ export interface BullQueueConfig {
2222
retryAttempts?: number;
2323
}
2424

25-
interface BullWorker extends Closeable {
26-
get worker(): Worker;
27-
}
28-
2925
/**
3026
* TaskQueue implementation for BullMQ
3127
*/
@@ -36,10 +32,6 @@ export class BullQueue
3632
{
3733
private activePromise?: Promise<void>;
3834

39-
private activeWorkers: Record<string, BullWorker> = {};
40-
41-
private activeJobs = 0;
42-
4335
public createWorker(
4436
name: string,
4537
executor: (data: TaskPayload) => Promise<TaskPayload>,
@@ -52,7 +44,6 @@ export class BullQueue
5244
// This is by far not optimal - since it still picks up 1 task per queue but waits until
5345
// computing them, so that leads to bad performance over multiple workers.
5446
// For that we need to restructure tasks to be flowing through a single queue however
55-
this.activeJobs += 1;
5647

5748
// TODO Use worker.pause()
5849
while (this.activePromise !== undefined) {
@@ -65,27 +56,10 @@ export class BullQueue
6556
});
6657
this.activePromise = promise;
6758

68-
// Pause all other workers
69-
const workersToPause = Object.entries(this.activeWorkers).filter(
70-
([key]) => key !== name
71-
);
72-
await Promise.all(
73-
workersToPause.map(([, workerToPause]) =>
74-
workerToPause.worker.pause(true)
75-
)
76-
);
77-
7859
const result = await executor(job.data);
7960
this.activePromise = undefined;
8061
void resOutside();
8162

82-
this.activeJobs -= 1;
83-
if (this.activeJobs === 0) {
84-
Object.entries(this.activeWorkers).forEach(([, resumingWorker]) =>
85-
resumingWorker.worker.resume()
86-
);
87-
}
88-
8963
return result;
9064
},
9165
{
@@ -104,16 +78,11 @@ export class BullQueue
10478
log.error(error);
10579
});
10680

107-
const instantiatedWorker = {
81+
return {
10882
async close() {
10983
await worker.close();
11084
},
111-
get worker() {
112-
return worker;
113-
},
11485
};
115-
this.activeWorkers[name] = instantiatedWorker;
116-
return instantiatedWorker;
11786
}
11887

11988
public async getQueue(queueName: string): Promise<InstantiatedQueue> {

0 commit comments

Comments
 (0)