diff --git a/packages/deployment/src/queue/BullQueue.ts b/packages/deployment/src/queue/BullQueue.ts index bd884670a..7b8188aa5 100644 --- a/packages/deployment/src/queue/BullQueue.ts +++ b/packages/deployment/src/queue/BullQueue.ts @@ -5,9 +5,11 @@ import { Closeable, InstantiatedQueue, TaskQueue, - SequencerModule, + AbstractTaskQueue, } from "@proto-kit/sequencer"; +import { InstantiatedBullQueue } from "./InstantiatedBullQueue"; + export interface BullQueueConfig { redis: { host: string; @@ -23,7 +25,7 @@ export interface BullQueueConfig { * TaskQueue implementation for BullMQ */ export class BullQueue - extends SequencerModule + extends AbstractTaskQueue implements TaskQueue { private activePromise?: Promise; @@ -40,6 +42,8 @@ export class BullQueue // This is by far not optimal - since it still picks up 1 task per queue but waits until // computing them, so that leads to bad performance over multiple workers. // For that we need to restructure tasks to be flowing through a single queue however + + // TODO Use worker.pause() while (this.activePromise !== undefined) { // eslint-disable-next-line no-await-in-loop await this.activePromise; @@ -80,50 +84,18 @@ export class BullQueue } public async getQueue(queueName: string): Promise { - const { retryAttempts, redis } = this.config; - - const queue = new Queue(queueName, { - connection: redis, - }); - const events = new QueueEvents(queueName, { connection: redis }); + return this.createOrGetQueue(queueName, (name) => { + log.debug(`Creating bull queue ${queueName}`); - await queue.drain(); + const { redis } = this.config; - return { - name: queueName, - - async addTask(payload: TaskPayload): Promise<{ taskId: string }> { - log.debug("Adding task: ", payload); - const job = await queue.add(queueName, payload, { - attempts: retryAttempts ?? 2, - }); - return { taskId: job.id! }; - }, + const queue = new Queue(queueName, { + connection: redis, + }); + const events = new QueueEvents(queueName, { connection: redis }); - async onCompleted(listener: (payload: TaskPayload) => Promise) { - events.on("completed", async (result) => { - log.debug("Completed task: ", result); - try { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - await listener(result.returnvalue as unknown as TaskPayload); - } catch (e) { - // Catch error explicitly since this promise is dangling, - // therefore any error will be voided as well - log.error(e); - } - }); - events.on("error", async (error) => { - log.error("Error in worker", error); - }); - await events.waitUntilReady(); - }, - - async close(): Promise { - await events.close(); - await queue.drain(); - await queue.close(); - }, - }; + return new InstantiatedBullQueue(name, queue, events, this.config); + }); } public async start() { diff --git a/packages/deployment/src/queue/InstantiatedBullQueue.ts b/packages/deployment/src/queue/InstantiatedBullQueue.ts new file mode 100644 index 000000000..65a31c73d --- /dev/null +++ b/packages/deployment/src/queue/InstantiatedBullQueue.ts @@ -0,0 +1,70 @@ +import { + InstantiatedQueue, + ListenerList, + TaskPayload, +} from "@proto-kit/sequencer"; +import { log } from "@proto-kit/common"; +import { Queue, QueueEvents } from "bullmq"; + +export class InstantiatedBullQueue implements InstantiatedQueue { + public constructor( + public readonly name: string, + private readonly queue: Queue, + private readonly events: QueueEvents, + private readonly options: { + retryAttempts?: number; + } + ) {} + + initialized = false; + + listeners = new ListenerList(); + + public async initialize() { + await this.queue.drain(); + } + + public async addTask(payload: TaskPayload): Promise<{ taskId: string }> { + log.debug("Adding task: ", payload); + const job = await this.queue.add(this.name, payload, { + attempts: this.options.retryAttempts ?? 2, + }); + return { taskId: job.id! }; + } + + async onCompleted(listener: (payload: TaskPayload) => Promise) { + if (!this.initialized) { + await this.events.waitUntilReady(); + + this.events.on("completed", async (result) => { + log.debug("Completed task: ", result); + try { + await this.listeners.executeListeners( + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + result.returnvalue as unknown as TaskPayload + ); + } catch (e) { + // Catch error explicitly since this promise is dangling, + // therefore any error will be voided as well + log.error(e); + } + }); + this.events.on("error", async (error) => { + log.error("Error in worker", error); + }); + this.initialized = true; + } + + return this.listeners.pushListener(listener); + } + + async offCompleted(listenerId: number) { + this.listeners.removeListener(listenerId); + } + + async close(): Promise { + await this.events.close(); + await this.queue.drain(); + await this.queue.close(); + } +} diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index 99d76db2a..3e03702ab 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -12,6 +12,8 @@ export * from "./worker/flow/JSONTaskSerializer"; // export * from "./worker/queue/BullQueue"; export * from "./worker/queue/TaskQueue"; export * from "./worker/queue/LocalTaskQueue"; +export * from "./worker/queue/ListenerList"; +export * from "./worker/queue/AbstractTaskQueue"; export * from "./worker/worker/FlowTaskWorker"; export * from "./worker/worker/LocalTaskWorkerModule"; export * from "./worker/worker/TaskWorkerModule"; diff --git a/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts b/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts index 63a5b720b..95f28ba15 100644 --- a/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts @@ -42,7 +42,7 @@ export class ReductionTaskFlow { reductionTask: Task, Output>; mergableFunction: (a: Output, b: Output) => boolean; }, - private readonly flowCreator: FlowCreator + flowCreator: FlowCreator ) { this.flow = flowCreator.createFlow>(options.name, { numMergesCompleted: 0, @@ -121,8 +121,10 @@ export class ReductionTaskFlow { const { availableReductions, touchedIndizes } = this.resolveReducibleTasks(flow.state.queue, options.mergableFunction); - // I don't know exactly what this rule wants from me, I suspect - // it complains bcs the function is called forEach + flow.state.queue = flow.state.queue.filter( + (ignored, index) => !touchedIndizes.includes(index) + ); + await flow.forEach(availableReductions, async (reduction) => { const taskParameters: PairTuple = [reduction.r1, reduction.r2]; await flow.pushTask( @@ -135,10 +137,6 @@ export class ReductionTaskFlow { } ); }); - - flow.state.queue = flow.state.queue.filter( - (ignored, index) => !touchedIndizes.includes(index) - ); } } diff --git a/packages/sequencer/src/worker/flow/Flow.ts b/packages/sequencer/src/worker/flow/Flow.ts index f299c0ef7..985fc2df9 100644 --- a/packages/sequencer/src/worker/flow/Flow.ts +++ b/packages/sequencer/src/worker/flow/Flow.ts @@ -1,5 +1,5 @@ -import { inject, injectable, Lifecycle, scoped } from "tsyringe"; -import { log } from "@proto-kit/common"; +import { inject, injectable } from "tsyringe"; +import { log, mapSequential } from "@proto-kit/common"; import { Closeable, InstantiatedQueue, TaskQueue } from "../queue/TaskQueue"; @@ -12,68 +12,6 @@ const errors = { ), }; -@injectable() -// ResolutionScoped => We want a new instance every time we resolve it -@scoped(Lifecycle.ResolutionScoped) -export class ConnectionHolder implements Closeable { - private queues: { - [key: string]: InstantiatedQueue; - } = {}; - - private listeners: { - [key: string]: { - [key: string]: (payload: TaskPayload) => Promise; - }; - } = {}; - - public constructor( - @inject("TaskQueue") private readonly queueImpl: TaskQueue - ) {} - - public registerListener( - flowId: string, - queue: string, - listener: (payload: TaskPayload) => Promise - ) { - if (this.listeners[queue] === undefined) { - this.listeners[queue] = {}; - } - this.listeners[queue][flowId] = listener; - } - - public unregisterListener(flowId: string, queue: string) { - delete this.listeners[queue][flowId]; - } - - private async openQueue(name: string): Promise { - const queue = await this.queueImpl.getQueue(name); - await queue.onCompleted(async (payload) => { - await this.onCompleted(name, payload); - }); - return queue; - } - - private async onCompleted(name: string, payload: TaskPayload) { - const listener = this.listeners[name]?.[payload.flowId]; - if (listener !== undefined) { - await listener(payload); - } - } - - public async getQueue(name: string) { - if (this.queues[name] !== undefined) { - return this.queues[name]; - } - const queue = await this.openQueue(name); - this.queues[name] = queue; - return queue; - } - - async close() { - // TODO - } -} - interface CompletedCallback { (result: Result, originalInput: Input): Promise; } @@ -83,7 +21,10 @@ export class Flow implements Closeable { // therefore cancelled private erroredOut = false; - private readonly registeredListeners: string[] = []; + private readonly registeredListeners: { + queueName: string; + listenerId: number; + }[] = []; private resultsPending: { [key: string]: (payload: TaskPayload) => Promise; @@ -98,28 +39,28 @@ export class Flow implements Closeable { public tasksInProgress = 0; public constructor( - private readonly connectionHolder: ConnectionHolder, + private readonly queueImpl: TaskQueue, public readonly flowId: string, public state: State ) {} - private waitForResult( - queue: string, + private async waitForResult( + queue: InstantiatedQueue, taskId: string, callback: (payload: TaskPayload) => Promise ) { this.resultsPending[taskId] = callback; - if (!this.registeredListeners.includes(queue)) { - // Open Listener onto Connectionhandler - this.connectionHolder.registerListener( - this.flowId, - queue, - async (payload) => { + if (!this.registeredListeners.find((l) => l.queueName === queue.name)) { + const listenerId = await queue.onCompleted(async (payload) => { + if (payload.flowId === this.flowId) { await this.resolveResponse(payload); } - ); - this.registeredListeners.push(queue); + }); + this.registeredListeners.push({ + queueName: queue.name, + listenerId, + }); } } @@ -167,7 +108,7 @@ export class Flow implements Closeable { ): Promise { const queueName = task.name; const taskName = overrides?.taskName ?? task.name; - const queue = await this.connectionHolder.getQueue(queueName); + const queue = await this.queueImpl.getQueue(queueName); const payload = await task.inputSerializer().toJSON(input); @@ -197,7 +138,7 @@ export class Flow implements Closeable { this.tasksInProgress -= 1; return await completed?.(decoded, input); }; - this.waitForResult(queueName, taskId, callback); + await this.waitForResult(queue, taskId, callback); } public async forEach( @@ -222,17 +163,23 @@ export class Flow implements Closeable { } public async close() { - this.registeredListeners.forEach((queue) => { - this.connectionHolder.unregisterListener(this.flowId, queue); - }); + await mapSequential( + this.registeredListeners, + async ({ queueName, listenerId }) => { + const queue = await this.queueImpl.getQueue(queueName); + queue.offCompleted(listenerId); + } + ); } } @injectable() export class FlowCreator { - public constructor(private readonly connectionHolder: ConnectionHolder) {} + public constructor( + @inject("TaskQueue") private readonly queueImpl: TaskQueue + ) {} public createFlow(flowId: string, state: State): Flow { - return new Flow(this.connectionHolder, flowId, state); + return new Flow(this.queueImpl, flowId, state); } } diff --git a/packages/sequencer/src/worker/queue/AbstractTaskQueue.ts b/packages/sequencer/src/worker/queue/AbstractTaskQueue.ts new file mode 100644 index 000000000..d2816998d --- /dev/null +++ b/packages/sequencer/src/worker/queue/AbstractTaskQueue.ts @@ -0,0 +1,19 @@ +import { SequencerModule } from "../../sequencer/builder/SequencerModule"; + +import type { InstantiatedQueue } from "./TaskQueue"; + +export abstract class AbstractTaskQueue< + Config, +> extends SequencerModule { + protected queues: Record = {}; + + protected createOrGetQueue( + name: string, + creator: (name: string) => InstantiatedQueue + ): InstantiatedQueue { + if (this.queues[name] === undefined) { + this.queues[name] = creator(name); + } + return this.queues[name]; + } +} diff --git a/packages/sequencer/src/worker/queue/ListenerList.ts b/packages/sequencer/src/worker/queue/ListenerList.ts new file mode 100644 index 000000000..3b497a7e2 --- /dev/null +++ b/packages/sequencer/src/worker/queue/ListenerList.ts @@ -0,0 +1,37 @@ +import { mapSequential } from "@proto-kit/common"; + +export class ListenerList { + private listenerId: number = 0; + + private listeners: { + listener: (payload: T) => Promise; + id: number; + }[] = []; + + public getListeners() { + return this.listeners.slice(); + } + + public async executeListeners(payload: T) { + await mapSequential( + this.getListeners(), + async (listener) => await listener.listener(payload) + ); + } + + public pushListener(listener: (payload: T) => Promise) { + // eslint-disable-next-line no-plusplus + const id = this.listenerId++; + + this.listeners.push({ + listener, + id, + }); + + return id; + } + + public removeListener(listenerId: number) { + this.listeners = this.listeners.filter(({ id }) => id !== listenerId); + } +} diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index 57725b4f8..85e11db88 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -1,9 +1,11 @@ import { log, mapSequential, noop } from "@proto-kit/common"; -import { SequencerModule } from "../../sequencer/builder/SequencerModule"; +import { sequencerModule } from "../../sequencer/builder/SequencerModule"; import { TaskPayload } from "../flow/Task"; import { Closeable, InstantiatedQueue, TaskQueue } from "./TaskQueue"; +import { ListenerList } from "./ListenerList"; +import { AbstractTaskQueue } from "./AbstractTaskQueue"; async function sleep(ms: number) { await new Promise((resolve) => { @@ -20,11 +22,59 @@ export interface LocalTaskQueueConfig { simulatedDuration?: number; } +class InMemoryInstantiatedQueue implements InstantiatedQueue { + public constructor( + public readonly name: string, + public taskQueue: LocalTaskQueue + ) {} + + private id = 0; + + private instantiated = false; + + private listeners = new ListenerList(); + + async addTask( + payload: TaskPayload, + taskId?: string + ): Promise<{ taskId: string }> { + this.id += 1; + const nextId = taskId ?? String(this.id).toString(); + this.taskQueue.queuedTasks[this.name].push({ payload, taskId: nextId }); + + void this.taskQueue.workNextTasks(); + + return { taskId: nextId }; + } + + async onCompleted( + listener: (payload: TaskPayload) => Promise + ): Promise { + if (!this.instantiated) { + (this.taskQueue.listeners[this.name] ??= []).push(async (result) => { + await this.listeners.executeListeners(result); + }); + + this.instantiated = false; + } + return this.listeners.pushListener(listener); + } + + async offCompleted(listenerId: number) { + this.listeners.removeListener(listenerId); + } + + async close() { + noop(); + } +} + +@sequencerModule() export class LocalTaskQueue - extends SequencerModule + extends AbstractTaskQueue implements TaskQueue { - public queues: { + public queuedTasks: { [key: string]: { payload: TaskPayload; taskId: string }[]; } = {}; @@ -51,7 +101,7 @@ export class LocalTaskQueue this.taskInProgress = true; // Collect all tasks - const tasksToExecute = Object.entries(this.queues).flatMap( + const tasksToExecute = Object.entries(this.queuedTasks).flatMap( ([queueName, tasks]) => { if (tasks.length > 0 && this.workers[queueName]) { const functions = tasks.map((task) => async () => { @@ -73,7 +123,7 @@ export class LocalTaskQueue ); void Promise.all(listenerPromises || []); }); - this.queues[queueName] = []; + this.queuedTasks[queueName] = []; return functions; } @@ -133,36 +183,10 @@ export class LocalTaskQueue } public async getQueue(queueName: string): Promise { - this.queues[queueName] = []; - - let id = 0; - - return { - name: queueName, - - addTask: async ( - payload: TaskPayload, - taskId?: string - ): Promise<{ taskId: string }> => { - id += 1; - const nextId = taskId ?? String(id).toString(); - this.queues[queueName].push({ payload, taskId: nextId }); - - void this.workNextTasks(); - - return { taskId: nextId }; - }, - - onCompleted: async ( - listener: (payload: TaskPayload) => Promise - ): Promise => { - (this.listeners[queueName] ??= []).push(listener); - }, - - close: async () => { - noop(); - }, - }; + return this.createOrGetQueue(queueName, (name) => { + this.queuedTasks[name] = []; + return new InMemoryInstantiatedQueue(name, this); + }); } public async start(): Promise { diff --git a/packages/sequencer/src/worker/queue/TaskQueue.ts b/packages/sequencer/src/worker/queue/TaskQueue.ts index af2cf289d..c3040b3a1 100644 --- a/packages/sequencer/src/worker/queue/TaskQueue.ts +++ b/packages/sequencer/src/worker/queue/TaskQueue.ts @@ -37,5 +37,7 @@ export interface InstantiatedQueue extends Closeable { */ onCompleted: ( listener: (payload: TaskPayload) => Promise - ) => Promise; + ) => Promise; + + offCompleted: (listenerId: number) => void; } diff --git a/packages/sequencer/test-integration/workers/workers-proven.test.ts b/packages/sequencer/test-integration/workers/workers-proven.test.ts index e9368bc16..16f60586f 100644 --- a/packages/sequencer/test-integration/workers/workers-proven.test.ts +++ b/packages/sequencer/test-integration/workers/workers-proven.test.ts @@ -26,6 +26,8 @@ import { ChildProcessWorker } from "./ChildProcessWorker"; const timeout = 300000; +const proofsEnabled = false; + describe("worker-proven", () => { describe("sequencer", () => { let test: BlockTestService; @@ -87,7 +89,7 @@ describe("worker-proven", () => { try { // Start AppChain const childContainer = container.createChildContainer(); - await app.start(false, childContainer); + await app.start(proofsEnabled, childContainer); test = app.sequencer.dependencyContainer.resolve(BlockTestService); @@ -124,10 +126,33 @@ describe("worker-proven", () => { console.log(batch.proof); - expect(batch.proof.proof.length).toBeGreaterThan(50); + expect(batch.proof.proof.length).toBeGreaterThan( + proofsEnabled ? 50 : 0 + ); expect(batch.blockHashes).toHaveLength(1); }, timeout ); + + it.each([5, 14, 20])( + "should produce a batch of a %s of blocks", + async (numBlocks) => { + for (let i = 0; i < numBlocks; i++) { + await test.produceBlock(); + } + + const batch = await test.produceBatch(); + + expectDefined(batch); + + console.log(batch.proof); + + expect(batch.proof.proof.length).toBeGreaterThan( + proofsEnabled ? 50 : 0 + ); + expect(batch.blockHashes).toHaveLength(numBlocks); + }, + timeout + ); }); }); diff --git a/packages/sequencer/test/protocol/production/flow/ReductionTaskFlow.test.ts b/packages/sequencer/test/protocol/production/flow/ReductionTaskFlow.test.ts new file mode 100644 index 000000000..28da2fc44 --- /dev/null +++ b/packages/sequencer/test/protocol/production/flow/ReductionTaskFlow.test.ts @@ -0,0 +1,140 @@ +import "reflect-metadata"; +import { container, DependencyContainer } from "tsyringe"; +import { noop, sleep } from "@proto-kit/common"; + +import { + FlowCreator, + FlowTaskWorker, + JSONTaskSerializer, + LocalTaskQueue, + PairTuple, + ReductionTaskFlow, + Task, + TaskSerializer, + TaskWorkerModule, +} from "../../../../src"; + +type IndexNumber = { + index: number; + value: number; +}; + +type RangeSum = { + from: number; + to: number; + value: number; +}; + +class PairedMulTask + extends TaskWorkerModule + implements Task, RangeSum> +{ + public name = "sum"; + + public inputSerializer(): TaskSerializer> { + return JSONTaskSerializer.fromType>(); + } + + public resultSerializer(): TaskSerializer { + return JSONTaskSerializer.fromType(); + } + + public async compute([a, b]: PairTuple): Promise { + return { + from: a.from, + to: b.to, + value: a.value + b.value, + }; + } + + public async prepare(): Promise { + noop(); + } +} + +class NumberIdentityTask + extends TaskWorkerModule + implements Task +{ + public name = "numberIdentity"; + + public inputSerializer(): TaskSerializer { + return JSONTaskSerializer.fromType(); + } + + public resultSerializer(): TaskSerializer { + return JSONTaskSerializer.fromType(); + } + + public async compute(input: IndexNumber): Promise { + return { + from: input.index, + to: input.index + 1, + value: input.value, + }; + } + + public async prepare(): Promise { + noop(); + } +} + +describe("ReductionTaskFlow", () => { + let di: DependencyContainer; + beforeAll(async () => { + di = container.createChildContainer(); + + const queue = new LocalTaskQueue(); + queue.config = {}; + + di.register("TaskQueue", { + useValue: queue, + }); + + const worker = new FlowTaskWorker(di.resolve("TaskQueue"), [ + di.resolve(NumberIdentityTask), + di.resolve(PairedMulTask), + ]); + await worker.start(); + }); + + it("regressions - should work for parallel result stream", async () => { + expect.assertions(1); + + const creator = di.resolve(FlowCreator); + const flow = new ReductionTaskFlow( + { + inputLength: 5, + mappingTask: di.resolve(NumberIdentityTask), + reductionTask: di.resolve(PairedMulTask), + name: "test", + mergableFunction: (a, b) => { + return a.to === b.from; + }, + }, + creator + ); + + // eslint-disable-next-line no-async-promise-executor + const result = await new Promise(async (res) => { + flow.onCompletion(async (output) => res(output)); + + await flow.pushInput({ index: 0, value: 1 }); + await flow.pushInput({ index: 1, value: 2 }); + await flow.pushInput({ index: 2, value: 3 }); + + await sleep(100); + + await flow.pushInput({ index: 3, value: 4 }); + await flow.pushInput({ index: 4, value: 0 }); + }); + + const expected: RangeSum = { + from: 0, + to: 5, + value: 1 + 2 + 3 + 4, + }; + + expect(result).toStrictEqual(expected); + }, 1000000); +});