Skip to content

Commit 2ae2a7c

Browse files
committed
add sdk version check and complete event while completing attempt
1 parent f3629af commit 2ae2a7c

File tree

5 files changed

+183
-65
lines changed

5 files changed

+183
-65
lines changed

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 94 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
import {
22
calculateNextRetryDelay,
33
RetryOptions,
4-
sanitizeError,
54
TaskRunExecution,
65
TaskRunExecutionRetry,
76
TaskRunFailedExecutionResult,
87
} from "@trigger.dev/core/v3";
98
import { logger } from "~/services/logger.server";
10-
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
119
import { BaseService } from "./services/baseService.server";
12-
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
1310
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
14-
import { Prisma } from "@trigger.dev/database";
11+
import type { Prisma, TaskRun } from "@trigger.dev/database";
1512
import { CompleteAttemptService } from "./services/completeAttempt.server";
1613
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
1714
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
15+
import * as semver from "semver";
1816

1917
const includeAttempts = {
2018
attempts: {
@@ -23,7 +21,8 @@ const includeAttempts = {
2321
},
2422
take: 1,
2523
},
26-
lockedBy: true,
24+
lockedBy: true, // task
25+
lockedToVersion: true, // worker
2726
} satisfies Prisma.TaskRunInclude;
2827

2928
type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
@@ -67,41 +66,18 @@ export class FailedTaskRunService extends BaseService {
6766
completion,
6867
});
6968

70-
if (retryResult !== undefined) {
71-
return;
72-
}
73-
74-
// No retriable execution, so we need to fail the task run
75-
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
76-
77-
const finalizeService = new FinalizeTaskRunService();
78-
await finalizeService.call({
79-
id: taskRun.id,
80-
status: "SYSTEM_FAILURE",
81-
completedAt: new Date(),
82-
attemptStatus: "FAILED",
83-
error: sanitizeError(completion.error),
84-
});
85-
86-
// Now we need to "complete" the task run event/span
87-
await eventRepository.completeEvent(taskRun.spanId, {
88-
endTime: new Date(),
89-
attributes: {
90-
isError: true,
91-
},
92-
events: [
93-
{
94-
name: "exception",
95-
time: new Date(),
96-
properties: {
97-
exception: createExceptionPropertiesFromError(completion.error),
98-
},
99-
},
100-
],
69+
logger.debug("[FailedTaskRunService] Completion result", {
70+
runId: taskRun.id,
71+
result: retryResult,
10172
});
10273
}
10374
}
10475

76+
interface TaskRunWithWorker extends TaskRun {
77+
lockedBy: { retryConfig: Prisma.JsonValue } | null;
78+
lockedToVersion: { sdkVersion: string } | null;
79+
}
80+
10581
export class FailedTaskRunRetryHelper extends BaseService {
10682
async call({
10783
runId,
@@ -125,19 +101,23 @@ export class FailedTaskRunRetryHelper extends BaseService {
125101
completion,
126102
});
127103

128-
return;
104+
return "NO_TASK_RUN";
129105
}
130106

131107
const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);
132108

133109
if (!retriableExecution) {
134-
return;
110+
return "NO_EXECUTION";
135111
}
136112

137113
logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });
138114

139115
const executionRetry =
140-
completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution));
116+
completion.retry ??
117+
(await FailedTaskRunRetryHelper.getExecutionRetry({
118+
run: taskRun,
119+
execution: retriableExecution,
120+
}));
141121

