@@ -5,7 +5,13 @@ import {
55 packetRequiresOffloading ,
66 parsePacket ,
77} from "@trigger.dev/core/v3" ;
8- import { BatchTaskRun , Prisma , TaskRunAttempt } from "@trigger.dev/database" ;
8+ import {
9+ BatchTaskRun ,
10+ isUniqueConstraintError ,
11+ Prisma ,
12+ TaskRunAttempt ,
13+ } from "@trigger.dev/database" ;
14+ import { z } from "zod" ;
915import { $transaction , prisma , PrismaClientOrTransaction } from "~/db.server" ;
1016import { env } from "~/env.server" ;
1117import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server" ;
@@ -20,9 +26,8 @@ import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.
2026import { isFinalAttemptStatus , isFinalRunStatus } from "../taskStatus" ;
2127import { startActiveSpan } from "../tracer.server" ;
2228import { BaseService , ServiceValidationError } from "./baseService.server" ;
23- import { OutOfEntitlementError , TriggerTaskService } from "./triggerTask.server" ;
24- import { z } from "zod" ;
2529import { ResumeBatchRunService } from "./resumeBatchRun.server" ;
30+ import { OutOfEntitlementError , TriggerTaskService } from "./triggerTask.server" ;
2631
2732const PROCESSING_BATCH_SIZE = 50 ;
2833const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20 ;
@@ -819,20 +824,39 @@ export class BatchTriggerV3Service extends BaseService {
819824 }
820825
821826 if ( ! result . isCached ) {
822- await $transaction ( this . _prisma , async ( tx ) => {
823- await tx . batchTaskRunItem . create ( {
824- data : {
825- batchTaskRunId : batch . id ,
826- taskRunId : result . run . id ,
827- status : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
828- } ,
829- } ) ;
827+ try {
828+ await $transaction ( this . _prisma , async ( tx ) => {
829+ // [batchTaskRunId, taskRunId] is a unique index
830+ await tx . batchTaskRunItem . create ( {
831+ data : {
832+ batchTaskRunId : batch . id ,
833+ taskRunId : result . run . id ,
834+ status : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
835+ } ,
836+ } ) ;
830837
831- await tx . batchTaskRun . update ( {
832- where : { id : batch . id } ,
833- data : { expectedCount : { increment : 1 } } ,
838+ await tx . batchTaskRun . update ( {
839+ where : { id : batch . id } ,
840+ data : { expectedCount : { increment : 1 } } ,
841+ } ) ;
834842 } ) ;
835- } ) ;
843+ } catch ( error ) {
844+ if ( isUniqueConstraintError ( error , [ "batchTaskRunId" , "taskRunId" ] ) ) {
845+ // This means there is already a batchTaskRunItem for this batch and taskRun
846+ logger . debug (
847+ "[BatchTriggerV2][processBatchTaskRunItem] BatchTaskRunItem already exists" ,
848+ {
849+ batchId : batch . friendlyId ,
850+ runId : task . runId ,
851+ currentIndex,
852+ }
853+ ) ;
854+
855+ return ;
856+ }
857+
858+ throw error ;
859+ }
836860 }
837861 }
838862
0 commit comments