Skip to content

Commit ae19d0f

Browse files
authored
Only start an attempt if not finished. Send message to worker if pending executing (#2377)
* Only start an attempt if not finished. Send message to worker if pending executing * Fix the exit process reason tet * Fixed cancelling test since bug fix The old behaviour was wrong for pending executing in the test
1 parent c58b78f commit ae19d0f

File tree

3 files changed

+14
-13
lines changed

3 files changed

+14
-13
lines changed

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js";
4141
import { sendNotificationToWorker } from "../eventBus.js";
4242
import { getMachinePreset, machinePresetFromName } from "../machinePresets.js";
4343
import { retryOutcomeFromCompletion } from "../retrying.js";
44-
import { isExecuting, isInitialState } from "../statuses.js";
44+
import {
45+
isExecuting,
46+
isFinishedOrPendingFinished,
47+
isInitialState,
48+
isPendingExecuting,
49+
} from "../statuses.js";
4550
import { RunEngineOptions } from "../types.js";
4651
import { BatchSystem } from "./batchSystem.js";
4752
import { DelayedRunSystem } from "./delayedRunSystem.js";
@@ -345,8 +350,8 @@ export class RunAttemptSystem {
345350
span.setAttribute("taskRunId", taskRun.id);
346351
span.setAttribute("taskRunFriendlyId", taskRun.friendlyId);
347352

348-
if (taskRun.status === "CANCELED") {
349-
throw new ServiceValidationError("Task run is cancelled", 400);
353+
if (isFinishedOrPendingFinished(latestSnapshot.executionStatus)) {
354+
throw new ServiceValidationError("Task run is already finished", 400);
350355
}
351356

352357
if (!taskRun.lockedById) {
@@ -1321,7 +1326,10 @@ export class RunAttemptSystem {
13211326
});
13221327

13231328
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
1324-
if (isExecuting(latestSnapshot.executionStatus)) {
1329+
if (
1330+
isExecuting(latestSnapshot.executionStatus) ||
1331+
isPendingExecuting(latestSnapshot.executionStatus)
1332+
) {
13251333
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
13261334
run,
13271335
snapshot: {

internal-packages/run-engine/src/engine/tests/cancelling.test.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ describe("RunEngine cancelling", () => {
150150
cancelledEventData.push(result);
151151
});
152152

153-
//todo call completeAttempt (this will happen from the worker)
153+
// call completeAttempt manually (this will happen from the worker)
154154
const completeResult = await engine.completeRunAttempt({
155155
runId: parentRun.id,
156156
snapshotId: executionData!.snapshot.id,
@@ -289,13 +289,6 @@ describe("RunEngine cancelling", () => {
289289
prisma
290290
);
291291

292-
//dequeue the run
293-
await setTimeout(500);
294-
const dequeued = await engine.dequeueFromWorkerQueue({
295-
consumerId: "test_12345",
296-
workerQueue: "main",
297-
});
298-
299292
let cancelledEventData: EventBusEventArgs<"runCancelled">[0][] = [];
300293
engine.eventBus.on("runCancelled", (result) => {
301294
cancelledEventData.push(result);

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ export class RunExecution {
300300
return;
301301
}
302302

303-
await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "re-queued" });
303+
await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "already-finished" });
304304
return;
305305
}
306306
case "QUEUED_EXECUTING":

0 commit comments

Comments
 (0)