diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index ef8b8f6af3..f5480520ba 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -303,6 +303,7 @@ export class RunEngine { executionSnapshotSystem: this.executionSnapshotSystem, batchSystem: this.batchSystem, waitpointSystem: this.waitpointSystem, + delayedRunSystem: this.delayedRunSystem, machines: this.options.machines, }); diff --git a/internal-packages/run-engine/src/engine/statuses.ts b/internal-packages/run-engine/src/engine/statuses.ts index 93a4428cac..f8a66240d3 100644 --- a/internal-packages/run-engine/src/engine/statuses.ts +++ b/internal-packages/run-engine/src/engine/statuses.ts @@ -36,6 +36,11 @@ export function isFinishedOrPendingFinished(status: TaskRunExecutionStatus): boo return finishedStatuses.includes(status); } +export function isInitialState(status: TaskRunExecutionStatus): boolean { + const startedStatuses: TaskRunExecutionStatus[] = ["RUN_CREATED"]; + return startedStatuses.includes(status); +} + export function isFinalRunStatus(status: TaskRunStatus): boolean { const finalStatuses: TaskRunStatus[] = [ "CANCELED", diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 2511e86f26..b43ff46221 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -131,4 +131,8 @@ export class DelayedRunSystem { availableAt: delayUntil, }); } + + async preventDelayedRunFromBeingEnqueued({ runId }: { runId: string }) { + await this.$.worker.ack(`enqueueDelayedRun:${runId}`); + } } diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index dcb40d995d..0ee1af5576 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -22,7 +22,7 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js"; import { sendNotificationToWorker } from "../eventBus.js"; import { getMachinePreset } from "../machinePresets.js"; import { retryOutcomeFromCompletion } from "../retrying.js"; -import { isExecuting } from "../statuses.js"; +import { isExecuting, isInitialState } from "../statuses.js"; import { RunEngineOptions } from "../types.js"; import { BatchSystem } from "./batchSystem.js"; import { @@ -32,12 +32,14 @@ import { } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; import { WaitpointSystem } from "./waitpointSystem.js"; +import { DelayedRunSystem } from "./delayedRunSystem.js"; export type RunAttemptSystemOptions = { resources: SystemResources; executionSnapshotSystem: ExecutionSnapshotSystem; batchSystem: BatchSystem; waitpointSystem: WaitpointSystem; + delayedRunSystem: DelayedRunSystem; retryWarmStartThresholdMs?: number; machines: RunEngineOptions["machines"]; }; @@ -47,12 +49,14 @@ export class RunAttemptSystem { private readonly executionSnapshotSystem: ExecutionSnapshotSystem; private readonly batchSystem: BatchSystem; private readonly waitpointSystem: WaitpointSystem; + private readonly delayedRunSystem: DelayedRunSystem; constructor(private readonly options: RunAttemptSystemOptions) { this.$ = options.resources; this.executionSnapshotSystem = options.executionSnapshotSystem; this.batchSystem = options.batchSystem; this.waitpointSystem = options.waitpointSystem; + this.delayedRunSystem = options.delayedRunSystem; } public async startRunAttempt({ @@ -968,6 +972,7 @@ export class RunAttemptSystem { completedAt: true, taskEventStore: true, parentTaskRunId: true, + delayUntil: true, runtimeEnvironment: { select: { organizationId: true, @@ -986,6 +991,11 @@ export class RunAttemptSystem { }, }); + //if the run is delayed and hasn't started yet, we need to prevent it being added to the queue in future + if (isInitialState(latestSnapshot.executionStatus) && run.delayUntil) { + await this.delayedRunSystem.preventDelayedRunFromBeingEnqueued({ runId }); + } + //remove it from the queue and release concurrency await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId); diff --git a/internal-packages/run-engine/src/engine/tests/delays.test.ts b/internal-packages/run-engine/src/engine/tests/delays.test.ts index b46c6d34dd..7b48859b55 100644 --- a/internal-packages/run-engine/src/engine/tests/delays.test.ts +++ b/internal-packages/run-engine/src/engine/tests/delays.test.ts @@ -290,4 +290,115 @@ describe("RunEngine delays", () => { engine.quit(); } }); + + containerTest("Cancelling a delayed run", async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //trigger the run with a 1 second delay + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 1000), + }, + prisma + ); + + //verify it's created but not queued + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED"); + expect(run.status).toBe("DELAYED"); + + //cancel the run + await engine.cancelRun({ + runId: run.id, + reason: "Cancelled by test", + }); + + //verify it's cancelled + const executionData2 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData2); + expect(executionData2.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData2.run.status).toBe("CANCELED"); + + //wait past the original delay time + await setTimeout(1500); + + //verify the run is still cancelled + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData3.run.status).toBe("CANCELED"); + + //attempt to dequeue - should get nothing + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + expect(dequeued.length).toBe(0); + + //verify final state is still cancelled + const executionData4 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData4); + expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData4.run.status).toBe("CANCELED"); + } finally { + engine.quit(); + } + }); });