@@ -329,24 +329,40 @@ export class BatchTriggerV3Service extends BaseService {
329329 } ) ) ;
330330 }
331331
332- const idempotencyKeys = body . items . map ( ( i ) => i . options ?. idempotencyKey ) . filter ( Boolean ) ;
332+ // Group items by taskIdentifier
333+ const itemsByTask = body . items . reduce ( ( acc , item ) => {
334+ if ( ! item . options ?. idempotencyKey ) return acc ;
333335
334- const cachedRuns =
335- idempotencyKeys . length > 0
336- ? await this . _prisma . taskRun . findMany ( {
337- where : {
338- runtimeEnvironmentId : environment . id ,
339- idempotencyKey : {
340- in : body . items . map ( ( i ) => i . options ?. idempotencyKey ) . filter ( Boolean ) ,
341- } ,
342- } ,
343- select : {
344- friendlyId : true ,
345- idempotencyKey : true ,
346- idempotencyKeyExpiresAt : true ,
336+ if ( ! acc [ item . task ] ) {
337+ acc [ item . task ] = [ ] ;
338+ }
339+ acc [ item . task ] . push ( item ) ;
340+ return acc ;
341+ } , { } as Record < string , typeof body . items > ) ;
342+
343+ logger . debug ( "[BatchTriggerV2][call] Grouped items by task identifier" , {
344+ itemsByTask,
345+ } ) ;
346+
347+ // Fetch cached runs for each task identifier separately to make use of the index
348+ const cachedRuns = await Promise . all (
349+ Object . entries ( itemsByTask ) . map ( ( [ taskIdentifier , items ] ) =>
350+ this . _prisma . taskRun . findMany ( {
351+ where : {
352+ runtimeEnvironmentId : environment . id ,
353+ taskIdentifier,
354+ idempotencyKey : {
355+ in : items . map ( ( i ) => i . options ?. idempotencyKey ) . filter ( Boolean ) ,
347356 } ,
348- } )
349- : [ ] ;
357+ } ,
358+ select : {
359+ friendlyId : true ,
360+ idempotencyKey : true ,
361+ idempotencyKeyExpiresAt : true ,
362+ } ,
363+ } )
364+ )
365+ ) . then ( ( results ) => results . flat ( ) ) ;
350366
351367 // Now we need to create an array of all the run IDs, in order
352368 // If we have a cached run, that isn't expired, we should use that run ID
0 commit comments