@@ -326,6 +326,43 @@ export class EventRepository {
326326 } ) ;
327327 }
328328
329+ async cancelEvents ( events : TaskEventRecord [ ] , cancelledAt : Date , reason : string ) {
330+ const eventsToCancel = events . filter ( ( event ) => event . isPartial ) ;
331+
332+ if ( eventsToCancel . length === 0 ) {
333+ return ;
334+ }
335+
336+ await this . insertMany (
337+ eventsToCancel . map ( ( event ) => ( {
338+ ...omit ( event , "id" ) ,
339+ isPartial : false ,
340+ isError : false ,
341+ isCancelled : true ,
342+ status : "ERROR" ,
343+ links : event . links ?? [ ] ,
344+ events : [
345+ {
346+ name : "cancellation" ,
347+ time : cancelledAt ,
348+ properties : {
349+ reason,
350+ } ,
351+ } ,
352+ ...( ( event . events as any [ ] ) ?? [ ] ) ,
353+ ] ,
354+ duration : calculateDurationFromStart ( event . startTime , cancelledAt ) ,
355+ properties : event . properties as Attributes ,
356+ metadata : event . metadata as Attributes ,
357+ style : event . style as Attributes ,
358+ output : event . output as Attributes ,
359+ outputType : event . outputType ,
360+ payload : event . payload as Attributes ,
361+ payloadType : event . payloadType ,
362+ } ) )
363+ ) ;
364+ }
365+
329366 async crashEvent ( {
330367 event,
331368 crashedAt,
@@ -394,28 +431,35 @@ export class EventRepository {
394431 queryOptions ,
395432 startCreatedAt ,
396433 endCreatedAt ,
397- { spanId : true , isPartial : true , isCancelled : true }
434+ { spanId : true , isPartial : true , isCancelled : true } ,
435+ undefined ,
436+ { limit : 500 }
398437 ) ;
399438
400- const filteredTaskEvents = taskEvents . filter ( ( event ) => {
401- // Event must be partial
402- if ( ! event . isPartial ) return false ;
403-
404- // If the event is cancelled, it is not incomplete
405- if ( event . isCancelled ) return false ;
406-
407- if ( allowCompleteDuplicate ) {
408- return true ;
439+ // Optimize the filtering by pre-processing the data
440+ const completeEventSpanIds = new Set < string > ( ) ;
441+ const incompleteEvents : Array < { spanId : string } > = [ ] ;
442+
443+ // Single pass to categorize events and build lookup structures
444+ for ( const event of taskEvents ) {
445+ if ( ! event . isPartial && ! event . isCancelled ) {
446+ // This is a complete event
447+ completeEventSpanIds . add ( event . spanId ) ;
448+ } else if ( event . isPartial && ! event . isCancelled ) {
449+ // This is a potentially incomplete event
450+ incompleteEvents . push ( event ) ;
409451 }
452+ // Skip cancelled events as they are not incomplete
453+ }
410454
411- // There must not be another complete event with the same spanId
412- const hasCompleteDuplicate = taskEvents . some (
413- ( otherEvent ) =>
414- otherEvent . spanId === event . spanId && ! otherEvent . isPartial && ! otherEvent . isCancelled
415- ) ;
455+ // Filter incomplete events, excluding those with complete duplicates
456+ const filteredTaskEvents = allowCompleteDuplicate
457+ ? incompleteEvents
458+ : incompleteEvents . filter ( ( event ) => ! completeEventSpanIds . has ( event . spanId ) ) ;
416459
417- return ! hasCompleteDuplicate ;
418- } ) ;
460+ if ( filteredTaskEvents . length === 0 ) {
461+ return [ ] ;
462+ }
419463
420464 return this . #queryEvents(
421465 storeTable ,
0 commit comments