Skip to content

Commit db87295

Browse files
authored
fix(runner): reduce restore recovery time and deprecated runner false positives (#2523)
* fix(runner): improve restore detection * chore(supervisor): skip schema parsing when debug logs disabled * fix(runner): deprecation race condition * add changeset
1 parent a3ef6ea commit db87295

File tree

7 files changed

+275
-66
lines changed

7 files changed

+275
-66
lines changed

.changeset/six-cougars-play.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Reduce restore recovery time and fix deprecated runner false positives

apps/supervisor/src/workloadServer/index.ts

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
125125
}
126126

127127
private createHttpServer({ host, port }: { host: string; port: number }) {
128-
return new HttpServer({
128+
const httpServer = new HttpServer({
129129
port,
130130
host,
131131
metrics: {
@@ -346,23 +346,6 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
346346
},
347347
}
348348
)
349-
.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
350-
paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }),
351-
bodySchema: WorkloadDebugLogRequestBody,
352-
handler: async ({ req, reply, params, body }) => {
353-
reply.empty(204);
354-
355-
if (!env.SEND_RUN_DEBUG_LOGS) {
356-
return;
357-
}
358-
359-
await this.workerClient.sendDebugLog(
360-
params.runFriendlyId,
361-
body,
362-
this.runnerIdFromRequest(req)
363-
);
364-
},
365-
})
366349
.route("/api/v1/workload-actions/deployments/:deploymentId/dequeue", "GET", {
367350
paramsSchema: z.object({
368351
deploymentId: z.string(),
@@ -387,6 +370,31 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
387370
reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody);
388371
},
389372
});
373+
374+
if (env.SEND_RUN_DEBUG_LOGS) {
375+
httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
376+
paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }),
377+
bodySchema: WorkloadDebugLogRequestBody,
378+
handler: async ({ req, reply, params, body }) => {
379+
reply.empty(204);
380+
381+
await this.workerClient.sendDebugLog(
382+
params.runFriendlyId,
383+
body,
384+
this.runnerIdFromRequest(req)
385+
);
386+
},
387+
});
388+
} else {
389+
// Lightweight mock route without schemas
390+
httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
391+
handler: async ({ reply }) => {
392+
reply.empty(204);
393+
},
394+
});
395+
}
396+
397+
return httpServer;
390398
}
391399

392400
private createWebsocketServer() {

packages/cli-v3/src/entryPoints/managed/controller.ts

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -461,19 +461,6 @@ export class ManagedRunController {
461461
runId: this.runFriendlyId,
462462
message: "Socket connected to supervisor",
463463
});
464-
465-
// This should handle the case where we reconnect after being restored
466-
if (
467-
this.runFriendlyId &&
468-
this.snapshotFriendlyId &&
469-
this.runFriendlyId !== this.env.TRIGGER_RUN_ID
470-
) {
471-
this.sendDebugLog({
472-
runId: this.runFriendlyId,
473-
message: "Subscribing to notifications for in-progress run",
474-
});
475-
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
476-
}
477464
});
478465

