From 8f1cfb74e07d2e8843b85d027595a5d99cd225c9 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 17:22:11 +0000 Subject: [PATCH 1/4] OOM retrying on larger machines --- apps/webapp/app/v3/failedTaskRun.server.ts | 67 +++++++++++++------ .../app/v3/services/completeAttempt.server.ts | 63 ++++++++++++++++- docs/machines.mdx | 35 +++++++++- packages/core/src/v3/schemas/schemas.ts | 16 ++++- packages/core/src/v3/types/tasks.ts | 7 +- references/hello-world/src/trigger/oom.ts | 41 ++++++++++++ 6 files changed, 195 insertions(+), 34 deletions(-) create mode 100644 references/hello-world/src/trigger/oom.ts diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index f26a343f0a..3935c21ddd 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -5,14 +5,14 @@ import { TaskRunExecutionRetry, TaskRunFailedExecutionResult, } from "@trigger.dev/core/v3"; +import type { Prisma, TaskRun } from "@trigger.dev/database"; +import * as semver from "semver"; import { logger } from "~/services/logger.server"; +import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server"; import { BaseService } from "./services/baseService.server"; -import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus"; -import type { Prisma, TaskRun } from "@trigger.dev/database"; import { CompleteAttemptService } from "./services/completeAttempt.server"; import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server"; -import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server"; -import * as semver from "semver"; +import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus"; const FailedTaskRunRetryGetPayload = { select: { @@ -180,13 +180,52 @@ export class FailedTaskRunRetryHelper extends BaseService { } } - static async getExecutionRetry({ + static getExecutionRetry({ run, execution, }: { run: TaskRunWithWorker; execution: TaskRunExecution; - }): Promise { + }): TaskRunExecutionRetry | undefined { + try { + const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run, execution }); + if (!retryConfig) { + return; + } + + const delay = calculateNextRetryDelay(retryConfig, execution.attempt.number); + + if (!delay) { + logger.debug("[FailedTaskRunRetryHelper] No more retries", { + run, + execution, + }); + + return; + } + + return { + timestamp: Date.now() + delay, + delay, + }; + } catch (error) { + logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { + run, + execution, + error, + }); + + return; + } + } + + static getRetryConfig({ + run, + execution, + }: { + run: TaskRunWithWorker; + execution: TaskRunExecution; + }): RetryOptions | undefined { try { const retryConfig = run.lockedBy?.retryConfig; @@ -247,21 +286,7 @@ export class FailedTaskRunRetryHelper extends BaseService { return; } - const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number); - - if (!delay) { - logger.debug("[FailedTaskRunRetryHelper] No more retries", { - run, - execution, - }); - - return; - } - - return { - timestamp: Date.now() + delay, - delay, - }; + return parsedRetryConfig.data; } catch (error) { logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { run, diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index bae62a37a8..bca72ad64f 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -1,13 +1,16 @@ import { Attributes } from "@opentelemetry/api"; import { TaskRunContext, + TaskRunError, TaskRunErrorCodes, TaskRunExecution, TaskRunExecutionResult, TaskRunExecutionRetry, TaskRunFailedExecutionResult, TaskRunSuccessfulExecutionResult, + exceptionEventEnhancer, flattenAttributes, + internalErrorFromUnexpectedExit, sanitizeError, shouldRetryError, taskRunErrorEnhancer, @@ -233,7 +236,7 @@ export class CompleteAttemptService extends BaseService { if (!executionRetry && shouldInfer) { executionRetryInferred = true; - executionRetry = await FailedTaskRunRetryHelper.getExecutionRetry({ + executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({ run: { ...taskRunAttempt.taskRun, lockedBy: taskRunAttempt.backgroundWorkerTask, @@ -243,7 +246,43 @@ export class CompleteAttemptService extends BaseService { }); } - const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); + let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); + let isOOMRetry = false; + + //OOM errors should retry (if an OOM machine is specified) + if (isOOMError(completion.error)) { + const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ + run: { + ...taskRunAttempt.taskRun, + lockedBy: taskRunAttempt.backgroundWorkerTask, + lockedToVersion: taskRunAttempt.backgroundWorker, + }, + execution, + }); + + if (retryConfig?.outOfMemory?.machine) { + isOOMRetry = true; + retriableError = true; + executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({ + run: { + ...taskRunAttempt.taskRun, + lockedBy: taskRunAttempt.backgroundWorkerTask, + lockedToVersion: taskRunAttempt.backgroundWorker, + }, + execution, + }); + + //update the machine on the run + await this._prisma.taskRun.update({ + where: { + id: taskRunAttempt.taskRunId, + }, + data: { + machinePreset: retryConfig.outOfMemory.machine, + }, + }); + } + } if ( retriableError && @@ -257,6 +296,7 @@ export class CompleteAttemptService extends BaseService { taskRunAttempt, environment, checkpoint, + forceRequeue: isOOMRetry, }); } @@ -378,12 +418,14 @@ export class CompleteAttemptService extends BaseService { executionRetryInferred, checkpointEventId, supportsLazyAttempts, + forceRequeue = false, }: { run: TaskRun; executionRetry: TaskRunExecutionRetry; executionRetryInferred: boolean; checkpointEventId?: string; supportsLazyAttempts: boolean; + forceRequeue?: boolean; }) { const retryViaQueue = () => { logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id }); @@ -434,6 +476,11 @@ export class CompleteAttemptService extends BaseService { return; } + if (forceRequeue) { + logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id }); + await retryViaQueue(); + } + // Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold if ( !this.opts.supportsRetryCheckpoints && @@ -466,6 +513,7 @@ export class CompleteAttemptService extends BaseService { taskRunAttempt, environment, checkpoint, + forceRequeue = false, }: { execution: TaskRunExecution; executionRetry: TaskRunExecutionRetry; @@ -473,6 +521,7 @@ export class CompleteAttemptService extends BaseService { taskRunAttempt: NonNullable; environment: AuthenticatedEnvironment; checkpoint?: CheckpointData; + forceRequeue?: boolean; }) { const retryAt = new Date(executionRetry.timestamp); @@ -533,6 +582,7 @@ export class CompleteAttemptService extends BaseService { executionRetry, supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, executionRetryInferred, + forceRequeue, }); return "RETRIED"; @@ -634,3 +684,12 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId: }, }); } + +function isOOMError(error: TaskRunError) { + if (error.type !== "INTERNAL_ERROR") return false; + if (error.code === "TASK_PROCESS_OOM_KILLED" || error.code === "TASK_PROCESS_MAYBE_OOM_KILLED") { + return true; + } + + return false; +} diff --git a/docs/machines.mdx b/docs/machines.mdx index bc980384b5..5e53290fef 100644 --- a/docs/machines.mdx +++ b/docs/machines.mdx @@ -8,9 +8,7 @@ The `machine` configuration is optional. Using higher spec machines will increas ```ts /trigger/heavy-task.ts export const heavyTask = task({ id: "heavy-task", - machine: { - preset: "large-1x", - }, + machine: "large-1x", run: async ({ payload, ctx }) => { //... }, @@ -28,6 +26,37 @@ export const config: TriggerConfig = { }; ``` +## Out Of Memory errors + +Sometimes you might see one of your runs fail with an "Out Of Memory" error. + +> TASK_PROCESS_OOM_KILLED. Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak. + +If this happens regularly you need to either optimize the memory-efficiency of your code, or increase the machine. + +### Retrying with a larger machine + +If you are seeing rare OOM errors, you can add a setting to your task to retry with a large machine if you get an OOM error: + +```ts /trigger/heavy-task.ts +export const yourTask = task({ + id: "your-task", + machine: "medium-1x", + retry: { + outOfMemory: { + machine: "large-1x", + }, + }, + run: async (payload: any, { ctx }) => { + //... + }, +}); +``` + + + This will only retry the task if you get an OOM error. It won't permanently change the machine that a new run starts on, so if you consistently see OOM errors you should change the machine in the `machine` property. + + ## Machine configurations | Preset | vCPU | Memory | Disk space | diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 418db11c5a..a72112783b 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -1,6 +1,6 @@ import { z } from "zod"; import { RequireKeys } from "../types/index.js"; -import { MachineConfig, MachinePreset, TaskRunExecution } from "./common.js"; +import { MachineConfig, MachinePreset, MachinePresetName, TaskRunExecution } from "./common.js"; /* WARNING: Never import anything from ./messages here. If it's needed in both, put it here instead. @@ -95,15 +95,25 @@ export const RetryOptions = z.object({ * This can be useful to prevent the thundering herd problem where all retries happen at the same time. */ randomize: z.boolean().optional(), + + /** If a run fails with an Out Of Memory (OOM) error and you have this set, it will retry with the machine you specify. + * Note: it will not default to this [machine](https://trigger.dev/docs/machines) for new runs, only for failures caused by OOM errors. + * So if you frequently have attempts failing with OOM errors, you should set the [default machine](https://trigger.dev/docs/machines) to be higher. + */ + outOfMemory: z + .object({ + machine: MachinePresetName.optional(), + }) + .optional(), }); export type RetryOptions = z.infer; export const QueueOptions = z.object({ /** You can define a shared queue and then pass the name in to your task. - * + * * @example - * + * * ```ts * const myQueue = queue({ name: "my-queue", diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index c8b82e60e3..d238c26dfb 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -202,17 +202,14 @@ type CommonTaskOptions< * ``` */ queue?: QueueOptions; - /** Configure the spec of the machine you want your task to run on. + /** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on. * * @example * * ```ts * export const heavyTask = task({ id: "heavy-task", - machine: { - cpu: 2, - memory: 4, - }, + machine: "medium-1x", run: async ({ payload, ctx }) => { //... }, diff --git a/references/hello-world/src/trigger/oom.ts b/references/hello-world/src/trigger/oom.ts new file mode 100644 index 0000000000..9d5ff8389a --- /dev/null +++ b/references/hello-world/src/trigger/oom.ts @@ -0,0 +1,41 @@ +import { logger, task } from "@trigger.dev/sdk/v3"; +import { setTimeout } from "timers/promises"; + +export const oomTask = task({ + id: "oom-task", + machine: "micro", + retry: { + outOfMemory: { + machine: "small-1x", + }, + }, + run: async ({ succeedOnLargerMachine }: { succeedOnLargerMachine: boolean }, { ctx }) => { + logger.info("running out of memory below this line"); + + logger.info(`Running on ${ctx.machine?.name}`); + + await setTimeout(2000); + + if (ctx.machine?.name !== "micro" && succeedOnLargerMachine) { + logger.info("Going to succeed now"); + return { + success: true, + }; + } + + let a = "a"; + + try { + while (true) { + a += a; + } + } catch (error) { + logger.error(error instanceof Error ? error.message : "Unknown error", { error }); + + let b = []; + while (true) { + b.push(a.replace(/a/g, "b")); + } + } + }, +}); From 5f929be58d6cf992ac5b6b15f07b9e89031160de Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 17:24:39 +0000 Subject: [PATCH 2/4] Create forty-windows-shop.md --- .changeset/forty-windows-shop.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/forty-windows-shop.md diff --git a/.changeset/forty-windows-shop.md b/.changeset/forty-windows-shop.md new file mode 100644 index 0000000000..3f0fd36a28 --- /dev/null +++ b/.changeset/forty-windows-shop.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Added the ability to retry runs that fail with an Out Of Memory (OOM) error on a larger machine. From 21ac5ce8669a4c1a6c2d2e4ebea48877213cf271 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 17:27:48 +0000 Subject: [PATCH 3/4] Update forty-windows-shop.md --- .changeset/forty-windows-shop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/forty-windows-shop.md b/.changeset/forty-windows-shop.md index 3f0fd36a28..faf533cdd1 100644 --- a/.changeset/forty-windows-shop.md +++ b/.changeset/forty-windows-shop.md @@ -1,5 +1,5 @@ --- -"@trigger.dev/core": patch +"@trigger.dev/sdk": patch --- Added the ability to retry runs that fail with an Out Of Memory (OOM) error on a larger machine. From 2de9e8d74bafd7aa10cce22b7f280380d8e50c9f Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 17:42:30 +0000 Subject: [PATCH 4/4] Only retry again if the machine is different from the original --- apps/webapp/app/v3/services/completeAttempt.server.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index bca72ad64f..1acc534f74 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -260,7 +260,11 @@ export class CompleteAttemptService extends BaseService { execution, }); - if (retryConfig?.outOfMemory?.machine) { + if ( + retryConfig?.outOfMemory?.machine && + retryConfig.outOfMemory.machine !== taskRunAttempt.taskRun.machinePreset + ) { + //we will retry isOOMRetry = true; retriableError = true; executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({ @@ -479,6 +483,7 @@ export class CompleteAttemptService extends BaseService { if (forceRequeue) { logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id }); await retryViaQueue(); + return; } // Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold