1- import { Attributes , Link , TraceFlags } from "@opentelemetry/api" ;
1+ import { Attributes , Link , trace , TraceFlags , Tracer } from "@opentelemetry/api" ;
22import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base" ;
33import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions" ;
44import {
@@ -32,6 +32,8 @@ import { singleton } from "~/utils/singleton";
3232import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server" ;
3333import { startActiveSpan } from "./tracer.server" ;
3434import { createRedisClient , RedisClient , RedisWithClusterOptions } from "~/redis.server" ;
35+ import { startSpan } from "./tracing.server" ;
36+ import { nanoid } from "nanoid" ;
3537
3638const MAX_FLUSH_DEPTH = 5 ;
3739
@@ -99,6 +101,7 @@ export type EventRepoConfig = {
99101 batchInterval : number ;
100102 redis : RedisWithClusterOptions ;
101103 retentionInDays : number ;
104+ tracer ?: Tracer ;
102105} ;
103106
104107export type QueryOptions = Prisma . TaskEventWhereInput ;
@@ -202,6 +205,8 @@ export class EventRepository {
202205 private _randomIdGenerator = new RandomIdGenerator ( ) ;
203206 private _redisPublishClient : RedisClient ;
204207 private _subscriberCount = 0 ;
208+ private _tracer : Tracer ;
209+ private _lastFlushedAt : Date | undefined ;
205210
206211 get subscriberCount ( ) {
207212 return this . _subscriberCount ;
@@ -219,22 +224,23 @@ export class EventRepository {
219224 } ) ;
220225
221226 this . _redisPublishClient = createRedisClient ( "trigger:eventRepoPublisher" , this . _config . redis ) ;
227+ this . _tracer = _config . tracer ?? trace . getTracer ( "eventRepo" , "0.0.1" ) ;
222228 }
223229
224230 async insert ( event : CreatableEvent ) {
225231 this . _flushScheduler . addToBatch ( [ event ] ) ;
226232 }
227233
228234 async insertImmediate ( event : CreatableEvent ) {
229- await this . #flushBatch( [ event ] ) ;
235+ await this . #flushBatch( nanoid ( ) , [ event ] ) ;
230236 }
231237
232238 async insertMany ( events : CreatableEvent [ ] ) {
233239 this . _flushScheduler . addToBatch ( events ) ;
234240 }
235241
236242 async insertManyImmediate ( events : CreatableEvent [ ] ) {
237- return await this . #flushBatch( events ) ;
243+ return await this . #flushBatch( nanoid ( ) , events ) ;
238244 }
239245
240246 async completeEvent ( spanId : string , options ?: UpdateEventOptions ) {
@@ -1019,42 +1025,56 @@ export class EventRepository {
10191025 } ;
10201026 }
10211027
1022- async #flushBatch( batch : CreatableEvent [ ] ) {
1023- const events = excludePartialEventsWithCorrespondingFullEvent ( batch ) ;
1028+ async #flushBatch( flushId : string , batch : CreatableEvent [ ] ) {
1029+ return await startSpan ( this . _tracer , "flushBatch" , async ( span ) => {
1030+ const events = excludePartialEventsWithCorrespondingFullEvent ( batch ) ;
10241031
1025- const flushedEvents = await this . #doFlushBatch( events ) ;
1032+ span . setAttribute ( "flush_id" , flushId ) ;
1033+ span . setAttribute ( "event_count" , events . length ) ;
1034+ span . setAttribute ( "partial_event_count" , batch . length - events . length ) ;
1035+ span . setAttribute (
1036+ "last_flush_in_ms" ,
1037+ this . _lastFlushedAt ? new Date ( ) . getTime ( ) - this . _lastFlushedAt . getTime ( ) : 0
1038+ ) ;
10261039
1027- if ( flushedEvents . length !== events . length ) {
1028- logger . debug ( "[EventRepository][flushBatch] Failed to insert all events" , {
1029- attemptCount : events . length ,
1030- successCount : flushedEvents . length ,
1031- } ) ;
1032- }
1040+ const flushedEvents = await this . #doFlushBatch( flushId , events ) ;
1041+
1042+ this . _lastFlushedAt = new Date ( ) ;
1043+
1044+ if ( flushedEvents . length !== events . length ) {
1045+ logger . debug ( "[EventRepository][flushBatch] Failed to insert all events" , {
1046+ attemptCount : events . length ,
1047+ successCount : flushedEvents . length ,
1048+ } ) ;
1049+
1050+ span . setAttribute ( "failed_event_count" , events . length - flushedEvents . length ) ;
1051+ }
10331052
1034- this . #publishToRedis( flushedEvents ) ;
1053+ this . #publishToRedis( flushedEvents ) ;
1054+ } ) ;
10351055 }
10361056
1037- async #doFlushBatch( events : CreatableEvent [ ] , depth : number = 1 ) : Promise < CreatableEvent [ ] > {
1038- try {
1039- await this . db . taskEvent . createMany ( {
1040- data : events as Prisma . TaskEventCreateManyInput [ ] ,
1041- } ) ;
1057+ async #doFlushBatch(
1058+ flushId : string ,
1059+ events : CreatableEvent [ ] ,
1060+ depth : number = 1
1061+ ) : Promise < CreatableEvent [ ] > {
1062+ return await startSpan ( this . _tracer , "doFlushBatch" , async ( span ) => {
1063+ try {
1064+ span . setAttribute ( "event_count" , events . length ) ;
1065+ span . setAttribute ( "depth" , depth ) ;
1066+ span . setAttribute ( "flush_id" , flushId ) ;
10421067
1043- return events ;
1044- } catch ( error ) {
1045- if ( error instanceof Prisma . PrismaClientUnknownRequestError ) {
1046- logger . error ( "Failed to insert events, most likely because of null characters" , {
1047- error : {
1048- name : error . name ,
1049- message : error . message ,
1050- stack : error . stack ,
1051- clientVersion : error . clientVersion ,
1052- } ,
1068+ await this . db . taskEvent . createMany ( {
1069+ data : events as Prisma . TaskEventCreateManyInput [ ] ,
10531070 } ) ;
10541071
1055- if ( events . length === 1 ) {
1056- logger . debug ( "Attempting to insert event individually and it failed" , {
1057- event : events [ 0 ] ,
1072+ span . setAttribute ( "inserted_event_count" , events . length ) ;
1073+
1074+ return events ;
1075+ } catch ( error ) {
1076+ if ( error instanceof Prisma . PrismaClientUnknownRequestError ) {
1077+ logger . error ( "Failed to insert events, most likely because of null characters" , {
10581078 error : {
10591079 name : error . name ,
10601080 message : error . message ,
@@ -1063,38 +1083,62 @@ export class EventRepository {
10631083 } ,
10641084 } ) ;
10651085
1066- return [ ] ;
1067- }
1086+ if ( events . length === 1 ) {
1087+ logger . debug ( "Attempting to insert event individually and it failed" , {
1088+ event : events [ 0 ] ,
1089+ error : {
1090+ name : error . name ,
1091+ message : error . message ,
1092+ stack : error . stack ,
1093+ clientVersion : error . clientVersion ,
1094+ } ,
1095+ } ) ;
10681096
1069- if ( depth > MAX_FLUSH_DEPTH ) {
1070- logger . error ( "Failed to insert events, reached maximum depth" , {
1071- error : {
1072- name : error . name ,
1073- message : error . message ,
1074- stack : error . stack ,
1075- clientVersion : error . clientVersion ,
1076- } ,
1077- depth,
1078- eventsCount : events . length ,
1079- } ) ;
1097+ span . setAttribute ( "failed_event_count" , 1 ) ;
10801098
1081- return [ ] ;
1082- }
1099+ return [ ] ;
1100+ }
10831101
1084- // Split the events into two batches, and recursively try to insert them.
1085- const middle = Math . floor ( events . length / 2 ) ;
1086- const [ firstHalf , secondHalf ] = [ events . slice ( 0 , middle ) , events . slice ( middle ) ] ;
1102+ if ( depth > MAX_FLUSH_DEPTH ) {
1103+ logger . error ( "Failed to insert events, reached maximum depth" , {
1104+ error : {
1105+ name : error . name ,
1106+ message : error . message ,
1107+ stack : error . stack ,
1108+ clientVersion : error . clientVersion ,
1109+ } ,
1110+ depth,
1111+ eventsCount : events . length ,
1112+ } ) ;
10871113
1088- const [ firstHalfEvents , secondHalfEvents ] = await Promise . all ( [
1089- this . #doFlushBatch( firstHalf , depth + 1 ) ,
1090- this . #doFlushBatch( secondHalf , depth + 1 ) ,
1091- ] ) ;
1114+ span . setAttribute ( "reached_max_flush_depth" , true ) ;
1115+ span . setAttribute ( "failed_event_count" , events . length ) ;
10921116
1093- return firstHalfEvents . concat ( secondHalfEvents ) ;
1094- }
1117+ return [ ] ;
1118+ }
10951119
1096- throw error ;
1097- }
1120+ // Split the events into two batches, and recursively try to insert them.
1121+ const middle = Math . floor ( events . length / 2 ) ;
1122+ const [ firstHalf , secondHalf ] = [ events . slice ( 0 , middle ) , events . slice ( middle ) ] ;
1123+
1124+ return await startSpan ( this . _tracer , "bisectBatch" , async ( span ) => {
1125+ span . setAttribute ( "first_half_count" , firstHalf . length ) ;
1126+ span . setAttribute ( "second_half_count" , secondHalf . length ) ;
1127+ span . setAttribute ( "depth" , depth ) ;
1128+ span . setAttribute ( "flush_id" , flushId ) ;
1129+
1130+ const [ firstHalfEvents , secondHalfEvents ] = await Promise . all ( [
1131+ this . #doFlushBatch( flushId , firstHalf , depth + 1 ) ,
1132+ this . #doFlushBatch( flushId , secondHalf , depth + 1 ) ,
1133+ ] ) ;
1134+
1135+ return firstHalfEvents . concat ( secondHalfEvents ) ;
1136+ } ) ;
1137+ }
1138+
1139+ throw error ;
1140+ }
1141+ } ) ;
10981142 }
10991143
11001144 async #publishToRedis( events : CreatableEvent [ ] ) {
0 commit comments