Skip to content

Commit 8e112ff

Browse files
committed
draining queues in sequencer.start
1 parent e80d0fb commit 8e112ff

File tree

7 files changed

+10431
-0
lines changed

7 files changed

+10431
-0
lines changed

packages/deployment/src/queue/InstantiatedBullQueue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ export class InstantiatedBullQueue implements InstantiatedQueue {
6262
this.listeners.removeListener(listenerId);
6363
}
6464

65+
async drain() {
66+
await this.queue.drain();
67+
}
68+
6569
async close(): Promise<void> {
6670
await this.events.close();
6771
await this.queue.drain();

packages/indexer/test/IndexerNotifier.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ describe.skip("IndexerNotifier", () => {
166166
addTask: addTaskSpy,
167167
onCompleted: jest.fn(async () => 5),
168168
close: jest.fn(async () => {}),
169+
drain: jest.fn(async () => {}),
169170
};
170171
});
171172

packages/sequencer/src/sequencer/executor/Sequencer.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { SequencerModule } from "../builder/SequencerModule";
2323
import { Closeable } from "../builder/Closeable";
2424
import { ConsoleTracingFactory } from "../../logging/ConsoleTracingFactory";
2525
import { StartableModule } from "../builder/StartableModule";
26+
import { TaskQueue } from "../../worker/queue/TaskQueue";
2627

2728
import { Sequenceable } from "./Sequenceable";
2829

@@ -89,6 +90,12 @@ export class Sequencer<Modules extends SequencerModulesRecord>
8990

9091
this.useDependencyFactory(MethodIdFactory);
9192

93+
// Drain all task queues to clear stale tasks from previous sequencer instances
94+
if (this.container.isRegistered("TaskQueue")) {
95+
const taskQueue = this.container.resolve<TaskQueue>("TaskQueue");
96+
await taskQueue.drainAllQueues();
97+
}
98+
9299
// Log startup info
93100
const moduleClassNames = Object.values(this.definition).map(
94101
(clazz) => clazz.name

packages/sequencer/src/worker/queue/AbstractTaskQueue.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,10 @@ export abstract class AbstractTaskQueue<
2222
Object.values(this.queues).map(async (queue) => await queue.close())
2323
);
2424
}
25+
26+
public async drainAllQueues(): Promise<void> {
27+
await Promise.all(
28+
Object.values(this.queues).map(async (queue) => await queue.drain())
29+
);
30+
}
2531
}

packages/sequencer/src/worker/queue/LocalTaskQueue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class InMemoryInstantiatedQueue implements InstantiatedQueue {
6868
async close() {
6969
noop();
7070
}
71+
72+
async drain() {
73+
this.taskQueue.queuedTasks[this.name] = [];
74+
}
7175
}
7276

7377
@sequencerModule()

packages/sequencer/src/worker/queue/TaskQueue.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ export interface TaskQueue {
1313
executor: (data: TaskPayload) => Promise<TaskPayload>,
1414
options?: { concurrency?: number }
1515
) => Closeable;
16+
17+
drainAllQueues: () => Promise<void>;
1618
}
1719
/**
1820
* Object that abstracts a concrete connection to a queue instance.
@@ -36,4 +38,9 @@ export interface InstantiatedQueue extends Closeable {
3638
) => Promise<number>;
3739

3840
offCompleted: (listenerId: number) => void;
41+
42+
/**
43+
* Drains the queue to clear stale tasks
44+
*/
45+
drain: () => Promise<void>;
3946
}

0 commit comments

Comments
 (0)