Skip to content

Commit 78ef834

Browse files
committed
unify failed and crashed run retries
1 parent ccda672 commit 78ef834

File tree

4 files changed

+140
-52
lines changed

4 files changed

+140
-52
lines changed

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,11 @@ export class FailedTaskRunService extends BaseService {
3636

3737
const isFriendlyId = anyRunId.startsWith("run_");
3838

39-
const taskRun = await this._prisma.taskRun.findUnique({
39+
const taskRun = await this._prisma.taskRun.findFirst({
4040
where: {
4141
friendlyId: isFriendlyId ? anyRunId : undefined,
4242
id: !isFriendlyId ? anyRunId : undefined,
4343
},
44-
include: includeAttempts,
4544
});
4645

4746
if (!taskRun) {
@@ -62,24 +61,13 @@ export class FailedTaskRunService extends BaseService {
6261
return;
6362
}
6463

65-
const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);
66-
67-
if (retriableExecution) {
68-
logger.debug("[FailedTaskRunService] Completing attempt", { taskRun, completion });
69-
70-
const executionRetry =
71-
completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution));
72-
73-
const completeAttempt = new CompleteAttemptService(this._prisma);
74-
await completeAttempt.call({
75-
completion: {
76-
...completion,
77-
retry: executionRetry,
78-
},
79-
execution: retriableExecution,
80-
isSystemFailure: true,
81-
});
64+
const retryHelper = new FailedTaskRunRetryHelper(this._prisma);
65+
const retryResult = await retryHelper.call({
66+
runId: taskRun.id,
67+
completion,
68+
});
8269

70+
if (retryResult !== undefined) {
8371
return;
8472
}
8573

@@ -112,6 +100,58 @@ export class FailedTaskRunService extends BaseService {
112100
],
113101
});
114102
}
103+
}
104+
105+
export class FailedTaskRunRetryHelper extends BaseService {
106+
async call({
107+
runId,
108+
completion,
109+
isCrash,
110+
}: {
111+
runId: string;
112+
completion: TaskRunFailedExecutionResult;
113+
isCrash?: boolean;
114+
}) {
115+
const taskRun = await this._prisma.taskRun.findFirst({
116+
where: {
117+
id: runId,
118+
},
119+
include: includeAttempts,
120+
});
121+
122+
if (!taskRun) {
123+
logger.error("[FailedTaskRunRetryHelper] Task run not found", {
124+
runId,
125+
completion,
126+
});
127+
128+
return;
129+
}
130+
131+
const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);
132+
133+
if (!retriableExecution) {
134+
return;
135+
}
136+
137+
logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });
138+
139+
const executionRetry =
140+
completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution));
141+
142+
const completeAttempt = new CompleteAttemptService(this._prisma);
143+
const completeResult = await completeAttempt.call({
144+
completion: {
145+
...completion,
146+
retry: executionRetry,
147+
},
148+
execution: retriableExecution,
149+
isSystemFailure: !isCrash,
150+
isCrash,
151+
});
152+
153+
return completeResult;
154+
}
115155

