diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index ec12c19f1d..81cbb0379c 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -42,6 +42,7 @@ import { RunQueueKeyProducer, RunQueueSelectionStrategy, } from "./types.js"; +import { WorkerQueueResolver } from "./workerQueueResolver.js"; const SemanticAttributes = { QUEUE: "runqueue.queue", @@ -169,6 +170,7 @@ export class RunQueue { private shardCount: number; private abortController: AbortController; private worker: Worker; + private workerQueueResolver: WorkerQueueResolver; private _observableWorkerQueues: Set = new Set(); private _meter: Meter; private _queueCooloffStates: Map = new Map(); @@ -185,6 +187,8 @@ export class RunQueue { }, }); this.logger = options.logger ?? new Logger("RunQueue", options.logLevel ?? "info"); + + this.workerQueueResolver = new WorkerQueueResolver({ logger: this.logger }); this._meter = options.meter ?? getMeter("run-queue"); const workerQueueObservableGauge = this._meter.createObservableGauge( @@ -1845,19 +1849,8 @@ export class RunQueue { ); } - #getWorkerQueueFromMessage(message: OutputPayload) { - if (message.version === "2") { - return message.workerQueue; - } - - // In v2, if the environment is development, the worker queue is the environment id. - if (message.environmentType === "DEVELOPMENT") { - return message.environmentId; - } - - // In v1, the master queue is something like us-nyc-3, - // which in v2 is the worker queue. - return message.masterQueues[0]; + #getWorkerQueueFromMessage(message: OutputPayload): string { + return this.workerQueueResolver.getWorkerQueueFromMessage(message); } #createBlockingDequeueClient() { diff --git a/internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts b/internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts new file mode 100644 index 0000000000..3ea8f0ce71 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/workerQueueResolver.test.ts @@ -0,0 +1,484 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { Logger } from "@trigger.dev/core/logger"; +import { WorkerQueueResolver, type WorkerQueueOverrides } from "../workerQueueResolver.js"; +import { OutputPayload, OutputPayloadV1, OutputPayloadV2 } from "../types.js"; +import { RuntimeEnvironmentType } from "@trigger.dev/core/v3"; + +vi.setConfig({ testTimeout: 5_000 }); + +describe("WorkerQueueOverrideResolver", () => { + const createTestMessage = (overrides?: Partial): OutputPayloadV2 => ({ + version: "2", + runId: "run_123", + taskIdentifier: "task_123", + orgId: "org_123", + projectId: "proj_123", + environmentId: "env_123", + environmentType: RuntimeEnvironmentType.PRODUCTION, + queue: "test-queue", + timestamp: Date.now(), + attempt: 0, + workerQueue: "default-queue", + ...overrides, + }); + + describe("No overrides", () => { + it("should return original workerQueue when no overrides are set", () => { + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger }); + const message = createTestMessage({ workerQueue: "original-queue" }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("original-queue"); + }); + }); + + describe("Environment ID overrides", () => { + it("should override based on environmentId", () => { + const overrideConfig = JSON.stringify({ + environmentId: { + env_special: "special-env-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage({ + environmentId: "env_special", + workerQueue: "original-queue", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("special-env-queue"); + }); + + it("should not override when environmentId doesn't match", () => { + const overrideConfig = JSON.stringify({ + environmentId: { + env_other: "other-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage({ + environmentId: "env_123", + workerQueue: "original-queue", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("original-queue"); + }); + }); + + describe("Project ID overrides", () => { + it("should override based on projectId", () => { + const overrideConfig = JSON.stringify({ + projectId: { + proj_special: "special-project-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage({ + projectId: "proj_special", + workerQueue: "original-queue", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("special-project-queue"); + }); + }); + + describe("Organization ID overrides", () => { + it("should override based on orgId", () => { + const overrideConfig = JSON.stringify({ + orgId: { + org_special: "special-org-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage({ + orgId: "org_special", + workerQueue: "original-queue", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("special-org-queue"); + }); + }); + + describe("Worker Queue overrides", () => { + it("should override based on workerQueue", () => { + const overrideConfig = JSON.stringify({ + workerQueue: { + "us-east-1": "us-west-1", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage({ + workerQueue: "us-east-1", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("us-west-1"); + }); + }); + + describe("Priority order", () => { + it("should prioritize environmentId over projectId", () => { + const overrideConfig = JSON.stringify({ + environmentId: { + env_123: "env-queue", + }, + projectId: { + proj_123: "project-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("env-queue"); + }); + + it("should prioritize projectId over orgId", () => { + const overrideConfig = JSON.stringify({ + projectId: { + proj_123: "project-queue", + }, + orgId: { + org_123: "org-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("project-queue"); + }); + + it("should prioritize orgId over workerQueue", () => { + const overrideConfig = JSON.stringify({ + orgId: { + org_123: "org-queue", + }, + workerQueue: { + "default-queue": "worker-override-queue", + }, + } satisfies WorkerQueueOverrides); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("org-queue"); + }); + }); + + describe("Configuration parsing", () => { + it("should handle invalid JSON gracefully", () => { + const loggerSpy = vi.spyOn(Logger.prototype, "error"); + + const overrideConfig = "invalid json {"; + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("default-queue"); + expect(loggerSpy).toHaveBeenCalledWith( + "Failed to parse worker queue overrides json", + expect.any(Object) + ); + + loggerSpy.mockRestore(); + }); + + it("should handle non-object JSON gracefully", () => { + const loggerSpy = vi.spyOn(Logger.prototype, "error"); + + const overrideConfig = JSON.stringify("not an object"); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("default-queue"); + expect(loggerSpy).toHaveBeenCalledWith( + "Invalid worker queue overrides format", + expect.any(Object) + ); + + loggerSpy.mockRestore(); + }); + + it("should handle null JSON gracefully", () => { + const loggerSpy = vi.spyOn(Logger.prototype, "error"); + + const overrideConfig = JSON.stringify(null); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("default-queue"); + expect(loggerSpy).toHaveBeenCalledWith( + "Invalid worker queue overrides format", + expect.any(Object) + ); + + loggerSpy.mockRestore(); + }); + + it("should log when overrides are enabled", () => { + const loggerSpy = vi.spyOn(Logger.prototype, "info"); + + const overrides: WorkerQueueOverrides = { + orgId: { org_123: "dedicated-queue" }, + }; + + const overrideConfig = JSON.stringify(overrides); + + const logger = new Logger("test", "info"); + new WorkerQueueResolver({ logger, overrideConfig }); + + expect(loggerSpy).toHaveBeenCalledWith("🎯 Worker queue overrides enabled", { overrides }); + + loggerSpy.mockRestore(); + }); + + it("should validate schema and reject invalid structure", () => { + const loggerSpy = vi.spyOn(Logger.prototype, "error"); + + // Invalid structure - numbers instead of strings in the record + const overrideConfig = JSON.stringify({ + orgId: { + org_123: 12345, // Should be string, not number + }, + }); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("default-queue"); + expect(loggerSpy).toHaveBeenCalledWith( + "Invalid worker queue overrides format", + expect.any(Object) + ); + + loggerSpy.mockRestore(); + }); + }); + + describe("Complex scenarios", () => { + it("should handle multiple override types simultaneously", () => { + const overrideConfig = JSON.stringify({ + environmentId: { + env_special: "special-env-queue", + }, + projectId: { + proj_other: "other-project-queue", + }, + orgId: { + org_123: "org-queue", + }, + workerQueue: { + "fallback-queue": "redirected-queue", + }, + }); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + + // Should use orgId override since env and project don't match + const message1 = createTestMessage({ + environmentId: "env_123", + projectId: "proj_123", + orgId: "org_123", + workerQueue: "original-queue", + }); + + const result1 = resolver.getWorkerQueueFromMessage(message1); + expect(result1).toBe("org-queue"); + + // Should use environmentId override since it matches + const message2 = createTestMessage({ + environmentId: "env_special", + projectId: "proj_123", + orgId: "org_456", + workerQueue: "original-queue", + }); + + const result2 = resolver.getWorkerQueueFromMessage(message2); + expect(result2).toBe("special-env-queue"); + + // Should use workerQueue override as fallback + const message3 = createTestMessage({ + environmentId: "env_unknown", + projectId: "proj_unknown", + orgId: "org_unknown", + workerQueue: "fallback-queue", + }); + + const result3 = resolver.getWorkerQueueFromMessage(message3); + expect(result3).toBe("redirected-queue"); + }); + + it("should handle empty override sections", () => { + const overrideConfig = JSON.stringify({ + environmentId: {}, + projectId: {}, + orgId: { + org_123: "org-queue", + }, + workerQueue: {}, + }); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage(); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("org-queue"); + }); + }); + + describe("V1 message handling", () => { + it("should handle v1 development messages", () => { + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger }); + + const v1DevMessage: OutputPayloadV1 = { + version: "1", + runId: "run_123", + taskIdentifier: "task_123", + orgId: "org_123", + projectId: "proj_123", + environmentId: "env_dev", + environmentType: RuntimeEnvironmentType.DEVELOPMENT, + queue: "test-queue", + timestamp: Date.now(), + attempt: 0, + masterQueues: ["us-east-1", "us-west-1"], + }; + + const result = resolver.getWorkerQueueFromMessage(v1DevMessage); + + expect(result).toBe("env_dev"); + }); + + it("should handle v1 production messages", () => { + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger }); + + const v1ProdMessage: OutputPayloadV1 = { + version: "1", + runId: "run_123", + taskIdentifier: "task_123", + orgId: "org_123", + projectId: "proj_123", + environmentId: "env_prod", + environmentType: RuntimeEnvironmentType.PRODUCTION, + queue: "test-queue", + timestamp: Date.now(), + attempt: 0, + masterQueues: ["us-east-1", "us-west-1"], + }; + + const result = resolver.getWorkerQueueFromMessage(v1ProdMessage); + + expect(result).toBe("us-east-1"); + }); + }); + + describe("Environment variable fallback", () => { + let originalEnv: string | undefined; + + beforeEach(() => { + originalEnv = process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES; + }); + + afterEach(() => { + if (originalEnv === undefined) { + delete process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES; + } else { + process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES = originalEnv; + } + }); + + it("should fall back to environment variable when no overrideConfig provided", () => { + // Set environment variable + process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES = JSON.stringify({ + orgId: { + org_from_env: "env-based-queue", + }, + }); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger }); // No overrideConfig + const message = createTestMessage({ + orgId: "org_from_env", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("env-based-queue"); + }); + + it("should prioritize overrideConfig over environment variable", () => { + // Set environment variable + process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES = JSON.stringify({ + orgId: { + org_123: "env-queue", + }, + }); + + // Pass config directly (should take precedence) + const overrideConfig = JSON.stringify({ + orgId: { + org_123: "config-queue", + }, + }); + + const logger = new Logger("test", "error"); + const resolver = new WorkerQueueResolver({ logger, overrideConfig }); + const message = createTestMessage({ + orgId: "org_123", + }); + + const result = resolver.getWorkerQueueFromMessage(message); + + expect(result).toBe("config-queue"); + }); + }); +}); diff --git a/internal-packages/run-engine/src/run-queue/workerQueueResolver.ts b/internal-packages/run-engine/src/run-queue/workerQueueResolver.ts new file mode 100644 index 0000000000..5c83d3de6f --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/workerQueueResolver.ts @@ -0,0 +1,100 @@ +import type { Logger } from "@trigger.dev/core/logger"; +import type { OutputPayload, OutputPayloadV2 } from "./types.js"; +import { z } from "zod"; + +const WorkerQueueOverrides = z.object({ + environmentId: z.record(z.string(), z.string()).optional(), + projectId: z.record(z.string(), z.string()).optional(), + orgId: z.record(z.string(), z.string()).optional(), + workerQueue: z.record(z.string(), z.string()).optional(), +}); + +export type WorkerQueueOverrides = z.infer; + +export type WorkerQueueResolverOptions = { + logger: Logger; + overrideConfig?: string; +}; + +export class WorkerQueueResolver { + private overrides: WorkerQueueOverrides | null; + private logger: Logger; + + constructor(opts: WorkerQueueResolverOptions) { + this.logger = opts.logger; + this.overrides = this.parseOverrides(opts.overrideConfig); + } + + private parseOverrides(overrideConfig?: string): WorkerQueueOverrides | null { + const overridesJson = overrideConfig ?? process.env.RUN_ENGINE_WORKER_QUEUE_OVERRIDES; + + if (!overridesJson) { + return null; + } + + try { + const parsed = JSON.parse(overridesJson); + const result = WorkerQueueOverrides.safeParse(parsed); + + if (!result.success) { + this.logger.error("Invalid worker queue overrides format", { + error: result.error.format(), + }); + return null; + } + + this.logger.info("🎯 Worker queue overrides enabled", { overrides: result.data }); + + return result.data; + } catch (error) { + this.logger.error("Failed to parse worker queue overrides json", { + error, + }); + return null; + } + } + + public getWorkerQueueFromMessage(message: OutputPayload): string { + if (message.version === "2") { + // Check overrides in priority order + const override = this.#getOverride(message); + if (override) return override; + + return message.workerQueue; + } + + // In v2, if the environment is development, the worker queue is the environment id. + if (message.environmentType === "DEVELOPMENT") { + return message.environmentId; + } + + // In v1, the master queue is something like us-nyc-3, + // which in v2 is the worker queue. + return message.masterQueues[0]; + } + + #getOverride(message: OutputPayloadV2): string | null { + if (!this.overrides) { + return null; + } + + // Priority: environmentId > projectId > orgId > workerQueue + if (this.overrides.environmentId?.[message.environmentId]) { + return this.overrides.environmentId[message.environmentId]; + } + + if (this.overrides.projectId?.[message.projectId]) { + return this.overrides.projectId[message.projectId]; + } + + if (this.overrides.orgId?.[message.orgId]) { + return this.overrides.orgId[message.orgId]; + } + + if (this.overrides.workerQueue?.[message.workerQueue]) { + return this.overrides.workerQueue[message.workerQueue]; + } + + return null; + } +}