Skip to content

Commit 9350770

Browse files
committed
failed run service now respects retries
1 parent f92d82f commit 9350770

File tree

4 files changed

+173
-39
lines changed

4 files changed

+173
-39
lines changed

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 136 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,47 @@
1-
import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
1+
import {
2+
calculateNextRetryDelay,
3+
RetryOptions,
4+
sanitizeError,
5+
TaskRunExecution,
6+
TaskRunExecutionRetry,
7+
TaskRunFailedExecutionResult,
8+
} from "@trigger.dev/core/v3";
29
import { logger } from "~/services/logger.server";
310
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
411
import { BaseService } from "./services/baseService.server";
512
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
6-
import { FAILABLE_RUN_STATUSES } from "./taskStatus";
13+
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
14+
import { Prisma } from "@trigger.dev/database";
15+
import { CompleteAttemptService } from "./services/completeAttempt.server";
16+
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
17+
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
18+
19+
const includeAttempts = {
20+
attempts: {
21+
orderBy: {
22+
createdAt: "desc",
23+
},
24+
take: 1,
25+
},
26+
lockedBy: true,
27+
} satisfies Prisma.TaskRunInclude;
28+
29+
type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
30+
include: typeof includeAttempts;
31+
}>;
732

8-
/**
9-
*
10-
*/
1133
export class FailedTaskRunService extends BaseService {
1234
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
35+
logger.debug("[FailedTaskRunService] Handling failed task run", { anyRunId, completion });
36+
1337
const isFriendlyId = anyRunId.startsWith("run_");
1438

1539
const taskRun = await this._prisma.taskRun.findUnique({
1640
where: {
1741
friendlyId: isFriendlyId ? anyRunId : undefined,
1842
id: !isFriendlyId ? anyRunId : undefined,
1943
},
44+
include: includeAttempts,
2045
});
2146

2247
if (!taskRun) {
@@ -28,7 +53,7 @@ export class FailedTaskRunService extends BaseService {
2853
return;
2954
}
3055

31-
if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
56+
if (!isFailableRunStatus(taskRun.status)) {
3257
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
3358
taskRun,
3459
completion,
@@ -37,7 +62,28 @@ export class FailedTaskRunService extends BaseService {
3762
return;
3863
}
3964

40-
// No more retries, we need to fail the task run
65+
const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);
66+
67+
if (retriableExecution) {
68+
logger.debug("[FailedTaskRunService] Completing attempt", { taskRun, completion });
69+
70+
const executionRetry =
71+
completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution));
72+
73+
const completeAttempt = new CompleteAttemptService(this._prisma);
74+
await completeAttempt.call({
75+
completion: {
76+
...completion,
77+
retry: executionRetry,
78+
},
79+
execution: retriableExecution,
80+
isSystemFailure: true,
81+
});
82+
83+
return;
84+
}
85+
86+
// No retriable execution, so we need to fail the task run
4187
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
4288