479466
socket.on("connect_error", (error) => {
@@ -514,7 +501,7 @@ export class ManagedRunController {
514501
supervisorApiUrl: this.env.TRIGGER_SUPERVISOR_API_URL,
515502
};
516503

517-
await this.currentExecution.processEnvOverrides("socket disconnected", true);
504+
const result = await this.currentExecution.processEnvOverrides("socket disconnected", true);
518505

519506
const newEnv = {
520507
workerInstanceName: this.env.TRIGGER_WORKER_INSTANCE_NAME,
@@ -528,6 +515,43 @@ export class ManagedRunController {
528515
properties: { reason, ...parseDescription(), currentEnv, newEnv },
529516
});
530517

518+
if (!result) {
519+
return;
520+
}
521+
522+
// If runner ID changed, we detected a restore
523+
if (result.runnerIdChanged) {
524+
this.sendDebugLog({
525+
runId: this.runFriendlyId,
526+
message: "Runner ID changed - restore detected",
527+
properties: {
528+
supervisorChanged: result.supervisorChanged,
529+
},
530+
});
531+
532+
if (!result.supervisorChanged) {
533+
return;
534+
}
535+
536+
// Only reconnect WebSocket if supervisor URL actually changed
537+
this.sendDebugLog({
538+
runId: this.runFriendlyId,
539+
message: "Supervisor URL changed - creating new socket connection",
540+
});
541+
542+
// First disconnect the old socket to avoid conflicts
543+
socket.removeAllListeners();
544+
socket.disconnect();
545+
546+
// Create a new socket with the updated URL and headers
547+
this.socket = this.createSupervisorSocket();
548+
549+
// Re-subscribe to notifications if we have an active execution
550+
if (this.runFriendlyId && this.snapshotFriendlyId) {
551+
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
552+
}
553+
}
554+
531555
return;
532556
}
533557

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,23 @@ export class RunExecution {
873873
);
874874

875875
if (!continuationResult.success) {
876-
throw new Error(continuationResult.error);
876+
// Check if we need to refresh metadata due to connection error
877+
if (continuationResult.isConnectionError) {
878+
this.sendDebugLog("restore: connection error detected, refreshing metadata");
879+
await this.processEnvOverrides("restore connection error");
880+
881+
// Retry the continuation after refreshing metadata
882+
const retryResult = await this.httpClient.continueRunExecution(
883+
this.runFriendlyId,
884+
this.snapshotManager.snapshotId
885+
);
886+
887+
if (!retryResult.success) {
888+
throw new Error(retryResult.error);
889+
}
890+
} else {
891+
throw new Error(continuationResult.error);
892+
}
877893
}
878894

879895
// Track restore count
@@ -899,11 +915,18 @@ export class RunExecution {
899915
public async processEnvOverrides(
900916
reason?: string,
901917
shouldPollForSnapshotChanges?: boolean
902-
): Promise<{ overrides: Metadata } | null> {
918+
): Promise<{
919+
overrides: Metadata;
920+
runnerIdChanged?: boolean;
921+
supervisorChanged?: boolean;
922+
} | null> {
903923
if (!this.metadataClient) {
904924
return null;
905925
}
906926

927+
const previousRunnerId = this.env.TRIGGER_RUNNER_ID;
928+
const previousSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL;
929+
907930
const [error, overrides] = await this.metadataClient.getEnvOverrides();
908931

909932
if (error) {
@@ -931,6 +954,14 @@ export class RunExecution {
931954
// Override the env with the new values
932955
this.env.override(overrides);
933956

957+
// Check if runner ID changed
958+
const newRunnerId = this.env.TRIGGER_RUNNER_ID;
959+
const runnerIdChanged = previousRunnerId !== newRunnerId;
960+
961+
// Check if supervisor URL changed
962+
const newSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL;
963+
const supervisorChanged = previousSupervisorUrl !== newSupervisorUrl;
964+
934965
// Update services with new values
935966
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
936967
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
@@ -954,6 +985,8 @@ export class RunExecution {
954985

955986
return {
956987
overrides,
988+
runnerIdChanged,
989+
supervisorChanged,
957990
};
958991
}
959992

@@ -977,6 +1010,12 @@ export class RunExecution {
9771010

9781011
if (!response.success) {
9791012
this.sendDebugLog("heartbeat: failed", { error: response.error });
1013+
1014+
// Check if we need to refresh metadata due to connection error
1015+
if (response.isConnectionError) {
1016+
this.sendDebugLog("heartbeat: connection error detected, refreshing metadata");
1017+
await this.processEnvOverrides("heartbeat connection error");
1018+
}
9801019
}
9811020

9821021
this.lastHeartbeat = new Date();
@@ -1192,6 +1231,14 @@ export class RunExecution {
11921231
error: response.error,
11931232
});
11941233

1234+
if (response.isConnectionError) {
1235+
// Log this separately to make it more visible
1236+
this.sendDebugLog(
1237+
"fetchAndProcessSnapshotChanges: connection error detected, refreshing metadata"
1238+
);
1239+
}
1240+
1241+
// Always trigger metadata refresh on snapshot fetch errors
11951242
await this.processEnvOverrides("snapshots since error");
11961243
return;
11971244
}

packages/cli-v3/src/entryPoints/managed/snapshot.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,65 @@ describe("SnapshotManager", () => {
697697
true
698698
);
699699
});
700+
701+
it("should handle deprecated snapshot race condition - avoid false positives from stale polls", async () => {
702+
const onSnapshotChange = vi.fn();
703+
704+
// Mock MetadataClient to simulate runner ID change (restore detected) on first call
705+
let isFirstCall = true;
706+
const mockMetadataClient = {
707+
getEnvOverrides: vi.fn().mockImplementation(() => {
708+
if (isFirstCall) {
709+
isFirstCall = false;
710+
return Promise.resolve([null, { TRIGGER_RUNNER_ID: "test-runner-2" }]); // Different runner ID = restore
711+
}
712+
return Promise.resolve([null, { TRIGGER_RUNNER_ID: "test-runner-2" }]); // Same runner ID afterward
713+
}),
714+
};
715+
716+
const manager = new SnapshotManager({
717+
runnerId: "test-runner-1",
718+
runFriendlyId: "test-run-1",
719+
initialSnapshotId: "snapshot-1",
720+
initialStatus: "EXECUTING_WITH_WAITPOINTS",
721+
logger: mockLogger,
722+
metadataClient: mockMetadataClient as any,
723+
onSnapshotChange,
724+
onSuspendable: mockSuspendableHandler,
725+
});
726+
727+
// First update: Process restore transition with deprecated statuses (normal case)
728+
// This simulates: EXECUTING_WITH_WAITPOINTS -> [SUSPENDED, QUEUED] -> PENDING_EXECUTING
729+
await manager.handleSnapshotChanges([
730+
createRunExecutionData({ snapshotId: "snapshot-suspended", executionStatus: "SUSPENDED" }),
731+
createRunExecutionData({ snapshotId: "snapshot-queued", executionStatus: "QUEUED" }),
732+
createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "PENDING_EXECUTING" }),
733+
]);
734+
735+
// First call should be deprecated=false (restore detected)
736+
expect(onSnapshotChange).toHaveBeenCalledWith(
737+
expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-2" }) }),
738+
false
739+
);
740+
741+
onSnapshotChange.mockClear();
742+
743+
// Second update: Should only get new snapshot (race condition case)
744+
// This simulates a stale poll that returns: getSnapshotsSince(snapshot-1) -> [SUSPENDED, QUEUED, snapshot-2, snapshot-3]
745+
// The SUSPENDED/QUEUED should be ignored as already seen
746+
await manager.handleSnapshotChanges([
747+
createRunExecutionData({ snapshotId: "snapshot-suspended", executionStatus: "SUSPENDED" }), // Already seen
748+
createRunExecutionData({ snapshotId: "snapshot-queued", executionStatus: "QUEUED" }), // Already seen
749+
createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "PENDING_EXECUTING" }), // Already processed
750+
createRunExecutionData({ snapshotId: "snapshot-3", executionStatus: "EXECUTING" }), // New
751+
]);
752+
753+
// Should call onSnapshotChange with deprecated = false (no new deprecated snapshots)
754+
expect(onSnapshotChange).toHaveBeenCalledWith(
755+
expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-3" }) }),
756+
false
757+
);
758+
});
700759
});
701760

