@@ -760,10 +760,21 @@ export class BatchTriggerV3Service extends BaseService {
760760 } ) ) ;
761761
762762 let workingIndex = currentIndex ;
763+ let expectedCount = 0 ;
763764
764765 for ( const item of itemsToProcess ) {
765766 try {
766- await this . #processBatchTaskRunItem( batch , environment , item , workingIndex , options ) ;
767+ const created = await this . #processBatchTaskRunItem(
768+ batch ,
769+ environment ,
770+ item ,
771+ workingIndex ,
772+ options
773+ ) ;
774+
775+ if ( created ) {
776+ expectedCount ++ ;
777+ }
767778
768779 workingIndex ++ ;
769780 } catch ( error ) {
@@ -780,6 +791,17 @@ export class BatchTriggerV3Service extends BaseService {
780791 }
781792 }
782793
794+ if ( expectedCount > 0 ) {
795+ await this . _prisma . batchTaskRun . update ( {
796+ where : { id : batch . id } ,
797+ data : {
798+ expectedCount : {
799+ increment : expectedCount ,
800+ } ,
801+ } ,
802+ } ) ;
803+ }
804+
783805 return { workingIndex } ;
784806 }
785807
@@ -825,21 +847,15 @@ export class BatchTriggerV3Service extends BaseService {
825847
826848 if ( ! result . isCached ) {
827849 try {
828- await $transaction ( this . _prisma , async ( tx ) => {
829- // [batchTaskRunId, taskRunId] is a unique index
830- await tx . batchTaskRunItem . create ( {
831- data : {
832- batchTaskRunId : batch . id ,
833- taskRunId : result . run . id ,
834- status : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
835- } ,
836- } ) ;
837-
838- await tx . batchTaskRun . update ( {
839- where : { id : batch . id } ,
840- data : { expectedCount : { increment : 1 } } ,
841- } ) ;
850+ await this . _prisma . batchTaskRunItem . create ( {
851+ data : {
852+ batchTaskRunId : batch . id ,
853+ taskRunId : result . run . id ,
854+ status : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
855+ } ,
842856 } ) ;
857+
858+ return true ;
843859 } catch ( error ) {
844860 if ( isUniqueConstraintError ( error , [ "batchTaskRunId" , "taskRunId" ] ) ) {
845861 // This means there is already a batchTaskRunItem for this batch and taskRun
@@ -852,12 +868,14 @@ export class BatchTriggerV3Service extends BaseService {
852868 }
853869 ) ;
854870
855- return ;
871+ return false ;
856872 }
857873
858874 throw error ;
859875 }
860876 }
877+
878+ return false ;
861879 }
862880
863881 async #enqueueBatchTaskRun( options : BatchProcessingOptions , tx ?: PrismaClientOrTransaction ) {
@@ -907,62 +925,69 @@ export async function completeBatchTaskRunItemV3(
907925 scheduleResumeOnComplete = false ,
908926 taskRunAttemptId ?: string
909927) {
910- await $transaction ( tx , "completeBatchTaskRunItemV3" , async ( tx , span ) => {
911- span ?. setAttribute ( "batch_id" , batchTaskRunId ) ;
912-
913- // Update the item to complete
914- const updated = await tx . batchTaskRunItem . updateMany ( {
915- where : {
916- id : itemId ,
917- status : "PENDING" ,
918- } ,
919- data : {
920- status : "COMPLETED" ,
921- taskRunAttemptId,
922- } ,
923- } ) ;
924-
925- if ( updated . count === 0 ) {
926- return ;
927- }
928-
929- const updatedBatchRun = await tx . batchTaskRun . update ( {
930- where : {
931- id : batchTaskRunId ,
932- } ,
933- data : {
934- completedCount : {
935- increment : 1 ,
928+ await $transaction (
929+ tx ,
930+ "completeBatchTaskRunItemV3" ,
931+ async ( tx , span ) => {
932+ span ?. setAttribute ( "batch_id" , batchTaskRunId ) ;
933+
934+ // Update the item to complete
935+ const updated = await tx . batchTaskRunItem . updateMany ( {
936+ where : {
937+ id : itemId ,
938+ status : "PENDING" ,
936939 } ,
937- } ,
938- select : {
939- sealed : true ,
940- status : true ,
941- completedCount : true ,
942- expectedCount : true ,
943- dependentTaskAttemptId : true ,
944- } ,
945- } ) ;
940+ data : {
941+ status : "COMPLETED" ,
942+ taskRunAttemptId ,
943+ } ,
944+ } ) ;
945+
946+ if ( updated . count === 0 ) {
947+ return ;
948+ }
946949
947- if (
948- updatedBatchRun . status === "PENDING" &&
949- updatedBatchRun . completedCount === updatedBatchRun . expectedCount &&
950- updatedBatchRun . sealed
951- ) {
952- await tx . batchTaskRun . update ( {
950+ const updatedBatchRun = await tx . batchTaskRun . update ( {
953951 where : {
954952 id : batchTaskRunId ,
955953 } ,
956954 data : {
957- status : "COMPLETED" ,
958- completedAt : new Date ( ) ,
955+ completedCount : {
956+ increment : 1 ,
957+ } ,
958+ } ,
959+ select : {
960+ sealed : true ,
961+ status : true ,
962+ completedCount : true ,
963+ expectedCount : true ,
964+ dependentTaskAttemptId : true ,
959965 } ,
960966 } ) ;
961967
962- // We only need to resume the batch if it has a dependent task attempt ID
963- if ( scheduleResumeOnComplete && updatedBatchRun . dependentTaskAttemptId ) {
964- await ResumeBatchRunService . enqueue ( batchTaskRunId , true , tx ) ;
968+ if (
969+ updatedBatchRun . status === "PENDING" &&
970+ updatedBatchRun . completedCount === updatedBatchRun . expectedCount &&
971+ updatedBatchRun . sealed
972+ ) {
973+ await tx . batchTaskRun . update ( {
974+ where : {
975+ id : batchTaskRunId ,
976+ } ,
977+ data : {
978+ status : "COMPLETED" ,
979+ completedAt : new Date ( ) ,
980+ } ,
981+ } ) ;
982+
983+ // We only need to resume the batch if it has a dependent task attempt ID
984+ if ( scheduleResumeOnComplete && updatedBatchRun . dependentTaskAttemptId ) {
985+ await ResumeBatchRunService . enqueue ( batchTaskRunId , true , tx ) ;
986+ }
965987 }
988+ } ,
989+ {
990+ timeout : 10000 ,
966991 }
967- } ) ;
992+ ) ;
968993}
0 commit comments