@@ -8,25 +8,20 @@ import {
88 ReplicationAssertionError ,
99 ServiceError
1010} from '@powersync/lib-services-framework' ;
11- import {
12- BSON_DESERIALIZE_DATA_OPTIONS ,
13- Metrics ,
14- SaveOperationTag ,
15- SourceEntityDescriptor ,
16- SourceTable ,
17- storage
18- } from '@powersync/service-core' ;
11+ import { MetricsEngine , SaveOperationTag , SourceEntityDescriptor , SourceTable , storage } from '@powersync/service-core' ;
1912import { DatabaseInputRow , SqliteRow , SqlSyncRules , TablePattern } from '@powersync/service-sync-rules' ;
2013import { MongoLSN } from '../common/MongoLSN.js' ;
2114import { PostImagesOption } from '../types/types.js' ;
2215import { escapeRegExp } from '../utils.js' ;
2316import { MongoManager } from './MongoManager.js' ;
2417import { constructAfterRecord , createCheckpoint , getCacheIdentifier , getMongoRelation } from './MongoRelation.js' ;
2518import { CHECKPOINTS_COLLECTION } from './replication-utils.js' ;
19+ import { ReplicationMetric } from '@powersync/service-types' ;
2620
2721export interface ChangeStreamOptions {
2822 connections : MongoManager ;
2923 storage : storage . SyncRulesBucketStorage ;
24+ metrics : MetricsEngine ;
3025 abort_signal : AbortSignal ;
3126}
3227
@@ -59,13 +54,15 @@ export class ChangeStream {
5954 private connections : MongoManager ;
6055 private readonly client : mongo . MongoClient ;
6156 private readonly defaultDb : mongo . Db ;
57+ private readonly metrics : MetricsEngine ;
6258
6359 private abort_signal : AbortSignal ;
6460
6561 private relation_cache = new Map < string | number , storage . SourceTable > ( ) ;
6662
6763 constructor ( options : ChangeStreamOptions ) {
6864 this . storage = options . storage ;
65+ this . metrics = options . metrics ;
6966 this . group_id = options . storage . group_id ;
7067 this . connections = options . connections ;
7168 this . client = this . connections . client ;
@@ -318,7 +315,7 @@ export class ChangeStream {
318315 }
319316
320317 at += docBatch . length ;
321- Metrics . getInstance ( ) . rows_replicated_total . add ( docBatch . length ) ;
318+ this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( docBatch . length ) ;
322319 const duration = performance . now ( ) - lastBatch ;
323320 lastBatch = performance . now ( ) ;
324321 logger . info (
@@ -446,7 +443,7 @@ export class ChangeStream {
446443 return null ;
447444 }
448445
449- Metrics . getInstance ( ) . rows_replicated_total . add ( 1 ) ;
446+ this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
450447 if ( change . operationType == 'insert' ) {
451448 const baseRecord = constructAfterRecord ( change . fullDocument ) ;
452449 return await batch . save ( {
0 commit comments