diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index d2ef1b7592..e8c78b2174 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -41,7 +41,12 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js"; import { sendNotificationToWorker } from "../eventBus.js"; import { getMachinePreset, machinePresetFromName } from "../machinePresets.js"; import { retryOutcomeFromCompletion } from "../retrying.js"; -import { isExecuting, isInitialState } from "../statuses.js"; +import { + isExecuting, + isFinishedOrPendingFinished, + isInitialState, + isPendingExecuting, +} from "../statuses.js"; import { RunEngineOptions } from "../types.js"; import { BatchSystem } from "./batchSystem.js"; import { DelayedRunSystem } from "./delayedRunSystem.js"; @@ -345,8 +350,8 @@ export class RunAttemptSystem { span.setAttribute("taskRunId", taskRun.id); span.setAttribute("taskRunFriendlyId", taskRun.friendlyId); - if (taskRun.status === "CANCELED") { - throw new ServiceValidationError("Task run is cancelled", 400); + if (isFinishedOrPendingFinished(latestSnapshot.executionStatus)) { + throw new ServiceValidationError("Task run is already finished", 400); } if (!taskRun.lockedById) { @@ -1321,7 +1326,10 @@ export class RunAttemptSystem { }); //if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status - if (isExecuting(latestSnapshot.executionStatus)) { + if ( + isExecuting(latestSnapshot.executionStatus) || + isPendingExecuting(latestSnapshot.executionStatus) + ) { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run, snapshot: { diff --git a/internal-packages/run-engine/src/engine/tests/cancelling.test.ts b/internal-packages/run-engine/src/engine/tests/cancelling.test.ts index 5fb4fc345e..1e5947cfbc 100644 --- a/internal-packages/run-engine/src/engine/tests/cancelling.test.ts +++ b/internal-packages/run-engine/src/engine/tests/cancelling.test.ts @@ -150,7 +150,7 @@ describe("RunEngine cancelling", () => { cancelledEventData.push(result); }); - //todo call completeAttempt (this will happen from the worker) + // call completeAttempt manually (this will happen from the worker) const completeResult = await engine.completeRunAttempt({ runId: parentRun.id, snapshotId: executionData!.snapshot.id, @@ -289,13 +289,6 @@ describe("RunEngine cancelling", () => { prisma ); - //dequeue the run - await setTimeout(500); - const dequeued = await engine.dequeueFromWorkerQueue({ - consumerId: "test_12345", - workerQueue: "main", - }); - let cancelledEventData: EventBusEventArgs<"runCancelled">[0][] = []; engine.eventBus.on("runCancelled", (result) => { cancelledEventData.push(result); diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index be118b6979..dbf9448807 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -300,7 +300,7 @@ export class RunExecution { return; } - await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "re-queued" }); + await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "already-finished" }); return; } case "QUEUED_EXECUTING":