Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/forty-windows-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Added the ability to retry runs that fail with an Out Of Memory (OOM) error on a larger machine.
67 changes: 46 additions & 21 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import {
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
} from "@trigger.dev/core/v3";
import type { Prisma, TaskRun } from "@trigger.dev/database";
import * as semver from "semver";
import { logger } from "~/services/logger.server";
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import { BaseService } from "./services/baseService.server";
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
import type { Prisma, TaskRun } from "@trigger.dev/database";
import { CompleteAttemptService } from "./services/completeAttempt.server";
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import * as semver from "semver";
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";

const FailedTaskRunRetryGetPayload = {
select: {
Expand Down Expand Up @@ -180,13 +180,52 @@ export class FailedTaskRunRetryHelper extends BaseService {
}
}

static async getExecutionRetry({
static getExecutionRetry({
run,
execution,
}: {
run: TaskRunWithWorker;
execution: TaskRunExecution;
}): Promise<TaskRunExecutionRetry | undefined> {
}): TaskRunExecutionRetry | undefined {
try {
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run, execution });
if (!retryConfig) {
return;
}

const delay = calculateNextRetryDelay(retryConfig, execution.attempt.number);

if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});

return;
}

return {
timestamp: Date.now() + delay,
delay,
};
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
run,
execution,
error,
});

return;
}
}

static getRetryConfig({
run,
execution,
}: {
run: TaskRunWithWorker;
execution: TaskRunExecution;
}): RetryOptions | undefined {
try {
const retryConfig = run.lockedBy?.retryConfig;

Expand Down Expand Up @@ -247,21 +286,7 @@ export class FailedTaskRunRetryHelper extends BaseService {
return;
}

const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);

if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});

return;
}

return {
timestamp: Date.now() + delay,
delay,
};
return parsedRetryConfig.data;
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
run,
Expand Down
63 changes: 61 additions & 2 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { Attributes } from "@opentelemetry/api";
import {
TaskRunContext,
TaskRunError,
TaskRunErrorCodes,
TaskRunExecution,
TaskRunExecutionResult,
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
TaskRunSuccessfulExecutionResult,
exceptionEventEnhancer,
flattenAttributes,
internalErrorFromUnexpectedExit,
sanitizeError,
shouldRetryError,
taskRunErrorEnhancer,
Expand Down Expand Up @@ -233,7 +236,7 @@ export class CompleteAttemptService extends BaseService {

if (!executionRetry && shouldInfer) {
executionRetryInferred = true;
executionRetry = await FailedTaskRunRetryHelper.getExecutionRetry({
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
run: {
...taskRunAttempt.taskRun,
lockedBy: taskRunAttempt.backgroundWorkerTask,
Expand All @@ -243,7 +246,43 @@ export class CompleteAttemptService extends BaseService {
});
}

const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
let isOOMRetry = false;

//OOM errors should retry (if an OOM machine is specified)
if (isOOMError(completion.error)) {
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({
run: {
...taskRunAttempt.taskRun,
lockedBy: taskRunAttempt.backgroundWorkerTask,
lockedToVersion: taskRunAttempt.backgroundWorker,
},
execution,
});

if (retryConfig?.outOfMemory?.machine) {
isOOMRetry = true;
retriableError = true;
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
run: {
...taskRunAttempt.taskRun,
lockedBy: taskRunAttempt.backgroundWorkerTask,
lockedToVersion: taskRunAttempt.backgroundWorker,
},
execution,
});

//update the machine on the run
await this._prisma.taskRun.update({
where: {
id: taskRunAttempt.taskRunId,
},
data: {
machinePreset: retryConfig.outOfMemory.machine,
},
});
}
}

if (
retriableError &&
Expand All @@ -257,6 +296,7 @@ export class CompleteAttemptService extends BaseService {
taskRunAttempt,
environment,
checkpoint,
forceRequeue: isOOMRetry,
});
}

Expand Down Expand Up @@ -378,12 +418,14 @@ export class CompleteAttemptService extends BaseService {
executionRetryInferred,
checkpointEventId,
supportsLazyAttempts,
forceRequeue = false,
}: {
run: TaskRun;
executionRetry: TaskRunExecutionRetry;
executionRetryInferred: boolean;
checkpointEventId?: string;
supportsLazyAttempts: boolean;
forceRequeue?: boolean;
}) {
const retryViaQueue = () => {
logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });
Expand Down Expand Up @@ -434,6 +476,11 @@ export class CompleteAttemptService extends BaseService {
return;
}

if (forceRequeue) {
logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id });
await retryViaQueue();
}

// Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold
if (
!this.opts.supportsRetryCheckpoints &&
Expand Down Expand Up @@ -466,13 +513,15 @@ export class CompleteAttemptService extends BaseService {
taskRunAttempt,
environment,
checkpoint,
forceRequeue = false,
}: {
execution: TaskRunExecution;
executionRetry: TaskRunExecutionRetry;
executionRetryInferred: boolean;
taskRunAttempt: NonNullable<FoundAttempt>;
environment: AuthenticatedEnvironment;
checkpoint?: CheckpointData;
forceRequeue?: boolean;
}) {
const retryAt = new Date(executionRetry.timestamp);

Expand Down Expand Up @@ -533,6 +582,7 @@ export class CompleteAttemptService extends BaseService {
executionRetry,
supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts,
executionRetryInferred,
forceRequeue,
});

return "RETRIED";
Expand Down Expand Up @@ -634,3 +684,12 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId:
},
});
}

function isOOMError(error: TaskRunError) {
if (error.type !== "INTERNAL_ERROR") return false;
if (error.code === "TASK_PROCESS_OOM_KILLED" || error.code === "TASK_PROCESS_MAYBE_OOM_KILLED") {
return true;
}

return false;
}
35 changes: 32 additions & 3 deletions docs/machines.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ The `machine` configuration is optional. Using higher spec machines will increas
```ts /trigger/heavy-task.ts
export const heavyTask = task({
id: "heavy-task",
machine: {
preset: "large-1x",
},
machine: "large-1x",
run: async ({ payload, ctx }) => {
//...
},
Expand All @@ -28,6 +26,37 @@ export const config: TriggerConfig = {
};
```

## Out Of Memory errors

Sometimes you might see one of your runs fail with an "Out Of Memory" error.

> TASK_PROCESS_OOM_KILLED. Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak.

If this happens regularly you need to either optimize the memory-efficiency of your code, or increase the machine.

### Retrying with a larger machine

If you are seeing rare OOM errors, you can add a setting to your task to retry with a large machine if you get an OOM error:

```ts /trigger/heavy-task.ts
export const yourTask = task({
id: "your-task",
machine: "medium-1x",
retry: {
outOfMemory: {
machine: "large-1x",
},
},
run: async (payload: any, { ctx }) => {
//...
},
});
```

<Note>
This will only retry the task if you get an OOM error. It won't permanently change the machine that a new run starts on, so if you consistently see OOM errors you should change the machine in the `machine` property.
</Note>

## Machine configurations

| Preset | vCPU | Memory | Disk space |
Expand Down
16 changes: 13 additions & 3 deletions packages/core/src/v3/schemas/schemas.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { z } from "zod";
import { RequireKeys } from "../types/index.js";
import { MachineConfig, MachinePreset, TaskRunExecution } from "./common.js";
import { MachineConfig, MachinePreset, MachinePresetName, TaskRunExecution } from "./common.js";

/*
WARNING: Never import anything from ./messages here. If it's needed in both, put it here instead.
Expand Down Expand Up @@ -95,15 +95,25 @@ export const RetryOptions = z.object({
* This can be useful to prevent the thundering herd problem where all retries happen at the same time.
*/
randomize: z.boolean().optional(),

/** If a run fails with an Out Of Memory (OOM) error and you have this set, it will retry with the machine you specify.
* Note: it will not default to this [machine](https://trigger.dev/docs/machines) for new runs, only for failures caused by OOM errors.
* So if you frequently have attempts failing with OOM errors, you should set the [default machine](https://trigger.dev/docs/machines) to be higher.
*/
outOfMemory: z
.object({
machine: MachinePresetName.optional(),
})
.optional(),
});

export type RetryOptions = z.infer<typeof RetryOptions>;

export const QueueOptions = z.object({
/** You can define a shared queue and then pass the name in to your task.
*
*
* @example
*
*
* ```ts
* const myQueue = queue({
name: "my-queue",
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,14 @@ type CommonTaskOptions<
* ```
*/
queue?: QueueOptions;
/** Configure the spec of the machine you want your task to run on.
/** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on.
*
* @example
*
* ```ts
* export const heavyTask = task({
id: "heavy-task",
machine: {
cpu: 2,
memory: 4,
},
machine: "medium-1x",
run: async ({ payload, ctx }) => {
//...
},
Expand Down
41 changes: 41 additions & 0 deletions references/hello-world/src/trigger/oom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { logger, task } from "@trigger.dev/sdk/v3";
import { setTimeout } from "timers/promises";

export const oomTask = task({
id: "oom-task",
machine: "micro",
retry: {
outOfMemory: {
machine: "small-1x",
},
},
run: async ({ succeedOnLargerMachine }: { succeedOnLargerMachine: boolean }, { ctx }) => {
logger.info("running out of memory below this line");

logger.info(`Running on ${ctx.machine?.name}`);

await setTimeout(2000);

if (ctx.machine?.name !== "micro" && succeedOnLargerMachine) {
logger.info("Going to succeed now");
return {
success: true,
};
}

let a = "a";

try {
while (true) {
a += a;
}
} catch (error) {
logger.error(error instanceof Error ? error.message : "Unknown error", { error });

let b = [];
while (true) {
b.push(a.replace(/a/g, "b"));
}
}
},
});
Loading