Skip to content

Commit 437bd7a

Browse files
committed
Updated replication modules to use the MetricsEngine instead of the old singleton metrics instance
1 parent c655147 commit 437bd7a

File tree

18 files changed

+106
-44
lines changed

18 files changed

+106
-44
lines changed

modules/module-mongodb/src/module/MongoModule.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@ export class MongoModule extends replication.ReplicationModule<types.MongoConnec
3737
id: this.getDefaultId(normalisedConfig.database ?? ''),
3838
syncRuleProvider: syncRuleProvider,
3939
storageEngine: context.storageEngine,
40+
metricsEngine: context.metricsEngine,
4041
connectionFactory: connectionFactory,
4142
rateLimiter: new MongoErrorRateLimiter()
4243
});
4344
}
4445

46+
async onInitialized(context: system.ServiceContextContainer): Promise<void> {}
47+
4548
/**
4649
* Combines base config with normalized connection settings
4750
*/

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ import {
88
ReplicationAssertionError,
99
ServiceError
1010
} from '@powersync/lib-services-framework';
11-
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
11+
import {
12+
MetricsEngine,
13+
ReplicationMetricType,
14+
SaveOperationTag,
15+
SourceEntityDescriptor,
16+
SourceTable,
17+
storage
18+
} from '@powersync/service-core';
1219
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
1320
import { MongoLSN } from '../common/MongoLSN.js';
1421
import { PostImagesOption } from '../types/types.js';
@@ -20,6 +27,7 @@ import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
2027
export interface ChangeStreamOptions {
2128
connections: MongoManager;
2229
storage: storage.SyncRulesBucketStorage;
30+
metrics: MetricsEngine;
2331
abort_signal: AbortSignal;
2432
}
2533

