diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 72498075cd..2ebec63833 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -33,6 +33,7 @@ const Env = z.object({ TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 811ee8746d..a7fc480c7f 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -118,6 +118,7 @@ class ManagedSupervisor { dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS, queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED, maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT, + maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, preDequeue: async () => { if (this.isKubernetes) { diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 747e1dae5e..6cc2bfc8f1 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -18,6 +18,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; + maxConsumerCount?: number; }; export class SupervisorSession extends EventEmitter { @@ -27,7 +28,7 @@ export class SupervisorSession extends EventEmitter { private runNotificationsSocket?: Socket; private readonly queueConsumerEnabled: boolean; - private readonly queueConsumer: RunQueueConsumer; + private readonly queueConsumers: RunQueueConsumer[]; private readonly heartbeat: IntervalService; private readonly heartbeatIntervalSeconds: number; @@ -39,13 +40,15 @@ export class SupervisorSession extends EventEmitter { this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true; this.httpClient = new SupervisorHttpClient(opts); - this.queueConsumer = new RunQueueConsumer({ - client: this.httpClient, - preDequeue: opts.preDequeue, - preSkip: opts.preSkip, - onDequeue: this.onDequeue.bind(this), - intervalMs: opts.dequeueIntervalMs, - maxRunCount: opts.maxRunCount, + this.queueConsumers = Array.from({ length: opts.maxConsumerCount ?? 1 }, () => { + return new RunQueueConsumer({ + client: this.httpClient, + preDequeue: opts.preDequeue, + preSkip: opts.preSkip, + onDequeue: this.onDequeue.bind(this), + intervalMs: opts.dequeueIntervalMs, + maxRunCount: opts.maxRunCount, + }); }); // TODO: This should be dynamic and set by (or at least overridden by) the platform @@ -181,7 +184,7 @@ export class SupervisorSession extends EventEmitter { if (this.queueConsumerEnabled) { console.log("[SupervisorSession] Queue consumer enabled"); - this.queueConsumer.start(); + await Promise.allSettled(this.queueConsumers.map(async (q) => q.start())); this.heartbeat.start(); } else { console.warn("[SupervisorSession] Queue consumer disabled"); @@ -196,6 +199,7 @@ export class SupervisorSession extends EventEmitter { } async stop() { + await Promise.allSettled(this.queueConsumers.map(async (q) => q.stop())); this.heartbeat.stop(); this.runNotificationsSocket?.disconnect(); }