142122
const completeAttempt = new CompleteAttemptService(this._prisma);
143123
const completeResult = await completeAttempt.call({
@@ -207,35 +187,90 @@ export class FailedTaskRunRetryHelper extends BaseService {
207187
}
208188
}
209189

210-
async #getExecutionRetry(
211-
run: TaskRunWithAttempts,
212-
execution: TaskRunExecution
213-
): Promise<TaskRunExecutionRetry | undefined> {
214-
const parsedRetryConfig = RetryOptions.safeParse(run.lockedBy?.retryConfig);
190+
static async getExecutionRetry({
191+
run,
192+
execution,
193+
}: {
194+
run: TaskRunWithWorker;
195+
execution: TaskRunExecution;
196+
}): Promise<TaskRunExecutionRetry | undefined> {
197+
try {
198+
const retryConfig = run.lockedBy?.retryConfig;
199+
200+
if (!retryConfig) {
201+
if (!run.lockedToVersion) {
202+
logger.error("[FailedTaskRunRetryHelper] Run not locked to version", {
203+
run,
204+
execution,
205+
});
206+
207+
return;
208+
}
209+
210+
const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0";
211+
const isValid = semver.valid(sdkVersion);
212+
213+
if (!isValid) {
214+
logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", {
215+
run,
216+
execution,
217+
});
218+
219+
return;
220+
}
221+
222+
// With older SDK versions, tasks only have a retry config stored in the DB if it's explicitly defined on the task itself
223+
// It won't get populated with retry.default in trigger.config.ts
224+
if (semver.lt(sdkVersion, FailedTaskRunRetryHelper.DEFAULT_RETRY_CONFIG_SINCE_VERSION)) {
225+
logger.warn(
226+
"[FailedTaskRunRetryHelper] SDK version not recent enough to determine retry config",
227+
{
228+
run,
229+
execution,
230+
}
231+
);
232+
233+
return;
234+
}
235+
}
215236

216-
if (!parsedRetryConfig.success) {
217-
logger.error("[FailedTaskRunRetryHelper] Invalid retry config", {
218-
run,
219-
execution,
220-
});
237+
const parsedRetryConfig = RetryOptions.safeParse(retryConfig);
221238

222-
return;
223-
}
239+
if (!parsedRetryConfig.success) {
240+
logger.error("[FailedTaskRunRetryHelper] Invalid retry config", {
241+
run,
242+
execution,
243+
});
244+
245+
return;
246+
}
247+
248+
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
224249

225-
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
250+
if (!delay) {
251+
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
252+
run,
253+
execution,
254+
});
255+
256+
return;
257+
}
226258

227-
if (!delay) {
228-
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
259+
return {
260+
timestamp: Date.now() + delay,
261+
delay,
262+
};
263+
} catch (error) {
264+
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
229265
run,
230266
execution,
267+
error,
231268
});
232269

233270
return;
234271
}
235-
236-
return {
237-
timestamp: Date.now() + delay,
238-
delay,
239-
};
240272
}
273+
274+
// TODO: update this to the correct version
275+
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.12";
241276
}

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { RetryAttemptService } from "./retryAttempt.server";
2525
import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
2626
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
2727
import { env } from "~/env.server";
28+
import { FailedTaskRunRetryHelper } from "../failedTaskRun.server";
2829

2930
type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;
3031

@@ -208,28 +209,40 @@ export class CompleteAttemptService extends BaseService {
208209
return "COMPLETED";
209210
}
210211

212+
const failedAt = new Date();
211213
const sanitizedError = sanitizeError(completion.error);
212214

213215
await this._prisma.taskRunAttempt.update({
214216
where: { id: taskRunAttempt.id },
215217
data: {
216218
status: "FAILED",
217-
completedAt: new Date(),
219+
completedAt: failedAt,
218220
error: sanitizedError,
219221
usageDurationMs: completion.usage?.durationMs,
220222
},
221223
});
222224

