@@ -330,10 +330,6 @@ export class RunsReplicationService {
330330 return ;
331331 }
332332
333- this . logger . debug ( "Handling transaction" , {
334- transaction,
335- } ) ;
336-
337333 const lsnToUInt64Start = process . hrtime . bigint ( ) ;
338334
339335 // If there are events, we need to handle them
@@ -349,20 +345,32 @@ export class RunsReplicationService {
349345 } ) )
350346 ) ;
351347
352- const currentSpan = this . _tracer . startSpan ( "handle_transaction" , {
353- attributes : {
354- "transaction.xid" : transaction . xid ,
355- "transaction.replication_lag_ms" : transaction . replicationLagMs ,
356- "transaction.events" : transaction . events . length ,
357- "transaction.commit_end_lsn" : transaction . commitEndLsn ,
358- "transaction.parse_duration_ms" : this . _currentParseDurationMs ?? undefined ,
359- "transaction.lsn_to_uint64_ms" : lsnToUInt64DurationMs ,
360- "transaction.version" : _version . toString ( ) ,
348+ this . _tracer
349+ . startSpan ( "handle_transaction" , {
350+ attributes : {
351+ "transaction.xid" : transaction . xid ,
352+ "transaction.replication_lag_ms" : transaction . replicationLagMs ,
353+ "transaction.events" : transaction . events . length ,
354+ "transaction.commit_end_lsn" : transaction . commitEndLsn ,
355+ "transaction.parse_duration_ms" : this . _currentParseDurationMs ?? undefined ,
356+ "transaction.lsn_to_uint64_ms" : lsnToUInt64DurationMs ,
357+ "transaction.version" : _version . toString ( ) ,
358+ } ,
359+ startTime : transaction . beginStartTimestamp ,
360+ } )
361+ . end ( ) ;
362+
363+ this . logger . debug ( "handle_transaction" , {
364+ transaction : {
365+ xid : transaction . xid ,
366+ commitLsn : transaction . commitLsn ,
367+ commitEndLsn : transaction . commitEndLsn ,
368+ events : transaction . events . length ,
369+ parseDurationMs : this . _currentParseDurationMs ,
370+ lsnToUInt64DurationMs,
371+ version : _version . toString ( ) ,
361372 } ,
362- startTime : transaction . beginStartTimestamp ,
363373 } ) ;
364-
365- currentSpan . end ( ) ;
366374 }
367375
368376 async #acknowledgeLatestTransaction( ) {
@@ -387,7 +395,7 @@ export class RunsReplicationService {
387395 this . _lastAcknowledgedAt = now ;
388396 this . _lastAcknowledgedLsn = this . _latestCommitEndLsn ;
389397
390- this . logger . debug ( "Acknowledging transaction " , {
398+ this . logger . debug ( "acknowledge_latest_transaction " , {
391399 commitEndLsn : this . _latestCommitEndLsn ,
392400 lastAcknowledgedAt : this . _lastAcknowledgedAt ,
393401 } ) ;
@@ -747,7 +755,7 @@ export class ConcurrentFlushScheduler<T> {
747755 const callback = this . config . callback ;
748756
749757 const promise = this . concurrencyLimiter ( async ( ) => {
750- await startSpan ( this . _tracer , "flushNextBatch" , async ( span ) => {
758+ return await startSpan ( this . _tracer , "flushNextBatch" , async ( span ) => {
751759 const batchId = nanoid ( ) ;
752760
753761 span . setAttribute ( "batch_id" , batchId ) ;
@@ -756,26 +764,47 @@ export class ConcurrentFlushScheduler<T> {
756764 span . setAttribute ( "concurrency_pending_count" , this . concurrencyLimiter . pendingCount ) ;
757765 span . setAttribute ( "concurrency_concurrency" , this . concurrencyLimiter . concurrency ) ;
758766
767+ this . logger . debug ( "flush_next_batch" , {
768+ batchId,
769+ batchSize : batch . length ,
770+ concurrencyActiveCount : this . concurrencyLimiter . activeCount ,
771+ concurrencyPendingCount : this . concurrencyLimiter . pendingCount ,
772+ concurrencyConcurrency : this . concurrencyLimiter . concurrency ,
773+ } ) ;
774+
775+ const start = performance . now ( ) ;
776+
759777 await callback ( batchId , batch ) ;
778+
779+ const end = performance . now ( ) ;
780+
781+ const duration = end - start ;
782+
783+ return {
784+ batchId,
785+ duration,
786+ } ;
760787 } ) ;
761788 } ) ;
762789
763- const [ error ] = await tryCatch ( promise ) ;
790+ const [ error , result ] = await tryCatch ( promise ) ;
764791
765792 if ( error ) {
766- this . logger . error ( "Error flushing batch " , {
793+ this . logger . error ( "flush_batch_error " , {
767794 error,
768795 } ) ;
769796
770797 this . failedBatchCount ++ ;
798+ } else {
799+ this . logger . debug ( "flush_batch_complete" , {
800+ totalBatches : 1 ,
801+ successfulBatches : 1 ,
802+ failedBatches : 0 ,
803+ totalFailedBatches : this . failedBatchCount ,
804+ duration : result ?. duration ,
805+ batchId : result ?. batchId ,
806+ } ) ;
771807 }
772-
773- this . logger . debug ( "Batch flush complete" , {
774- totalBatches : 1 ,
775- successfulBatches : 1 ,
776- failedBatches : 0 ,
777- totalFailedBatches : this . failedBatchCount ,
778- } ) ;
779808 }
780809}
781810
0 commit comments