Skip to content

Commit 13f9bed

Browse files
authored
Merge pull request #265 from proto-kit/fix/reduction-flow-halt
Fix/reduction flow halt
2 parents 58d69b9 + dcf6721 commit 13f9bed

File tree

11 files changed

+407
-171
lines changed

11 files changed

+407
-171
lines changed

packages/deployment/src/queue/BullQueue.ts

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import {
55
Closeable,
66
InstantiatedQueue,
77
TaskQueue,
8-
SequencerModule,
8+
AbstractTaskQueue,
99
} from "@proto-kit/sequencer";
1010

11+
import { InstantiatedBullQueue } from "./InstantiatedBullQueue";
12+
1113
export interface BullQueueConfig {
1214
redis: {
1315
host: string;
@@ -23,7 +25,7 @@ export interface BullQueueConfig {
2325
* TaskQueue implementation for BullMQ
2426
*/
2527
export class BullQueue
26-
extends SequencerModule<BullQueueConfig>
28+
extends AbstractTaskQueue<BullQueueConfig>
2729
implements TaskQueue
2830
{
2931
private activePromise?: Promise<void>;
@@ -40,6 +42,8 @@ export class BullQueue
4042
// This is by far not optimal - since it still picks up 1 task per queue but waits until
4143
// computing them, so that leads to bad performance over multiple workers.
4244
// For that we need to restructure tasks to be flowing through a single queue however
45+
46+
// TODO Use worker.pause()
4347
while (this.activePromise !== undefined) {
4448
// eslint-disable-next-line no-await-in-loop
4549
await this.activePromise;
@@ -80,50 +84,18 @@ export class BullQueue
8084
}
8185

8286
public async getQueue(queueName: string): Promise<InstantiatedQueue> {
83-
const { retryAttempts, redis } = this.config;
84-
85-
const queue = new Queue<TaskPayload, TaskPayload>(queueName, {
86-
connection: redis,
87-
});
88-
const events = new QueueEvents(queueName, { connection: redis });
87+
return this.createOrGetQueue(queueName, (name) => {
88+
log.debug(`Creating bull queue ${queueName}`);
8989

90-
await queue.drain();
90+
const { redis } = this.config;
9191

92-
return {
93-
name: queueName,
94-
95-
async addTask(payload: TaskPayload): Promise<{ taskId: string }> {
96-
log.debug("Adding task: ", payload);
97-
const job = await queue.add(queueName, payload, {
98-
attempts: retryAttempts ?? 2,
99-
});
100-
return { taskId: job.id! };
101-
},
92+
const queue = new Queue<TaskPayload, TaskPayload>(queueName, {
93+
connection: redis,
94+
});
95+
const events = new QueueEvents(queueName, { connection: redis });
10296

103-
async onCompleted(listener: (payload: TaskPayload) => Promise<void>) {
104-
events.on("completed", async (result) => {
105-
log.debug("Completed task: ", result);
106-
try {
107-
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
108-
await listener(result.returnvalue as unknown as TaskPayload);
109-
} catch (e) {
110-
// Catch error explicitly since this promise is dangling,
111-
// therefore any error will be voided as well
112-
log.error(e);
113-
}
114-
});
115-
events.on("error", async (error) => {
116-
log.error("Error in worker", error);
117-
});
118-
await events.waitUntilReady();
119-
},
120-
121-
async close(): Promise<void> {
122-
await events.close();
123-
await queue.drain();
124-
await queue.close();
125-
},
126-
};
97+
return new InstantiatedBullQueue(name, queue, events, this.config);
98+
});
12799
}
128100

129101
public async start() {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import {
2+
InstantiatedQueue,
3+
ListenerList,
4+
TaskPayload,
5+
} from "@proto-kit/sequencer";
6+
import { log } from "@proto-kit/common";
7+
import { Queue, QueueEvents } from "bullmq";
8+
9+
export class InstantiatedBullQueue implements InstantiatedQueue {
10+
public constructor(
11+
public readonly name: string,
12+
private readonly queue: Queue,
13+
private readonly events: QueueEvents,
14+
private readonly options: {
15+
retryAttempts?: number;
16+
}
17+
) {}
18+
19+
initialized = false;
20+
21+
listeners = new ListenerList<TaskPayload>();
22+
23+
public async initialize() {
24+
await this.queue.drain();
25+
}
26+
27+
public async addTask(payload: TaskPayload): Promise<{ taskId: string }> {
28+
log.debug("Adding task: ", payload);
29+
const job = await this.queue.add(this.name, payload, {
30+
attempts: this.options.retryAttempts ?? 2,
31+
});
32+
return { taskId: job.id! };
33+
}
34+
35+
async onCompleted(listener: (payload: TaskPayload) => Promise<void>) {
36+
if (!this.initialized) {
37+
await this.events.waitUntilReady();
38+
39+
this.events.on("completed", async (result) => {
40+
log.debug("Completed task: ", result);
41+
try {
42+
await this.listeners.executeListeners(
43+
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
44+
result.returnvalue as unknown as TaskPayload
45+
);
46+
} catch (e) {
47+
// Catch error explicitly since this promise is dangling,
48+
// therefore any error will be voided as well
49+
log.error(e);
50+
}
51+
});
52+
this.events.on("error", async (error) => {
53+
log.error("Error in worker", error);
54+
});
55+
this.initialized = true;
56+
}
57+
58+
return this.listeners.pushListener(listener);
59+
}
60+
61+
async offCompleted(listenerId: number) {
62+
this.listeners.removeListener(listenerId);
63+
}
64+
65+
async close(): Promise<void> {
66+
await this.events.close();
67+
await this.queue.drain();
68+
await this.queue.close();
69+
}
70+
}

packages/sequencer/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ export * from "./worker/flow/JSONTaskSerializer";
1212
// export * from "./worker/queue/BullQueue";
1313
export * from "./worker/queue/TaskQueue";
1414
export * from "./worker/queue/LocalTaskQueue";
15+
export * from "./worker/queue/ListenerList";
16+
export * from "./worker/queue/AbstractTaskQueue";
1517
export * from "./worker/worker/FlowTaskWorker";
1618
export * from "./worker/worker/LocalTaskWorkerModule";
1719
export * from "./worker/worker/TaskWorkerModule";

packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export class ReductionTaskFlow<Input, Output> {
4242
reductionTask: Task<PairTuple<Output>, Output>;
4343
mergableFunction: (a: Output, b: Output) => boolean;
4444
},
45-
private readonly flowCreator: FlowCreator
45+
flowCreator: FlowCreator
4646
) {
4747
this.flow = flowCreator.createFlow<ReductionState<Output>>(options.name, {
4848
numMergesCompleted: 0,
@@ -121,8 +121,10 @@ export class ReductionTaskFlow<Input, Output> {
121121
const { availableReductions, touchedIndizes } =
122122
this.resolveReducibleTasks(flow.state.queue, options.mergableFunction);
123123

124-
// I don't know exactly what this rule wants from me, I suspect
125-
// it complains bcs the function is called forEach
124+
flow.state.queue = flow.state.queue.filter(
125+
(ignored, index) => !touchedIndizes.includes(index)
126+
);
127+
126128
await flow.forEach(availableReductions, async (reduction) => {
127129
const taskParameters: PairTuple<Output> = [reduction.r1, reduction.r2];
128130
await flow.pushTask(
@@ -135,10 +137,6 @@ export class ReductionTaskFlow<Input, Output> {
135137
}
136138
);
137139
});
138-
139-
flow.state.queue = flow.state.queue.filter(
140-
(ignored, index) => !touchedIndizes.includes(index)
141-
);
142140
}
143141
}
144142

packages/sequencer/src/worker/flow/Flow.ts

Lines changed: 30 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { inject, injectable, Lifecycle, scoped } from "tsyringe";
2-
import { log } from "@proto-kit/common";
1+
import { inject, injectable } from "tsyringe";
2+
import { log, mapSequential } from "@proto-kit/common";
33

44
import { Closeable, InstantiatedQueue, TaskQueue } from "../queue/TaskQueue";
55

@@ -12,68 +12,6 @@ const errors = {
1212
),
1313
};
1414

15-
@injectable()
16-
// ResolutionScoped => We want a new instance every time we resolve it
17-
@scoped(Lifecycle.ResolutionScoped)
18-
export class ConnectionHolder implements Closeable {
19-
private queues: {
20-
[key: string]: InstantiatedQueue;
21-
} = {};
22-
23-
private listeners: {
24-
[key: string]: {
25-
[key: string]: (payload: TaskPayload) => Promise<void>;
26-
};
27-
} = {};
28-
29-
public constructor(
30-
@inject("TaskQueue") private readonly queueImpl: TaskQueue
31-
) {}
32-
33-
public registerListener(
34-
flowId: string,
35-
queue: string,
36-
listener: (payload: TaskPayload) => Promise<void>
37-
) {
38-
if (this.listeners[queue] === undefined) {
39-
this.listeners[queue] = {};
40-
}
41-
this.listeners[queue][flowId] = listener;
42-
}
43-
44-
public unregisterListener(flowId: string, queue: string) {
45-
delete this.listeners[queue][flowId];
46-
}
47-
48-
private async openQueue(name: string): Promise<InstantiatedQueue> {
49-
const queue = await this.queueImpl.getQueue(name);
50-
await queue.onCompleted(async (payload) => {
51-
await this.onCompleted(name, payload);
52-
});
53-
return queue;
54-
}
55-
56-
private async onCompleted(name: string, payload: TaskPayload) {
57-
const listener = this.listeners[name]?.[payload.flowId];
58-
if (listener !== undefined) {
59-
await listener(payload);
60-
}
61-
}
62-
63-
public async getQueue(name: string) {
64-
if (this.queues[name] !== undefined) {
65-
return this.queues[name];
66-
}
67-
const queue = await this.openQueue(name);
68-
this.queues[name] = queue;
69-
return queue;
70-
}
71-
72-
async close() {
73-
// TODO
74-
}
75-
}
76-
7715
interface CompletedCallback<Input, Result> {
7816
(result: Result, originalInput: Input): Promise<any>;
7917
}
@@ -83,7 +21,10 @@ export class Flow<State> implements Closeable {
8321
// therefore cancelled
8422
private erroredOut = false;
8523

86-
private readonly registeredListeners: string[] = [];
24+
private readonly registeredListeners: {
25+
queueName: string;
26+
listenerId: number;
27+
}[] = [];
8728

8829
private resultsPending: {
8930
[key: string]: (payload: TaskPayload) => Promise<void>;
@@ -98,28 +39,28 @@ export class Flow<State> implements Closeable {
9839
public tasksInProgress = 0;
9940

10041
public constructor(
101-
private readonly connectionHolder: ConnectionHolder,
42+
private readonly queueImpl: TaskQueue,
10243
public readonly flowId: string,
10344
public state: State
10445
) {}
10546

106-
private waitForResult(
107-
queue: string,
47+
private async waitForResult(
48+
queue: InstantiatedQueue,
10849
taskId: string,
10950
callback: (payload: TaskPayload) => Promise<void>
11051
) {
11152
this.resultsPending[taskId] = callback;
11253

113-
if (!this.registeredListeners.includes(queue)) {
114-
// Open Listener onto Connectionhandler
115-
this.connectionHolder.registerListener(
116-
this.flowId,
117-
queue,
118-
async (payload) => {
54+
if (!this.registeredListeners.find((l) => l.queueName === queue.name)) {
55+
const listenerId = await queue.onCompleted(async (payload) => {
56+
if (payload.flowId === this.flowId) {
11957
await this.resolveResponse(payload);
12058
}
121-
);
122-
this.registeredListeners.push(queue);
59+
});
60+
this.registeredListeners.push({
61+
queueName: queue.name,
62+
listenerId,
63+
});
12364
}
12465
}
12566

@@ -167,7 +108,7 @@ export class Flow<State> implements Closeable {
167108
): Promise<void> {
168109
const queueName = task.name;
169110
const taskName = overrides?.taskName ?? task.name;
170-
const queue = await this.connectionHolder.getQueue(queueName);
111+
const queue = await this.queueImpl.getQueue(queueName);
171112

172113
const payload = await task.inputSerializer().toJSON(input);
173114

@@ -197,7 +138,7 @@ export class Flow<State> implements Closeable {
197138
this.tasksInProgress -= 1;
198139
return await completed?.(decoded, input);
199140
};
200-
this.waitForResult(queueName, taskId, callback);
141+
await this.waitForResult(queue, taskId, callback);
201142
}
202143

203144
public async forEach<Type>(
@@ -222,17 +163,23 @@ export class Flow<State> implements Closeable {
222163
}
223164

224165
public async close() {
225-
this.registeredListeners.forEach((queue) => {
226-
this.connectionHolder.unregisterListener(this.flowId, queue);
227-
});
166+
await mapSequential(
167+
this.registeredListeners,
168+
async ({ queueName, listenerId }) => {
169+
const queue = await this.queueImpl.getQueue(queueName);
170+
queue.offCompleted(listenerId);
171+
}
172+
);
228173
}
229174
}
230175

231176
@injectable()
232177
export class FlowCreator {
233-
public constructor(private readonly connectionHolder: ConnectionHolder) {}
178+
public constructor(
179+
@inject("TaskQueue") private readonly queueImpl: TaskQueue
180+
) {}
234181

235182
public createFlow<State>(flowId: string, state: State): Flow<State> {
236-
return new Flow(this.connectionHolder, flowId, state);
183+
return new Flow(this.queueImpl, flowId, state);
237184
}
238185
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { SequencerModule } from "../../sequencer/builder/SequencerModule";
2+
3+
import type { InstantiatedQueue } from "./TaskQueue";
4+
5+
export abstract class AbstractTaskQueue<
6+
Config,
7+
> extends SequencerModule<Config> {
8+
protected queues: Record<string, InstantiatedQueue> = {};
9+
10+
protected createOrGetQueue(
11+
name: string,
12+
creator: (name: string) => InstantiatedQueue
13+
): InstantiatedQueue {
14+
if (this.queues[name] === undefined) {
15+
this.queues[name] = creator(name);
16+
}
17+
return this.queues[name];
18+
}
19+
}

0 commit comments

Comments
 (0)