From d5cd1c789b8a419c0323a429874ae274bd1abebb Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 11:49:42 +0000 Subject: [PATCH 01/18] Added describe to tests that were missing it --- .../run-engine/src/engine/tests/batchTriggerAndWait.test.ts | 2 +- .../run-engine/src/engine/tests/heartbeats.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts index 6639253654..d2869886d0 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts @@ -5,7 +5,7 @@ import { setupBackgroundWorker, } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; -import { expect } from "vitest"; +import { expect, describe } from "vitest"; import { RunEngine } from "../index.js"; import { setTimeout } from "node:timers/promises"; import { generateFriendlyId } from "@trigger.dev/core/v3/apps"; diff --git a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts index 1ff229cd7b..b606845490 100644 --- a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts +++ b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts @@ -5,7 +5,7 @@ import { assertNonNullable, } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; -import { expect } from "vitest"; +import { expect, describe } from "vitest"; import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; From 0ce5e0f1af5d3c4dd9aa02cbd2d595599010c543 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 11:50:01 +0000 Subject: [PATCH 02/18] Added a function to get the maxOldSpaceSize --- packages/core/package.json | 17 ++++++++++++++++- packages/core/src/v3/machines/index.ts | 13 +++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 packages/core/src/v3/machines/index.ts diff --git a/packages/core/package.json b/packages/core/package.json index 1f96164295..438101251d 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -59,7 +59,8 @@ "./v3/workers": "./src/v3/workers/index.ts", "./v3/schemas": "./src/v3/schemas/index.ts", "./v3/runEngineWorker": "./src/v3/runEngineWorker/index.ts", - "./v3/checkpoints": "./src/v3/checkpoints/index.ts" + "./v3/checkpoints": "./src/v3/checkpoints/index.ts", + "./v3/machines": "./src/v3/machines/index.ts" }, "sourceDialects": [ "@triggerdotdev/source" @@ -180,6 +181,9 @@ ], "v3/checkpoints": [ "dist/commonjs/v3/checkpoints/index.d.ts" + ], + "v3/machines": [ + "dist/commonjs/v3/machines/index.d.ts" ] } }, @@ -673,6 +677,17 @@ "types": "./dist/commonjs/v3/checkpoints/index.d.ts", "default": "./dist/commonjs/v3/checkpoints/index.js" } + }, + "./v3/machines": { + "import": { + "@triggerdotdev/source": "./src/v3/machines/index.ts", + "types": "./dist/esm/v3/machines/index.d.ts", + "default": "./dist/esm/v3/machines/index.js" + }, + "require": { + "types": "./dist/commonjs/v3/machines/index.d.ts", + "default": "./dist/commonjs/v3/machines/index.js" + } } }, "type": "module", diff --git a/packages/core/src/v3/machines/index.ts b/packages/core/src/v3/machines/index.ts new file mode 100644 index 0000000000..c388a1f728 --- /dev/null +++ b/packages/core/src/v3/machines/index.ts @@ -0,0 +1,13 @@ +import { MachinePreset } from "../schemas/common.js"; + +/** + * Returns a value to be used for `--max-old-space-size`. It is in MiB. + * Setting this correctly means V8 spends more times running Garbage Collection (GC). + * It won't eliminate crashes but it will help avoid them. + * @param {MachinePreset} machine - The machine preset configuration containing memory specifications + * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) + * @returns {number} The calculated max old space size in MiB + */ +export function maxOldSpaceSizeForMachine(machine: MachinePreset, overhead = 0.2) { + return machine.memory * 1_024 * (1 - overhead); +} From c560a187cb17955e095e92ae180db9ccb408f084 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 12:32:21 +0000 Subject: [PATCH 03/18] Make it easy to take `NODE_OPTIONS` and set the old space flag --- packages/core/src/v3/machines/index.ts | 48 +++++++++++++++- .../src/v3/machines/max-old-space.test.ts | 57 +++++++++++++++++++ packages/core/vitest.config.ts | 2 +- 3 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 packages/core/src/v3/machines/max-old-space.test.ts diff --git a/packages/core/src/v3/machines/index.ts b/packages/core/src/v3/machines/index.ts index c388a1f728..771f4345a4 100644 --- a/packages/core/src/v3/machines/index.ts +++ b/packages/core/src/v3/machines/index.ts @@ -8,6 +8,50 @@ import { MachinePreset } from "../schemas/common.js"; * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) * @returns {number} The calculated max old space size in MiB */ -export function maxOldSpaceSizeForMachine(machine: MachinePreset, overhead = 0.2) { - return machine.memory * 1_024 * (1 - overhead); +export function maxOldSpaceSizeForMachine(machine: MachinePreset, overhead: number = 0.2): number { + return Math.round(machine.memory * 1_024 * (1 - overhead)); +} + +/** + * Returns a flag to be used for `--max-old-space-size`. It is in MiB. + * Setting this correctly means V8 spends more times running Garbage Collection (GC). + * It won't eliminate crashes but it will help avoid them. + * @param {MachinePreset} machine - The machine preset configuration containing memory specifications + * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) + * @returns {string} The calculated max old space size flag + */ +export function maxOldSpaceSizeFlag(machine: MachinePreset, overhead: number = 0.2): string { + return `--max-old-space-size=${maxOldSpaceSizeForMachine(machine, overhead)}`; +} + +/** + * Takes the existing NODE_OPTIONS value, removes any existing max-old-space-size flag, and adds a new one. + * @param {string | undefined} existingOptions - The existing NODE_OPTIONS value + * @param {MachinePreset} machine - The machine preset configuration containing memory specifications + * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) + * @returns {string} The updated NODE_OPTIONS value with the new max-old-space-size flag + */ +export function nodeOptionsWithMaxOldSpaceSize( + existingOptions: string | undefined, + machine: MachinePreset, + overhead: number = 0.2 +): string { + let options = existingOptions ?? ""; + + //remove existing max-old-space-size flag + options = options.replace(/--max-old-space-size=\d+/g, "").trim(); + + //get max-old-space-size flag + const flag = maxOldSpaceSizeFlag(machine, overhead); + + return normalizeCommandLineFlags(options ? `${options} ${flag}` : flag); +} + +/** + * Normalizes spaces in a string of command line flags, ensuring single spaces between flags + * @param {string} input - The string to normalize + * @returns {string} The normalized string with single spaces between flags + */ +function normalizeCommandLineFlags(input: string): string { + return input.split(/\s+/).filter(Boolean).join(" "); } diff --git a/packages/core/src/v3/machines/max-old-space.test.ts b/packages/core/src/v3/machines/max-old-space.test.ts new file mode 100644 index 0000000000..425b819762 --- /dev/null +++ b/packages/core/src/v3/machines/max-old-space.test.ts @@ -0,0 +1,57 @@ +import { describe, it, expect } from "vitest"; +import { nodeOptionsWithMaxOldSpaceSize } from "./index.js"; +import { MachinePreset } from "../schemas/common.js"; + +describe("nodeOptionsWithMaxOldSpaceSize", () => { + const testMachine: MachinePreset = { + name: "small-2x", + memory: 1, // 1GB = 1024 MiB + cpu: 1, + centsPerMs: 0, + }; + + // With default 0.2 overhead, max-old-space-size should be 819 (1024 * 0.8) + const expectedFlag = "--max-old-space-size=819"; + + it("handles undefined NODE_OPTIONS", () => { + const result = nodeOptionsWithMaxOldSpaceSize(undefined, testMachine); + expect(result).toBe(expectedFlag); + }); + + it("handles empty string NODE_OPTIONS", () => { + const result = nodeOptionsWithMaxOldSpaceSize("", testMachine); + expect(result).toBe(expectedFlag); + }); + + it("preserves existing flags while adding max-old-space-size", () => { + const result = nodeOptionsWithMaxOldSpaceSize("--inspect --trace-warnings", testMachine); + expect(result).toBe(`--inspect --trace-warnings ${expectedFlag}`); + }); + + it("replaces existing max-old-space-size flag", () => { + const result = nodeOptionsWithMaxOldSpaceSize( + "--max-old-space-size=4096 --inspect", + testMachine + ); + expect(result).toBe(`--inspect ${expectedFlag}`); + }); + + it("handles multiple existing max-old-space-size flags", () => { + const result = nodeOptionsWithMaxOldSpaceSize( + "--max-old-space-size=4096 --inspect --max-old-space-size=8192", + testMachine + ); + expect(result).toBe(`--inspect ${expectedFlag}`); + }); + + it("handles extra spaces between flags", () => { + const result = nodeOptionsWithMaxOldSpaceSize("--inspect --trace-warnings", testMachine); + expect(result).toBe(`--inspect --trace-warnings ${expectedFlag}`); + }); + + it("uses custom overhead value", () => { + const result = nodeOptionsWithMaxOldSpaceSize("--inspect", testMachine, 0.5); + // With 0.5 overhead, max-old-space-size should be 512 (1024 * 0.5) + expect(result).toBe("--inspect --max-old-space-size=512"); + }); +}); diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 758c27056e..8db9c42ae0 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "vitest/config"; export default defineConfig({ test: { - include: ["test/**/*.test.ts"], + include: ["test/**/*.test.ts", "src/v3/**/*.test.ts"], globals: true, }, }); From 8b66469236508dacec77abebaad8b77d6eb244da Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 14:24:47 +0000 Subject: [PATCH 04/18] Added a zed task to rebuild the packages --- .zed/tasks.json | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 .zed/tasks.json diff --git a/.zed/tasks.json b/.zed/tasks.json new file mode 100644 index 0000000000..8612e16bfb --- /dev/null +++ b/.zed/tasks.json @@ -0,0 +1,45 @@ +[ + { + "label": "Build packages", + "command": "pnpm run build --filter \"@trigger.dev/*\" --filter trigger.dev", + //"args": [], + // Env overrides for the command, will be appended to the terminal's environment from the settings. + "env": { "foo": "bar" }, + // Current working directory to spawn the command into, defaults to current project root. + //"cwd": "/path/to/working/directory", + // Whether to use a new terminal tab or reuse the existing one to spawn the process, defaults to `false`. + "use_new_terminal": false, + // Whether to allow multiple instances of the same task to be run, or rather wait for the existing ones to finish, defaults to `false`. + "allow_concurrent_runs": false, + // What to do with the terminal pane and tab, after the command was started: + // * `always` — always show the task's pane, and focus the corresponding tab in it (default) + // * `no_focus` — always show the task's pane, add the task's tab in it, but don't focus it + // * `never` — do not alter focus, but still add/reuse the task's tab in its pane + "reveal": "always", + // What to do with the terminal pane and tab, after the command has finished: + // * `never` — Do nothing when the command finishes (default) + // * `always` — always hide the terminal tab, hide the pane also if it was the last tab in it + // * `on_success` — hide the terminal tab on task success only, otherwise behaves similar to `always` + "hide": "never", + // Which shell to use when running a task inside the terminal. + // May take 3 values: + // 1. (default) Use the system's default terminal configuration in /etc/passwd + // "shell": "system" + // 2. A program: + // "shell": { + // "program": "sh" + // } + // 3. A program with arguments: + // "shell": { + // "with_arguments": { + // "program": "/bin/bash", + // "args": ["--login"] + // } + // } + "shell": "system", + // Whether to show the task line in the output of the spawned task, defaults to `true`. + "show_summary": true, + // Whether to show the command line in the output of the spawned task, defaults to `true`. + "show_output": true + } +] From 87bd15a689e556a95737f11e745573d6897acae5 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 14:27:36 +0000 Subject: [PATCH 05/18] Moved isOOMRunError and added SIBABRT condition --- .../v3/services/alerts/deliverAlert.server.ts | 3 +- .../app/v3/services/completeAttempt.server.ts | 42 +----------- packages/core/src/v3/errors.ts | 67 +++++++++++++++++++ 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index 3c8d95cd5e..9f98d28a94 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -13,6 +13,7 @@ import { RunFailedWebhook, DeploymentFailedWebhook, DeploymentSuccessWebhook, + isOOMRunError, } from "@trigger.dev/core/v3"; import assertNever from "assert-never"; import { subtle } from "crypto"; @@ -375,7 +376,7 @@ export class DeliverAlertService extends BaseService { idempotencyKey: alert.taskRun.idempotencyKey ?? undefined, tags: alert.taskRun.runTags, error, - isOutOfMemoryError: isOOMError(error), + isOutOfMemoryError: isOOMRunError(error), machine: alert.taskRun.machinePreset ?? "Unknown", dashboardUrl: `${env.APP_ORIGIN}${v3RunPath( alert.project.organization, diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 324750f1a8..73c8fbc88d 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -11,6 +11,7 @@ import { TaskRunSuccessfulExecutionResult, flattenAttributes, isManualOutOfMemoryError, + isOOMRunError, sanitizeError, shouldRetryError, taskRunErrorEnhancer, @@ -255,7 +256,7 @@ export class CompleteAttemptService extends BaseService { let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); let isOOMRetry = false; - let isOOMAttempt = isOOMError(completion.error); + let isOOMAttempt = isOOMRunError(completion.error); let isOnMaxOOMMachine = false; let oomMachine: MachinePresetName | undefined; @@ -738,45 +739,6 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId: }); } -export function isOOMError(error: TaskRunError) { - if (error.type === "INTERNAL_ERROR") { - if ( - error.code === "TASK_PROCESS_OOM_KILLED" || - error.code === "TASK_PROCESS_MAYBE_OOM_KILLED" - ) { - return true; - } - - // For the purposes of retrying on a larger machine, we're going to treat this is an OOM error. - // This is what they look like if we're executing using k8s. They then get corrected later, but it's too late. - // {"code": "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE", "type": "INTERNAL_ERROR", "message": "Process exited with code -1 after signal SIGKILL."} - if ( - error.code === "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" && - error.message && - error.message.includes("SIGKILL") && - error.message.includes("-1") - ) { - return true; - } - } - - if (error.type === "BUILT_IN_ERROR") { - // ffmpeg also does weird stuff - // { "name": "Error", "type": "BUILT_IN_ERROR", "message": "ffmpeg was killed with signal SIGKILL" } - if (error.message && error.message.includes("ffmpeg was killed with signal SIGKILL")) { - return true; - } - } - - // Special `OutOfMemoryError` for doing a manual OOM kill. - // Useful if a native library does an OOM but doesn't actually crash the run and you want to manually - if (isManualOutOfMemoryError(error)) { - return true; - } - - return false; -} - function exitRun(runId: string) { socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { version: "v1", diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 88c089025f..a8e789f81d 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -76,6 +76,58 @@ export function isManualOutOfMemoryError(error: TaskRunError) { return false; } +export function isOOMRunError(error: TaskRunError) { + if (error.type === "INTERNAL_ERROR") { + if ( + error.code === "TASK_PROCESS_OOM_KILLED" || + error.code === "TASK_PROCESS_MAYBE_OOM_KILLED" + ) { + return true; + } + + // For the purposes of retrying on a larger machine, we're going to treat this is an OOM error. + // This is what they look like if we're executing using k8s. They then get corrected later, but it's too late. + // {"code": "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE", "type": "INTERNAL_ERROR", "message": "Process exited with code -1 after signal SIGKILL."} + if ( + error.code === "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" && + error.message && + error.message.includes("-1") + ) { + if (error.message.includes("SIGKILL")) { + return true; + } + + if (error.message.includes("SIGABRT") && error.stackTrace) { + const oomIndicators = [ + "JavaScript heap out of memory", + "Reached heap limit", + "FATAL ERROR: Reached heap limit Allocation failed", + ]; + + if (oomIndicators.some((indicator) => error.stackTrace!.includes(indicator))) { + return true; + } + } + } + } + + if (error.type === "BUILT_IN_ERROR") { + // ffmpeg also does weird stuff + // { "name": "Error", "type": "BUILT_IN_ERROR", "message": "ffmpeg was killed with signal SIGKILL" } + if (error.message && error.message.includes("ffmpeg was killed with signal SIGKILL")) { + return true; + } + } + + // Special `OutOfMemoryError` for doing a manual OOM kill. + // Useful if a native library does an OOM but doesn't actually crash the run and you want to manually + if (isManualOutOfMemoryError(error)) { + return true; + } + + return false; +} + export class TaskPayloadParsedError extends Error { public readonly cause: unknown; @@ -562,6 +614,8 @@ const findSignalInMessage = (message?: string, truncateLength = 100) => { return "SIGSEGV"; } else if (trunc.includes("SIGKILL")) { return "SIGKILL"; + } else if (trunc.includes("SIGABRT")) { + return "SIGABRT"; } else { return; } @@ -587,6 +641,10 @@ export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError Date: Thu, 6 Mar 2025 14:28:10 +0000 Subject: [PATCH 06/18] Deduplication flags function with tests --- packages/core/src/v3/build/flags.test.ts | 53 ++++++++++++++++++++++++ packages/core/src/v3/build/flags.ts | 47 +++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 packages/core/src/v3/build/flags.test.ts create mode 100644 packages/core/src/v3/build/flags.ts diff --git a/packages/core/src/v3/build/flags.test.ts b/packages/core/src/v3/build/flags.test.ts new file mode 100644 index 0000000000..aaa3149bea --- /dev/null +++ b/packages/core/src/v3/build/flags.test.ts @@ -0,0 +1,53 @@ +import { describe, it, expect } from "vitest"; +import { dedupFlags } from "./flags.js"; + +describe("dedupFlags", () => { + it("should keep single flags unchanged", () => { + expect(dedupFlags("--verbose")).toBe("--verbose"); + expect(dedupFlags("-v")).toBe("-v"); + expect(dedupFlags("--debug=true")).toBe("--debug=true"); + }); + + it("should preserve multiple different flags", () => { + expect(dedupFlags("--quiet --verbose")).toBe("--quiet --verbose"); + expect(dedupFlags("-v -q --log=debug")).toBe("-v -q --log=debug"); + }); + + it("should use last value when flags are duplicated", () => { + expect(dedupFlags("--debug=false --debug=true")).toBe("--debug=true"); + expect(dedupFlags("--log=info --log=warn --log=error")).toBe("--log=error"); + }); + + it("should handle mix of flags with and without values", () => { + expect(dedupFlags("--debug=false -v --debug=true")).toBe("-v --debug=true"); + expect(dedupFlags("-v --quiet -v")).toBe("--quiet -v"); + }); + + // Edge cases + it("should handle empty string", () => { + expect(dedupFlags("")).toBe(""); + }); + + it("should handle multiple spaces between flags", () => { + expect(dedupFlags("--debug=false --verbose --debug=true")).toBe("--verbose --debug=true"); + }); + + it("should handle flags with empty values", () => { + expect(dedupFlags("--path= --path=foo")).toBe("--path=foo"); + expect(dedupFlags("--path=foo --path=")).toBe("--path="); + }); + + it("should preserve values containing equals signs", () => { + expect(dedupFlags("--url=http://example.com?foo=bar")).toBe("--url=http://example.com?foo=bar"); + }); + + it("should handle flags with special characters", () => { + expect(dedupFlags("--path=/usr/local --path=/home/user")).toBe("--path=/home/user"); + expect(dedupFlags('--name="John Doe" --name="Jane Doe"')).toBe('--name="Jane Doe"'); + }); + + it("should handle multiple flag variants", () => { + const input = "--env=dev -v --env=prod --quiet -v --env=staging"; + expect(dedupFlags(input)).toBe("--quiet -v --env=staging"); + }); +}); diff --git a/packages/core/src/v3/build/flags.ts b/packages/core/src/v3/build/flags.ts new file mode 100644 index 0000000000..88448fa730 --- /dev/null +++ b/packages/core/src/v3/build/flags.ts @@ -0,0 +1,47 @@ +/** + * Deduplicates command line flags by keeping only the last occurrence of each flag. + * Preserves the order of the last occurrence of each flag. + * + * @param flags - A space-separated string of command line flags + * @returns A space-separated string of deduplicated flags + * + * @example + * // Single flags are preserved + * dedupFlags("--quiet --verbose") // returns "--quiet --verbose" + * + * @example + * // For duplicate flags, the last value wins and maintains its position + * dedupFlags("--debug=false --log=info --debug=true") // returns "--log=info --debug=true" + * + * @example + * // Mixing flags with and without values + * dedupFlags("-v --log=debug -v") // returns "--log=debug -v" + */ +export function dedupFlags(flags: string): string { + const seen = new Set(); + const result: [string, string | boolean][] = []; + + const pairs = flags + .split(" ") + .filter(Boolean) // Remove empty strings from multiple spaces + .map((flag): [string, string | boolean] => { + const equalIndex = flag.indexOf("="); + if (equalIndex !== -1) { + const key = flag.slice(0, equalIndex); + const value = flag.slice(equalIndex + 1); + return [key, value]; + } else { + return [flag, true]; + } + }); + + // Process in reverse to keep last occurrence + for (const [key, value] of pairs.reverse()) { + if (!seen.has(key)) { + seen.add(key); + result.unshift([key, value]); + } + } + + return result.map(([key, value]) => (value === true ? key : `${key}=${value}`)).join(" "); +} From 408a42d1fc8cd3ad5037d97386ce6308cf44e7e5 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 14:28:16 +0000 Subject: [PATCH 07/18] Export flags file --- packages/core/src/v3/build/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/core/src/v3/build/index.ts b/packages/core/src/v3/build/index.ts index 38245d4798..c29a82da2e 100644 --- a/packages/core/src/v3/build/index.ts +++ b/packages/core/src/v3/build/index.ts @@ -2,3 +2,4 @@ export * from "./extensions.js"; export * from "./resolvedConfig.js"; export * from "./runtime.js"; export * from "./externals.js"; +export * from "./flags.js"; From 0a2c0902c4b1f25205f8c94c293aa25c20607dc7 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 14:29:04 +0000 Subject: [PATCH 08/18] On TaskRunProcess, set max old space and deduplicate the flags with priority order --- packages/cli-v3/src/executions/taskRunProcess.ts | 7 +++++-- packages/core/src/v3/build/runtime.ts | 15 ++++++++++++--- packages/core/src/v3/schemas/common.ts | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 59643b5db2..052da5bce9 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -19,6 +19,7 @@ import { ChildProcess, fork } from "node:child_process"; import { chalkError, chalkGrey, chalkRun, prettyPrintDate } from "../utilities/cliOutput.js"; import { execOptionsForRuntime, execPathForRuntime } from "@trigger.dev/core/v3/build"; +import { nodeOptionsWithMaxOldSpaceSize } from "@trigger.dev/core/v3/machines"; import { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket"; import { logger } from "../utilities/logger.js"; import { @@ -117,14 +118,16 @@ export class TaskRunProcess { } async initialize() { - const { env: $env, workerManifest, cwd, messageId } = this.options; + const { env: $env, workerManifest, cwd, messageId, payload } = this.options; + + const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, payload.execution.machine); const fullEnv = { ...(this.isTest ? { TRIGGER_LOG_LEVEL: "debug" } : {}), ...$env, OTEL_IMPORT_HOOK_INCLUDES: workerManifest.otelImportHook?.include?.join(","), // TODO: this will probably need to use something different for bun (maybe --preload?) - NODE_OPTIONS: execOptionsForRuntime(workerManifest.runtime, workerManifest), + NODE_OPTIONS: execOptionsForRuntime(workerManifest.runtime, workerManifest, maxOldSpaceSize), PATH: process.env.PATH, TRIGGER_PROCESS_FORK_START_TIME: String(Date.now()), }; diff --git a/packages/core/src/v3/build/runtime.ts b/packages/core/src/v3/build/runtime.ts index 94b3fac08c..dc8ce19c6c 100644 --- a/packages/core/src/v3/build/runtime.ts +++ b/packages/core/src/v3/build/runtime.ts @@ -1,6 +1,7 @@ import { join } from "node:path"; import { pathToFileURL } from "url"; import { BuildRuntime } from "../schemas/build.js"; +import { dedupFlags } from "./flags.js"; export const DEFAULT_RUNTIME = "node" satisfies BuildRuntime; @@ -41,7 +42,11 @@ export type ExecOptions = { customConditions?: string[]; }; -export function execOptionsForRuntime(runtime: BuildRuntime, options: ExecOptions): string { +export function execOptionsForRuntime( + runtime: BuildRuntime, + options: ExecOptions, + additionalNodeOptions?: string +): string { switch (runtime) { case "node": case "node-22": { @@ -51,15 +56,19 @@ export function execOptionsForRuntime(runtime: BuildRuntime, options: ExecOption const conditions = options.customConditions?.map((condition) => `--conditions=${condition}`); - return [ + //later flags will win (after the dedupe) + const flags = [ + process.env.NODE_OPTIONS, + additionalNodeOptions, importEntryPoint, conditions, - process.env.NODE_OPTIONS, nodeRuntimeNeedsGlobalWebCryptoFlag() ? "--experimental-global-webcrypto" : undefined, ] .filter(Boolean) .flat() .join(" "); + + return dedupFlags(flags); } case "bun": { return ""; diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index cf82719f76..fbfd4e97e7 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -308,7 +308,7 @@ export const TaskRunExecution = z.object({ organization: TaskRunExecutionOrganization, project: TaskRunExecutionProject, batch: TaskRunExecutionBatch.optional(), - machine: MachinePreset.optional(), + machine: MachinePreset, }); export type TaskRunExecution = z.infer; From a06b7d1474b24ee1bd2dadf19b0b07cb0eab0d5c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 16:56:48 +0000 Subject: [PATCH 09/18] Move retrying logic to a separate function, it was getting very messy --- .../run-engine/src/engine/index.ts | 334 ++++++++---------- .../run-engine/src/engine/retrying.ts | 165 +++++++++ 2 files changed, 307 insertions(+), 192 deletions(-) create mode 100644 internal-packages/run-engine/src/engine/retrying.ts diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index cddf3994d8..5bce47b97d 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -71,6 +71,7 @@ import { isPendingExecuting, } from "./statuses"; import { HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types"; +import { retryOutcomeFromCompletion } from "./retrying"; const workerCatalog = { finishWaitpoint: { @@ -2867,228 +2868,177 @@ export class RunEngine { const failedAt = new Date(); - if ( - completion.error.type === "INTERNAL_ERROR" && - completion.error.code === "TASK_RUN_CANCELLED" - ) { - // We need to cancel the task run instead of fail it - const result = await this.cancelRun({ - runId, - completedAt: failedAt, - reason: completion.error.message, - finalizeRun: true, - tx: prisma, - }); - return { - attemptStatus: - result.snapshot.executionStatus === "PENDING_CANCEL" - ? "RUN_PENDING_CANCEL" - : "RUN_FINISHED", - ...result, - }; - } - - const error = sanitizeError(completion.error); - const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); - - const permanentlyFailRun = async (run?: { - status: TaskRunStatus; - spanId: string; - createdAt: Date; - completedAt: Date | null; - taskEventStore: string; - }) => { - // Emit an event so we can complete any spans of stalled executions - if (forceRequeue && run) { - this.eventBus.emit("runAttemptFailed", { - time: failedAt, - run: { - id: runId, - status: run.status, - spanId: run.spanId, - error, - attemptNumber: latestSnapshot.attemptNumber ?? 0, - createdAt: run.createdAt, - completedAt: run.completedAt, - taskEventStore: run.taskEventStore, - }, - }); - } - - return await this.#permanentlyFailRun({ - runId, - snapshotId, - failedAt, - error, - workerId, - runnerId, - }); - }; - - // Error is not retriable, fail the run - if (!retriableError) { - return await permanentlyFailRun(); - } - - // No retry config attached to completion, fail the run - if (completion.retry === undefined) { - return await permanentlyFailRun(); - } - - // Run attempts have reached the global maximum, fail the run - if ( - latestSnapshot.attemptNumber !== null && - latestSnapshot.attemptNumber >= MAX_TASK_RUN_ATTEMPTS - ) { - return await permanentlyFailRun(); - } + const retryResult = await retryOutcomeFromCompletion(prisma, { + runId, + error: completion.error, + retryUsingQueue: forceRequeue ?? false, + retrySettings: completion.retry, + attemptNumber: latestSnapshot.attemptNumber, + }); - const minimalRun = await prisma.taskRun.findFirst({ - where: { - id: runId, - }, - select: { - status: true, - spanId: true, - maxAttempts: true, - runtimeEnvironment: { - select: { - organizationId: true, + // Force requeue means it was crashed so the attempt span needs to be closed + if (forceRequeue) { + const minimalRun = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + status: true, + spanId: true, + maxAttempts: true, + runtimeEnvironment: { + select: { + organizationId: true, + }, }, + taskEventStore: true, + createdAt: true, + completedAt: true, }, - taskEventStore: true, - createdAt: true, - completedAt: true, - }, - }); - - if (!minimalRun) { - throw new ServiceValidationError("Run not found", 404); - } - - // Run doesn't have any max attempts set which is required for retrying, fail the run - if (!minimalRun.maxAttempts) { - return await permanentlyFailRun(minimalRun); - } + }); - // Run has reached the maximum configured number of attempts, fail the run - if ( - latestSnapshot.attemptNumber !== null && - latestSnapshot.attemptNumber >= minimalRun.maxAttempts - ) { - return await permanentlyFailRun(minimalRun); - } + if (!minimalRun) { + throw new ServiceValidationError("Run not found", 404); + } - // This error didn't come from user code, so we need to emit an event to complete any spans - if (forceRequeue) { this.eventBus.emit("runAttemptFailed", { time: failedAt, run: { id: runId, status: minimalRun.status, spanId: minimalRun.spanId, - error, + error: completion.error, attemptNumber: latestSnapshot.attemptNumber ?? 0, - taskEventStore: minimalRun.taskEventStore, createdAt: minimalRun.createdAt, completedAt: minimalRun.completedAt, + taskEventStore: minimalRun.taskEventStore, }, }); } - const retryAt = new Date(completion.retry.timestamp); + switch (retryResult.outcome) { + case "cancel_run": { + const result = await this.cancelRun({ + runId, + completedAt: failedAt, + reason: retryResult.reason, + finalizeRun: true, + tx: prisma, + }); + return { + attemptStatus: + result.snapshot.executionStatus === "PENDING_CANCEL" + ? "RUN_PENDING_CANCEL" + : "RUN_FINISHED", + ...result, + }; + } + case "fail_run": { + return await this.#permanentlyFailRun({ + runId, + snapshotId, + failedAt, + error: retryResult.sanitizedError, + workerId, + runnerId, + }); + } + case "retry": { + const retryAt = new Date(retryResult.settings.timestamp); - const run = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { - status: "RETRYING_AFTER_FAILURE", - }, - include: { - runtimeEnvironment: { + const run = await prisma.taskRun.update({ + where: { + id: runId, + }, + data: { + status: "RETRYING_AFTER_FAILURE", + machinePreset: retryResult.machine, + }, include: { - project: true, - organization: true, - orgMember: true, + runtimeEnvironment: { + include: { + project: true, + organization: true, + orgMember: true, + }, + }, }, - }, - }, - }); + }); - const nextAttemptNumber = - latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1; + const nextAttemptNumber = + latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1; - this.eventBus.emit("runRetryScheduled", { - time: failedAt, - run: { - id: run.id, - friendlyId: run.friendlyId, - attemptNumber: nextAttemptNumber, - queue: run.queue, - taskIdentifier: run.taskIdentifier, - traceContext: run.traceContext as Record, - baseCostInCents: run.baseCostInCents, - spanId: run.spanId, - }, - organization: { - id: run.runtimeEnvironment.organizationId, - }, - environment: run.runtimeEnvironment, - retryAt, - }); + this.eventBus.emit("runRetryScheduled", { + time: failedAt, + run: { + id: run.id, + friendlyId: run.friendlyId, + attemptNumber: nextAttemptNumber, + queue: run.queue, + taskIdentifier: run.taskIdentifier, + traceContext: run.traceContext as Record, + baseCostInCents: run.baseCostInCents, + spanId: run.spanId, + }, + organization: { + id: run.runtimeEnvironment.organizationId, + }, + environment: run.runtimeEnvironment, + retryAt, + }); + + //if it's a long delay and we support checkpointing, put it back in the queue + if ( + forceRequeue || + retryResult.method === "queue" || + (this.options.retryWarmStartThresholdMs !== undefined && + retryResult.settings.delay >= this.options.retryWarmStartThresholdMs) + ) { + //we nack the message, requeuing it for later + const nackResult = await this.#tryNackAndRequeue({ + run, + environment: run.runtimeEnvironment, + orgId: run.runtimeEnvironment.organizationId, + timestamp: retryAt.getTime(), + error: { + type: "INTERNAL_ERROR", + code: "TASK_RUN_DEQUEUED_MAX_RETRIES", + message: `We tried to dequeue the run the maximum number of times but it wouldn't start executing`, + }, + tx: prisma, + }); - //todo anything special for DEV? Ideally not. + if (!nackResult.wasRequeued) { + return { + attemptStatus: "RUN_FINISHED", + ...nackResult, + }; + } else { + return { attemptStatus: "RETRY_QUEUED", ...nackResult }; + } + } - //if it's a long delay and we support checkpointing, put it back in the queue - if ( - forceRequeue || - (this.options.retryWarmStartThresholdMs !== undefined && - completion.retry.delay >= this.options.retryWarmStartThresholdMs) - ) { - //we nack the message, requeuing it for later - const nackResult = await this.#tryNackAndRequeue({ - run, - environment: run.runtimeEnvironment, - orgId: run.runtimeEnvironment.organizationId, - timestamp: retryAt.getTime(), - error: { - type: "INTERNAL_ERROR", - code: "TASK_RUN_DEQUEUED_MAX_RETRIES", - message: `We tried to dequeue the run the maximum number of times but it wouldn't start executing`, - }, - tx: prisma, - }); + //it will continue running because the retry delay is short + const newSnapshot = await this.#createExecutionSnapshot(prisma, { + run, + snapshot: { + executionStatus: "PENDING_EXECUTING", + description: "Attempt failed with a short delay, starting a new attempt", + }, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + workerId, + runnerId, + }); + //the worker can fetch the latest snapshot and should create a new attempt + await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot }); - if (!nackResult.wasRequeued) { return { - attemptStatus: "RUN_FINISHED", - ...nackResult, + attemptStatus: "RETRY_IMMEDIATELY", + ...executionResultFromSnapshot(newSnapshot), }; - } else { - return { attemptStatus: "RETRY_QUEUED", ...nackResult }; } } - - //it will continue running because the retry delay is short - const newSnapshot = await this.#createExecutionSnapshot(prisma, { - run, - snapshot: { - executionStatus: "PENDING_EXECUTING", - description: "Attempt failed with a short delay, starting a new attempt", - }, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - workerId, - runnerId, - }); - //the worker can fetch the latest snapshot and should create a new attempt - await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot }); - - return { - attemptStatus: "RETRY_IMMEDIATELY", - ...executionResultFromSnapshot(newSnapshot), - }; }); }); } diff --git a/internal-packages/run-engine/src/engine/retrying.ts b/internal-packages/run-engine/src/engine/retrying.ts new file mode 100644 index 0000000000..12e3f23c30 --- /dev/null +++ b/internal-packages/run-engine/src/engine/retrying.ts @@ -0,0 +1,165 @@ +import { + isOOMRunError, + RetryOptions, + shouldRetryError, + TaskRunError, + TaskRunExecutionRetry, + taskRunErrorEnhancer, + sanitizeError, +} from "@trigger.dev/core/v3"; +import { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database"; +import { MAX_TASK_RUN_ATTEMPTS } from "./consts"; +import { ServiceValidationError } from "."; + +type Params = { + runId: string; + attemptNumber: number | null; + error: TaskRunError; + retryUsingQueue: boolean; + retrySettings: TaskRunExecutionRetry | undefined; +}; + +export type RetryOutcome = + | { + outcome: "cancel_run"; + reason?: string; + } + | { + outcome: "fail_run"; + sanitizedError: TaskRunError; + wasOOMError?: boolean; + } + | { + outcome: "retry"; + method: "queue" | "immediate"; + settings: TaskRunExecutionRetry; + machine?: string; + }; + +export async function retryOutcomeFromCompletion( + prisma: PrismaClientOrTransaction, + { runId, attemptNumber, error, retryUsingQueue, retrySettings }: Params +): Promise { + // Canceled + if (error.type === "INTERNAL_ERROR" && error.code === "TASK_RUN_CANCELLED") { + return { outcome: "cancel_run", reason: error.message }; + } + + const sanitizedError = sanitizeError(error); + + // No retry settings + if (!retrySettings) { + return { outcome: "fail_run", sanitizedError }; + } + + // OOM error (retry on a larger machine or fail) + if (isOOMRunError(error)) { + const newMachine = await retryOOMOnMachine(prisma, runId); + if (!newMachine) { + return { outcome: "fail_run", sanitizedError, wasOOMError: true }; + } + + return { + outcome: "retry", + method: "queue", + settings: retrySettings, + machine: newMachine, + }; + } + + // Not a retriable error: fail + const retriableError = shouldRetryError(taskRunErrorEnhancer(error)); + if (!retriableError) { + return { outcome: "fail_run", sanitizedError }; + } + + // Exceeded global max attempts + if (attemptNumber !== null && attemptNumber > MAX_TASK_RUN_ATTEMPTS) { + return { outcome: "fail_run", sanitizedError }; + } + + // Get the run settings + const run = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + maxAttempts: true, + }, + }); + + if (!run) { + throw new ServiceValidationError("Run not found", 404); + } + + // No max attempts set + if (!run.maxAttempts) { + return { outcome: "fail_run", sanitizedError }; + } + + // No attempts left + if (attemptNumber !== null && attemptNumber >= run.maxAttempts) { + return { outcome: "fail_run", sanitizedError }; + } + + return { + outcome: "retry", + method: retryUsingQueue ? "queue" : "immediate", + settings: retrySettings, + }; +} + +async function retryOOMOnMachine( + prisma: PrismaClientOrTransaction, + runId: string +): Promise { + try { + const run = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + machinePreset: true, + lockedBy: { + select: { + retryConfig: true, + }, + }, + }, + }); + + if (!run || !run.lockedBy || !run.machinePreset) { + return; + } + + const retryConfig = run.lockedBy?.retryConfig; + const parsedRetryConfig = RetryOptions.nullish().safeParse(retryConfig); + + if (!parsedRetryConfig.success) { + return; + } + + if (!parsedRetryConfig.data) { + return; + } + + const retryMachine = parsedRetryConfig.data.outOfMemory?.machine; + + if (!retryMachine) { + return; + } + + if (run.machinePreset === retryMachine) { + return; + } + + return retryMachine; + } catch (error) { + console.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { + runId, + error, + }); + + return; + } +} From 676672006b6a68086468dedf4b720e1f7484e0c0 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 16:59:31 +0000 Subject: [PATCH 10/18] Created new test file for attempt failures --- .../src/engine/tests/attemptFailures.test.ts | 169 ++++++++++++++++++ .../src/engine/tests/trigger.test.ts | 157 ---------------- 2 files changed, 169 insertions(+), 157 deletions(-) create mode 100644 internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts diff --git a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts new file mode 100644 index 0000000000..08b9d99827 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts @@ -0,0 +1,169 @@ +import { + assertNonNullable, + containerTest, + setupAuthenticatedEnvironment, + setupBackgroundWorker, +} from "@internal/testcontainers"; +import { trace } from "@opentelemetry/api"; +import { expect } from "vitest"; +import { EventBusEventArgs } from "../eventBus.js"; +import { RunEngine } from "../index.js"; + +describe("RunEngine attempt failures", () => { + containerTest( + "Single run (retry attempt, then succeed)", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + prisma, + authenticatedEnvironment, + taskIdentifier + ); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt + const error = { + type: "BUILT_IN_ERROR" as const, + name: "UserError", + message: "This is a user error", + stackTrace: "Error: This is a user error\n at :1:1", + }; + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + expect(result.attemptStatus).toBe("RETRY_IMMEDIATELY"); + expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING"); + expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //state should be completed + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING"); + //only when the new attempt is created, should the attempt be increased + expect(executionData3.run.attemptNumber).toBe(1); + expect(executionData3.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //create a second attempt + const attemptResult2 = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: executionData3.snapshot.id, + }); + expect(attemptResult2.run.attemptNumber).toBe(2); + + //now complete it successfully + const result2 = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult2.snapshot.id, + completion: { + ok: true, + id: dequeued[0].run.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + expect(result2.snapshot.executionStatus).toBe("FINISHED"); + expect(result2.run.attemptNumber).toBe(2); + expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY"); + + //waitpoint should have been completed, with the output + const runWaitpointAfter = await prisma.waitpoint.findMany({ + where: { + completedByTaskRunId: run.id, + }, + }); + expect(runWaitpointAfter.length).toBe(1); + expect(runWaitpointAfter[0].type).toBe("RUN"); + expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`); + expect(runWaitpointAfter[0].outputIsError).toBe(false); + + //state should be completed + const executionData4 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData4); + expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData4.run.attemptNumber).toBe(2); + expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY"); + } finally { + engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/trigger.test.ts b/internal-packages/run-engine/src/engine/tests/trigger.test.ts index 69dbb66b08..1eef5dd838 100644 --- a/internal-packages/run-engine/src/engine/tests/trigger.test.ts +++ b/internal-packages/run-engine/src/engine/tests/trigger.test.ts @@ -330,161 +330,4 @@ describe("RunEngine trigger()", () => { engine.quit(); } }); - - containerTest( - "Single run (retry attempt, then succeed)", - { timeout: 15_000 }, - async ({ prisma, redisOptions }) => { - //create environment - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - - const engine = new RunEngine({ - prisma, - worker: { - redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, - }, - queue: { - redis: redisOptions, - }, - runLock: { - redis: redisOptions, - }, - machines: { - defaultMachine: "small-1x", - machines: { - "small-1x": { - name: "small-1x" as const, - cpu: 0.5, - memory: 0.5, - centsPerMs: 0.0001, - }, - }, - baseCostInCents: 0.0001, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - try { - const taskIdentifier = "test-task"; - - //create background worker - const backgroundWorker = await setupBackgroundWorker( - prisma, - authenticatedEnvironment, - taskIdentifier - ); - - //trigger the run - const run = await engine.trigger( - { - number: 1, - friendlyId: "run_1234", - environment: authenticatedEnvironment, - taskIdentifier, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t12345", - spanId: "s12345", - masterQueue: "main", - queueName: "task/test-task", - isTest: false, - tags: [], - }, - prisma - ); - - //dequeue the run - const dequeued = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: run.masterQueue, - maxRunCount: 10, - }); - - //create an attempt - const attemptResult = await engine.startRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: dequeued[0].snapshot.id, - }); - - //fail the attempt - const error = { - type: "BUILT_IN_ERROR" as const, - name: "UserError", - message: "This is a user error", - stackTrace: "Error: This is a user error\n at :1:1", - }; - const result = await engine.completeRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: attemptResult.snapshot.id, - completion: { - ok: false, - id: dequeued[0].run.id, - error, - retry: { - timestamp: Date.now(), - delay: 0, - }, - }, - }); - expect(result.attemptStatus).toBe("RETRY_IMMEDIATELY"); - expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING"); - expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); - - //state should be completed - const executionData3 = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData3); - expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING"); - //only when the new attempt is created, should the attempt be increased - expect(executionData3.run.attemptNumber).toBe(1); - expect(executionData3.run.status).toBe("RETRYING_AFTER_FAILURE"); - - //create a second attempt - const attemptResult2 = await engine.startRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: executionData3.snapshot.id, - }); - expect(attemptResult2.run.attemptNumber).toBe(2); - - //now complete it successfully - const result2 = await engine.completeRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: attemptResult2.snapshot.id, - completion: { - ok: true, - id: dequeued[0].run.id, - output: `{"foo":"bar"}`, - outputType: "application/json", - }, - }); - expect(result2.snapshot.executionStatus).toBe("FINISHED"); - expect(result2.run.attemptNumber).toBe(2); - expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY"); - - //waitpoint should have been completed, with the output - const runWaitpointAfter = await prisma.waitpoint.findMany({ - where: { - completedByTaskRunId: run.id, - }, - }); - expect(runWaitpointAfter.length).toBe(1); - expect(runWaitpointAfter[0].type).toBe("RUN"); - expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`); - expect(runWaitpointAfter[0].outputIsError).toBe(false); - - //state should be completed - const executionData4 = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData4); - expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); - expect(executionData4.run.attemptNumber).toBe(2); - expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY"); - } finally { - engine.quit(); - } - } - ); }); From 8f1086c3eaa41ebcedd610a6cbd7057f47050df2 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 17:52:17 +0000 Subject: [PATCH 11/18] Allow setting retry settings for tests --- internal-packages/testcontainers/src/setup.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal-packages/testcontainers/src/setup.ts b/internal-packages/testcontainers/src/setup.ts index f79f2f1ef7..ffd9d86e1b 100644 --- a/internal-packages/testcontainers/src/setup.ts +++ b/internal-packages/testcontainers/src/setup.ts @@ -68,7 +68,8 @@ export async function setupBackgroundWorker( prisma: PrismaClient, environment: AuthenticatedEnvironment, taskIdentifier: string | string[], - machineConfig?: MachineConfig + machineConfig?: MachineConfig, + retryOptions?: RetryOptions ) { const worker = await prisma.backgroundWorker.create({ data: { @@ -86,7 +87,7 @@ export async function setupBackgroundWorker( const tasks: BackgroundWorkerTask[] = []; for (const identifier of taskIdentifiers) { - const retryConfig: RetryOptions = { + const retryConfig: RetryOptions = retryOptions ?? { maxAttempts: 3, factor: 1, minTimeoutInMs: 100, From 7aa4598bf8217b08b5dc20b16b50b6475e826f8a Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 18:11:16 +0000 Subject: [PATCH 12/18] Some retrying tests, including OOM --- .../src/engine/tests/attemptFailures.test.ts | 391 +++++++++++++++++- 1 file changed, 389 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts index 08b9d99827..dc4312b759 100644 --- a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts +++ b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts @@ -11,7 +11,7 @@ import { RunEngine } from "../index.js"; describe("RunEngine attempt failures", () => { containerTest( - "Single run (retry attempt, then succeed)", + "Retry user error and succeed", { timeout: 15_000 }, async ({ prisma, redisOptions }) => { //create environment @@ -114,7 +114,7 @@ describe("RunEngine attempt failures", () => { expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING"); expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); - //state should be completed + //state should be pending const executionData3 = await engine.getRunExecutionData({ runId: run.id }); assertNonNullable(executionData3); expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING"); @@ -166,4 +166,391 @@ describe("RunEngine attempt failures", () => { } } ); + + containerTest("Fail (no more retries)", { timeout: 15_000 }, async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + maxAttempts: 1, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt + const error = { + type: "BUILT_IN_ERROR" as const, + name: "UserError", + message: "This is a user error", + stackTrace: "Error: This is a user error\n at :1:1", + }; + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + expect(result.attemptStatus).toBe("RUN_FINISHED"); + expect(result.snapshot.executionStatus).toBe("FINISHED"); + expect(result.run.status).toBe("COMPLETED_WITH_ERRORS"); + + //state should be pending + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + //only when the new attempt is created, should the attempt be increased + expect(executionData3.run.attemptNumber).toBe(1); + expect(executionData3.run.status).toBe("COMPLETED_WITH_ERRORS"); + } finally { + engine.quit(); + } + }); + + containerTest("OOM fail", { timeout: 15_000 }, async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt with an OOM error + const error = { + type: "INTERNAL_ERROR" as const, + code: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const, + message: "Process exited with code -1 after signal SIGKILL.", + stackTrace: "JavaScript heap out of memory", + }; + + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + + // The run should be retried with a larger machine + expect(result.attemptStatus).toBe("RUN_FINISHED"); + expect(result.snapshot.executionStatus).toBe("FINISHED"); + expect(result.run.status).toBe("CRASHED"); + + //state should be pending + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData.run.attemptNumber).toBe(1); + expect(executionData.run.status).toBe("CRASHED"); + } finally { + engine.quit(); + } + }); + + containerTest( + "OOM retry on larger machine", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + "small-2x": { + name: "small-2x" as const, + cpu: 1, + memory: 1, + centsPerMs: 0.0002, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + outOfMemory: { + machine: "small-2x", + }, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt with an OOM error + const error = { + type: "INTERNAL_ERROR" as const, + code: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const, + message: "Process exited with code -1 after signal SIGKILL.", + stackTrace: "JavaScript heap out of memory", + }; + + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + + // The run should be retried with a larger machine + expect(result.attemptStatus).toBe("RETRY_QUEUED"); + expect(result.snapshot.executionStatus).toBe("QUEUED"); + expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //state should be pending + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("QUEUED"); + expect(executionData.run.attemptNumber).toBe(1); + expect(executionData.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //create a second attempt + const attemptResult2 = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: executionData.snapshot.id, + }); + expect(attemptResult2.run.attemptNumber).toBe(2); + + //now complete it successfully + const result2 = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult2.snapshot.id, + completion: { + ok: true, + id: dequeued[0].run.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + expect(result2.snapshot.executionStatus).toBe("FINISHED"); + expect(result2.run.attemptNumber).toBe(2); + expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY"); + + //waitpoint should have been completed, with the output + const runWaitpointAfter = await prisma.waitpoint.findMany({ + where: { + completedByTaskRunId: run.id, + }, + }); + expect(runWaitpointAfter.length).toBe(1); + expect(runWaitpointAfter[0].type).toBe("RUN"); + expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`); + expect(runWaitpointAfter[0].outputIsError).toBe(false); + + //state should be completed + const executionData4 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData4); + expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData4.run.attemptNumber).toBe(2); + expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY"); + } finally { + engine.quit(); + } + } + ); }); From c46d56059cce66ea82f5d0e46b30dc50614d7cfe Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 18:37:41 +0000 Subject: [PATCH 13/18] More failure condition tests --- .../src/engine/tests/attemptFailures.test.ts | 279 ++++++++++++++++++ 1 file changed, 279 insertions(+) diff --git a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts index dc4312b759..152bf88b76 100644 --- a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts +++ b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts @@ -278,6 +278,119 @@ describe("RunEngine attempt failures", () => { } }); + containerTest( + "Fail (not a retriable error)", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + maxAttempts: 1, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt with an unretriable error + const error = { + type: "INTERNAL_ERROR" as const, + code: "DISK_SPACE_EXCEEDED" as const, + }; + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + expect(result.attemptStatus).toBe("RUN_FINISHED"); + expect(result.snapshot.executionStatus).toBe("FINISHED"); + expect(result.run.status).toBe("CRASHED"); + + //state should be pending + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + //only when the new attempt is created, should the attempt be increased + expect(executionData3.run.attemptNumber).toBe(1); + expect(executionData3.run.status).toBe("CRASHED"); + } finally { + engine.quit(); + } + } + ); + containerTest("OOM fail", { timeout: 15_000 }, async ({ prisma, redisOptions }) => { //create environment const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); @@ -553,4 +666,170 @@ describe("RunEngine attempt failures", () => { } } ); + + containerTest( + "OOM fails after retrying on larger machine", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + "small-2x": { + name: "small-2x" as const, + cpu: 1, + memory: 1, + centsPerMs: 0.0002, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + outOfMemory: { + machine: "small-2x", + }, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create first attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the first attempt with an OOM error + const error = { + type: "INTERNAL_ERROR" as const, + code: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const, + message: "Process exited with code -1 after signal SIGKILL.", + stackTrace: "JavaScript heap out of memory", + }; + + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + + // The run should be retried with a larger machine + expect(result.attemptStatus).toBe("RETRY_QUEUED"); + expect(result.snapshot.executionStatus).toBe("QUEUED"); + expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //state should be queued + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("QUEUED"); + expect(executionData.run.attemptNumber).toBe(1); + expect(executionData.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //dequeue again + const dequeued2 = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create second attempt + const attemptResult2 = await engine.startRunAttempt({ + runId: dequeued2[0].run.id, + snapshotId: dequeued2[0].snapshot.id, + }); + expect(attemptResult2.run.attemptNumber).toBe(2); + + //fail the second attempt with the same OOM error + const result2 = await engine.completeRunAttempt({ + runId: dequeued2[0].run.id, + snapshotId: attemptResult2.snapshot.id, + completion: { + ok: false, + id: dequeued2[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + + // The run should fail after the second OOM + expect(result2.attemptStatus).toBe("RUN_FINISHED"); + expect(result2.snapshot.executionStatus).toBe("FINISHED"); + expect(result2.run.status).toBe("CRASHED"); + + //final state should be crashed + const finalExecutionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(finalExecutionData); + expect(finalExecutionData.snapshot.executionStatus).toBe("FINISHED"); + expect(finalExecutionData.run.attemptNumber).toBe(2); + expect(finalExecutionData.run.status).toBe("CRASHED"); + } finally { + engine.quit(); + } + } + ); }); From c8dec7472f7b06d1d7aa6f2de84898f28ea234cc Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 19:13:23 +0000 Subject: [PATCH 14/18] Fix for OOM retrying --- .../run-engine/src/engine/retrying.ts | 30 ++++++++++++------- .../src/engine/tests/attemptFailures.test.ts | 19 +++++------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/internal-packages/run-engine/src/engine/retrying.ts b/internal-packages/run-engine/src/engine/retrying.ts index 12e3f23c30..5d862e5552 100644 --- a/internal-packages/run-engine/src/engine/retrying.ts +++ b/internal-packages/run-engine/src/engine/retrying.ts @@ -6,6 +6,7 @@ import { TaskRunExecutionRetry, taskRunErrorEnhancer, sanitizeError, + calculateNextRetryDelay, } from "@trigger.dev/core/v3"; import { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database"; import { MAX_TASK_RUN_ATTEMPTS } from "./consts"; @@ -47,26 +48,33 @@ export async function retryOutcomeFromCompletion( const sanitizedError = sanitizeError(error); - // No retry settings - if (!retrySettings) { - return { outcome: "fail_run", sanitizedError }; - } - // OOM error (retry on a larger machine or fail) if (isOOMRunError(error)) { - const newMachine = await retryOOMOnMachine(prisma, runId); - if (!newMachine) { + const oomResult = await retryOOMOnMachine(prisma, runId); + if (!oomResult) { + return { outcome: "fail_run", sanitizedError, wasOOMError: true }; + } + + const delay = calculateNextRetryDelay(oomResult.retrySettings, attemptNumber ?? 1); + + if (!delay) { + //no more retries left return { outcome: "fail_run", sanitizedError, wasOOMError: true }; } return { outcome: "retry", method: "queue", - settings: retrySettings, - machine: newMachine, + machine: oomResult.machine, + settings: { timestamp: Date.now() + delay, delay }, }; } + // No retry settings + if (!retrySettings) { + return { outcome: "fail_run", sanitizedError }; + } + // Not a retriable error: fail const retriableError = shouldRetryError(taskRunErrorEnhancer(error)); if (!retriableError) { @@ -112,7 +120,7 @@ export async function retryOutcomeFromCompletion( async function retryOOMOnMachine( prisma: PrismaClientOrTransaction, runId: string -): Promise { +): Promise<{ machine: string; retrySettings: RetryOptions } | undefined> { try { const run = await prisma.taskRun.findFirst({ where: { @@ -153,7 +161,7 @@ async function retryOOMOnMachine( return; } - return retryMachine; + return { machine: retryMachine, retrySettings: parsedRetryConfig.data }; } catch (error) { console.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { runId, diff --git a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts index 152bf88b76..232d090a73 100644 --- a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts +++ b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts @@ -8,6 +8,7 @@ import { trace } from "@opentelemetry/api"; import { expect } from "vitest"; import { EventBusEventArgs } from "../eventBus.js"; import { RunEngine } from "../index.js"; +import { setTimeout } from "node:timers/promises"; describe("RunEngine attempt failures", () => { containerTest( @@ -479,10 +480,6 @@ describe("RunEngine attempt failures", () => { ok: false, id: dequeued[0].run.id, error, - retry: { - timestamp: Date.now(), - delay: 0, - }, }, }); @@ -603,10 +600,6 @@ describe("RunEngine attempt failures", () => { ok: false, id: dequeued[0].run.id, error, - retry: { - timestamp: Date.now(), - delay: 0, - }, }, }); @@ -714,6 +707,8 @@ describe("RunEngine attempt failures", () => { //create background worker await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + maxTimeoutInMs: 10, + maxAttempts: 10, outOfMemory: { machine: "small-2x", }, @@ -768,10 +763,6 @@ describe("RunEngine attempt failures", () => { ok: false, id: dequeued[0].run.id, error, - retry: { - timestamp: Date.now(), - delay: 0, - }, }, }); @@ -787,12 +778,16 @@ describe("RunEngine attempt failures", () => { expect(executionData.run.attemptNumber).toBe(1); expect(executionData.run.status).toBe("RETRYING_AFTER_FAILURE"); + //wait for 1s + await setTimeout(1_000); + //dequeue again const dequeued2 = await engine.dequeueFromMasterQueue({ consumerId: "test_12345", masterQueue: run.masterQueue, maxRunCount: 10, }); + expect(dequeued2.length).toBe(1); //create second attempt const attemptResult2 = await engine.startRunAttempt({ From 206b9bc795f266dd9279763f31113cc23b8a5871 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 19:24:07 +0000 Subject: [PATCH 15/18] Complete the attempt span if it was an OOM error --- internal-packages/run-engine/src/engine/index.ts | 16 ++++++++++++++++ .../run-engine/src/engine/retrying.ts | 2 ++ 2 files changed, 18 insertions(+) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 5bce47b97d..fb14a2f9a7 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -2968,6 +2968,22 @@ export class RunEngine { const nextAttemptNumber = latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1; + if (retryResult.wasOOMError) { + this.eventBus.emit("runAttemptFailed", { + time: failedAt, + run: { + id: runId, + status: run.status, + spanId: run.spanId, + error: completion.error, + attemptNumber: latestSnapshot.attemptNumber ?? 0, + createdAt: run.createdAt, + completedAt: run.completedAt, + taskEventStore: run.taskEventStore, + }, + }); + } + this.eventBus.emit("runRetryScheduled", { time: failedAt, run: { diff --git a/internal-packages/run-engine/src/engine/retrying.ts b/internal-packages/run-engine/src/engine/retrying.ts index 5d862e5552..974bcd1151 100644 --- a/internal-packages/run-engine/src/engine/retrying.ts +++ b/internal-packages/run-engine/src/engine/retrying.ts @@ -35,6 +35,7 @@ export type RetryOutcome = method: "queue" | "immediate"; settings: TaskRunExecutionRetry; machine?: string; + wasOOMError?: boolean; }; export async function retryOutcomeFromCompletion( @@ -67,6 +68,7 @@ export async function retryOutcomeFromCompletion( method: "queue", machine: oomResult.machine, settings: { timestamp: Date.now() + delay, delay }, + wasOOMError: true, }; } From a4fadc3f1eb256c754eb2045ec23a2a72667ee1d Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 19:39:49 +0000 Subject: [PATCH 16/18] Remove old broken import --- apps/webapp/app/v3/services/alerts/deliverAlert.server.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index 9f98d28a94..c373b11f8a 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -40,7 +40,6 @@ import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; import { ProjectAlertChannelType, ProjectAlertType } from "@trigger.dev/database"; import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server"; import { v3RunPath } from "~/utils/pathBuilder"; -import { isOOMError } from "../completeAttempt.server"; import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server"; type FoundAlert = Prisma.Result< From 6f5cfac73bc062fed6fa75ef9ba2f6674581448e Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 19:40:01 +0000 Subject: [PATCH 17/18] Fixed order of exports --- packages/core/package.json | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/core/package.json b/packages/core/package.json index 338e486ec3..d3f2d38f2a 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -667,17 +667,6 @@ "default": "./dist/commonjs/v3/runEngineWorker/index.js" } }, - "./v3/serverOnly": { - "import": { - "@triggerdotdev/source": "./src/v3/serverOnly/index.ts", - "types": "./dist/esm/v3/serverOnly/index.d.ts", - "default": "./dist/esm/v3/serverOnly/index.js" - }, - "require": { - "types": "./dist/commonjs/v3/serverOnly/index.d.ts", - "default": "./dist/commonjs/v3/serverOnly/index.js" - } - }, "./v3/machines": { "import": { "@triggerdotdev/source": "./src/v3/machines/index.ts", @@ -688,6 +677,17 @@ "types": "./dist/commonjs/v3/machines/index.d.ts", "default": "./dist/commonjs/v3/machines/index.js" } + }, + "./v3/serverOnly": { + "import": { + "@triggerdotdev/source": "./src/v3/serverOnly/index.ts", + "types": "./dist/esm/v3/serverOnly/index.d.ts", + "default": "./dist/esm/v3/serverOnly/index.js" + }, + "require": { + "types": "./dist/commonjs/v3/serverOnly/index.d.ts", + "default": "./dist/commonjs/v3/serverOnly/index.js" + } } }, "type": "module", From 271eaa8e7010dc11b3052de8e391dfeb50c51632 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Mar 2025 19:52:24 +0000 Subject: [PATCH 18/18] Fix for docker-provider checkpoints import --- apps/docker-provider/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/docker-provider/src/index.ts b/apps/docker-provider/src/index.ts index 3ca5184c75..a0b0554fb2 100644 --- a/apps/docker-provider/src/index.ts +++ b/apps/docker-provider/src/index.ts @@ -8,7 +8,7 @@ import { } from "@trigger.dev/core/v3/apps"; import { SimpleLogger } from "@trigger.dev/core/v3/apps"; import { isExecaChildProcess } from "@trigger.dev/core/v3/apps"; -import { testDockerCheckpoint } from "@trigger.dev/core/v3/checkpoints"; +import { testDockerCheckpoint } from "@trigger.dev/core/v3/serverOnly"; import { setTimeout } from "node:timers/promises"; import { PostStartCauses, PreStopCauses } from "@trigger.dev/core/v3";