1- import { prisma } from "~/db.server" ;
1+ import { $replica , prisma } from "~/db.server" ;
22import {
33 createExceptionPropertiesFromError ,
44 eventRepository ,
@@ -16,23 +16,30 @@ import { RunId } from "@trigger.dev/core/v3/apps";
1616import { updateMetadataService } from "~/services/metadata/updateMetadata.server" ;
1717import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server" ;
1818import { env } from "~/env.server" ;
19+ import { getTaskEventStoreTableForRun } from "./taskEventStore.server" ;
1920
2021export function registerRunEngineEventBusHandlers ( ) {
2122 engine . eventBus . on ( "runSucceeded" , async ( { time, run } ) => {
2223 try {
23- const completedEvent = await eventRepository . completeEvent ( run . spanId , {
24- endTime : time ,
25- attributes : {
26- isError : false ,
27- output :
28- run . outputType === "application/store" || run . outputType === "text/plain"
29- ? run . output
30- : run . output
31- ? ( safeJsonParse ( run . output ) as Attributes )
32- : undefined ,
33- outputType : run . outputType ,
34- } ,
35- } ) ;
24+ const completedEvent = await eventRepository . completeEvent (
25+ getTaskEventStoreTableForRun ( run ) ,
26+ run . spanId ,
27+ run . createdAt ,
28+ run . completedAt ?? undefined ,
29+ {
30+ endTime : time ,
31+ attributes : {
32+ isError : false ,
33+ output :
34+ run . outputType === "application/store" || run . outputType === "text/plain"
35+ ? run . output
36+ : run . output
37+ ? ( safeJsonParse ( run . output ) as Attributes )
38+ : undefined ,
39+ outputType : run . outputType ,
40+ } ,
41+ }
42+ ) ;
3643
3744 if ( ! completedEvent ) {
3845 logger . error ( "[runSucceeded] Failed to complete event for unknown reason" , {
@@ -69,21 +76,29 @@ export function registerRunEngineEventBusHandlers() {
6976 const sanitizedError = sanitizeError ( run . error ) ;
7077 const exception = createExceptionPropertiesFromError ( sanitizedError ) ;
7178
72- const completedEvent = await eventRepository . completeEvent ( run . spanId , {
73- endTime : time ,
74- attributes : {
75- isError : true ,
76- } ,
77- events : [
78- {
79- name : "exception" ,
80- time,
81- properties : {
82- exception,
83- } ,
79+ const eventStore = getTaskEventStoreTableForRun ( run ) ;
80+
81+ const completedEvent = await eventRepository . completeEvent (
82+ eventStore ,
83+ run . spanId ,
84+ run . createdAt ,
85+ run . completedAt ?? undefined ,
86+ {
87+ endTime : time ,
88+ attributes : {
89+ isError : true ,
8490 } ,
85- ] ,
86- } ) ;
91+ events : [
92+ {
93+ name : "exception" ,
94+ time,
95+ properties : {
96+ exception,
97+ } ,
98+ } ,
99+ ] ,
100+ }
101+ ) ;
87102
88103 if ( ! completedEvent ) {
89104 logger . error ( "[runFailed] Failed to complete event for unknown reason" , {
@@ -93,28 +108,39 @@ export function registerRunEngineEventBusHandlers() {
93108 return ;
94109 }
95110
96- const inProgressEvents = await eventRepository . queryIncompleteEvents ( {
97- runId : completedEvent ?. runId ,
98- } ) ;
111+ const inProgressEvents = await eventRepository . queryIncompleteEvents (
112+ eventStore ,
113+ {
114+ runId : completedEvent ?. runId ,
115+ } ,
116+ run . createdAt ,
117+ run . completedAt ?? undefined
118+ ) ;
99119
100120 await Promise . all (
101121 inProgressEvents . map ( ( event ) => {
102122 try {
103- const completedEvent = eventRepository . completeEvent ( event . spanId , {
104- endTime : time ,
105- attributes : {
106- isError : true ,
107- } ,
108- events : [
109- {
110- name : "exception" ,
111- time,
112- properties : {
113- exception,
114- } ,
123+ const completedEvent = eventRepository . completeEvent (
124+ eventStore ,
125+ run . spanId ,
126+ run . createdAt ,
127+ run . completedAt ?? undefined ,
128+ {
129+ endTime : time ,
130+ attributes : {
131+ isError : true ,
115132 } ,
116- ] ,
117- } ) ;
133+ events : [
134+ {
135+ name : "exception" ,
136+ time,
137+ properties : {
138+ exception,
139+ } ,
140+ } ,
141+ ] ,
142+ }
143+ ) ;
118144
119145 if ( ! completedEvent ) {
120146 logger . error ( "[runFailed] Failed to complete in-progress event for unknown reason" , {
@@ -147,13 +173,19 @@ export function registerRunEngineEventBusHandlers() {
147173 try {
148174 const sanitizedError = sanitizeError ( run . error ) ;
149175 const exception = createExceptionPropertiesFromError ( sanitizedError ) ;
176+ const eventStore = getTaskEventStoreTableForRun ( run ) ;
150177
151- const inProgressEvents = await eventRepository . queryIncompleteEvents ( {
152- runId : RunId . toFriendlyId ( run . id ) ,
153- spanId : {
154- not : run . spanId ,
178+ const inProgressEvents = await eventRepository . queryIncompleteEvents (
179+ eventStore ,
180+ {
181+ runId : RunId . toFriendlyId ( run . id ) ,
182+ spanId : {
183+ not : run . spanId ,
184+ } ,
155185 } ,
156- } ) ;
186+ run . createdAt ,
187+ run . completedAt ?? undefined
188+ ) ;
157189
158190 await Promise . all (
159191 inProgressEvents . map ( ( event ) => {
@@ -173,48 +205,80 @@ export function registerRunEngineEventBusHandlers() {
173205 }
174206 } ) ;
175207
176- engine . eventBus . on ( "cachedRunCompleted" , async ( { time, spanId , hasError } ) => {
208+ engine . eventBus . on ( "cachedRunCompleted" , async ( { time, span , blockedRunId , hasError } ) => {
177209 try {
178- const completedEvent = await eventRepository . completeEvent ( spanId , {
179- endTime : time ,
180- attributes : {
181- isError : hasError ,
210+ const blockedRun = await $replica . taskRun . findFirst ( {
211+ select : {
212+ taskEventStore : true ,
213+ } ,
214+ where : {
215+ id : blockedRunId ,
182216 } ,
183217 } ) ;
184218
219+ if ( ! blockedRun ) {
220+ logger . error ( "[cachedRunCompleted] Blocked run not found" , {
221+ blockedRunId,
222+ } ) ;
223+ return ;
224+ }
225+
226+ const eventStore = getTaskEventStoreTableForRun ( blockedRun ) ;
227+
228+ const completedEvent = await eventRepository . completeEvent (
229+ eventStore ,
230+ span . id ,
231+ span . createdAt ,
232+ time ,
233+ {
234+ endTime : time ,
235+ attributes : {
236+ isError : hasError ,
237+ } ,
238+ }
239+ ) ;
240+
185241 if ( ! completedEvent ) {
186242 logger . error ( "[cachedRunCompleted] Failed to complete event for unknown reason" , {
187- spanId ,
243+ span ,
188244 } ) ;
189245 return ;
190246 }
191247 } catch ( error ) {
192248 logger . error ( "[cachedRunCompleted] Failed to complete event for unknown reason" , {
193249 error : error instanceof Error ? error . message : error ,
194- spanId ,
250+ span ,
195251 } ) ;
196252 }
197253 } ) ;
198254
199255 engine . eventBus . on ( "runExpired" , async ( { time, run } ) => {
200256 try {
201- const completedEvent = await eventRepository . completeEvent ( run . spanId , {
202- endTime : time ,
203- attributes : {
204- isError : true ,
205- } ,
206- events : [
207- {
208- name : "exception" ,
209- time,
210- properties : {
211- exception : {
212- message : `Run expired because the TTL (${ run . ttl } ) was reached` ,
257+ const eventStore = getTaskEventStoreTableForRun ( run ) ;
258+
259+ const completedEvent = await eventRepository . completeEvent (
260+ eventStore ,
261+ run . spanId ,
262+ run . createdAt ,
263+ run . completedAt ?? undefined ,
264+ {
265+ endTime : time ,
266+ attributes : {
267+ isError : true ,
268+ } ,
269+ events : [
270+ {
271+ name : "exception" ,
272+ time,
273+ properties : {
274+ exception : {
275+ message : `Run expired because the TTL (${ run . ttl } ) was reached` ,
276+ } ,
213277 } ,
214278 } ,
215- } ,
216- ] ,
217- } ) ;
279+ ] ,
280+ }
281+ ) ;
218282
219283 if ( ! completedEvent ) {
220284 logger . error ( "[runFailed] Failed to complete event for unknown reason" , {
@@ -234,9 +298,16 @@ export function registerRunEngineEventBusHandlers() {
234298
235299 engine . eventBus . on ( "runCancelled" , async ( { time, run } ) => {
236300 try {
237- const inProgressEvents = await eventRepository . queryIncompleteEvents ( {
238- runId : run . friendlyId ,
239- } ) ;
301+ const eventStore = getTaskEventStoreTableForRun ( run ) ;
302+
303+ const inProgressEvents = await eventRepository . queryIncompleteEvents (
304+ eventStore ,
305+ {
306+ runId : run . friendlyId ,
307+ } ,
308+ run . createdAt ,
309+ run . completedAt ?? undefined
310+ ) ;
240311
241312 await Promise . all (
242313 inProgressEvents . map ( ( event ) => {
0 commit comments