223225
const environment = env ?? (await this.#getEnvironment(execution.environment.id));
224226

227+
const executionRetry =
228+
completion.retry ??
229+
(await FailedTaskRunRetryHelper.getExecutionRetry({
230+
run: {
231+
...taskRunAttempt.taskRun,
232+
lockedBy: taskRunAttempt.backgroundWorkerTask,
233+
lockedToVersion: taskRunAttempt.backgroundWorker,
234+
},
235+
execution,
236+
}));
237+
225238
if (
226239
shouldRetryError(completion.error) &&
227-
completion.retry !== undefined &&
240+
executionRetry !== undefined &&
228241
taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS
229242
) {
230243
return await this.#retryAttempt({
231244
execution,
232-
executionRetry: completion.retry,
245+
executionRetry,
233246
taskRunAttempt,
234247
environment,
235248
checkpoint,
@@ -241,14 +254,14 @@ export class CompleteAttemptService extends BaseService {
241254

242255
// Now we need to "complete" the task run event/span
243256
await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, {
244-
endTime: new Date(),
257+
endTime: failedAt,
245258
attributes: {
246259
isError: true,
247260
},
248261
events: [
249262
{
250263
name: "exception",
251-
time: new Date(),
264+
time: failedAt,
252265
properties: {
253266
exception: createExceptionPropertiesFromError(sanitizedError),
254267
},
@@ -267,6 +280,7 @@ export class CompleteAttemptService extends BaseService {
267280

268281
let status: FAILED_RUN_STATUSES;
269282

283+
// Set the correct task run status
270284
if (isSystemFailure) {
271285
status = "SYSTEM_FAILURE";
272286
} else if (isCrash) {
@@ -284,9 +298,63 @@ export class CompleteAttemptService extends BaseService {
284298
await finalizeService.call({
285299
id: taskRunAttempt.taskRunId,
286300
status,
287-
completedAt: new Date(),
301+
completedAt: failedAt,
288302
});
289303

304+
if (status !== "CRASHED" && status !== "SYSTEM_FAILURE") {
305+
return "COMPLETED";
306+
}
307+
308+
const inProgressEvents = await eventRepository.queryIncompleteEvents({
309+
runId: taskRunAttempt.taskRun.friendlyId,
310+
});
311+
312+
// Handle in-progress events
313+
switch (status) {
314+
case "CRASHED": {
315+
logger.debug("[CompleteAttemptService] Crashing in-progress events", {
316+
inProgressEvents: inProgressEvents.map((event) => event.id),
317+
});
318+
319+
await Promise.all(
320+
inProgressEvents.map((event) => {
321+
return eventRepository.crashEvent({
322+
event,
323+
crashedAt: failedAt,
324+
exception: createExceptionPropertiesFromError(sanitizedError),
325+
});
326+
})
327+
);
328+
329+
break;
330+
}
331+
case "SYSTEM_FAILURE": {
332+
logger.debug("[CompleteAttemptService] Failing in-progress events", {
333+
inProgressEvents: inProgressEvents.map((event) => event.id),
334+
});
335+
336+
await Promise.all(
337+
inProgressEvents.map((event) => {
338+
return eventRepository.completeEvent(event.spanId, {
339+
endTime: failedAt,
340+
attributes: {
341+
isError: true,
342+
},
343+
events: [
344+
{
345+
name: "exception",
346+
time: failedAt,
347+
properties: {
348+
exception: createExceptionPropertiesFromError(sanitizedError),
349+
},
350+
},
351+
],
352+
});
353+
})
354+
);
355+
}
356+
}
357+
290358
return "COMPLETED";
291359
}
292360

@@ -519,6 +587,7 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId:
519587
select: {
520588
id: true,
521589
supportsLazyAttempts: true,
590+
sdkVersion: true,
522591
},
523592
},
524593
},

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,17 @@ export class CrashTaskRunService extends BaseService {
6666
},
6767
});
6868

69+
logger.debug("[CrashTaskRunService] Completion result", { runId, retryResult });
70+
6971
if (retryResult === "RETRIED") {
7072
logger.debug("[CrashTaskRunService] Retried task run", { runId });
7173
return;
7274
}
7375

76+
if (!opts.overrideCompletion) {
77+
return;
78+
}
79+
7480
logger.debug("[CrashTaskRunService] Overriding completion", { runId, options });
7581

7682
const finalizeService = new FinalizeTaskRunService();

0 commit comments

Comments
 (0)