Skip to content

Commit 045455f

Browse files
committed
Draining all task queues in BullQueue.start()
1 parent 19c7716 commit 045455f

File tree

2 files changed

+3
-9
lines changed

2 files changed

+3
-9
lines changed

packages/deployment/src/queue/BullQueue.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { MetricsTime, Queue, QueueEvents, Worker } from "bullmq";
2-
import { log, noop } from "@proto-kit/common";
2+
import { log } from "@proto-kit/common";
33
import {
44
TaskPayload,
55
Closeable,
@@ -102,7 +102,8 @@ export class BullQueue
102102
}
103103

104104
public async start() {
105-
noop();
105+
// Drain all queues to clear stale tasks from previous sequencer instances
106+
await this.drainAllQueues();
106107
}
107108

108109
public async close() {

packages/sequencer/src/sequencer/executor/Sequencer.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { SequencerModule } from "../builder/SequencerModule";
2222
import { Closeable } from "../builder/Closeable";
2323
import { ConsoleTracingFactory } from "../../logging/ConsoleTracingFactory";
2424
import { StartableModule } from "../builder/StartableModule";
25-
import { TaskQueue } from "../../worker/queue/TaskQueue";
2625

2726
import { Sequenceable } from "./Sequenceable";
2827

@@ -86,12 +85,6 @@ export class Sequencer<Modules extends SequencerModulesRecord>
8685

8786
this.useDependencyFactory(MethodIdFactory);
8887

89-
// Drain all task queues to clear stale tasks from previous sequencer instances
90-
if (this.container.isRegistered("TaskQueue")) {
91-
const taskQueue = this.container.resolve<TaskQueue>("TaskQueue");
92-
await taskQueue.drainAllQueues();
93-
}
94-
9588
// Log startup info
9689
const moduleClassNames = Object.values(this.definition).map(
9790
(clazz) => clazz.name

0 commit comments

Comments
 (0)