Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/six-cougars-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Reduce restore recovery time and fix deprecated runner false positives
44 changes: 26 additions & 18 deletions apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
}

private createHttpServer({ host, port }: { host: string; port: number }) {
return new HttpServer({
const httpServer = new HttpServer({
port,
host,
metrics: {
Expand Down Expand Up @@ -346,23 +346,6 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
},
}
)
.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }),
bodySchema: WorkloadDebugLogRequestBody,
handler: async ({ req, reply, params, body }) => {
reply.empty(204);

if (!env.SEND_RUN_DEBUG_LOGS) {
return;
}

await this.workerClient.sendDebugLog(
params.runFriendlyId,
body,
this.runnerIdFromRequest(req)
);
},
})
.route("/api/v1/workload-actions/deployments/:deploymentId/dequeue", "GET", {
paramsSchema: z.object({
deploymentId: z.string(),
Expand All @@ -387,6 +370,31 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody);
},
});

if (env.SEND_RUN_DEBUG_LOGS) {
httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }),
bodySchema: WorkloadDebugLogRequestBody,
handler: async ({ req, reply, params, body }) => {
reply.empty(204);

await this.workerClient.sendDebugLog(
params.runFriendlyId,
body,
this.runnerIdFromRequest(req)
);
},
});
} else {
// Lightweight mock route without schemas
httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
handler: async ({ reply }) => {
reply.empty(204);
},
});
}

return httpServer;
}

private createWebsocketServer() {
Expand Down
52 changes: 38 additions & 14 deletions packages/cli-v3/src/entryPoints/managed/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,19 +461,6 @@ export class ManagedRunController {
runId: this.runFriendlyId,
message: "Socket connected to supervisor",
});

// This should handle the case where we reconnect after being restored
if (
this.runFriendlyId &&
this.snapshotFriendlyId &&
this.runFriendlyId !== this.env.TRIGGER_RUN_ID
) {
this.sendDebugLog({
runId: this.runFriendlyId,
message: "Subscribing to notifications for in-progress run",
});
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
}
});

socket.on("connect_error", (error) => {
Expand Down Expand Up @@ -514,7 +501,7 @@ export class ManagedRunController {
supervisorApiUrl: this.env.TRIGGER_SUPERVISOR_API_URL,
};

await this.currentExecution.processEnvOverrides("socket disconnected", true);
const result = await this.currentExecution.processEnvOverrides("socket disconnected", true);

const newEnv = {
workerInstanceName: this.env.TRIGGER_WORKER_INSTANCE_NAME,
Expand All @@ -528,6 +515,43 @@ export class ManagedRunController {
properties: { reason, ...parseDescription(), currentEnv, newEnv },
});

if (!result) {
return;
}

// If runner ID changed, we detected a restore
if (result.runnerIdChanged) {
this.sendDebugLog({
runId: this.runFriendlyId,
message: "Runner ID changed - restore detected",
properties: {
supervisorChanged: result.supervisorChanged,
},
});

if (!result.supervisorChanged) {
return;
}

// Only reconnect WebSocket if supervisor URL actually changed
this.sendDebugLog({
runId: this.runFriendlyId,
message: "Supervisor URL changed - creating new socket connection",
});

// First disconnect the old socket to avoid conflicts
socket.removeAllListeners();
socket.disconnect();

// Create a new socket with the updated URL and headers
this.socket = this.createSupervisorSocket();

// Re-subscribe to notifications if we have an active execution
if (this.runFriendlyId && this.snapshotFriendlyId) {
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
}
}

return;
}

Expand Down
51 changes: 49 additions & 2 deletions packages/cli-v3/src/entryPoints/managed/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,23 @@ export class RunExecution {
);

if (!continuationResult.success) {
throw new Error(continuationResult.error);
// Check if we need to refresh metadata due to connection error
if (continuationResult.isConnectionError) {
this.sendDebugLog("restore: connection error detected, refreshing metadata");
await this.processEnvOverrides("restore connection error");

// Retry the continuation after refreshing metadata
const retryResult = await this.httpClient.continueRunExecution(
this.runFriendlyId,
this.snapshotManager.snapshotId
);

if (!retryResult.success) {
throw new Error(retryResult.error);
}
} else {
throw new Error(continuationResult.error);
}
}

// Track restore count
Expand All @@ -899,11 +915,18 @@ export class RunExecution {
public async processEnvOverrides(
reason?: string,
shouldPollForSnapshotChanges?: boolean
): Promise<{ overrides: Metadata } | null> {
): Promise<{
overrides: Metadata;
runnerIdChanged?: boolean;
supervisorChanged?: boolean;
} | null> {
if (!this.metadataClient) {
return null;
}

const previousRunnerId = this.env.TRIGGER_RUNNER_ID;
const previousSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL;

const [error, overrides] = await this.metadataClient.getEnvOverrides();

if (error) {
Expand Down Expand Up @@ -931,6 +954,14 @@ export class RunExecution {
// Override the env with the new values
this.env.override(overrides);

// Check if runner ID changed
const newRunnerId = this.env.TRIGGER_RUNNER_ID;
const runnerIdChanged = previousRunnerId !== newRunnerId;

// Check if supervisor URL changed
const newSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL;
const supervisorChanged = previousSupervisorUrl !== newSupervisorUrl;

// Update services with new values
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
Expand All @@ -954,6 +985,8 @@ export class RunExecution {

return {
overrides,
runnerIdChanged,
supervisorChanged,
};
}

Expand All @@ -977,6 +1010,12 @@ export class RunExecution {

if (!response.success) {
this.sendDebugLog("heartbeat: failed", { error: response.error });

// Check if we need to refresh metadata due to connection error
if (response.isConnectionError) {
this.sendDebugLog("heartbeat: connection error detected, refreshing metadata");
await this.processEnvOverrides("heartbeat connection error");
}
}

this.lastHeartbeat = new Date();
Expand Down Expand Up @@ -1192,6 +1231,14 @@ export class RunExecution {
error: response.error,
});

if (response.isConnectionError) {
// Log this separately to make it more visible
this.sendDebugLog(
"fetchAndProcessSnapshotChanges: connection error detected, refreshing metadata"
);
}

// Always trigger metadata refresh on snapshot fetch errors
await this.processEnvOverrides("snapshots since error");
return;
}
Expand Down
59 changes: 59 additions & 0 deletions packages/cli-v3/src/entryPoints/managed/snapshot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,65 @@ describe("SnapshotManager", () => {
true
);
});

it("should handle deprecated snapshot race condition - avoid false positives from stale polls", async () => {
const onSnapshotChange = vi.fn();

// Mock MetadataClient to simulate runner ID change (restore detected) on first call
let isFirstCall = true;
const mockMetadataClient = {
getEnvOverrides: vi.fn().mockImplementation(() => {
if (isFirstCall) {
isFirstCall = false;
return Promise.resolve([null, { TRIGGER_RUNNER_ID: "test-runner-2" }]); // Different runner ID = restore
}
return Promise.resolve([null, { TRIGGER_RUNNER_ID: "test-runner-2" }]); // Same runner ID afterward
}),
};

const manager = new SnapshotManager({
runnerId: "test-runner-1",
runFriendlyId: "test-run-1",
initialSnapshotId: "snapshot-1",
initialStatus: "EXECUTING_WITH_WAITPOINTS",
logger: mockLogger,
metadataClient: mockMetadataClient as any,
onSnapshotChange,
onSuspendable: mockSuspendableHandler,
});

// First update: Process restore transition with deprecated statuses (normal case)
// This simulates: EXECUTING_WITH_WAITPOINTS -> [SUSPENDED, QUEUED] -> PENDING_EXECUTING
await manager.handleSnapshotChanges([
createRunExecutionData({ snapshotId: "snapshot-suspended", executionStatus: "SUSPENDED" }),
createRunExecutionData({ snapshotId: "snapshot-queued", executionStatus: "QUEUED" }),
createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "PENDING_EXECUTING" }),
]);

// First call should be deprecated=false (restore detected)
expect(onSnapshotChange).toHaveBeenCalledWith(
expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-2" }) }),
false
);

onSnapshotChange.mockClear();

// Second update: Should only get new snapshot (race condition case)
// This simulates a stale poll that returns: getSnapshotsSince(snapshot-1) -> [SUSPENDED, QUEUED, snapshot-2, snapshot-3]
// The SUSPENDED/QUEUED should be ignored as already seen
await manager.handleSnapshotChanges([
createRunExecutionData({ snapshotId: "snapshot-suspended", executionStatus: "SUSPENDED" }), // Already seen
createRunExecutionData({ snapshotId: "snapshot-queued", executionStatus: "QUEUED" }), // Already seen
createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "PENDING_EXECUTING" }), // Already processed
createRunExecutionData({ snapshotId: "snapshot-3", executionStatus: "EXECUTING" }), // New
]);

// Should call onSnapshotChange with deprecated = false (no new deprecated snapshots)
expect(onSnapshotChange).toHaveBeenCalledWith(
expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-3" }) }),
false
);
});
});

