@@ -17,7 +17,7 @@ import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
1717import { setTimeout as sleep } from "timers/promises" ;
1818import { RunExecutionSnapshotPoller } from "./poller.js" ;
1919import { assertExhaustive , tryCatch } from "@trigger.dev/core/utils" ;
20- import { MetadataClient } from "./overrides.js" ;
20+ import { Metadata , MetadataClient } from "./overrides.js" ;
2121import { randomBytes } from "node:crypto" ;
2222import { SnapshotManager , SnapshotState } from "./snapshot.js" ;
2323import type { SupervisorSocket } from "./controller.js" ;
@@ -76,6 +76,7 @@ export class RunExecution {
7676
7777 private supervisorSocket : SupervisorSocket ;
7878 private notifier ?: RunNotifier ;
79+ private metadataClient ?: MetadataClient ;
7980
8081 constructor ( opts : RunExecutionOptions ) {
8182 this . id = randomBytes ( 4 ) . toString ( "hex" ) ;
@@ -87,6 +88,10 @@ export class RunExecution {
8788
8889 this . restoreCount = 0 ;
8990 this . executionAbortController = new AbortController ( ) ;
91+
92+ if ( this . env . TRIGGER_METADATA_URL ) {
93+ this . metadataClient = new MetadataClient ( this . env . TRIGGER_METADATA_URL ) ;
94+ }
9095 }
9196
9297 /**
@@ -820,30 +825,42 @@ export class RunExecution {
820825 /**
821826 * Processes env overrides from the metadata service. Generally called when we're resuming from a suspended state.
822827 */
823- public async processEnvOverrides ( reason ?: string ) {
824- if ( ! this . env . TRIGGER_METADATA_URL ) {
825- this . sendDebugLog ( "no metadata url, skipping env overrides" , { reason } ) ;
826- return ;
828+ public async processEnvOverrides (
829+ reason ?: string
830+ ) : Promise < { executionWasRestored : boolean ; overrides : Metadata } | null > {
831+ if ( ! this . metadataClient ) {
832+ return null ;
827833 }
828834
829- const metadataClient = new MetadataClient ( this . env . TRIGGER_METADATA_URL ) ;
830- const overrides = await metadataClient . getEnvOverrides ( ) ;
835+ const [ error , overrides ] = await this . metadataClient . getEnvOverrides ( ) ;
831836
832- if ( ! overrides ) {
833- this . sendDebugLog ( "no env overrides, skipping" , { reason } ) ;
834- return ;
837+ if ( error ) {
838+ this . sendDebugLog ( "[override] failed to fetch" , { error : error . message } ) ;
839+ return null ;
840+ }
841+
842+ if ( overrides . TRIGGER_RUN_ID && overrides . TRIGGER_RUN_ID !== this . runFriendlyId ) {
843+ this . sendDebugLog ( "[override] run ID mismatch, ignoring overrides" , {
844+ currentRunId : this . runFriendlyId ,
845+ overrideRunId : overrides . TRIGGER_RUN_ID ,
846+ } ) ;
847+ return null ;
835848 }
836849
837- this . sendDebugLog ( `processing env overrides : ${ reason } ` , {
850+ this . sendDebugLog ( `[override] processing : ${ reason } ` , {
838851 overrides,
839852 currentEnv : this . env . raw ,
840853 } ) ;
841854
855+ let executionWasRestored = false ;
856+
842857 if ( this . env . TRIGGER_RUNNER_ID !== overrides . TRIGGER_RUNNER_ID ) {
843- this . sendDebugLog ( "runner ID changed -> run was restored from a checkpoint " , {
858+ this . sendDebugLog ( "[override] runner ID changed -> execution was restored" , {
844859 currentRunnerId : this . env . TRIGGER_RUNNER_ID ,
845860 newRunnerId : overrides . TRIGGER_RUNNER_ID ,
846861 } ) ;
862+
863+ executionWasRestored = true ;
847864 }
848865
849866 // Override the env with the new values
@@ -863,6 +880,11 @@ export class RunExecution {
863880 if ( overrides . TRIGGER_RUNNER_ID ) {
864881 this . httpClient . updateRunnerId ( this . env . TRIGGER_RUNNER_ID ) ;
865882 }
883+
884+ return {
885+ executionWasRestored,
886+ overrides,
887+ } ;
866888 }
867889
868890 private async onHeartbeat ( ) {
@@ -1125,21 +1147,34 @@ export class RunExecution {
11251147
11261148 // If any previous snapshot is QUEUED or SUSPENDED, deprecate this worker
11271149 const deprecatedStatus : TaskRunExecutionStatus [ ] = [ "QUEUED" , "SUSPENDED" ] ;
1128- const foundDeprecated = previousSnapshots . find ( ( snap ) =>
1150+ const deprecatedSnapshots = previousSnapshots . filter ( ( snap ) =>
11291151 deprecatedStatus . includes ( snap . snapshot . executionStatus )
11301152 ) ;
11311153
1132- if ( foundDeprecated ) {
1133- this . sendDebugLog (
1134- `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1135- {
1136- source,
1137- status : foundDeprecated . snapshot . executionStatus ,
1138- snapshotId : foundDeprecated . snapshot . friendlyId ,
1139- }
1154+ if ( deprecatedSnapshots . length ) {
1155+ const result = await this . processEnvOverrides (
1156+ "found deprecation marker in previous snapshots"
11401157 ) ;
1141- await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1142- return ;
1158+
1159+ if ( ! result ) {
1160+ return ;
1161+ }
1162+
1163+ const { executionWasRestored } = result ;
1164+
1165+ if ( executionWasRestored ) {
1166+ // It's normal for a restored run to have deprecation markers, e.g. it will have been SUSPENDED
1167+ } else {
1168+ this . sendDebugLog (
1169+ `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1170+ {
1171+ source,
1172+ deprecatedSnapshots : deprecatedSnapshots . map ( ( s ) => s . snapshot ) ,
1173+ }
1174+ ) ;
1175+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1176+ return ;
1177+ }
11431178 }
11441179
11451180 const [ error ] = await tryCatch ( this . enqueueSnapshotChangeAndWait ( lastSnapshot ) ) ;
0 commit comments