@@ -162,6 +162,49 @@ class TaskCoordinator {
162162
163163 taskSocket . emit ( "RESUME_AFTER_DEPENDENCY" , message ) ;
164164 } ,
165+ RESUME_AFTER_DEPENDENCY_WITH_ACK : async ( message ) => {
166+ const taskSocket = await this . #getAttemptSocket( message . attemptFriendlyId ) ;
167+
168+ if ( ! taskSocket ) {
169+ logger . log ( "Socket for attempt not found" , {
170+ attemptFriendlyId : message . attemptFriendlyId ,
171+ } ) ;
172+ return {
173+ success : false ,
174+ error : {
175+ name : "SocketNotFoundError" ,
176+ message : "Socket for attempt not found" ,
177+ } ,
178+ } ;
179+ }
180+
181+ //if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
182+ if ( taskSocket . data . requiresCheckpointResumeWithMessage ) {
183+ logger . log ( "RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack" , {
184+ socketData : taskSocket . data ,
185+ } ) ;
186+
187+ return {
188+ success : false ,
189+ error : {
190+ name : "CheckpointMessagePresentError" ,
191+ message :
192+ "Checkpoint message is present, so we need to kill the process and resume from the queue." ,
193+ } ,
194+ } ;
195+ }
196+
197+ await chaosMonkey . call ( ) ;
198+
199+ // In case the task resumed faster than we could checkpoint
200+ this . #cancelCheckpoint( message . runId ) ;
201+
202+ taskSocket . emit ( "RESUME_AFTER_DEPENDENCY" , message ) ;
203+
204+ return {
205+ success : true ,
206+ } ;
207+ } ,
165208 RESUME_AFTER_DURATION : async ( message ) => {
166209 const taskSocket = await this . #getAttemptSocket( message . attemptFriendlyId ) ;
167210
@@ -792,6 +835,18 @@ class TaskCoordinator {
792835 return ;
793836 }
794837
838+ logger . log ( "WAIT_FOR_TASK checkpoint created" , {
839+ checkpoint,
840+ socketData : socket . data ,
841+ } ) ;
842+
843+ //setting this means we can only resume from a checkpoint
844+ socket . data . requiresCheckpointResumeWithMessage = `location:${ checkpoint . location } -docker:${ checkpoint . docker } ` ;
845+ logger . log ( "WAIT_FOR_TASK set requiresCheckpointResumeWithMessage" , {
846+ checkpoint,
847+ socketData : socket . data ,
848+ } ) ;
849+
795850 const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
796851 version : "v1" ,
797852 attemptFriendlyId : message . attemptFriendlyId ,
@@ -804,6 +859,7 @@ class TaskCoordinator {
804859 } ) ;
805860
806861 if ( ack ?. keepRunAlive ) {
862+ socket . data . requiresCheckpointResumeWithMessage = undefined ;
807863 logger . log ( "keeping run alive after task checkpoint" , { runId : socket . data . runId } ) ;
808864 return ;
809865 }
@@ -862,6 +918,18 @@ class TaskCoordinator {
862918 return ;
863919 }
864920
921+ logger . log ( "WAIT_FOR_BATCH checkpoint created" , {
922+ checkpoint,
923+ socketData : socket . data ,
924+ } ) ;
925+
926+ //setting this means we can only resume from a checkpoint
927+ socket . data . requiresCheckpointResumeWithMessage = `location:${ checkpoint . location } -docker:${ checkpoint . docker } ` ;
928+ logger . log ( "WAIT_FOR_BATCH set checkpoint" , {
929+ checkpoint,
930+ socketData : socket . data ,
931+ } ) ;
932+
865933 const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
866934 version : "v1" ,
867935 attemptFriendlyId : message . attemptFriendlyId ,
@@ -875,6 +943,7 @@ class TaskCoordinator {
875943 } ) ;
876944
877945 if ( ack ?. keepRunAlive ) {
946+ socket . data . requiresCheckpointResumeWithMessage = undefined ;
878947 logger . log ( "keeping run alive after batch checkpoint" , { runId : socket . data . runId } ) ;
879948 return ;
880949 }
0 commit comments