@@ -448,13 +448,13 @@ export class RunExecution {
448448 snapshotFriendlyId : this . snapshotManager . snapshotId ,
449449 logger : this . logger ,
450450 snapshotPollIntervalSeconds : this . env . TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS ,
451- onPoll : this . fetchAndEnqueueSnapshotChange . bind ( this ) ,
451+ onPoll : this . fetchAndProcessSnapshotChanges . bind ( this ) ,
452452 } ) . start ( ) ;
453453
454454 this . notifier = new RunNotifier ( {
455455 runFriendlyId : this . runFriendlyId ,
456456 supervisorSocket : this . supervisorSocket ,
457- onNotify : this . fetchAndEnqueueSnapshotChange . bind ( this ) ,
457+ onNotify : this . fetchAndProcessSnapshotChanges . bind ( this ) ,
458458 logger : this . logger ,
459459 } ) . start ( ) ;
460460
@@ -1081,29 +1081,71 @@ export class RunExecution {
10811081 * Fetches the latest execution data and enqueues snapshot changes. Used by both poller and notification handlers.
10821082 * @param source string - where this call originated (e.g. 'poller', 'notification')
10831083 */
1084- public async fetchAndEnqueueSnapshotChange ( source : string ) : Promise < void > {
1084+ public async fetchAndProcessSnapshotChanges ( source : string ) : Promise < void > {
10851085 if ( ! this . runFriendlyId ) {
1086- this . sendDebugLog ( `fetchAndEnqueueSnapshotChange : missing runFriendlyId` , { source } ) ;
1086+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges : missing runFriendlyId` , { source } ) ;
10871087 return ;
10881088 }
10891089
1090- const latestSnapshot = await this . httpClient . getRunExecutionData ( this . runFriendlyId ) ;
1090+ // Use the last processed snapshot as the since parameter
1091+ const sinceSnapshotId = this . currentSnapshotFriendlyId ;
10911092
1092- if ( ! latestSnapshot . success ) {
1093- this . sendDebugLog ( `fetchAndEnqueueSnapshotChange: failed to get latest snapshot data` , {
1093+ if ( ! sinceSnapshotId ) {
1094+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: missing sinceSnapshotId` , { source } ) ;
1095+ return ;
1096+ }
1097+
1098+ const response = await this . httpClient . getSnapshotsSince ( this . runFriendlyId , sinceSnapshotId ) ;
1099+
1100+ if ( ! response . success ) {
1101+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: failed to get snapshots since` , {
10941102 source,
1095- error : latestSnapshot . error ,
1103+ error : response . error ,
10961104 } ) ;
10971105 return ;
10981106 }
10991107
1100- const [ error ] = await tryCatch (
1101- this . enqueueSnapshotChangeAndWait ( latestSnapshot . data . execution )
1108+ const { executions } = response . data ;
1109+
1110+ if ( ! executions . length ) {
1111+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: no new snapshots` , { source } ) ;
1112+ return ;
1113+ }
1114+
1115+ // Only act on the last snapshot
1116+ const lastSnapshot = executions [ executions . length - 1 ] ;
1117+
1118+ if ( ! lastSnapshot ) {
1119+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: no last snapshot` , { source } ) ;
1120+ return ;
1121+ }
1122+
1123+ const previousSnapshots = executions . slice ( 0 , - 1 ) ;
1124+
1125+ // If any previous snapshot is QUEUED or SUSPENDED, deprecate this worker
1126+ const deprecatedStatus : TaskRunExecutionStatus [ ] = [ "QUEUED" , "SUSPENDED" ] ;
1127+ const foundDeprecated = previousSnapshots . find ( ( snap ) =>
1128+ deprecatedStatus . includes ( snap . snapshot . executionStatus )
11021129 ) ;
11031130
1131+ if ( foundDeprecated ) {
1132+ this . sendDebugLog (
1133+ `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1134+ {
1135+ source,
1136+ status : foundDeprecated . snapshot . executionStatus ,
1137+ snapshotId : foundDeprecated . snapshot . friendlyId ,
1138+ }
1139+ ) ;
1140+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1141+ return ;
1142+ }
1143+
1144+ const [ error ] = await tryCatch ( this . enqueueSnapshotChangeAndWait ( lastSnapshot ) ) ;
1145+
11041146 if ( error ) {
11051147 this . sendDebugLog (
1106- `fetchAndEnqueueSnapshotChange : failed to enqueue and process snapshot change` ,
1148+ `fetchAndProcessSnapshotChanges : failed to enqueue and process snapshot change` ,
11071149 {
11081150 source,
11091151 error : error . message ,
0 commit comments