116156
async #getRetriableAttemptExecution(
117157
run: TaskRunWithAttempts,
@@ -123,7 +163,7 @@ export class FailedTaskRunService extends BaseService {
123163
// - None exists yet
124164
// - The last attempt has a final status, e.g. we failed between attempts
125165
if (!attempt || isFinalAttemptStatus(attempt.status)) {
126-
logger.error("[FailedTaskRunService] No attempts found", {
166+
logger.error("[FailedTaskRunRetryHelper] No attempts found", {
127167
run,
128168
completion,
129169
});
@@ -134,7 +174,7 @@ export class FailedTaskRunService extends BaseService {
134174
const { execution } = await createAttempt.call(run.id);
135175
return execution;
136176
} catch (error) {
137-
logger.error("[FailedTaskRunService] Failed to create attempt", {
177+
logger.error("[FailedTaskRunRetryHelper] Failed to create attempt", {
138178
run,
139179
completion,
140180
error,
@@ -153,7 +193,7 @@ export class FailedTaskRunService extends BaseService {
153193

154194
return executionPayload?.execution;
155195
} catch (error) {
156-
logger.error("[FailedTaskRunService] Failed to get execution payload", {
196+
logger.error("[FailedTaskRunRetryHelper] Failed to get execution payload", {
157197
run,
158198
completion,
159199
error,
@@ -170,7 +210,7 @@ export class FailedTaskRunService extends BaseService {
170210
const parsedRetryConfig = RetryOptions.safeParse(run.lockedBy?.retryConfig);
171211

172212
if (!parsedRetryConfig.success) {
173-
logger.error("[FailedTaskRunService] Invalid retry config", {
213+
logger.error("[FailedTaskRunRetryHelper] Invalid retry config", {
174214
run,
175215
execution,
176216
});
@@ -181,7 +221,7 @@ export class FailedTaskRunService extends BaseService {
181221
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
182222

183223
if (!delay) {
184-
logger.debug("[FailedTaskRunService] No more retries", {
224+
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
185225
run,
186226
execution,
187227
});

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
2222
import { CreateCheckpointService } from "./createCheckpoint.server";
2323
import { TaskRun } from "@trigger.dev/database";
2424
import { RetryAttemptService } from "./retryAttempt.server";
25-
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
25+
import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
2626
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
2727
import { env } from "~/env.server";
2828

@@ -41,13 +41,15 @@ export class CompleteAttemptService extends BaseService {
4141
checkpoint,
4242
supportsRetryCheckpoints,
4343
isSystemFailure,
44+
isCrash,
4445
}: {
4546
completion: TaskRunExecutionResult;
4647
execution: TaskRunExecution;
4748
env?: AuthenticatedEnvironment;
4849
checkpoint?: CheckpointData;
4950
supportsRetryCheckpoints?: boolean;
5051
isSystemFailure?: boolean;
52+
isCrash?: boolean;
5153
}): Promise<"COMPLETED" | "RETRIED"> {
5254
const taskRunAttempt = await findAttempt(this._prisma, execution.attempt.id);
5355

@@ -114,6 +116,7 @@ export class CompleteAttemptService extends BaseService {
114116
checkpoint,
115117
supportsRetryCheckpoints,
116118
isSystemFailure,
119+
isCrash,
117120
});
118121
}
119122
}
@@ -175,6 +178,7 @@ export class CompleteAttemptService extends BaseService {
175178
checkpoint,
176179
supportsRetryCheckpoints,
177180
isSystemFailure,
181+
isCrash,
178182
}: {
179183
completion: TaskRunFailedExecutionResult;
180184
execution: TaskRunExecution;
@@ -183,6 +187,7 @@ export class CompleteAttemptService extends BaseService {
183187
checkpoint?: CheckpointData;
184188
supportsRetryCheckpoints?: boolean;
185189
isSystemFailure?: boolean;
190+
isCrash?: boolean;
186191
}): Promise<"COMPLETED" | "RETRIED"> {
187192
if (
188193
completion.error.type === "INTERNAL_ERROR" &&
@@ -260,11 +265,20 @@ export class CompleteAttemptService extends BaseService {
260265
},
261266
});
262267

263-
const status = isSystemFailure
264-
? "SYSTEM_FAILURE"
265-
: sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED"
266-
? "TIMED_OUT"
267-
: "COMPLETED_WITH_ERRORS";
268+
let status: FAILED_RUN_STATUSES;
269+
270+
if (isSystemFailure) {
271+
status = "SYSTEM_FAILURE";
272+
} else if (isCrash) {
273+
status = "CRASHED";
274+
} else if (
275+
sanitizedError.type === "INTERNAL_ERROR" &&
276+
sanitizedError.code === "MAX_DURATION_EXCEEDED"
277+
) {
278+
status = "TIMED_OUT";
279+
} else {
280+
status = "COMPLETED_WITH_ERRORS";
281+
}
268282

269283
const finalizeService = new FinalizeTaskRunService();
270284
await finalizeService.call({

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
66
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
77
import { sanitizeError, TaskRunInternalError } from "@trigger.dev/core/v3";
88
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
9+
import { FailedTaskRunRetryHelper } from "../failedTaskRun.server";
910

1011
export type CrashTaskRunServiceOptions = {
1112
reason?: string;
@@ -35,16 +36,43 @@ export class CrashTaskRunService extends BaseService {
3536
});
3637

3738
if (!taskRun) {
38-
logger.error("Task run not found", { runId });
39+
logger.error("[CrashTaskRunService] Task run not found", { runId });
3940
return;
4041
}
4142

4243
// Make sure the task run is in a crashable state
4344
if (!opts.overrideCompletion && !isCrashableRunStatus(taskRun.status)) {
44-
logger.error("Task run is not in a crashable state", { runId, status: taskRun.status });
45+
logger.error("[CrashTaskRunService] Task run is not in a crashable state", {
46+
runId,
47+
status: taskRun.status,
48+
});
49+
return;
50+
}
51+
52+
logger.debug("[CrashTaskRunService] Completing attempt", { runId, options });
53+
54+
const retryHelper = new FailedTaskRunRetryHelper(this._prisma);
55+
const retryResult = await retryHelper.call({
56+
runId,
57+
completion: {
58+
ok: false,
59+
id: runId,
60+
error: {
61+
type: "INTERNAL_ERROR",
62+
code: opts.errorCode ?? "TASK_RUN_CRASHED",
63+
message: opts.reason,
64+
stackTrace: opts.logs,
65+
},
66+
},
67+
});
68+
69+
if (retryResult === "RETRIED") {
70+
logger.debug("[CrashTaskRunService] Retried task run", { runId });
4571
return;
4672
}
4773

74+
logger.debug("[CrashTaskRunService] Overriding completion", { runId, options });
75+
4876
const finalizeService = new FinalizeTaskRunService();
4977
const crashedTaskRun = await finalizeService.call({
5078
id: taskRun.id,
@@ -86,7 +114,7 @@ export class CrashTaskRunService extends BaseService {
86114
options?.overrideCompletion
87115
);
88116

89-
logger.debug("Crashing in-progress events", {
117+
logger.debug("[CrashTaskRunService] Crashing in-progress events", {
90118
inProgressEvents: inProgressEvents.map((event) => event.id),
91119
});
92120

@@ -135,25 +163,29 @@ export class CrashTaskRunService extends BaseService {
135163
code?: TaskRunInternalError["code"];
136164
}
137165
) {
138-
return await this.traceWithEnv("failAttempt()", environment, async (span) => {
139-
span.setAttribute("taskRunId", run.id);
140-
span.setAttribute("attemptId", attempt.id);
166+
return await this.traceWithEnv(
167+
"[CrashTaskRunService] failAttempt()",
168+
environment,
169+
async (span) => {
170+
span.setAttribute("taskRunId", run.id);
171+
span.setAttribute("attemptId", attempt.id);
141172

142-
await this._prisma.taskRunAttempt.update({
143-
where: {
144-
id: attempt.id,
145-
},
146-
data: {
147-
status: "FAILED",
148-
completedAt: failedAt,
149-
error: sanitizeError({
150-
type: "INTERNAL_ERROR",
151-
code: error.code ?? "TASK_RUN_CRASHED",
152-
message: error.reason,
153-
stackTrace: error.logs,
154-
}),
155-
},
156-
});
157-
});
173+
await this._prisma.taskRunAttempt.update({
174+
where: {
175+
id: attempt.id,
176+
},
177+
data: {
178+
status: "FAILED",
179+
completedAt: failedAt,
180+
error: sanitizeError({
181+
type: "INTERNAL_ERROR",
182+
code: error.code ?? "TASK_RUN_CRASHED",
183+
message: error.reason,
184+
stackTrace: error.logs,
185+
}),
186+
},
187+
});
188+
}
189+
);
158190
}
159191
}

apps/webapp/app/v3/taskStatus.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ export const FAILED_RUN_STATUSES = [
4949
"TIMED_OUT",
5050
] satisfies TaskRunStatus[];
5151

52+
export type FAILED_RUN_STATUSES = (typeof FAILED_RUN_STATUSES)[number];
53+
5254
export const CANCELLABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES;
5355
export const CANCELLABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES;
5456

0 commit comments

Comments
 (0)