Skip to content

Commit 4c8618d

Browse files
committed
fix for resuming parents of canceled child runs
1 parent 9af6018 commit 4c8618d

File tree

3 files changed

+55
-34
lines changed

3 files changed

+55
-34
lines changed

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,10 @@ function createCoordinatorNamespace(io: Server) {
195195
const service = new CreateTaskRunAttemptService();
196196
const { attempt } = await service.call(message.runId, environment, false);
197197

198-
const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt(attempt.id, true);
198+
const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
199+
id: attempt.id,
200+
setToExecuting: true,
201+
});
199202

200203
if (!payload) {
201204
logger.error("Failed to retrieve payload after attempt creation", message);

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,12 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
4343
import { EnvironmentVariable } from "../environmentVariables/repository";
4444
import { machinePresetFromConfig } from "../machinePresets.server";
4545
import { env } from "~/env.server";
46-
import { FINAL_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
46+
import {
47+
FINAL_ATTEMPT_STATUSES,
48+
FINAL_RUN_STATUSES,
49+
isFinalAttemptStatus,
50+
isFinalRunStatus,
51+
} from "../taskStatus";
4752
import { getMaxDuration } from "../utils/maxDuration";
4853

4954
const WithTraceContext = z.object({
@@ -729,9 +734,9 @@ export class SharedQueueConsumer {
729734

730735
completions.push(completion);
731736

732-
const executionPayload = await this._tasks.getExecutionPayloadFromAttempt(
733-
completedAttempt.id
734-
);
737+
const executionPayload = await this._tasks.getExecutionPayloadFromAttempt({
738+
id: completedAttempt.id,
739+
});
735740

736741
if (!executionPayload) {
737742
await this.#ackAndDoMoreWork(message.messageId);
@@ -968,7 +973,7 @@ class SharedQueueTasks {
968973
where: {
969974
id,
970975
status: {
971-
in: ["COMPLETED", "FAILED"],
976+
in: FINAL_ATTEMPT_STATUSES,
972977
},
973978
},
974979
include: {
@@ -1014,11 +1019,17 @@ class SharedQueueTasks {
10141019
}
10151020
}
10161021

1017-
async getExecutionPayloadFromAttempt(
1018-
id: string,
1019-
setToExecuting?: boolean,
1020-
isRetrying?: boolean
1021-
): Promise<ProdTaskRunExecutionPayload | undefined> {
1022+
async getExecutionPayloadFromAttempt({
1023+
id,
1024+
setToExecuting,
1025+
isRetrying,
1026+
skipStatusChecks,
1027+
}: {
1028+
id: string;
1029+
setToExecuting?: boolean;
1030+
isRetrying?: boolean;
1031+
skipStatusChecks?: boolean;
1032+
}): Promise<ProdTaskRunExecutionPayload | undefined> {
10221033
const attempt = await prisma.taskRunAttempt.findUnique({
10231034
where: {
10241035
id,
@@ -1051,27 +1062,29 @@ class SharedQueueTasks {
10511062
return;
10521063
}
10531064

1054-
switch (attempt.status) {
1055-
case "CANCELED":
1056-
case "EXECUTING": {
1057-
logger.error("Invalid attempt status for execution payload retrieval", {
1058-
attemptId: id,
1059-
status: attempt.status,
1060-
});
1061-
return;
1065+
if (!skipStatusChecks) {
1066+
switch (attempt.status) {
1067+
case "CANCELED":
1068+
case "EXECUTING": {
1069+
logger.error("Invalid attempt status for execution payload retrieval", {
1070+
attemptId: id,
1071+
status: attempt.status,
1072+
});
1073+
return;
1074+
}
10621075
}
1063-
}
10641076

1065-
switch (attempt.taskRun.status) {
1066-
case "CANCELED":
1067-
case "EXECUTING":
1068-
case "INTERRUPTED": {
1069-
logger.error("Invalid run status for execution payload retrieval", {
1070-
attemptId: id,
1071-
runId: attempt.taskRunId,
1072-
status: attempt.taskRun.status,
1073-
});
1074-
return;
1077+
switch (attempt.taskRun.status) {
1078+
case "CANCELED":
1079+
case "EXECUTING":
1080+
case "INTERRUPTED": {
1081+
logger.error("Invalid run status for execution payload retrieval", {
1082+
attemptId: id,
1083+
runId: attempt.taskRunId,
1084+
status: attempt.taskRun.status,
1085+
});
1086+
return;
1087+
}
10751088
}
10761089
}
10771090

@@ -1222,7 +1235,11 @@ class SharedQueueTasks {
12221235
return;
12231236
}
12241237

1225-
return this.getExecutionPayloadFromAttempt(latestAttempt.id, setToExecuting, isRetrying);
1238+
return this.getExecutionPayloadFromAttempt({
1239+
id: latestAttempt.id,
1240+
setToExecuting,
1241+
isRetrying,
1242+
});
12261243
}
12271244

12281245
async getLazyAttemptPayload(

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,10 @@ export class ResumeAttemptService extends BaseService {
207207

208208
completions.push(completion);
209209

210-
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
211-
completedAttempt.id
212-
);
210+
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
211+
id: completedAttempt.id,
212+
skipStatusChecks: true, // already checked when getting the completion
213+
});
213214

214215
if (!executionPayload) {
215216
logger.error("Failed to get execution payload");

0 commit comments

Comments
 (0)