@@ -760,10 +760,21 @@ export class BatchTriggerV3Service extends BaseService {
760760    } ) ) ; 
761761
762762    let  workingIndex  =  currentIndex ; 
763+     let  expectedCount  =  0 ; 
763764
764765    for  ( const  item  of  itemsToProcess )  { 
765766      try  { 
766-         await  this . #processBatchTaskRunItem( batch ,  environment ,  item ,  workingIndex ,  options ) ; 
767+         const  created  =  await  this . #processBatchTaskRunItem( 
768+           batch , 
769+           environment , 
770+           item , 
771+           workingIndex , 
772+           options 
773+         ) ; 
774+ 
775+         if  ( created )  { 
776+           expectedCount ++ ; 
777+         } 
767778
768779        workingIndex ++ ; 
769780      }  catch  ( error )  { 
@@ -780,6 +791,17 @@ export class BatchTriggerV3Service extends BaseService {
780791      } 
781792    } 
782793
794+     if  ( expectedCount  >  0 )  { 
795+       await  this . _prisma . batchTaskRun . update ( { 
796+         where : {  id : batch . id  } , 
797+         data : { 
798+           expectedCount : { 
799+             increment : expectedCount , 
800+           } , 
801+         } , 
802+       } ) ; 
803+     } 
804+ 
783805    return  {  workingIndex } ; 
784806  } 
785807
@@ -825,21 +847,15 @@ export class BatchTriggerV3Service extends BaseService {
825847
826848    if  ( ! result . isCached )  { 
827849      try  { 
828-         await  $transaction ( this . _prisma ,  async  ( tx )  =>  { 
829-           // [batchTaskRunId, taskRunId] is a unique index 
830-           await  tx . batchTaskRunItem . create ( { 
831-             data : { 
832-               batchTaskRunId : batch . id , 
833-               taskRunId : result . run . id , 
834-               status : batchTaskRunItemStatusForRunStatus ( result . run . status ) , 
835-             } , 
836-           } ) ; 
837- 
838-           await  tx . batchTaskRun . update ( { 
839-             where : {  id : batch . id  } , 
840-             data : {  expectedCount : {  increment : 1  }  } , 
841-           } ) ; 
850+         await  this . _prisma . batchTaskRunItem . create ( { 
851+           data : { 
852+             batchTaskRunId : batch . id , 
853+             taskRunId : result . run . id , 
854+             status : batchTaskRunItemStatusForRunStatus ( result . run . status ) , 
855+           } , 
842856        } ) ; 
857+ 
858+         return  true ; 
843859      }  catch  ( error )  { 
844860        if  ( isUniqueConstraintError ( error ,  [ "batchTaskRunId" ,  "taskRunId" ] ) )  { 
845861          // This means there is already a batchTaskRunItem for this batch and taskRun 
@@ -852,12 +868,14 @@ export class BatchTriggerV3Service extends BaseService {
852868            } 
853869          ) ; 
854870
855-           return ; 
871+           return   false ; 
856872        } 
857873
858874        throw  error ; 
859875      } 
860876    } 
877+ 
878+     return  false ; 
861879  } 
862880
863881  async  #enqueueBatchTaskRun( options : BatchProcessingOptions ,  tx ?: PrismaClientOrTransaction )  { 
@@ -907,62 +925,69 @@ export async function completeBatchTaskRunItemV3(
907925  scheduleResumeOnComplete  =  false , 
908926  taskRunAttemptId ?: string 
909927)  { 
910-   await  $transaction ( tx ,  "completeBatchTaskRunItemV3" ,  async  ( tx ,  span )  =>  { 
911-     span ?. setAttribute ( "batch_id" ,  batchTaskRunId ) ; 
912- 
913-     // Update the item to complete 
914-     const  updated  =  await  tx . batchTaskRunItem . updateMany ( { 
915-       where : { 
916-         id : itemId , 
917-         status : "PENDING" , 
918-       } , 
919-       data : { 
920-         status : "COMPLETED" , 
921-         taskRunAttemptId, 
922-       } , 
923-     } ) ; 
924- 
925-     if  ( updated . count  ===  0 )  { 
926-       return ; 
927-     } 
928- 
929-     const  updatedBatchRun  =  await  tx . batchTaskRun . update ( { 
930-       where : { 
931-         id : batchTaskRunId , 
932-       } , 
933-       data : { 
934-         completedCount : { 
935-           increment : 1 , 
928+   await  $transaction ( 
929+     tx , 
930+     "completeBatchTaskRunItemV3" , 
931+     async  ( tx ,  span )  =>  { 
932+       span ?. setAttribute ( "batch_id" ,  batchTaskRunId ) ; 
933+ 
934+       // Update the item to complete 
935+       const  updated  =  await  tx . batchTaskRunItem . updateMany ( { 
936+         where : { 
937+           id : itemId , 
938+           status : "PENDING" , 
936939        } , 
937-       } , 
938-       select :  { 
939-         sealed :  true , 
940-         status :  true , 
941-          completedCount :  true , 
942-          expectedCount :  true , 
943-          dependentTaskAttemptId :  true , 
944-       } , 
945-     } ) ; 
940+          data :  { 
941+            status :  "COMPLETED" , 
942+           taskRunAttemptId , 
943+         } , 
944+       } ) ; 
945+ 
946+       if   ( updated . count   ===   0 )   { 
947+          return ; 
948+        } 
946949
947-     if  ( 
948-       updatedBatchRun . status  ===  "PENDING"  && 
949-       updatedBatchRun . completedCount  ===  updatedBatchRun . expectedCount  && 
950-       updatedBatchRun . sealed 
951-     )  { 
952-       await  tx . batchTaskRun . update ( { 
950+       const  updatedBatchRun  =  await  tx . batchTaskRun . update ( { 
953951        where : { 
954952          id : batchTaskRunId , 
955953        } , 
956954        data : { 
957-           status : "COMPLETED" , 
958-           completedAt : new  Date ( ) , 
955+           completedCount : { 
956+             increment : 1 , 
957+           } , 
958+         } , 
959+         select : { 
960+           sealed : true , 
961+           status : true , 
962+           completedCount : true , 
963+           expectedCount : true , 
964+           dependentTaskAttemptId : true , 
959965        } , 
960966      } ) ; 
961967
962-       // We only need to resume the batch if it has a dependent task attempt ID 
963-       if  ( scheduleResumeOnComplete  &&  updatedBatchRun . dependentTaskAttemptId )  { 
964-         await  ResumeBatchRunService . enqueue ( batchTaskRunId ,  true ,  tx ) ; 
968+       if  ( 
969+         updatedBatchRun . status  ===  "PENDING"  && 
970+         updatedBatchRun . completedCount  ===  updatedBatchRun . expectedCount  && 
971+         updatedBatchRun . sealed 
972+       )  { 
973+         await  tx . batchTaskRun . update ( { 
974+           where : { 
975+             id : batchTaskRunId , 
976+           } , 
977+           data : { 
978+             status : "COMPLETED" , 
979+             completedAt : new  Date ( ) , 
980+           } , 
981+         } ) ; 
982+ 
983+         // We only need to resume the batch if it has a dependent task attempt ID 
984+         if  ( scheduleResumeOnComplete  &&  updatedBatchRun . dependentTaskAttemptId )  { 
985+           await  ResumeBatchRunService . enqueue ( batchTaskRunId ,  true ,  tx ) ; 
986+         } 
965987      } 
988+     } , 
989+     { 
990+       timeout : 10000 , 
966991    } 
967-   } ) ; 
992+   ) ; 
968993} 
0 commit comments