702761
// Helper to generate RunExecutionData with sensible defaults

packages/cli-v3/src/entryPoints/managed/snapshot.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ export class SnapshotManager {
4949
private changeQueue: QueuedChangeItem[] = [];
5050
private isProcessingQueue = false;
5151

52+
// Track seen deprecated snapshots to prevent false positives
53+
private seenDeprecatedSnapshotIds: string[] = [];
54+
private readonly maxSeenDeprecatedSnapshotIds = 50;
55+
5256
constructor(opts: SnapshotManagerOptions) {
5357
this.runFriendlyId = opts.runFriendlyId;
5458
this.runnerId = opts.runnerId;
@@ -284,9 +288,13 @@ export class SnapshotManager {
284288

285289
// Check if any previous snapshot is QUEUED or SUSPENDED
286290
const deprecatedStatus: TaskRunExecutionStatus[] = ["QUEUED", "SUSPENDED"];
287-
const deprecatedSnapshots = previousSnapshots.filter((snap) =>
288-
deprecatedStatus.includes(snap.snapshot.executionStatus)
289-
);
291+
const deprecatedSnapshots = previousSnapshots.filter((snap) => {
292+
const isDeprecated = deprecatedStatus.includes(snap.snapshot.executionStatus);
293+
const previouslySeen = this.seenDeprecatedSnapshotIds.some(
294+
(s) => s === snap.snapshot.friendlyId
295+
);
296+
return isDeprecated && !previouslySeen;
297+
});
290298

291299
let deprecated = false;
292300
if (deprecatedSnapshots.length > 0) {
@@ -298,6 +306,18 @@ export class SnapshotManager {
298306
} else {
299307
deprecated = true;
300308
}
309+
310+
// Add the deprecated snapshot IDs to the seen list
311+
this.seenDeprecatedSnapshotIds.push(
312+
...deprecatedSnapshots.map((s) => s.snapshot.friendlyId)
313+
);
314+
315+
if (this.seenDeprecatedSnapshotIds.length > this.maxSeenDeprecatedSnapshotIds) {
316+
// Only keep the latest maxSeenDeprecatedSnapshotIds
317+
this.seenDeprecatedSnapshotIds = this.seenDeprecatedSnapshotIds.slice(
318+
-this.maxSeenDeprecatedSnapshotIds
319+
);
320+
}
301321
}
302322

303323
const { snapshot } = latestSnapshot;

0 commit comments

Comments
 (0)