@@ -52,13 +60,15 @@ export class ChangeStream {
5260
private connections: MongoManager;
5361
private readonly client: mongo.MongoClient;
5462
private readonly defaultDb: mongo.Db;
63+
private readonly metrics: MetricsEngine;
5564

5665
private abort_signal: AbortSignal;
5766

5867
private relation_cache = new Map<string | number, storage.SourceTable>();
5968

6069
constructor(options: ChangeStreamOptions) {
6170
this.storage = options.storage;
71+
this.metrics = options.metrics;
6272
this.group_id = options.storage.group_id;
6373
this.connections = options.connections;
6474
this.client = this.connections.client;
@@ -322,7 +332,7 @@ export class ChangeStream {
322332
logger.info(`[${this.group_id}] Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
323333
lastLogIndex = at;
324334
}
325-
Metrics.getInstance().rows_replicated_total.add(1);
335+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
326336

327337
await touch();
328338
}
@@ -438,7 +448,7 @@ export class ChangeStream {
438448
return null;
439449
}
440450

441-
Metrics.getInstance().rows_replicated_total.add(1);
451+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
442452
if (change.operationType == 'insert') {
443453
const baseRecord = constructAfterRecord(change.fullDocument);
444454
return await batch.save({

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
7171
const stream = new ChangeStream({
7272
abort_signal: this.abortController.signal,
7373
storage: this.options.storage,
74+
metrics: this.options.metrics,
7475
connections: connectionManager
7576
});
7677
await stream.replicate();

modules/module-mongodb/src/replication/ChangeStreamReplicator.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator<Chang
2020
return new ChangeStreamReplicationJob({
2121
id: this.createJobId(options.storage.group_id),
2222
storage: options.storage,
23+
metrics: this.metrics,
2324
connectionFactory: this.connectionFactory,
2425
lock: options.lock,
2526
rateLimiter: new MongoErrorRateLimiter()

modules/module-mysql/src/module/MySQLModule.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ export class MySQLModule extends replication.ReplicationModule<types.MySQLConnec
2525
});
2626
}
2727

28-
async initialize(context: system.ServiceContextContainer): Promise<void> {
29-
await super.initialize(context);
30-
}
28+
async onInitialized(context: system.ServiceContextContainer): Promise<void> {}
3129

3230
protected createRouteAPIAdapter(): api.RouteAPI {
3331
return new MySQLRouteAPIAdapter(this.resolveConfig(this.decodedConfig!));
@@ -42,6 +40,7 @@ export class MySQLModule extends replication.ReplicationModule<types.MySQLConnec
4240
id: this.getDefaultId(normalisedConfig.database),
4341
syncRuleProvider: syncRuleProvider,
4442
storageEngine: context.storageEngine,
43+
metricsEngine: context.metricsEngine,
4544
connectionFactory: connectionFactory,
4645
rateLimiter: new MySQLErrorRateLimiter()
4746
});

modules/module-mysql/src/replication/BinLogReplicationJob.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
6363
const stream = new BinLogStream({
6464
abortSignal: this.abortController.signal,
6565
storage: this.options.storage,
66+
metrics: this.options.metrics,
6667
connections: connectionManager
6768
});
6869
await stream.replicate();

modules/module-mysql/src/replication/BinLogReplicator.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class BinLogReplicator extends replication.AbstractReplicator<BinLogRepli
1919
return new BinLogReplicationJob({
2020
id: this.createJobId(options.storage.group_id),
2121
storage: options.storage,
22+
metrics: this.metrics,
2223
lock: options.lock,
2324
connectionFactory: this.connectionFactory,
2425
rateLimiter: this.rateLimiter

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@ import { logger, ReplicationAbortedError, ReplicationAssertionError } from '@pow
22
import * as sync_rules from '@powersync/service-sync-rules';
33
import async from 'async';
44

5-
import { ColumnDescriptor, framework, getUuidReplicaIdentityBson, Metrics, storage } from '@powersync/service-core';
5+
import {
6+
ColumnDescriptor,
7+
framework,
8+
getUuidReplicaIdentityBson,
9+
MetricsEngine,
10+
ReplicationMetricType,
11+
storage
12+
} from '@powersync/service-core';
613
import mysql, { FieldPacket } from 'mysql2';
714

815
import { BinLogEvent, StartOptions, TableMapEntry } from '@powersync/mysql-zongji';
@@ -16,6 +23,7 @@ import * as zongji_utils from './zongji/zongji-utils.js';
1623
export interface BinLogStreamOptions {
1724
connections: MySQLConnectionManager;
1825
storage: storage.SyncRulesBucketStorage;
26+
metrics: MetricsEngine;
1927
abortSignal: AbortSignal;
2028
}
2129

@@ -62,7 +70,7 @@ export class BinLogStream {
6270

6371
private tableCache = new Map<string | number, storage.SourceTable>();
6472

65-
constructor(protected options: BinLogStreamOptions) {
73+
constructor(private options: BinLogStreamOptions) {
6674
this.storage = options.storage;
6775
this.connections = options.connections;
6876
this.syncRules = options.storage.getParsedSyncRules({ defaultSchema: this.defaultSchema });
@@ -74,6 +82,10 @@ export class BinLogStream {
7482
return this.connections.connectionTag;
7583
}
7684

85+
private get metrics() {
86+
return this.options.metrics;
87+
}
88+
7789
get connectionId() {
7890
const { connectionId } = this.connections;
7991
// Default to 1 if not set
@@ -334,6 +346,8 @@ AND table_type = 'BASE TABLE';`,
334346
after: record,
335347
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
336348
});
349+
350+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
337351
}
338352
await batch.flush();
339353
}
@@ -460,7 +474,7 @@ AND table_type = 'BASE TABLE';`,
460474
});
461475
break;
462476
case zongji_utils.eventIsXid(evt):
463-
Metrics.getInstance().transactions_replicated_total.add(1);
477+
this.metrics.getCounter(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL).add(1);
464478
// Need to commit with a replicated GTID with updated next position
465479
await batch.commit(
466480
new common.ReplicatedGTID({
@@ -602,7 +616,7 @@ AND table_type = 'BASE TABLE';`,
602616
): Promise<storage.FlushedResult | null> {
603617
switch (payload.type) {
604618
case storage.SaveOperationTag.INSERT:
605-
Metrics.getInstance().rows_replicated_total.add(1);
619+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
606620
const record = common.toSQLiteRow(payload.data, payload.columns);
607621
return await batch.save({
608622
tag: storage.SaveOperationTag.INSERT,
@@ -613,7 +627,7 @@ AND table_type = 'BASE TABLE';`,
613627
afterReplicaId: getUuidReplicaIdentityBson(record, payload.sourceTable.replicaIdColumns)
614628
});
615629
case storage.SaveOperationTag.UPDATE:
616-
Metrics.getInstance().rows_replicated_total.add(1);
630+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
617631
// "before" may be null if the replica id columns are unchanged
618632
// It's fine to treat that the same as an insert.
619633
const beforeUpdated = payload.previous_data
@@ -633,7 +647,7 @@ AND table_type = 'BASE TABLE';`,
633647
});
634648

635649
case storage.SaveOperationTag.DELETE:
636-
Metrics.getInstance().rows_replicated_total.add(1);
650+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
637651
const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns);
638652

639653
return await batch.save({

modules/module-postgres/src/module/PostgresModule.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
ConnectionTestResult,
66
modules,
77
replication,
8+
ReplicationMetricType,
89
system
910
} from '@powersync/service-core';
1011
import * as jpgwire from '@powersync/service-jpgwire';
@@ -29,9 +30,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
2930
});
3031
}
3132

