@@ -216,7 +216,7 @@ export class RunExecution {
216216 *
217217 * This is the main entry point for snapshot changes, but processing is deferred to the snapshot manager.
218218 */
219- public async enqueueSnapshotChangeAndWait ( runData : RunExecutionData ) : Promise < void > {
219+ private async enqueueSnapshotChangesAndWait ( snapshots : RunExecutionData [ ] ) : Promise < void > {
220220 if ( this . isShuttingDown ) {
221221 this . sendDebugLog ( "enqueueSnapshotChangeAndWait: shutting down, skipping" ) ;
222222 return ;
@@ -227,10 +227,13 @@ export class RunExecution {
227227 return ;
228228 }
229229
230- await this . snapshotManager . handleSnapshotChange ( runData ) ;
230+ await this . snapshotManager . handleSnapshotChanges ( snapshots ) ;
231231 }
232232
233- private async processSnapshotChange ( runData : RunExecutionData ) : Promise < void > {
233+ private async processSnapshotChange (
234+ runData : RunExecutionData ,
235+ deprecated : boolean
236+ ) : Promise < void > {
234237 const { run, snapshot, completedWaitpoints } = runData ;
235238
236239 const snapshotMetadata = {
@@ -257,6 +260,13 @@ export class RunExecution {
257260 this . snapshotPoller ?. updateSnapshotId ( snapshot . friendlyId ) ;
258261 this . snapshotPoller ?. resetCurrentInterval ( ) ;
259262
263+ if ( deprecated ) {
264+ this . sendDebugLog ( "run execution is deprecated" , { incomingSnapshot : snapshot } ) ;
265+
266+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
267+ return ;
268+ }
269+
260270 switch ( snapshot . executionStatus ) {
261271 case "PENDING_CANCEL" : {
262272 this . sendDebugLog ( "run was cancelled" , snapshotMetadata ) ;
@@ -276,14 +286,12 @@ export class RunExecution {
276286 case "QUEUED" : {
277287 this . sendDebugLog ( "run was re-queued" , snapshotMetadata ) ;
278288
279- // Pretend we've just suspended the run. This will kill the process without failing the run.
280289 await this . exitTaskRunProcessWithoutFailingRun ( { flush : true } ) ;
281290 return ;
282291 }
283292 case "FINISHED" : {
284293 this . sendDebugLog ( "run is finished" , snapshotMetadata ) ;
285294
286- // Pretend we've just suspended the run. This will kill the process without failing the run.
287295 await this . exitTaskRunProcessWithoutFailingRun ( { flush : true } ) ;
288296 return ;
289297 }
@@ -434,10 +442,12 @@ export class RunExecution {
434442 // Create snapshot manager
435443 this . snapshotManager = new SnapshotManager ( {
436444 runFriendlyId : runOpts . runFriendlyId ,
445+ runnerId : this . env . TRIGGER_RUNNER_ID ,
437446 initialSnapshotId : runOpts . snapshotFriendlyId ,
438447 // We're just guessing here, but "PENDING_EXECUTING" is probably fine
439448 initialStatus : "PENDING_EXECUTING" ,
440449 logger : this . logger ,
450+ metadataClient : this . metadataClient ,
441451 onSnapshotChange : this . processSnapshotChange . bind ( this ) ,
442452 onSuspendable : this . handleSuspendable . bind ( this ) ,
443453 } ) ;
@@ -835,14 +845,18 @@ export class RunExecution {
835845 const [ error , overrides ] = await this . metadataClient . getEnvOverrides ( ) ;
836846
837847 if ( error ) {
838- this . sendDebugLog ( "[override] failed to fetch" , { error : error . message } ) ;
848+ this . sendDebugLog ( "[override] failed to fetch" , {
849+ reason,
850+ error : error . message ,
851+ } ) ;
839852 return null ;
840853 }
841854
842855 if ( overrides . TRIGGER_RUN_ID && overrides . TRIGGER_RUN_ID !== this . runFriendlyId ) {
843856 this . sendDebugLog ( "[override] run ID mismatch, ignoring overrides" , {
857+ reason,
844858 currentRunId : this . runFriendlyId ,
845- overrideRunId : overrides . TRIGGER_RUN_ID ,
859+ incomingRunId : overrides . TRIGGER_RUN_ID ,
846860 } ) ;
847861 return null ;
848862 }
@@ -855,11 +869,14 @@ export class RunExecution {
855869 let executionWasRestored = false ;
856870
857871 if ( this . env . TRIGGER_RUNNER_ID !== overrides . TRIGGER_RUNNER_ID ) {
858- this . sendDebugLog ( "[override] runner ID changed -> execution was restored" , {
872+ this . sendDebugLog ( "[override] runner ID mismatch, execution was restored" , {
873+ reason,
859874 currentRunnerId : this . env . TRIGGER_RUNNER_ID ,
860- newRunnerId : overrides . TRIGGER_RUNNER_ID ,
875+ incomingRunnerId : overrides . TRIGGER_RUNNER_ID ,
861876 } ) ;
862877
878+ // we should keep a list of restored snapshots
879+
863880 executionWasRestored = true ;
864881 }
865882
@@ -1124,7 +1141,6 @@ export class RunExecution {
11241141 } ) ;
11251142
11261143 await this . processEnvOverrides ( "snapshots since error" ) ;
1127-
11281144 return ;
11291145 }
11301146
@@ -1135,49 +1151,7 @@ export class RunExecution {
11351151 return ;
11361152 }
11371153
1138- // Only act on the last snapshot
1139- const lastSnapshot = snapshots [ snapshots . length - 1 ] ;
1140-
1141- if ( ! lastSnapshot ) {
1142- this . sendDebugLog ( `fetchAndProcessSnapshotChanges: no last snapshot` , { source } ) ;
1143- return ;
1144- }
1145-
1146- const previousSnapshots = snapshots . slice ( 0 , - 1 ) ;
1147-
1148- // If any previous snapshot is QUEUED or SUSPENDED, deprecate this worker
1149- const deprecatedStatus : TaskRunExecutionStatus [ ] = [ "QUEUED" , "SUSPENDED" ] ;
1150- const deprecatedSnapshots = previousSnapshots . filter ( ( snap ) =>
1151- deprecatedStatus . includes ( snap . snapshot . executionStatus )
1152- ) ;
1153-
1154- if ( deprecatedSnapshots . length ) {
1155- const result = await this . processEnvOverrides (
1156- "found deprecation marker in previous snapshots"
1157- ) ;
1158-
1159- if ( ! result ) {
1160- return ;
1161- }
1162-
1163- const { executionWasRestored } = result ;
1164-
1165- if ( executionWasRestored ) {
1166- // It's normal for a restored run to have deprecation markers, e.g. it will have been SUSPENDED
1167- } else {
1168- this . sendDebugLog (
1169- `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1170- {
1171- source,
1172- deprecatedSnapshots : deprecatedSnapshots . map ( ( s ) => s . snapshot ) ,
1173- }
1174- ) ;
1175- await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1176- return ;
1177- }
1178- }
1179-
1180- const [ error ] = await tryCatch ( this . enqueueSnapshotChangeAndWait ( lastSnapshot ) ) ;
1154+ const [ error ] = await tryCatch ( this . enqueueSnapshotChangesAndWait ( snapshots ) ) ;
11811155
11821156 if ( error ) {
11831157 this . sendDebugLog (
0 commit comments