// Helper to generate RunExecutionData with sensible defaults
Expand Down
26 changes: 23 additions & 3 deletions packages/cli-v3/src/entryPoints/managed/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export class SnapshotManager {
private changeQueue: QueuedChangeItem[] = [];
private isProcessingQueue = false;

// Track seen deprecated snapshots to prevent false positives
private seenDeprecatedSnapshotIds: string[] = [];
private readonly maxSeenDeprecatedSnapshotIds = 50;

constructor(opts: SnapshotManagerOptions) {
this.runFriendlyId = opts.runFriendlyId;
this.runnerId = opts.runnerId;
Expand Down Expand Up @@ -284,9 +288,13 @@ export class SnapshotManager {

// Check if any previous snapshot is QUEUED or SUSPENDED
const deprecatedStatus: TaskRunExecutionStatus[] = ["QUEUED", "SUSPENDED"];
const deprecatedSnapshots = previousSnapshots.filter((snap) =>
deprecatedStatus.includes(snap.snapshot.executionStatus)
);
const deprecatedSnapshots = previousSnapshots.filter((snap) => {
const isDeprecated = deprecatedStatus.includes(snap.snapshot.executionStatus);
const previouslySeen = this.seenDeprecatedSnapshotIds.some(
(s) => s === snap.snapshot.friendlyId
);
return isDeprecated && !previouslySeen;
});

let deprecated = false;
if (deprecatedSnapshots.length > 0) {
Expand All @@ -298,6 +306,18 @@ export class SnapshotManager {
} else {
deprecated = true;
}

// Add the deprecated snapshot IDs to the seen list
this.seenDeprecatedSnapshotIds.push(
...deprecatedSnapshots.map((s) => s.snapshot.friendlyId)
);

if (this.seenDeprecatedSnapshotIds.length > this.maxSeenDeprecatedSnapshotIds) {
// Only keep the latest maxSeenDeprecatedSnapshotIds
this.seenDeprecatedSnapshotIds = this.seenDeprecatedSnapshotIds.slice(
-this.maxSeenDeprecatedSnapshotIds
);
}
}

const { snapshot } = latestSnapshot;
Expand Down
Loading
Loading