From 8cdc9733d1f6f86c1b564e172f67c75d9c14fe3a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Apr 2025 23:10:19 +0100 Subject: [PATCH 1/3] multiple queue consumer in the same supervisor instance --- apps/supervisor/src/env.ts | 1 + apps/supervisor/src/index.ts | 1 + .../v3/runEngineWorker/supervisor/session.ts | 22 +++++++++++-------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 72498075cd..2ebec63833 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -33,6 +33,7 @@ const Env = z.object({ TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 811ee8746d..a7fc480c7f 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -118,6 +118,7 @@ class ManagedSupervisor { dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS, queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED, maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT, + maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, preDequeue: async () => { if (this.isKubernetes) { diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 747e1dae5e..6cc2bfc8f1 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -18,6 +18,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; + maxConsumerCount?: number; }; export class SupervisorSession extends EventEmitter { @@ -27,7 +28,7 @@ export class SupervisorSession extends EventEmitter { private runNotificationsSocket?: Socket; private readonly queueConsumerEnabled: boolean; - private readonly queueConsumer: RunQueueConsumer; + private readonly queueConsumers: RunQueueConsumer[]; private readonly heartbeat: IntervalService; private readonly heartbeatIntervalSeconds: number; @@ -39,13 +40,15 @@ export class SupervisorSession extends EventEmitter { this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true; this.httpClient = new SupervisorHttpClient(opts); - this.queueConsumer = new RunQueueConsumer({ - client: this.httpClient, - preDequeue: opts.preDequeue, - preSkip: opts.preSkip, - onDequeue: this.onDequeue.bind(this), - intervalMs: opts.dequeueIntervalMs, - maxRunCount: opts.maxRunCount, + this.queueConsumers = Array.from({ length: opts.maxConsumerCount ?? 1 }, () => { + return new RunQueueConsumer({ + client: this.httpClient, + preDequeue: opts.preDequeue, + preSkip: opts.preSkip, + onDequeue: this.onDequeue.bind(this), + intervalMs: opts.dequeueIntervalMs, + maxRunCount: opts.maxRunCount, + }); }); // TODO: This should be dynamic and set by (or at least overridden by) the platform @@ -181,7 +184,7 @@ export class SupervisorSession extends EventEmitter { if (this.queueConsumerEnabled) { console.log("[SupervisorSession] Queue consumer enabled"); - this.queueConsumer.start(); + await Promise.allSettled(this.queueConsumers.map(async (q) => q.start())); this.heartbeat.start(); } else { console.warn("[SupervisorSession] Queue consumer disabled"); @@ -196,6 +199,7 @@ export class SupervisorSession extends EventEmitter { } async stop() { + await Promise.allSettled(this.queueConsumers.map(async (q) => q.stop())); this.heartbeat.stop(); this.runNotificationsSocket?.disconnect(); } From 4b154398982758d09667531e3a64761da83664eb Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Apr 2025 23:11:47 +0100 Subject: [PATCH 2/3] disable pre --- .changeset/pre.json | 37 ------------------------------------- 1 file changed, 37 deletions(-) delete mode 100644 .changeset/pre.json diff --git a/.changeset/pre.json b/.changeset/pre.json deleted file mode 100644 index 3d489a956d..0000000000 --- a/.changeset/pre.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "mode": "pre", - "tag": "v4-beta", - "initialVersions": { - "coordinator": "0.0.1", - "docker-provider": "0.0.1", - "kubernetes-provider": "0.0.1", - "supervisor": "0.0.1", - "webapp": "1.0.0", - "@trigger.dev/build": "3.3.17", - "trigger.dev": "3.3.17", - "@trigger.dev/core": "3.3.17", - "@trigger.dev/python": "3.3.17", - "@trigger.dev/react-hooks": "3.3.17", - "@trigger.dev/redis-worker": "3.3.17", - "@trigger.dev/rsc": "3.3.17", - "@trigger.dev/sdk": "3.3.17" - }, - "changesets": [ - "breezy-turtles-talk", - "four-needles-add", - "green-lions-relate", - "honest-files-decide", - "late-chairs-ring", - "moody-squids-count", - "nice-colts-boil", - "polite-lies-fix", - "red-wasps-cover", - "shiny-kiwis-beam", - "smart-coins-hammer", - "sour-mirrors-accept", - "tiny-buckets-teach", - "tricky-houses-invite", - "two-tigers-dream", - "weak-jobs-hide" - ] -} From 935a9591ef372d6a98f0d21673e15909fa88c91f Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Apr 2025 23:25:07 +0100 Subject: [PATCH 3/3] Revert "disable pre" This reverts commit 4b154398982758d09667531e3a64761da83664eb. --- .changeset/pre.json | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 .changeset/pre.json diff --git a/.changeset/pre.json b/.changeset/pre.json new file mode 100644 index 0000000000..3d489a956d --- /dev/null +++ b/.changeset/pre.json @@ -0,0 +1,37 @@ +{ + "mode": "pre", + "tag": "v4-beta", + "initialVersions": { + "coordinator": "0.0.1", + "docker-provider": "0.0.1", + "kubernetes-provider": "0.0.1", + "supervisor": "0.0.1", + "webapp": "1.0.0", + "@trigger.dev/build": "3.3.17", + "trigger.dev": "3.3.17", + "@trigger.dev/core": "3.3.17", + "@trigger.dev/python": "3.3.17", + "@trigger.dev/react-hooks": "3.3.17", + "@trigger.dev/redis-worker": "3.3.17", + "@trigger.dev/rsc": "3.3.17", + "@trigger.dev/sdk": "3.3.17" + }, + "changesets": [ + "breezy-turtles-talk", + "four-needles-add", + "green-lions-relate", + "honest-files-decide", + "late-chairs-ring", + "moody-squids-count", + "nice-colts-boil", + "polite-lies-fix", + "red-wasps-cover", + "shiny-kiwis-beam", + "smart-coins-hammer", + "sour-mirrors-accept", + "tiny-buckets-teach", + "tricky-houses-invite", + "two-tigers-dream", + "weak-jobs-hide" + ] +}