@@ -3,6 +3,7 @@ import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
3
3
import { logger } from "~/services/logger.server" ;
4
4
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server" ;
5
5
import type { RunEngine } from "~/v3/runEngine.server" ;
6
+ import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus" ;
6
7
import type { TraceEventConcern , TriggerTaskRequest } from "../types" ;
7
8
8
9
export type IdempotencyKeyConcernResult =
@@ -41,6 +42,7 @@ export class IdempotencyKeyConcern {
41
42
: undefined ;
42
43
43
44
if ( existingRun ) {
45
+ // The idempotency key has expired
44
46
if ( existingRun . idempotencyKeyExpiresAt && existingRun . idempotencyKeyExpiresAt < new Date ( ) ) {
45
47
logger . debug ( "[TriggerTaskService][call] Idempotency key has expired" , {
46
48
idempotencyKey : request . options ?. idempotencyKey ,
@@ -52,42 +54,62 @@ export class IdempotencyKeyConcern {
52
54
where : { id : existingRun . id , idempotencyKey } ,
53
55
data : { idempotencyKey : null , idempotencyKeyExpiresAt : null } ,
54
56
} ) ;
55
- } else {
56
- const associatedWaitpoint = existingRun . associatedWaitpoint ;
57
- const parentRunId = request . body . options ?. parentRunId ;
58
- const resumeParentOnCompletion = request . body . options ?. resumeParentOnCompletion ;
59
- //We're using `andWait` so we need to block the parent run with a waitpoint
60
- if ( associatedWaitpoint && resumeParentOnCompletion && parentRunId ) {
61
- await this . traceEventConcern . traceIdempotentRun (
62
- request ,
63
- {
64
- existingRun,
65
- idempotencyKey,
66
- incomplete : associatedWaitpoint . status === "PENDING" ,
67
- isError : associatedWaitpoint . outputIsError ,
68
- } ,
69
- async ( event ) => {
70
- //block run with waitpoint
71
- await this . engine . blockRunWithWaitpoint ( {
72
- runId : RunId . fromFriendlyId ( parentRunId ) ,
73
- waitpoints : associatedWaitpoint . id ,
74
- spanIdToComplete : event . spanId ,
75
- batch : request . options ?. batchId
76
- ? {
77
- id : request . options . batchId ,
78
- index : request . options . batchIndex ?? 0 ,
79
- }
80
- : undefined ,
81
- projectId : request . environment . projectId ,
82
- organizationId : request . environment . organizationId ,
83
- tx : this . prisma ,
84
- } ) ;
85
- }
86
- ) ;
87
- }
88
57
89
- return { isCached : true , run : existingRun } ;
58
+ return { isCached : false , idempotencyKey , idempotencyKeyExpiresAt } ;
90
59
}
60
+
61
+ // If the existing run failed or was expired, we clear the key and do a new run
62
+ if ( shouldIdempotencyKeyBeCleared ( existingRun . status ) ) {
63
+ logger . debug ( "[TriggerTaskService][call] Idempotency key should be cleared" , {
64
+ idempotencyKey : request . options ?. idempotencyKey ,
65
+ runStatus : existingRun . status ,
66
+ runId : existingRun . id ,
67
+ } ) ;
68
+
69
+ // Update the existing run to remove the idempotency key
70
+ await this . prisma . taskRun . updateMany ( {
71
+ where : { id : existingRun . id , idempotencyKey } ,
72
+ data : { idempotencyKey : null , idempotencyKeyExpiresAt : null } ,
73
+ } ) ;
74
+
75
+ return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
76
+ }
77
+
78
+ // We have an idempotent run, so we return it
79
+ const associatedWaitpoint = existingRun . associatedWaitpoint ;
80
+ const parentRunId = request . body . options ?. parentRunId ;
81
+ const resumeParentOnCompletion = request . body . options ?. resumeParentOnCompletion ;
82
+ //We're using `andWait` so we need to block the parent run with a waitpoint
83
+ if ( associatedWaitpoint && resumeParentOnCompletion && parentRunId ) {
84
+ await this . traceEventConcern . traceIdempotentRun (
85
+ request ,
86
+ {
87
+ existingRun,
88
+ idempotencyKey,
89
+ incomplete : associatedWaitpoint . status === "PENDING" ,
90
+ isError : associatedWaitpoint . outputIsError ,
91
+ } ,
92
+ async ( event ) => {
93
+ //block run with waitpoint
94
+ await this . engine . blockRunWithWaitpoint ( {
95
+ runId : RunId . fromFriendlyId ( parentRunId ) ,
96
+ waitpoints : associatedWaitpoint . id ,
97
+ spanIdToComplete : event . spanId ,
98
+ batch : request . options ?. batchId
99
+ ? {
100
+ id : request . options . batchId ,
101
+ index : request . options . batchIndex ?? 0 ,
102
+ }
103
+ : undefined ,
104
+ projectId : request . environment . projectId ,
105
+ organizationId : request . environment . organizationId ,
106
+ tx : this . prisma ,
107
+ } ) ;
108
+ }
109
+ ) ;
110
+ }
111
+
112
+ return { isCached : true , run : existingRun } ;
91
113
}
92
114
93
115
return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
0 commit comments