@@ -7,7 +7,16 @@ import {
77 type MessageUpdate ,
88 type PgoutputMessage ,
99} from "@internal/replication" ;
10- import { recordSpanError , startSpan , trace , type Tracer } from "@internal/tracing" ;
10+ import {
11+ getMeter ,
12+ recordSpanError ,
13+ startSpan ,
14+ trace ,
15+ type Counter ,
16+ type Histogram ,
17+ type Meter ,
18+ type Tracer ,
19+ } from "@internal/tracing" ;
1120import { Logger , type LogLevel } from "@trigger.dev/core/logger" ;
1221import { tryCatch } from "@trigger.dev/core/utils" ;
1322import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization" ;
@@ -51,6 +60,7 @@ export type RunsReplicationServiceOptions = {
5160 logger ?: Logger ;
5261 logLevel ?: LogLevel ;
5362 tracer ?: Tracer ;
63+ meter ?: Meter ;
5464 waitForAsyncInsert ?: boolean ;
5565 insertStrategy ?: "insert" | "insert_async" ;
5666 // Retry configuration for insert operations
@@ -90,6 +100,7 @@ export class RunsReplicationService {
90100 private _isShuttingDown = false ;
91101 private _isShutDownComplete = false ;
92102 private _tracer : Tracer ;
103+ private _meter : Meter ;
93104 private _currentParseDurationMs : number | null = null ;
94105 private _lastAcknowledgedAt : number | null = null ;
95106 private _acknowledgeTimeoutMs : number ;
@@ -103,13 +114,77 @@ export class RunsReplicationService {
103114 private _insertStrategy : "insert" | "insert_async" ;
104115 private _disablePayloadInsert : boolean ;
105116
117+ // Metrics
118+ private _replicationLagHistogram : Histogram ;
119+ private _batchesFlushedCounter : Counter ;
120+ private _batchSizeHistogram : Histogram ;
121+ private _taskRunsInsertedCounter : Counter ;
122+ private _payloadsInsertedCounter : Counter ;
123+ private _insertRetriesCounter : Counter ;
124+ private _eventsProcessedCounter : Counter ;
125+ private _flushDurationHistogram : Histogram ;
126+
106127 public readonly events : EventEmitter < RunsReplicationServiceEvents > ;
107128
108129 constructor ( private readonly options : RunsReplicationServiceOptions ) {
109130 this . logger =
110131 options . logger ?? new Logger ( "RunsReplicationService" , options . logLevel ?? "info" ) ;
111132 this . events = new EventEmitter ( ) ;
112133 this . _tracer = options . tracer ?? trace . getTracer ( "runs-replication-service" ) ;
134+ this . _meter = options . meter ?? getMeter ( "runs-replication" ) ;
135+
136+ // Initialize metrics
137+ this . _replicationLagHistogram = this . _meter . createHistogram (
138+ "runs_replication.replication_lag_ms" ,
139+ {
140+ description : "Replication lag from Postgres commit to processing" ,
141+ unit : "ms" ,
142+ }
143+ ) ;
144+
145+ this . _batchesFlushedCounter = this . _meter . createCounter ( "runs_replication.batches_flushed" , {
146+ description : "Total batches flushed to ClickHouse" ,
147+ } ) ;
148+
149+ this . _batchSizeHistogram = this . _meter . createHistogram ( "runs_replication.batch_size" , {
150+ description : "Number of items per batch flush" ,
151+ unit : "items" ,
152+ } ) ;
153+
154+ this . _taskRunsInsertedCounter = this . _meter . createCounter (
155+ "runs_replication.task_runs_inserted" ,
156+ {
157+ description : "Task run inserts to ClickHouse" ,
158+ unit : "inserts" ,
159+ }
160+ ) ;
161+
162+ this . _payloadsInsertedCounter = this . _meter . createCounter (
163+ "runs_replication.payloads_inserted" ,
164+ {
165+ description : "Payload inserts to ClickHouse" ,
166+ unit : "inserts" ,
167+ }
168+ ) ;
169+
170+ this . _insertRetriesCounter = this . _meter . createCounter ( "runs_replication.insert_retries" , {
171+ description : "Insert retry attempts" ,
172+ } ) ;
173+
174+ this . _eventsProcessedCounter = this . _meter . createCounter (
175+ "runs_replication.events_processed" ,
176+ {
177+ description : "Replication events processed (inserts, updates, deletes)" ,
178+ }
179+ ) ;
180+
181+ this . _flushDurationHistogram = this . _meter . createHistogram (
182+ "runs_replication.flush_duration_ms" ,
183+ {
184+ description : "Duration of batch flush operations" ,
185+ unit : "ms" ,
186+ }
187+ ) ;
113188
114189 this . _acknowledgeTimeoutMs = options . acknowledgeTimeoutMs ?? 1_000 ;
115190
@@ -423,20 +498,13 @@ export class RunsReplicationService {
423498 } ) )
424499 ) ;
425500
426- this . _tracer
427- . startSpan ( "handle_transaction" , {
428- attributes : {
429- "transaction.xid" : transaction . xid ,
430- "transaction.replication_lag_ms" : transaction . replicationLagMs ,
431- "transaction.events" : transaction . events . length ,
432- "transaction.commit_end_lsn" : transaction . commitEndLsn ,
433- "transaction.parse_duration_ms" : this . _currentParseDurationMs ?? undefined ,
434- "transaction.lsn_to_uint64_ms" : lsnToUInt64DurationMs ,
435- "transaction.version" : _version . toString ( ) ,
436- } ,
437- startTime : transaction . beginStartTimestamp ,
438- } )
439- . end ( ) ;
501+ // Record metrics
502+ this . _replicationLagHistogram . record ( transaction . replicationLagMs ) ;
503+
504+ // Count events by type
505+ for ( const event of transaction . events ) {
506+ this . _eventsProcessedCounter . add ( 1 , { event_type : event . tag } ) ;
507+ }
440508
441509 this . logger . info ( "handle_transaction" , {
442510 transaction : {
@@ -501,6 +569,8 @@ export class RunsReplicationService {
501569 batchSize : batch . length ,
502570 } ) ;
503571
572+ const flushStartTime = performance . now ( ) ;
573+
504574 await startSpan ( this . _tracer , "flushBatch" , async ( span ) => {
505575 const preparedInserts = await startSpan ( this . _tracer , "prepare_inserts" , async ( span ) => {
506576 return await Promise . all ( batch . map ( this . #prepareRunInserts. bind ( this ) ) ) ;
@@ -584,6 +654,22 @@ export class RunsReplicationService {
584654 } ) ;
585655
586656 this . events . emit ( "batchFlushed" , { flushId, taskRunInserts, payloadInserts } ) ;
657+
658+ // Record metrics
659+ const flushDurationMs = performance . now ( ) - flushStartTime ;
660+ const hasErrors = taskRunError !== null || payloadError !== null ;
661+
662+ this . _batchSizeHistogram . record ( batch . length ) ;
663+ this . _flushDurationHistogram . record ( flushDurationMs ) ;
664+ this . _batchesFlushedCounter . add ( 1 , { success : ! hasErrors } ) ;
665+
666+ if ( ! taskRunError ) {
667+ this . _taskRunsInsertedCounter . add ( taskRunInserts . length ) ;
668+ }
669+
670+ if ( ! payloadError ) {
671+ this . _payloadsInsertedCounter . add ( payloadInserts . length ) ;
672+ }
587673 } ) ;
588674 }
589675
@@ -615,6 +701,10 @@ export class RunsReplicationService {
615701 delay,
616702 } ) ;
617703
704+ // Record retry metric
705+ const operation = operationName . includes ( "task run" ) ? "task_runs" : "payloads" ;
706+ this . _insertRetriesCounter . add ( 1 , { operation } ) ;
707+
618708 await new Promise ( ( resolve ) => setTimeout ( resolve , delay ) ) ;
619709 continue ;
620710 }
0 commit comments