diff --git a/.changeset/six-cougars-play.md b/.changeset/six-cougars-play.md new file mode 100644 index 0000000000..75d128dddd --- /dev/null +++ b/.changeset/six-cougars-play.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Reduce restore recovery time and fix deprecated runner false positives \ No newline at end of file diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 6b24ffcf33..35d53d3609 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -125,7 +125,7 @@ export class WorkloadServer extends EventEmitter { } private createHttpServer({ host, port }: { host: string; port: number }) { - return new HttpServer({ + const httpServer = new HttpServer({ port, host, metrics: { @@ -346,23 +346,6 @@ export class WorkloadServer extends EventEmitter { }, } ) - .route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { - paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }), - bodySchema: WorkloadDebugLogRequestBody, - handler: async ({ req, reply, params, body }) => { - reply.empty(204); - - if (!env.SEND_RUN_DEBUG_LOGS) { - return; - } - - await this.workerClient.sendDebugLog( - params.runFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - }, - }) .route("/api/v1/workload-actions/deployments/:deploymentId/dequeue", "GET", { paramsSchema: z.object({ deploymentId: z.string(), @@ -387,6 +370,31 @@ export class WorkloadServer extends EventEmitter { reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody); }, }); + + if (env.SEND_RUN_DEBUG_LOGS) { + httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { + paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }), + bodySchema: WorkloadDebugLogRequestBody, + handler: async ({ req, reply, params, body }) => { + reply.empty(204); + + await this.workerClient.sendDebugLog( + params.runFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + }, + }); + } else { + // Lightweight mock route without schemas + httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { + handler: async ({ reply }) => { + reply.empty(204); + }, + }); + } + + return httpServer; } private createWebsocketServer() { diff --git a/packages/cli-v3/src/entryPoints/managed/controller.ts b/packages/cli-v3/src/entryPoints/managed/controller.ts index 80bd744dfa..c721cefc56 100644 --- a/packages/cli-v3/src/entryPoints/managed/controller.ts +++ b/packages/cli-v3/src/entryPoints/managed/controller.ts @@ -461,19 +461,6 @@ export class ManagedRunController { runId: this.runFriendlyId, message: "Socket connected to supervisor", }); - - // This should handle the case where we reconnect after being restored - if ( - this.runFriendlyId && - this.snapshotFriendlyId && - this.runFriendlyId !== this.env.TRIGGER_RUN_ID - ) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Subscribing to notifications for in-progress run", - }); - this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId); - } }); socket.on("connect_error", (error) => { @@ -514,7 +501,7 @@ export class ManagedRunController { supervisorApiUrl: this.env.TRIGGER_SUPERVISOR_API_URL, }; - await this.currentExecution.processEnvOverrides("socket disconnected", true); + const result = await this.currentExecution.processEnvOverrides("socket disconnected", true); const newEnv = { workerInstanceName: this.env.TRIGGER_WORKER_INSTANCE_NAME, @@ -528,6 +515,43 @@ export class ManagedRunController { properties: { reason, ...parseDescription(), currentEnv, newEnv }, }); + if (!result) { + return; + } + + // If runner ID changed, we detected a restore + if (result.runnerIdChanged) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Runner ID changed - restore detected", + properties: { + supervisorChanged: result.supervisorChanged, + }, + }); + + if (!result.supervisorChanged) { + return; + } + + // Only reconnect WebSocket if supervisor URL actually changed + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Supervisor URL changed - creating new socket connection", + }); + + // First disconnect the old socket to avoid conflicts + socket.removeAllListeners(); + socket.disconnect(); + + // Create a new socket with the updated URL and headers + this.socket = this.createSupervisorSocket(); + + // Re-subscribe to notifications if we have an active execution + if (this.runFriendlyId && this.snapshotFriendlyId) { + this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId); + } + } + return; } diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index 2b9e0c9c08..2dd3e6838e 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -873,7 +873,23 @@ export class RunExecution { ); if (!continuationResult.success) { - throw new Error(continuationResult.error); + // Check if we need to refresh metadata due to connection error + if (continuationResult.isConnectionError) { + this.sendDebugLog("restore: connection error detected, refreshing metadata"); + await this.processEnvOverrides("restore connection error"); + + // Retry the continuation after refreshing metadata + const retryResult = await this.httpClient.continueRunExecution( + this.runFriendlyId, + this.snapshotManager.snapshotId + ); + + if (!retryResult.success) { + throw new Error(retryResult.error); + } + } else { + throw new Error(continuationResult.error); + } } // Track restore count @@ -899,11 +915,18 @@ export class RunExecution { public async processEnvOverrides( reason?: string, shouldPollForSnapshotChanges?: boolean - ): Promise<{ overrides: Metadata } | null> { + ): Promise<{ + overrides: Metadata; + runnerIdChanged?: boolean; + supervisorChanged?: boolean; + } | null> { if (!this.metadataClient) { return null; } + const previousRunnerId = this.env.TRIGGER_RUNNER_ID; + const previousSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL; + const [error, overrides] = await this.metadataClient.getEnvOverrides(); if (error) { @@ -931,6 +954,14 @@ export class RunExecution { // Override the env with the new values this.env.override(overrides); + // Check if runner ID changed + const newRunnerId = this.env.TRIGGER_RUNNER_ID; + const runnerIdChanged = previousRunnerId !== newRunnerId; + + // Check if supervisor URL changed + const newSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL; + const supervisorChanged = previousSupervisorUrl !== newSupervisorUrl; + // Update services with new values if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000); @@ -954,6 +985,8 @@ export class RunExecution { return { overrides, + runnerIdChanged, + supervisorChanged, }; } @@ -977,6 +1010,12 @@ export class RunExecution { if (!response.success) { this.sendDebugLog("heartbeat: failed", { error: response.error }); + + // Check if we need to refresh metadata due to connection error + if (response.isConnectionError) { + this.sendDebugLog("heartbeat: connection error detected, refreshing metadata"); + await this.processEnvOverrides("heartbeat connection error"); + } } this.lastHeartbeat = new Date(); @@ -1192,6 +1231,14 @@ export class RunExecution { error: response.error, }); + if (response.isConnectionError) { + // Log this separately to make it more visible + this.sendDebugLog( + "fetchAndProcessSnapshotChanges: connection error detected, refreshing metadata" + ); + } + + // Always trigger metadata refresh on snapshot fetch errors await this.processEnvOverrides("snapshots since error"); return; } diff --git a/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts b/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts index 05cba11f38..a3dbab3883 100644 --- a/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts +++ b/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts @@ -697,6 +697,65 @@ describe("SnapshotManager", () => { true ); }); + + it("should handle deprecated snapshot race condition - avoid false positives from stale polls", async () => { + const onSnapshotChange = vi.fn(); + + // Mock MetadataClient to simulate runner ID change (restore detected) on first call + let isFirstCall = true; + const mockMetadataClient = { + getEnvOverrides: vi.fn().mockImplementation(() => { + if (isFirstCall) { + isFirstCall = false; + return Promise.resolve([null, { TRIGGER_RUNNER_ID: "test-runner-2" }]); // Different runner ID = restore + } + return Promise.resolve([null, { TRIGGER_RUNNER_ID: "test-runner-2" }]); // Same runner ID afterward + }), + }; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "EXECUTING_WITH_WAITPOINTS", + logger: mockLogger, + metadataClient: mockMetadataClient as any, + onSnapshotChange, + onSuspendable: mockSuspendableHandler, + }); + + // First update: Process restore transition with deprecated statuses (normal case) + // This simulates: EXECUTING_WITH_WAITPOINTS -> [SUSPENDED, QUEUED] -> PENDING_EXECUTING + await manager.handleSnapshotChanges([ + createRunExecutionData({ snapshotId: "snapshot-suspended", executionStatus: "SUSPENDED" }), + createRunExecutionData({ snapshotId: "snapshot-queued", executionStatus: "QUEUED" }), + createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "PENDING_EXECUTING" }), + ]); + + // First call should be deprecated=false (restore detected) + expect(onSnapshotChange).toHaveBeenCalledWith( + expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-2" }) }), + false + ); + + onSnapshotChange.mockClear(); + + // Second update: Should only get new snapshot (race condition case) + // This simulates a stale poll that returns: getSnapshotsSince(snapshot-1) -> [SUSPENDED, QUEUED, snapshot-2, snapshot-3] + // The SUSPENDED/QUEUED should be ignored as already seen + await manager.handleSnapshotChanges([ + createRunExecutionData({ snapshotId: "snapshot-suspended", executionStatus: "SUSPENDED" }), // Already seen + createRunExecutionData({ snapshotId: "snapshot-queued", executionStatus: "QUEUED" }), // Already seen + createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "PENDING_EXECUTING" }), // Already processed + createRunExecutionData({ snapshotId: "snapshot-3", executionStatus: "EXECUTING" }), // New + ]); + + // Should call onSnapshotChange with deprecated = false (no new deprecated snapshots) + expect(onSnapshotChange).toHaveBeenCalledWith( + expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-3" }) }), + false + ); + }); }); // Helper to generate RunExecutionData with sensible defaults diff --git a/packages/cli-v3/src/entryPoints/managed/snapshot.ts b/packages/cli-v3/src/entryPoints/managed/snapshot.ts index 75d3d4b036..9703ea8f87 100644 --- a/packages/cli-v3/src/entryPoints/managed/snapshot.ts +++ b/packages/cli-v3/src/entryPoints/managed/snapshot.ts @@ -49,6 +49,10 @@ export class SnapshotManager { private changeQueue: QueuedChangeItem[] = []; private isProcessingQueue = false; + // Track seen deprecated snapshots to prevent false positives + private seenDeprecatedSnapshotIds: string[] = []; + private readonly maxSeenDeprecatedSnapshotIds = 50; + constructor(opts: SnapshotManagerOptions) { this.runFriendlyId = opts.runFriendlyId; this.runnerId = opts.runnerId; @@ -284,9 +288,13 @@ export class SnapshotManager { // Check if any previous snapshot is QUEUED or SUSPENDED const deprecatedStatus: TaskRunExecutionStatus[] = ["QUEUED", "SUSPENDED"]; - const deprecatedSnapshots = previousSnapshots.filter((snap) => - deprecatedStatus.includes(snap.snapshot.executionStatus) - ); + const deprecatedSnapshots = previousSnapshots.filter((snap) => { + const isDeprecated = deprecatedStatus.includes(snap.snapshot.executionStatus); + const previouslySeen = this.seenDeprecatedSnapshotIds.some( + (s) => s === snap.snapshot.friendlyId + ); + return isDeprecated && !previouslySeen; + }); let deprecated = false; if (deprecatedSnapshots.length > 0) { @@ -298,6 +306,18 @@ export class SnapshotManager { } else { deprecated = true; } + + // Add the deprecated snapshot IDs to the seen list + this.seenDeprecatedSnapshotIds.push( + ...deprecatedSnapshots.map((s) => s.snapshot.friendlyId) + ); + + if (this.seenDeprecatedSnapshotIds.length > this.maxSeenDeprecatedSnapshotIds) { + // Only keep the latest maxSeenDeprecatedSnapshotIds + this.seenDeprecatedSnapshotIds = this.seenDeprecatedSnapshotIds.slice( + -this.maxSeenDeprecatedSnapshotIds + ); + } } const { snapshot } = latestSnapshot; diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index 57c7f06e35..93fa7bf03c 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -52,18 +52,58 @@ export class WorkloadHttpClient { }); } + private isConnectionError(error: string): boolean { + const connectionErrors = [ + "Connection error", + "ECONNREFUSED", + "ETIMEDOUT", + "ENOTFOUND", + "ECONNRESET", + "EHOSTUNREACH", + "ENETUNREACH", + "EPIPE", + "ECONNABORTED", + ]; + return connectionErrors.some((errType) => error.includes(errType)); + } + + private async withConnectionErrorDetection( + operation: () => Promise<{ success: true; data: T } | { success: false; error: string }> + ): Promise< + { success: true; data: T } | { success: false; error: string; isConnectionError?: boolean } + > { + const result = await operation(); + + if (result.success) { + return result; + } + + // Check if this is a connection error + if (this.isConnectionError(result.error)) { + return { + ...result, + isConnectionError: true, + }; + } + + return result; + } + async heartbeatRun(runId: string, snapshotId: string, body?: WorkloadHeartbeatRequestBody) { - return wrapZodFetch( - WorkloadHeartbeatResponseBody, - `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/heartbeat`, - { - method: "POST", - headers: { - ...this.defaultHeaders(), - "Content-Type": "application/json", - }, - body: JSON.stringify(body ?? {}), - } + return this.withConnectionErrorDetection(() => + wrapZodFetch( + WorkloadHeartbeatResponseBody, + `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/heartbeat`, + { + method: "POST", + headers: { + ...this.defaultHeaders(), + "Content-Type": "application/json", + }, + body: JSON.stringify(body ?? {}), + signal: AbortSignal.timeout(10_000), // 10 second timeout + } + ) ); } @@ -81,15 +121,17 @@ export class WorkloadHttpClient { } async continueRunExecution(runId: string, snapshotId: string) { - return wrapZodFetch( - WorkloadContinueRunExecutionResponseBody, - `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/continue`, - { - method: "GET", - headers: { - ...this.defaultHeaders(), - }, - } + return this.withConnectionErrorDetection(() => + wrapZodFetch( + WorkloadContinueRunExecutionResponseBody, + `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/continue`, + { + method: "GET", + headers: { + ...this.defaultHeaders(), + }, + } + ) ); } @@ -130,15 +172,18 @@ export class WorkloadHttpClient { } async getSnapshotsSince(runId: string, snapshotId: string) { - return wrapZodFetch( - WorkloadRunSnapshotsSinceResponseBody, - `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/since/${snapshotId}`, - { - method: "GET", - headers: { - ...this.defaultHeaders(), - }, - } + return this.withConnectionErrorDetection(() => + wrapZodFetch( + WorkloadRunSnapshotsSinceResponseBody, + `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/since/${snapshotId}`, + { + method: "GET", + headers: { + ...this.defaultHeaders(), + }, + signal: AbortSignal.timeout(10_000), // 10 second timeout + } + ) ); }