@@ -16,12 +16,11 @@ import {
1616import { roomFromFriendlyRunId , socketIo } from "./handleSocketIo.server" ;
1717import { engine } from "./runEngine.server" ;
1818import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server" ;
19- import { getTaskEventStoreTableForRun } from "./taskEventStore.server" ;
2019
2120export function registerRunEngineEventBusHandlers ( ) {
2221 engine . eventBus . on ( "runSucceeded" , async ( { time, run } ) => {
2322 const [ taskRunError , taskRun ] = await tryCatch (
24- $replica . taskRun . findFirst ( {
23+ $replica . taskRun . findFirstOrThrow ( {
2524 where : {
2625 id : run . id ,
2726 } ,
@@ -51,13 +50,6 @@ export function registerRunEngineEventBusHandlers() {
5150 return ;
5251 }
5352
54- if ( ! taskRun ) {
55- logger . error ( "[runSucceeded] Task run not found" , {
56- runId : run . id ,
57- } ) ;
58- return ;
59- }
60-
6153 const [ completeSuccessfulRunEventError ] = await tryCatch (
6254 eventRepository . completeSuccessfulRunEvent ( {
6355 run : taskRun ,
@@ -92,7 +84,7 @@ export function registerRunEngineEventBusHandlers() {
9284 const exception = createExceptionPropertiesFromError ( sanitizedError ) ;
9385
9486 const [ taskRunError , taskRun ] = await tryCatch (
95- $replica . taskRun . findFirst ( {
87+ $replica . taskRun . findFirstOrThrow ( {
9688 where : {
9789 id : run . id ,
9890 } ,
@@ -122,13 +114,6 @@ export function registerRunEngineEventBusHandlers() {
122114 return ;
123115 }
124116
125- if ( ! taskRun ) {
126- logger . error ( "[runFailed] Task run not found" , {
127- runId : run . id ,
128- } ) ;
129- return ;
130- }
131-
132117 const [ completeFailedRunEventError ] = await tryCatch (
133118 eventRepository . completeFailedRunEvent ( {
134119 run : taskRun ,
@@ -150,7 +135,7 @@ export function registerRunEngineEventBusHandlers() {
150135 const exception = createExceptionPropertiesFromError ( sanitizedError ) ;
151136
152137 const [ taskRunError , taskRun ] = await tryCatch (
153- $replica . taskRun . findFirst ( {
138+ $replica . taskRun . findFirstOrThrow ( {
154139 where : {
155140 id : run . id ,
156141 } ,
@@ -180,13 +165,6 @@ export function registerRunEngineEventBusHandlers() {
180165 return ;
181166 }
182167
183- if ( ! taskRun ) {
184- logger . error ( "[runAttemptFailed] Task run not found" , {
185- runId : run . id ,
186- } ) ;
187- return ;
188- }
189-
190168 const [ createAttemptFailedRunEventError ] = await tryCatch (
191169 eventRepository . createAttemptFailedRunEvent ( {
192170 run : taskRun ,
@@ -218,7 +196,7 @@ export function registerRunEngineEventBusHandlers() {
218196 }
219197
220198 const [ cachedRunError , cachedRun ] = await tryCatch (
221- $replica . taskRun . findFirst ( {
199+ $replica . taskRun . findFirstOrThrow ( {
222200 where : {
223201 id : cachedRunId ,
224202 } ,
@@ -248,13 +226,6 @@ export function registerRunEngineEventBusHandlers() {
248226 return ;
249227 }
250228
251- if ( ! cachedRun ) {
252- logger . error ( "[cachedRunCompleted] Cached run not found" , {
253- cachedRunId,
254- } ) ;
255- return ;
256- }
257-
258229 const [ blockedRunError , blockedRun ] = await tryCatch (
259230 $replica . taskRun . findFirst ( {
260231 where : {
@@ -314,52 +285,60 @@ export function registerRunEngineEventBusHandlers() {
314285 ) ;
315286
316287 engine . eventBus . on ( "runExpired" , async ( { time, run } ) => {
317- try {
318- const eventStore = getTaskEventStoreTableForRun ( run ) ;
288+ if ( ! run . ttl ) {
289+ return ;
290+ }
319291
320- const completedEvent = await eventRepository . completeEvent (
321- eventStore ,
322- run . spanId ,
323- run . createdAt ,
324- run . completedAt ?? undefined ,
325- {
326- endTime : time ,
327- attributes : {
328- isError : true ,
329- } ,
330- events : [
331- {
332- name : "exception" ,
333- time,
334- properties : {
335- exception : {
336- message : `Run expired because the TTL (${ run . ttl } ) was reached` ,
337- } ,
338- } ,
339- } ,
340- ] ,
341- }
342- ) ;
292+ const [ taskRunError , taskRun ] = await tryCatch (
293+ $replica . taskRun . findFirstOrThrow ( {
294+ where : {
295+ id : run . id ,
296+ } ,
297+ select : {
298+ id : true ,
299+ friendlyId : true ,
300+ traceId : true ,
301+ spanId : true ,
302+ parentSpanId : true ,
303+ createdAt : true ,
304+ completedAt : true ,
305+ taskIdentifier : true ,
306+ projectId : true ,
307+ runtimeEnvironmentId : true ,
308+ environmentType : true ,
309+ isTest : true ,
310+ organizationId : true ,
311+ } ,
312+ } )
313+ ) ;
343314
344- if ( ! completedEvent ) {
345- logger . error ( "[runFailed] Failed to complete event for unknown reason" , {
346- runId : run . id ,
347- spanId : run . spanId ,
348- } ) ;
349- return ;
350- }
351- } catch ( error ) {
352- logger . error ( "[runExpired] Failed to complete event" , {
353- error : error instanceof Error ? error . message : error ,
315+ if ( taskRunError ) {
316+ logger . error ( "[runExpired] Failed to find task run" , {
317+ error : taskRunError ,
318+ runId : run . id ,
319+ } ) ;
320+ return ;
321+ }
322+
323+ const [ completeExpiredRunEventError ] = await tryCatch (
324+ eventRepository . completeExpiredRunEvent ( {
325+ run : taskRun ,
326+ endTime : time ,
327+ ttl : run . ttl ,
328+ } )
329+ ) ;
330+
331+ if ( completeExpiredRunEventError ) {
332+ logger . error ( "[runExpired] Failed to complete expired run event" , {
333+ error : completeExpiredRunEventError ,
354334 runId : run . id ,
355- spanId : run . spanId ,
356335 } ) ;
357336 }
358337 } ) ;
359338
360339 engine . eventBus . on ( "runCancelled" , async ( { time, run } ) => {
361- try {
362- const taskRun = await $replica . taskRun . findFirst ( {
340+ const [ taskRunError , taskRun ] = await tryCatch (
341+ $replica . taskRun . findFirstOrThrow ( {
363342 where : {
364343 id : run . id ,
365344 } ,
@@ -378,27 +357,31 @@ export function registerRunEngineEventBusHandlers() {
378357 isTest : true ,
379358 organizationId : true ,
380359 } ,
381- } ) ;
360+ } )
361+ ) ;
382362
383- if ( ! taskRun ) {
384- logger . error ( "[runCancelled] Task run not found" , {
385- runId : run . id ,
386- } ) ;
387- return ;
388- }
363+ if ( taskRunError ) {
364+ logger . error ( "[runCancelled] Task run not found" , {
365+ error : taskRunError ,
366+ runId : run . id ,
367+ } ) ;
368+ return ;
369+ }
389370
390- const error = createJsonErrorObject ( run . error ) ;
371+ const error = createJsonErrorObject ( run . error ) ;
391372
392- await eventRepository . cancelRunEvent ( {
373+ const [ cancelRunEventError ] = await tryCatch (
374+ eventRepository . cancelRunEvent ( {
393375 reason : error . message ,
394376 run : taskRun ,
395377 cancelledAt : time ,
396- } ) ;
397- } catch ( error ) {
398- logger . error ( "[runCancelled] Failed to cancel event" , {
399- error : error instanceof Error ? error . message : error ,
378+ } )
379+ ) ;
380+
381+ if ( cancelRunEventError ) {
382+ logger . error ( "[runCancelled] Failed to cancel run event" , {
383+ error : cancelRunEventError ,
400384 runId : run . id ,
401- spanId : run . spanId ,
402385 } ) ;
403386 }
404387 } ) ;
0 commit comments