32-
async initialize(context: system.ServiceContextContainer): Promise<void> {
33-
await super.initialize(context);
34-
33+
async onInitialized(context: system.ServiceContextContainer): Promise<void> {
3534
const client_auth = context.configuration.base_config.client_auth;
3635

3736
if (client_auth?.supabase && client_auth?.supabase_jwt_secret == null) {
@@ -46,13 +45,11 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
4645
}
4746

4847
// Record replicated bytes using global jpgwire metrics.
49-
if (context.metrics) {
50-
jpgwire.setMetricsRecorder({
51-
addBytesRead(bytes) {
52-
context.metrics!.data_replicated_bytes.add(bytes);
53-
}
54-
});
55-
}
48+
jpgwire.setMetricsRecorder({
49+
addBytesRead(bytes) {
50+
context.metricsEngine.getCounter(ReplicationMetricType.DATA_REPLICATED_BYTES).add(bytes);
51+
}
52+
});
5653
}
5754

5855
protected createRouteAPIAdapter(): api.RouteAPI {
@@ -68,6 +65,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
6865
id: this.getDefaultId(normalisedConfig.database),
6966
syncRuleProvider: syncRuleProvider,
7067
storageEngine: context.storageEngine,
68+
metricsEngine: context.metricsEngine,
7169
connectionFactory: connectionFactory,
7270
rateLimiter: new PostgresErrorRateLimiter()
7371
});

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ import {
88
ReplicationAbortedError,
99
ReplicationAssertionError
1010
} from '@powersync/lib-services-framework';
11-
import { getUuidReplicaIdentityBson, Metrics, SourceEntityDescriptor, storage } from '@powersync/service-core';
11+
import {
12+
getUuidReplicaIdentityBson,
13+
MetricsEngine,
14+
ReplicationMetricType,
15+
SourceEntityDescriptor,
16+
storage
17+
} from '@powersync/service-core';
1218
import * as pgwire from '@powersync/service-jpgwire';
1319
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
1420
import * as pg_utils from '../utils/pgwire_utils.js';
@@ -20,6 +26,7 @@ import { checkSourceConfiguration, getReplicationIdentityColumns } from './repli
2026
export interface WalStreamOptions {
2127
connections: PgManager;
2228
storage: storage.SyncRulesBucketStorage;
29+
metrics: MetricsEngine;
2330
abort_signal: AbortSignal;
2431
}
2532

@@ -72,7 +79,7 @@ export class WalStream {
7279
connection_id = 1;
7380

7481
private readonly storage: storage.SyncRulesBucketStorage;
75-
82+
private readonly metrics: MetricsEngine;
7683
private readonly slot_name: string;
7784

7885
private connections: PgManager;
@@ -85,6 +92,7 @@ export class WalStream {
8592

8693
constructor(options: WalStreamOptions) {
8794
this.storage = options.storage;
95+
this.metrics = options.metrics;
8896
this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA });
8997
this.group_id = options.storage.group_id;
9098
this.slot_name = options.storage.slot_name;
@@ -471,7 +479,7 @@ WHERE oid = $1::regclass`,
471479
}
472480

473481
at += rows.length;
474-
Metrics.getInstance().rows_replicated_total.add(rows.length);
482+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(rows.length);
475483

476484
await touch();
477485
}
@@ -562,7 +570,7 @@ WHERE oid = $1::regclass`,
562570
}
563571

564572
if (msg.tag == 'insert') {
565-
Metrics.getInstance().rows_replicated_total.add(1);
573+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
566574
const baseRecord = pg_utils.constructAfterRecord(msg);
567575
return await batch.save({
568576
tag: storage.SaveOperationTag.INSERT,
@@ -573,7 +581,7 @@ WHERE oid = $1::regclass`,
573581
afterReplicaId: getUuidReplicaIdentityBson(baseRecord, table.replicaIdColumns)
574582
});
575583
} else if (msg.tag == 'update') {
576-
Metrics.getInstance().rows_replicated_total.add(1);
584+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
577585
// "before" may be null if the replica id columns are unchanged
578586
// It's fine to treat that the same as an insert.
579587
const before = pg_utils.constructBeforeRecord(msg);
@@ -587,7 +595,7 @@ WHERE oid = $1::regclass`,
587595
afterReplicaId: getUuidReplicaIdentityBson(after, table.replicaIdColumns)
588596
});
589597
} else if (msg.tag == 'delete') {
590-
Metrics.getInstance().rows_replicated_total.add(1);
598+
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
591599
const before = pg_utils.constructBeforeRecord(msg)!;
592600

593601
return await batch.save({
@@ -697,7 +705,7 @@ WHERE oid = $1::regclass`,
697705
} else if (msg.tag == 'begin') {
698706
inTx = true;
699707
} else if (msg.tag == 'commit') {
700-
Metrics.getInstance().transactions_replicated_total.add(1);
708+
this.metrics.getCounter(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL).add(1);
701709
inTx = false;
702710
await batch.commit(msg.lsn!, { createEmptyCheckpoints });
703711
await this.ack(msg.lsn!, replicationStream);
@@ -739,7 +747,7 @@ WHERE oid = $1::regclass`,
739747
await this.ack(chunkLastLsn, replicationStream);
740748
}
741749

742-
Metrics.getInstance().chunks_replicated_total.add(1);
750+
this.metrics.getCounter(ReplicationMetricType.CHUNKS_REPLICATED_TOTAL).add(1);
743751
}
744752
}
745753
);

0 commit comments

Comments
 (0)