diff --git a/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts b/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts index e3c9f2585e..bcaa3a59f6 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts @@ -5,6 +5,7 @@ import { expect } from "vitest"; import { RunEngine } from "../index.js"; import { setTimeout } from "node:timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { DequeuedMessage } from "@trigger.dev/core/v3"; vi.setConfig({ testTimeout: 60_000 }); @@ -115,12 +116,17 @@ describe("RunEngine batchTrigger", () => { expect(queueLength).toBe(2); //dequeue - const [d1, d2] = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: run1.masterQueue, - maxRunCount: 10, - }); - + const dequeued: DequeuedMessage[] = []; + for (let i = 0; i < 2; i++) { + dequeued.push( + ...(await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 1, + })) + ); + } + const [d1, d2] = dequeued; //attempts const attempt1 = await engine.startRunAttempt({ runId: d1.run.id, diff --git a/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts b/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts index ec5a25a754..6d2f79053f 100644 --- a/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts +++ b/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts @@ -6,6 +6,7 @@ import { expect } from "vitest"; import { MinimalAuthenticatedEnvironment } from "../../shared/index.js"; import { RunEngine } from "../index.js"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { DequeuedMessage } from "@trigger.dev/core/v3"; vi.setConfig({ testTimeout: 60_000 }); @@ -63,11 +64,16 @@ describe("RunEngine dequeuing", () => { expect(queueLength).toBe(10); //dequeue - const dequeued = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 5, - }); + const dequeued: DequeuedMessage[] = []; + for (let i = 0; i < 5; i++) { + dequeued.push( + ...(await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 1, + })) + ); + } expect(dequeued.length).toBe(5); } finally { @@ -75,94 +81,98 @@ describe("RunEngine dequeuing", () => { } }); - containerTest("Dequeues runs within machine constraints", async ({ prisma, redisOptions }) => { - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + //This will fail until we support dequeuing multiple runs from a single environment + containerTest.fails( + "Dequeues runs within machine constraints", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const engine = new RunEngine({ - prisma, - worker: { - redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, - }, - queue: { - redis: redisOptions, - }, - runLock: { - redis: redisOptions, - }, - machines: { - defaultMachine: "small-1x", - machines: { - "small-1x": { - name: "small-1x" as const, - cpu: 0.5, - memory: 0.5, - centsPerMs: 0.0001, - }, - }, - baseCostInCents: 0.0005, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - try { - const taskIdentifier = "test-task"; - - //create background worker - await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, { - preset: "small-1x", - }); - - //trigger the runs - const runs = await triggerRuns({ - engine, - environment: authenticatedEnvironment, - taskIdentifier, + const engine = new RunEngine({ prisma, - count: 20, - }); - expect(runs.length).toBe(20); - - //check the queue length - const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); - expect(queueLength).toBe(20); - - //dequeue - const dequeued = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 5, - maxResources: { - cpu: 1.1, - memory: 3.8, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, }, - }); - expect(dequeued.length).toBe(2); - - //check the queue length - const queueLength2 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); - expect(queueLength2).toBe(18); - - const dequeued2 = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 10, - maxResources: { - cpu: 4.7, - memory: 3.0, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, }, + tracer: trace.getTracer("test", "0.0.0"), }); - expect(dequeued2.length).toBe(6); - //check the queue length - const queueLength3 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); - expect(queueLength3).toBe(12); - } finally { - engine.quit(); + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, { + preset: "small-1x", + }); + + //trigger the runs + const runs = await triggerRuns({ + engine, + environment: authenticatedEnvironment, + taskIdentifier, + prisma, + count: 20, + }); + expect(runs.length).toBe(20); + + //check the queue length + const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength).toBe(20); + + //dequeue + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 5, + maxResources: { + cpu: 1.1, + memory: 3.8, + }, + }); + expect(dequeued.length).toBe(2); + + //check the queue length + const queueLength2 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength2).toBe(18); + + const dequeued2 = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 10, + maxResources: { + cpu: 4.7, + memory: 3.0, + }, + }); + expect(dequeued2.length).toBe(6); + + //check the queue length + const queueLength3 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength3).toBe(12); + } finally { + engine.quit(); + } } - }); + ); }); async function triggerRuns({ diff --git a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts index c2c1ae3382..2164920cfc 100644 --- a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts +++ b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts @@ -3,6 +3,7 @@ import { trace } from "@internal/tracing"; import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { DequeuedMessage } from "@trigger.dev/core/v3"; vi.setConfig({ testTimeout: 60_000 }); @@ -100,11 +101,17 @@ describe("RunEngine pending version", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, ["test-task-other"]); //dequeuing should fail - const dequeued = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: run.masterQueue, - maxRunCount: 10, - }); + + const dequeued: DequeuedMessage[] = []; + for (let i = 0; i < 2; i++) { + dequeued.push( + ...(await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 1, + })) + ); + } expect(dequeued.length).toBe(0); //queue should be empty diff --git a/internal-packages/run-engine/src/engine/tests/priority.test.ts b/internal-packages/run-engine/src/engine/tests/priority.test.ts index 2467449585..c5bb40788e 100644 --- a/internal-packages/run-engine/src/engine/tests/priority.test.ts +++ b/internal-packages/run-engine/src/engine/tests/priority.test.ts @@ -6,6 +6,7 @@ import { PrismaClientOrTransaction } from "@trigger.dev/database"; import { MinimalAuthenticatedEnvironment } from "../../shared/index.js"; import { setTimeout } from "timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { DequeuedMessage } from "@trigger.dev/core/v3"; vi.setConfig({ testTimeout: 60_000 }); @@ -76,12 +77,16 @@ describe("RunEngine priority", () => { const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); expect(queueLength).toBe(priorities.length); - //dequeue (expect 4 items because of the negative priority) - const dequeue = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 20, - }); + //dequeue 4 times, in order + const dequeue: DequeuedMessage[] = []; + for (let i = 0; i < 4; i++) { + const items = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 1, + }); + dequeue.push(...items); + } expect(dequeue.length).toBe(4); expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId); expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); @@ -175,11 +180,16 @@ describe("RunEngine priority", () => { expect(queueLength).toBe(queueTimestamps.length); //dequeue (expect 4 items because of the negative priority) - const dequeue = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 20, - }); + const dequeue: DequeuedMessage[] = []; + for (let i = 0; i < 5; i++) { + dequeue.push( + ...(await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 1, + })) + ); + } expect(dequeue.length).toBe(queueTimestamps.length); expect(dequeue[0].run.friendlyId).toBe(runs[2].friendlyId); expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); diff --git a/internal-packages/run-engine/src/run-queue/index.test.ts b/internal-packages/run-engine/src/run-queue/index.test.ts index dbbb574bfc..302e7b6c68 100644 --- a/internal-packages/run-engine/src/run-queue/index.test.ts +++ b/internal-packages/run-engine/src/run-queue/index.test.ts @@ -355,7 +355,8 @@ describe("RunQueue", () => { } ); - redisTest( + // This test fails now because we only return a single run per env. We will change this in the future. + redisTest.fails( "Dequeue multiple messages from the queue", { timeout: 5_000 }, async ({ redisContainer }) => {