@@ -725,20 +725,47 @@ export class SharedQueueConsumer {
725725 }
726726
727727 try {
728- logger . debug ( "Broadcasting RESUME_AFTER_DEPENDENCY" , {
729- runId : resumableAttempt . taskRunId ,
730- attemptId : resumableAttempt . id ,
731- } ) ;
732-
733- // The attempt should still be running so we can broadcast to all coordinators to resume immediately
734- socketIo . coordinatorNamespace . emit ( "RESUME_AFTER_DEPENDENCY" , {
735- version : "v1" ,
728+ const resumeMessage = {
729+ version : "v1" as const ,
736730 runId : resumableAttempt . taskRunId ,
737731 attemptId : resumableAttempt . id ,
738732 attemptFriendlyId : resumableAttempt . friendlyId ,
739733 completions,
740734 executions,
735+ } ;
736+
737+ logger . debug ( "Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK" , { resumeMessage, message } ) ;
738+
739+ // The attempt should still be running so we can broadcast to all coordinators to resume immediately
740+ const responses = await socketIo . coordinatorNamespace
741+ . timeout ( 10_000 )
742+ . emitWithAck ( "RESUME_AFTER_DEPENDENCY_WITH_ACK" , resumeMessage ) ;
743+
744+ logger . debug ( "RESUME_AFTER_DEPENDENCY_WITH_ACK received" , {
745+ resumeMessage,
746+ responses,
747+ message,
741748 } ) ;
749+
750+ if ( responses . length === 0 ) {
751+ logger . error ( "RESUME_AFTER_DEPENDENCY_WITH_ACK no response" , {
752+ resumeMessage,
753+ message,
754+ } ) ;
755+ await this . #nackAndDoMoreWork( message . messageId , this . _options . nextTickInterval , 5_000 ) ;
756+ return ;
757+ }
758+
759+ const hasSuccess = responses . some ( ( response ) => response . success ) ;
760+ if ( ! hasSuccess ) {
761+ logger . warn ( "RESUME_AFTER_DEPENDENCY_WITH_ACK failed" , {
762+ resumeMessage,
763+ responses,
764+ message,
765+ } ) ;
766+ await this . #nackAndDoMoreWork( message . messageId , this . _options . nextTickInterval , 5_000 ) ;
767+ return ;
768+ }
742769 } catch ( e ) {
743770 if ( e instanceof Error ) {
744771 this . _currentSpan ?. recordException ( e ) ;
@@ -748,7 +775,12 @@ export class SharedQueueConsumer {
748775
749776 this . _endSpanInNextIteration = true ;
750777
751- await this . #nackAndDoMoreWork( message . messageId ) ;
778+ logger . error ( "RESUME_AFTER_DEPENDENCY_WITH_ACK threw, nacking with delay" , {
779+ message,
780+ error : e ,
781+ } ) ;
782+
783+ await this . #nackAndDoMoreWork( message . messageId , this . _options . nextTickInterval , 5_000 ) ;
752784 return ;
753785 }
754786
@@ -1169,8 +1201,8 @@ class SharedQueueTasks {
11691201 } satisfies TaskRunExecutionLazyAttemptPayload ;
11701202 }
11711203
1172- async taskHeartbeat ( attemptFriendlyId : string , seconds : number = 60 ) {
1173- logger . debug ( "[SharedQueueConsumer] taskHeartbeat()" , { id : attemptFriendlyId , seconds } ) ;
1204+ async taskHeartbeat ( attemptFriendlyId : string ) {
1205+ logger . debug ( "[SharedQueueConsumer] taskHeartbeat()" , { id : attemptFriendlyId } ) ;
11741206
11751207 const taskRunAttempt = await prisma . taskRunAttempt . findUnique ( {
11761208 where : { friendlyId : attemptFriendlyId } ,
@@ -1180,13 +1212,13 @@ class SharedQueueTasks {
11801212 return ;
11811213 }
11821214
1183- await marqs ?. heartbeatMessage ( taskRunAttempt . taskRunId , seconds ) ;
1215+ await marqs ?. heartbeatMessage ( taskRunAttempt . taskRunId ) ;
11841216 }
11851217
1186- async taskRunHeartbeat ( runId : string , seconds : number = 60 ) {
1187- logger . debug ( "[SharedQueueConsumer] taskRunHeartbeat()" , { runId, seconds } ) ;
1218+ async taskRunHeartbeat ( runId : string ) {
1219+ logger . debug ( "[SharedQueueConsumer] taskRunHeartbeat()" , { runId } ) ;
11881220
1189- await marqs ?. heartbeatMessage ( runId , seconds ) ;
1221+ await marqs ?. heartbeatMessage ( runId ) ;
11901222 }
11911223
11921224 public async taskRunFailed ( completion : TaskRunFailedExecutionResult ) {
0 commit comments