Skip to content

Commit 70b16ee

Browse files
committed
feat(engine): Improve execution stalls troubleshooting, align dev and prod behavior, adding heartbeats.yield utility
1 parent 71060d9 commit 70b16ee

File tree

21 files changed

+446
-109
lines changed

21 files changed

+446
-109
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,8 +519,8 @@ const EnvironmentSchema = z
519519
RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
520520
RUN_ENGINE_TIMEOUT_PENDING_EXECUTING: z.coerce.number().int().default(60_000),
521521
RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000),
522-
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
523-
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
522+
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(600_000), // 10 minutes
523+
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(600_000), // 10 minutes
524524
RUN_ENGINE_TIMEOUT_SUSPENDED: z.coerce
525525
.number()
526526
.int()

docs/troubleshooting.mdx

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,38 @@ View the [rate limits](/limits) page for more information.
181181

182182
This can happen in different situations, for example when using plain strings as idempotency keys. Support for `Crypto` without a special flag was added in Node `v19.0.0`. You will have to upgrade Node - we recommend even-numbered major releases, e.g. `v20` or `v22`. Alternatively, you can switch from plain strings to the `idempotencyKeys.create` SDK function. [Read the guide](/idempotency).
183183

184+
### Task run stalled executing
185+
186+
If you see a `TASK_RUN_STALLED_EXECUTING` error it means that we didn't receive a heartbeat from your task before the stall timeout. We automatically heartbeat runs every 30 seconds, and the heartbeat timeout is 10 minutes.
187+
188+
<Note>
189+
190+
If this was a dev run, then most likely the `trigger.dev dev` CLI was stopped, and it wasn't an issue with your code.
191+
192+
</Note>
193+
194+
These errors can happen when code inside your task is blocking the event loop for too long. The most likely cause would be an accidental infinite loop. It could also be a CPU-heavy operation that's blocking the event loop, like nested loops with very large arrays. We recommend reading the [Don't Block the Event Loop](https://nodejs.org/en/learn/asynchronous-work/dont-block-the-event-loop) guide from Node.js for common patterns that can cause this.
195+
196+
If you are doing a continuous CPU-heavy task, then we recommend you try using our `heartbeats.yield` function to automatically yield to the event loop periodically:
197+
198+
```ts
199+
import { heartbeats } from "@trigger.dev/sdk";
200+
201+
// code inside your task
202+
for (const row of bigDataset) {
203+
await heartbeats.yield(); // safe to call every iteration, we will only actually yield when we need to
204+
process(row); // this is a synchronous operation
205+
}
206+
```
207+
208+
<Note>
209+
210+
You could also offload the CPU-heavy work to a Node.js worker thread, but this is more complex to setup currently. We are planning on adding support for this in the future.
211+
212+
</Note>
213+
214+
If the above doesn't work, then we recommend you try increasing the machine size of your task. See our [machines guide](/machines) for more information.
215+
184216
## Framework specific issues
185217

186218
### NestJS swallows all errors/exceptions

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import { assertExhaustive } from "@trigger.dev/core";
22
import { TaskRunError } from "@trigger.dev/core/v3";
3-
import { TaskRunStatus } from "@trigger.dev/database";
3+
import { RuntimeEnvironmentType, TaskRunStatus } from "@trigger.dev/database";
44

5-
export function runStatusFromError(error: TaskRunError): TaskRunStatus {
5+
export function runStatusFromError(
6+
error: TaskRunError,
7+
environmentType: RuntimeEnvironmentType
8+
): TaskRunStatus {
69
if (error.type !== "INTERNAL_ERROR") {
710
return "COMPLETED_WITH_ERRORS";
811
}
@@ -21,6 +24,15 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
2124
return "CANCELED";
2225
case "MAX_DURATION_EXCEEDED":
2326
return "TIMED_OUT";
27+
case "TASK_RUN_STALLED_EXECUTING":
28+
case "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS": {
29+
if (environmentType === "DEVELOPMENT") {
30+
return "CANCELED";
31+
}
32+
33+
return "COMPLETED_WITH_ERRORS";
34+
}
35+
2436
case "TASK_PROCESS_OOM_KILLED":
2537
case "TASK_PROCESS_MAYBE_OOM_KILLED":
2638
case "TASK_PROCESS_SIGSEGV":
@@ -40,8 +52,6 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
4052
case "TASK_DEQUEUED_INVALID_STATE":
4153
case "TASK_DEQUEUED_QUEUE_NOT_FOUND":
4254
case "TASK_RUN_DEQUEUED_MAX_RETRIES":
43-
case "TASK_RUN_STALLED_EXECUTING":
44-
case "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS":
4555
case "TASK_HAS_N0_EXECUTION_SNAPSHOT":
4656
case "GRACEFUL_EXIT_TIMEOUT":
4757
case "POD_EVICTED":

internal-packages/run-engine/src/engine/index.ts

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
CreateCheckpointResult,
99
DequeuedMessage,
1010
ExecutionResult,
11+
formatDurationMilliseconds,
1112
RunExecutionData,
1213
StartRunAttemptResult,
1314
TaskRunContext,
@@ -212,6 +213,8 @@ export class RunEngine {
212213
});
213214

214215
if (!options.worker.disabled) {
216+
console.log("✅ Starting run engine worker");
217+
215218
this.worker.start();
216219
}
217220

@@ -1190,23 +1193,6 @@ export class RunEngine {
11901193
snapshot: latestSnapshot,
11911194
});
11921195

1193-
// For dev, we just cancel runs that are stuck
1194-
if (latestSnapshot.environmentType === "DEVELOPMENT") {
1195-
this.logger.log("RunEngine.#handleStalledSnapshot() cancelling DEV run", {
1196-
runId,
1197-
snapshot: latestSnapshot,
1198-
});
1199-
1200-
await this.cancelRun({
1201-
runId: latestSnapshot.runId,
1202-
finalizeRun: true,
1203-
reason:
1204-
"Run was disconnected, check you're running the CLI dev command and your network connection is healthy.",
1205-
tx,
1206-
});
1207-
return;
1208-
}
1209-
12101196
switch (latestSnapshot.executionStatus) {
12111197
case "RUN_CREATED": {
12121198
throw new NotImplementedError("There shouldn't be a heartbeat for RUN_CREATED");
@@ -1264,7 +1250,19 @@ export class RunEngine {
12641250
case "EXECUTING_WITH_WAITPOINTS": {
12651251
const retryDelay = 250;
12661252

1267-
//todo call attemptFailed and force requeuing
1253+
const timeoutDuration =
1254+
latestSnapshot.executionStatus === "EXECUTING"
1255+
? formatDurationMilliseconds(this.heartbeatTimeouts.EXECUTING)
1256+
: formatDurationMilliseconds(this.heartbeatTimeouts.EXECUTING_WITH_WAITPOINTS);
1257+
1258+
// Dev runs don't retry, because the vast majority of the time these snapshots stall because
1259+
// they have quit the CLI
1260+
const shouldRetry = latestSnapshot.environmentType !== "DEVELOPMENT";
1261+
const errorMessage =
1262+
latestSnapshot.environmentType === "DEVELOPMENT"
1263+
? `Run timed out after ${timeoutDuration} due to missing heartbeats (sent every 30s). Check if your \`trigger.dev dev\` CLI is still running, or if CPU-heavy work is blocking the main thread.`
1264+
: `Run timed out after ${timeoutDuration} due to missing heartbeats (sent every 30s). This typically happens when CPU-heavy work blocks the main thread.`;
1265+
12681266
await this.runAttemptSystem.attemptFailed({
12691267
runId,
12701268
snapshotId: latestSnapshot.id,
@@ -1277,13 +1275,15 @@ export class RunEngine {
12771275
latestSnapshot.executionStatus === "EXECUTING"
12781276
? "TASK_RUN_STALLED_EXECUTING"
12791277
: "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS",
1280-
message: `Run stalled while executing. This can happen when the run becomes unresponsive, for example because the CPU is overloaded.`,
1281-
},
1282-
retry: {
1283-
//250ms in the future
1284-
timestamp: Date.now() + retryDelay,
1285-
delay: retryDelay,
1278+
message: errorMessage,
12861279
},
1280+
retry: shouldRetry
1281+
? {
1282+
//250ms in the future
1283+
timestamp: Date.now() + retryDelay,
1284+
delay: retryDelay,
1285+
}
1286+
: undefined,
12871287
},
12881288
forceRequeue: true,
12891289
tx: prisma,

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import { RunEngineOptions } from "../types.js";
5151
import { BatchSystem } from "./batchSystem.js";
5252
import { DelayedRunSystem } from "./delayedRunSystem.js";
5353
import {
54+
EnhancedExecutionSnapshot,
5455
executionResultFromSnapshot,
5556
ExecutionSnapshotSystem,
5657
getLatestExecutionSnapshot,
@@ -690,6 +691,10 @@ export class RunAttemptSystem {
690691
throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400);
691692
}
692693

694+
if (latestSnapshot.executionStatus === "FINISHED") {
695+
throw new ServiceValidationError("Run is already finished", 400);
696+
}
697+
693698
span.setAttribute("completionStatus", completion.ok);
694699

695700
const completedAt = new Date();
@@ -843,6 +848,10 @@ export class RunAttemptSystem {
843848
throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400);
844849
}
845850

851+
if (latestSnapshot.executionStatus === "FINISHED") {
852+
throw new ServiceValidationError("Run is already finished", 400);
853+
}
854+
846855
span.setAttribute("completionStatus", completion.ok);
847856

848857
//remove waitpoints blocking the run
@@ -923,7 +932,7 @@ export class RunAttemptSystem {
923932
case "fail_run": {
924933
return await this.#permanentlyFailRun({
925934
runId,
926-
snapshotId,
935+
latestSnapshot,
927936
failedAt,
928937
error: retryResult.sanitizedError,
929938
workerId,
@@ -1440,14 +1449,14 @@ export class RunAttemptSystem {
14401449

14411450
async #permanentlyFailRun({
14421451
runId,
1443-
snapshotId,
1452+
latestSnapshot,
14441453
failedAt,
14451454
error,
14461455
workerId,
14471456
runnerId,
14481457
}: {
14491458
runId: string;
1450-
snapshotId?: string;
1459+
latestSnapshot: EnhancedExecutionSnapshot;
14511460
failedAt: Date;
14521461
error: TaskRunError;
14531462
workerId?: string;
@@ -1456,7 +1465,7 @@ export class RunAttemptSystem {
14561465
const prisma = this.$.prisma;
14571466

14581467
return startSpan(this.$.tracer, "permanentlyFailRun", async (span) => {
1459-
const status = runStatusFromError(error);
1468+
const status = runStatusFromError(error, latestSnapshot.environmentType);
14601469

14611470
//run permanently failed
14621471
const run = await prisma.taskRun.update({
@@ -1509,7 +1518,7 @@ export class RunAttemptSystem {
15091518
executionStatus: "FINISHED",
15101519
description: "Run failed",
15111520
},
1512-
previousSnapshotId: snapshotId,
1521+
previousSnapshotId: latestSnapshot.id,
15131522
environmentId: run.runtimeEnvironment.id,
15141523
environmentType: run.runtimeEnvironment.type,
15151524
projectId: run.runtimeEnvironment.project.id,

0 commit comments

Comments
 (0)