diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e1d1e0419b..e8eebe50fb 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -50,6 +50,7 @@ import { TtlSystem } from "./systems/ttlSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js"; import { workerCatalog } from "./workerCatalog.js"; +import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js"; export class RunEngine { private runLockRedis: Redis; @@ -73,6 +74,7 @@ export class RunEngine { ttlSystem: TtlSystem; pendingVersionSystem: PendingVersionSystem; releaseConcurrencySystem: ReleaseConcurrencySystem; + raceSimulationSystem: RaceSimulationSystem = new RaceSimulationSystem(); constructor(private readonly options: RunEngineOptions) { this.prisma = options.prisma; @@ -194,6 +196,7 @@ export class RunEngine { tracer: this.tracer, runLock: this.runLock, runQueue: this.runQueue, + raceSimulationSystem: this.raceSimulationSystem, }; this.releaseConcurrencySystem = new ReleaseConcurrencySystem({ @@ -522,6 +525,7 @@ export class RunEngine { runId: parentTaskRunId, waitpoints: associatedWaitpoint.id, projectId: associatedWaitpoint.projectId, + organizationId: environment.organization.id, batch, workerId, runnerId, @@ -966,6 +970,7 @@ export class RunEngine { runId, waitpoints, projectId, + organizationId, releaseConcurrency, timeout, spanIdToComplete, @@ -990,6 +995,7 @@ export class RunEngine { runId, waitpoints, projectId, + organizationId, releaseConcurrency, timeout, spanIdToComplete, @@ -1140,6 +1146,10 @@ export class RunEngine { } } + async registerRacepointForRun({ runId, waitInterval }: { runId: string; waitInterval: number }) { + return this.raceSimulationSystem.registerRacepointForRun({ runId, waitInterval }); + } + async quit() { try { //stop the run queue diff --git a/internal-packages/run-engine/src/engine/systems/raceSimulationSystem.ts b/internal-packages/run-engine/src/engine/systems/raceSimulationSystem.ts new file mode 100644 index 0000000000..cf72a4e086 --- /dev/null +++ b/internal-packages/run-engine/src/engine/systems/raceSimulationSystem.ts @@ -0,0 +1,33 @@ +import { promiseWithResolvers } from "@trigger.dev/core"; + +export class RaceSimulationSystem { + private racepoints: Record | undefined> = {}; + + constructor() {} + + async waitForRacepoint({ runId }: { runId: string }): Promise { + if (this.racepoints[runId]) { + return this.racepoints[runId]; + } + + return Promise.resolve(); + } + + registerRacepointForRun({ runId, waitInterval }: { runId: string; waitInterval: number }) { + if (this.racepoints[runId]) { + return; + } + + const { promise, resolve } = promiseWithResolvers(); + + this.racepoints[runId] = promise; + + setTimeout(() => { + resolve(); + }, waitInterval); + + promise.then(() => { + delete this.racepoints[runId]; + }); + } +} diff --git a/internal-packages/run-engine/src/engine/systems/systems.ts b/internal-packages/run-engine/src/engine/systems/systems.ts index 85ccb014ee..e20f347b93 100644 --- a/internal-packages/run-engine/src/engine/systems/systems.ts +++ b/internal-packages/run-engine/src/engine/systems/systems.ts @@ -5,6 +5,7 @@ import { RunQueue } from "../../run-queue/index.js"; import { EventBus } from "../eventBus.js"; import { RunLocker } from "../locking.js"; import { EngineWorker } from "../types.js"; +import { RaceSimulationSystem } from "./raceSimulationSystem.js"; export type SystemResources = { prisma: PrismaClient; @@ -14,4 +15,5 @@ export type SystemResources = { tracer: Tracer; runLock: RunLocker; runQueue: RunQueue; + raceSimulationSystem: RaceSimulationSystem; }; diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index f78c2c3a0c..cfbc2caaa4 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -353,6 +353,7 @@ export class WaitpointSystem { runId, waitpoints, projectId, + organizationId, releaseConcurrency, timeout, spanIdToComplete, @@ -364,6 +365,7 @@ export class WaitpointSystem { runId: string; waitpoints: string | string[]; projectId: string; + organizationId: string; releaseConcurrency?: boolean; timeout?: Date; spanIdToComplete?: string; @@ -374,6 +376,8 @@ export class WaitpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; + await this.$.raceSimulationSystem.waitForRacepoint({ runId }); + let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints; return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], 5000, async () => { @@ -439,7 +443,7 @@ export class WaitpointSystem { environmentId: snapshot.environmentId, environmentType: snapshot.environmentType, projectId: snapshot.projectId, - organizationId: snapshot.organizationId, + organizationId, // Do NOT carry over the batchId from the previous snapshot batchId: batch?.id, workerId, @@ -495,6 +499,7 @@ export class WaitpointSystem { const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({ where: { taskRunId: runId }, select: { + id: true, batchId: true, batchIndex: true, waitpoint: { @@ -503,6 +508,8 @@ export class WaitpointSystem { }, }); + await this.$.raceSimulationSystem.waitForRacepoint({ runId }); + // 2. There are blockers still, so do nothing if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) { this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, { @@ -657,16 +664,19 @@ export class WaitpointSystem { } }); - //5. Remove the blocking waitpoints - await this.$.prisma.taskRunWaitpoint.deleteMany({ - where: { - taskRunId: runId, - }, - }); + if (blockingWaitpoints.length > 0) { + //5. Remove the blocking waitpoints + await this.$.prisma.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: runId, + id: { in: blockingWaitpoints.map((b) => b.id) }, + }, + }); - this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { - runId, - }); + this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { + runId, + }); + } } public async createRunAssociatedWaitpoint( diff --git a/internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts b/internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts new file mode 100644 index 0000000000..17df3f724a --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts @@ -0,0 +1,111 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { setTimeout } from "timers/promises"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunEngine Waitpoints – race condition", () => { + containerTest( + "join-row removed before run continues (failing race)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, env, taskIdentifier); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_race", + environment: env, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "race-trace", + spanId: "race-span", + masterQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + // create manual waitpoint + const { waitpoint } = await engine.createManualWaitpoint({ + environmentId: env.id, + projectId: env.projectId, + }); + + // block the run + await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: waitpoint.id, + projectId: env.projectId, + organizationId: env.organizationId, + }); + + // Now we need to block the run again right after the continueRunIfUnblocked function + // is called as a result of the above completeWaitpoint call + const { waitpoint: waitpoint2 } = await engine.createManualWaitpoint({ + environmentId: env.id, + projectId: env.projectId, + }); + + engine.registerRacepointForRun({ runId: run.id, waitInterval: 500 }); + + // complete the waitpoint (this will schedule a continueRunIfUnblocked job normally) + await engine.completeWaitpoint({ id: waitpoint.id }); + + await engine.waitpointSystem.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: waitpoint2.id, + projectId: env.projectId, + organizationId: env.organizationId, + }); + + await setTimeout(1000); + + // The join row SHOULD still exist until the run progresses naturally. + const joinRow = await prisma.taskRunWaitpoint.findFirst({ + where: { taskRunId: run.id, waitpointId: waitpoint2.id }, + }); + + // Intentionally expect it to still be there – current implementation erroneously deletes it so test fails. + expect(joinRow).not.toBeNull(); + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/packages/cli-v3/e2e/fixtures.ts b/packages/cli-v3/e2e/fixtures.ts index 44409230f0..95011a36f8 100644 --- a/packages/cli-v3/e2e/fixtures.ts +++ b/packages/cli-v3/e2e/fixtures.ts @@ -66,7 +66,7 @@ export const fixturesConfig: TestCase[] = [ { task: { id: "helloWorld", filePath: "src/trigger/helloWorld.ts", exportName: "helloWorld" }, payload: "{}", - result: { ok: true, durationMs: 1000 }, + result: { ok: true, durationMs: 500 }, }, ], tsconfig: "tsconfig.json",