@@ -3,6 +3,25 @@ import { $transaction, PrismaClientOrTransaction, PrismaErrorSchema, prisma } fr
3
3
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
4
4
import { workerQueue } from "~/services/worker.server" ;
5
5
import { logger } from "../logger.server" ;
6
+ import { EventRecord , ExternalAccount } from "@trigger.dev/database" ;
7
+
8
+ type UpdateEventInput = {
9
+ tx : PrismaClientOrTransaction ;
10
+ existingEventLog : EventRecord ;
11
+ reqEvent : RawEvent ;
12
+ deliverAt ?: Date ;
13
+ } ;
14
+
15
+ type CreateEventInput = {
16
+ tx : PrismaClientOrTransaction ;
17
+ event : RawEvent ;
18
+ environment : AuthenticatedEnvironment ;
19
+ deliverAt ?: Date ;
20
+ sourceContext ?: { id : string ; metadata ?: any } ;
21
+ externalAccount ?: ExternalAccount ;
22
+ } ;
23
+
24
+ const EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS = 5 * 1000 ; // 5 seconds
6
25
7
26
export class IngestSendEvent {
8
27
#prismaClient: PrismaClientOrTransaction ;
@@ -52,34 +71,25 @@ export class IngestSendEvent {
52
71
} )
53
72
: undefined ;
54
73
55
- // Create a new event in the database
56
- const eventLog = await tx . eventRecord . create ( {
57
- data : {
58
- organizationId : environment . organizationId ,
59
- projectId : environment . projectId ,
60
- environmentId : environment . id ,
61
- eventId : event . id ,
62
- name : event . name ,
63
- timestamp : event . timestamp ?? new Date ( ) ,
64
- payload : event . payload ?? { } ,
65
- context : event . context ?? { } ,
66
- source : event . source ?? "trigger.dev" ,
67
- sourceContext,
68
- deliverAt : deliverAt ,
69
- externalAccountId : externalAccount ? externalAccount . id : undefined ,
74
+ const existingEventLog = await tx . eventRecord . findUnique ( {
75
+ where : {
76
+ eventId_environmentId : {
77
+ eventId : event . id ,
78
+ environmentId : environment . id ,
79
+ } ,
70
80
} ,
71
81
} ) ;
72
82
73
- if ( this . deliverEvents ) {
74
- // Produce a message to the event bus
75
- await workerQueue . enqueue (
76
- "deliverEvent" ,
77
- {
78
- id : eventLog . id ,
79
- } ,
80
- { runAt : eventLog . deliverAt , tx , jobKey : `event: ${ eventLog . id } ` }
81
- ) ;
82
- }
83
+ const eventLog = await ( existingEventLog
84
+ ? this . updateEvent ( { tx , existingEventLog , reqEvent : event , deliverAt } )
85
+ : this . createEvent ( {
86
+ tx ,
87
+ event ,
88
+ environment ,
89
+ deliverAt ,
90
+ sourceContext ,
91
+ externalAccount ,
92
+ } ) ) ;
83
93
84
94
return eventLog ;
85
95
} ) ;
@@ -95,21 +105,81 @@ export class IngestSendEvent {
95
105
throw error ;
96
106
}
97
107
98
- // If the error is a Prisma unique constraint error, it means that the event already exists
99
- if ( prismaError . success && prismaError . data . code === "P2002" ) {
100
- logger . debug ( "Event already exists, finding and returning" , { event , environment } ) ;
108
+ throw error ;
109
+ }
110
+ }
101
111
102
- return this . #prismaClient. eventRecord . findUniqueOrThrow ( {
103
- where : {
104
- eventId_environmentId : {
105
- eventId : event . id ,
106
- environmentId : environment . id ,
107
- } ,
108
- } ,
109
- } ) ;
110
- }
112
+ private async createEvent ( {
113
+ tx,
114
+ event,
115
+ environment,
116
+ deliverAt,
117
+ sourceContext,
118
+ externalAccount,
119
+ } : CreateEventInput ) {
120
+ const eventLog = await tx . eventRecord . create ( {
121
+ data : {
122
+ organizationId : environment . organizationId ,
123
+ projectId : environment . projectId ,
124
+ environmentId : environment . id ,
125
+ eventId : event . id ,
126
+ name : event . name ,
127
+ timestamp : event . timestamp ?? new Date ( ) ,
128
+ payload : event . payload ?? { } ,
129
+ context : event . context ?? { } ,
130
+ source : event . source ?? "trigger.dev" ,
131
+ sourceContext,
132
+ deliverAt : deliverAt ,
133
+ externalAccountId : externalAccount ? externalAccount . id : undefined ,
134
+ } ,
135
+ } ) ;
111
136
112
- throw error ;
137
+ await this . enqueueWorkerEvent ( tx , eventLog ) ;
138
+
139
+ return eventLog ;
140
+ }
141
+
142
+ private async updateEvent ( { tx, existingEventLog, reqEvent, deliverAt } : UpdateEventInput ) {
143
+ if ( ! this . shouldUpdateEvent ( existingEventLog ) ) {
144
+ logger . debug ( `not updating event for event id: ${ existingEventLog . eventId } ` ) ;
145
+ return existingEventLog ;
146
+ }
147
+
148
+ const updatedEventLog = await tx . eventRecord . update ( {
149
+ where : {
150
+ eventId_environmentId : {
151
+ eventId : existingEventLog . eventId ,
152
+ environmentId : existingEventLog . environmentId ,
153
+ } ,
154
+ } ,
155
+ data : {
156
+ payload : reqEvent . payload ?? existingEventLog . payload ,
157
+ context : reqEvent . context ?? existingEventLog . context ,
158
+ deliverAt : deliverAt ?? new Date ( ) ,
159
+ } ,
160
+ } ) ;
161
+
162
+ await this . enqueueWorkerEvent ( tx , updatedEventLog ) ;
163
+
164
+ return updatedEventLog ;
165
+ }
166
+
167
+ private shouldUpdateEvent ( eventLog : EventRecord ) {
168
+ const thresholdTime = new Date ( Date . now ( ) + EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS ) ;
169
+
170
+ return eventLog . deliverAt >= thresholdTime ;
171
+ }
172
+
173
+ private async enqueueWorkerEvent ( tx : PrismaClientOrTransaction , eventLog : EventRecord ) {
174
+ if ( this . deliverEvents ) {
175
+ // Produce a message to the event bus
176
+ await workerQueue . enqueue (
177
+ "deliverEvent" ,
178
+ {
179
+ id : eventLog . id ,
180
+ } ,
181
+ { runAt : eventLog . deliverAt , tx, jobKey : `event:${ eventLog . id } ` }
182
+ ) ;
113
183
}
114
184
}
115
185
}
0 commit comments