Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tiny-buckets-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Fix stalled run detection
2 changes: 1 addition & 1 deletion packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
for await (const _ of setInterval(heartbeatInterval)) {
if (_isRunning && _execution) {
try {
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id });
} catch (err) {
console.error("Failed to send HEARTBEAT message", err);
}
Expand Down
65 changes: 49 additions & 16 deletions packages/cli-v3/src/entryPoints/managed/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js";
import { RunnerEnv } from "./env.js";
import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
import { setTimeout as sleep } from "timers/promises";
import { RunExecutionHeartbeat } from "./heartbeat.js";
import { RunExecutionSnapshotPoller } from "./poller.js";
import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
import { MetadataClient } from "./overrides.js";
Expand Down Expand Up @@ -63,9 +62,10 @@ export class RunExecution {
private restoreCount: number;

private taskRunProcess?: TaskRunProcess;
private runHeartbeat?: RunExecutionHeartbeat;
private snapshotPoller?: RunExecutionSnapshotPoller;

private lastHeartbeat?: Date;

constructor(opts: RunExecutionOptions) {
this.id = randomBytes(4).toString("hex");
this.workerManifest = opts.workerManifest;
Expand Down Expand Up @@ -105,7 +105,7 @@ export class RunExecution {
envVars: Record<string, string>;
isWarmStart?: boolean;
}) {
return new TaskRunProcess({
const taskRunProcess = new TaskRunProcess({
workerManifest: this.workerManifest,
env: {
...envVars,
Expand All @@ -123,6 +123,29 @@ export class RunExecution {
},
isWarmStart,
}).initialize();

taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
if (!this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId });
return;
}

if (runId !== this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", {
heartbeatRunId: runId,
expectedRunId: this.runFriendlyId,
});
return;
}

const [error] = await tryCatch(this.onHeartbeat());

if (error) {
this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message });
}
});

return taskRunProcess;
}

/**
Expand Down Expand Up @@ -229,7 +252,6 @@ export class RunExecution {
this.currentSnapshotId = snapshot.friendlyId;

// Update services
this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId);
this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId);

switch (snapshot.executionStatus) {
Expand Down Expand Up @@ -450,13 +472,6 @@ export class RunExecution {
this.podScheduledAt = runOpts.podScheduledAt;

// Create and start services
this.runHeartbeat = new RunExecutionHeartbeat({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
httpClient: this.httpClient,
logger: this.logger,
heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS,
});
this.snapshotPoller = new RunExecutionSnapshotPoller({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
Expand All @@ -466,7 +481,6 @@ export class RunExecution {
handleSnapshotChange: this.handleSnapshotChange.bind(this),
});

this.runHeartbeat.start();
this.snapshotPoller.start();

const [startError, start] = await tryCatch(
Expand Down Expand Up @@ -839,9 +853,6 @@ export class RunExecution {
this.env.override(overrides);

// Update services with new values
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000);
}
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
}
Expand All @@ -857,6 +868,28 @@ export class RunExecution {
}
}

private async onHeartbeat() {
if (!this.runFriendlyId) {
this.sendDebugLog("Heartbeat: missing run ID");
return;
}

if (!this.currentSnapshotId) {
this.sendDebugLog("Heartbeat: missing snapshot ID");
return;
}

this.sendDebugLog("Heartbeat: started");

const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId);

if (!response.success) {
this.sendDebugLog("Heartbeat: failed", { error: response.error });
}

this.lastHeartbeat = new Date();
}

sendDebugLog(
message: string,
properties?: SendDebugLogOptions["properties"],
Expand All @@ -871,6 +904,7 @@ export class RunExecution {
snapshotId: this.currentSnapshotId,
executionId: this.id,
executionRestoreCount: this.restoreCount,
lastHeartbeat: this.lastHeartbeat?.toISOString(),
},
});
}
Expand Down Expand Up @@ -917,7 +951,6 @@ export class RunExecution {
}

private stopServices() {
this.runHeartbeat?.stop();
this.snapshotPoller?.stop();
}
}