4389
const finalizeService = new FinalizeTaskRunService();
@@ -66,4 +112,87 @@ export class FailedTaskRunService extends BaseService {
66112
],
67113
});
68114
}
115+
116+
async #getRetriableAttemptExecution(
117+
run: TaskRunWithAttempts,
118+
completion: TaskRunFailedExecutionResult
119+
): Promise<TaskRunExecution | undefined> {
120+
let attempt = run.attempts[0];
121+
122+
// We need to create an attempt if:
123+
// - None exists yet
124+
// - The last attempt has a final status, e.g. we failed between attempts
125+
if (!attempt || isFinalAttemptStatus(attempt.status)) {
126+
logger.error("[FailedTaskRunService] No attempts found", {
127+
run,
128+
completion,
129+
});
130+
131+
const createAttempt = new CreateTaskRunAttemptService(this._prisma);
132+
133+
try {
134+
const { execution } = await createAttempt.call(run.id);
135+
return execution;
136+
} catch (error) {
137+
logger.error("[FailedTaskRunService] Failed to create attempt", {
138+
run,
139+
completion,
140+
error,
141+
});
142+
143+
return;
144+
}
145+
}
146+
147+
// We already have an attempt with non-final status, let's use it
148+
try {
149+
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
150+
attempt.id,
151+
undefined,
152+
undefined,
153+
true
154+
);
155+
return executionPayload?.execution;
156+
} catch (error) {
157+
logger.error("[FailedTaskRunService] Failed to get execution payload", {
158+
run,
159+
completion,
160+
error,
161+
});
162+
163+
return;
164+
}
165+
}
166+
167+
async #getExecutionRetry(
168+
run: TaskRunWithAttempts,
169+
execution: TaskRunExecution
170+
): Promise<TaskRunExecutionRetry | undefined> {
171+
const parsedRetryConfig = RetryOptions.safeParse(run.lockedBy?.retryConfig);
172+
173+
if (!parsedRetryConfig.success) {
174+
logger.error("[FailedTaskRunService] Invalid retry config", {
175+
run,
176+
execution,
177+
});
178+
179+
return;
180+
}
181+
182+
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
183+
184+
if (!delay) {
185+
logger.debug("[FailedTaskRunService] No more retries", {
186+
run,
187+
execution,
188+
});
189+
190+
return;
191+
}
192+
193+
return {
194+
timestamp: Date.now() + delay,
195+
delay,
196+
};
197+
}
69198
}

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,8 @@ class SharedQueueTasks {
945945
async getExecutionPayloadFromAttempt(
946946
id: string,
947947
setToExecuting?: boolean,
948-
isRetrying?: boolean
948+
isRetrying?: boolean,
949+
skipStatusChecks?: boolean
949950
): Promise<ProdTaskRunExecutionPayload | undefined> {
950951
const attempt = await prisma.taskRunAttempt.findUnique({
951952
where: {
@@ -979,27 +980,29 @@ class SharedQueueTasks {
979980
return;
980981
}
981982

982-
switch (attempt.status) {
983-
case "CANCELED":
984-
case "EXECUTING": {
985-
logger.error("Invalid attempt status for execution payload retrieval", {
986-
attemptId: id,
987-
status: attempt.status,
988-
});
989-
return;
983+
if (!skipStatusChecks) {
984+
switch (attempt.status) {
985+
case "CANCELED":
986+
case "EXECUTING": {
987+
logger.error("Invalid attempt status for execution payload retrieval", {
988+
attemptId: id,
989+
status: attempt.status,
990+
});
991+
return;
992+
}
990993
}
991-
}
992994

993-
switch (attempt.taskRun.status) {
994-
case "CANCELED":
995-
case "EXECUTING":
996-
case "INTERRUPTED": {
997-
logger.error("Invalid run status for execution payload retrieval", {
998-
attemptId: id,
999-
runId: attempt.taskRunId,
1000-
status: attempt.taskRun.status,
1001-
});
1002-
return;
995+
switch (attempt.taskRun.status) {
996+
case "CANCELED":
997+
case "EXECUTING":
998+
case "INTERRUPTED": {
999+
logger.error("Invalid run status for execution payload retrieval", {
1000+
attemptId: id,
1001+
runId: attempt.taskRunId,
1002+
status: attempt.taskRun.status,
1003+
});
1004+
return;
1005+
}
10031006
}
10041007
}
10051008

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ export class CompleteAttemptService extends BaseService {
3939
env,
4040
checkpoint,
4141
supportsRetryCheckpoints,
42+
isSystemFailure,
4243
}: {
4344
completion: TaskRunExecutionResult;
4445
execution: TaskRunExecution;
4546
env?: AuthenticatedEnvironment;
4647
checkpoint?: CheckpointData;
4748
supportsRetryCheckpoints?: boolean;
49+
isSystemFailure?: boolean;
4850
}): Promise<"COMPLETED" | "RETRIED"> {
4951
const taskRunAttempt = await findAttempt(this._prisma, execution.attempt.id);
5052

@@ -110,6 +112,7 @@ export class CompleteAttemptService extends BaseService {
110112
env,
111113
checkpoint,
112114
supportsRetryCheckpoints,
115+
isSystemFailure,
113116
});
114117
}
115118
}
@@ -170,13 +173,15 @@ export class CompleteAttemptService extends BaseService {
170173
env,
171174
checkpoint,
172175
supportsRetryCheckpoints,
176+
isSystemFailure,
173177
}: {
174178
completion: TaskRunFailedExecutionResult;
175179
execution: TaskRunExecution;
176180
taskRunAttempt: NonNullable<FoundAttempt>;
177181
env?: AuthenticatedEnvironment;
178182
checkpoint?: CheckpointData;
179183
supportsRetryCheckpoints?: boolean;
184+
isSystemFailure?: boolean;
180185
}): Promise<"COMPLETED" | "RETRIED"> {
181186
if (
182187
completion.error.type === "INTERNAL_ERROR" &&
@@ -199,14 +204,6 @@ export class CompleteAttemptService extends BaseService {
199204

200205
const sanitizedError = sanitizeError(completion.error);
201206

202-
// TODO: make this handle the case where the current attempt is unknown, with only a run id
203-
204-
// 1. Get the task run
205-
206-
// 2. Get the most recent attempt
207-
208-
// 3. Get the retry config
209-
210207
await this._prisma.taskRunAttempt.update({
211208
where: { id: taskRunAttempt.id },
212209
data: {
@@ -258,10 +255,11 @@ export class CompleteAttemptService extends BaseService {
258255
},
259256
});
260257

261-
const status =
262-
sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED"
263-
? "TIMED_OUT"
264-
: "COMPLETED_WITH_ERRORS";
258+
const status = isSystemFailure
259+
? "SYSTEM_FAILURE"
260+
: sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED"
261+
? "TIMED_OUT"
262+
: "COMPLETED_WITH_ERRORS";
265263

266264
const finalizeService = new FinalizeTaskRunService();
267265
await finalizeService.call({

apps/webapp/app/v3/taskStatus.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export function isCrashableAttemptStatus(status: TaskRunAttemptStatus): boolean
8888
return CRASHABLE_ATTEMPT_STATUSES.includes(status);
8989
}
9090

91+
export function isFailableRunStatus(status: TaskRunStatus): boolean {
92+
return FAILABLE_RUN_STATUSES.includes(status);
93+
}
94+
9195
export function isFreezableRunStatus(status: TaskRunStatus): boolean {
9296
return FREEZABLE_RUN_STATUSES.includes(status);
9397
}

0 commit comments

Comments
 (0)