Skip to content

Commit d7fab45

Browse files
committed
Moved metric enums to types package
1 parent d239244 commit d7fab45

File tree

15 files changed

+121
-133
lines changed

15 files changed

+121
-133
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,15 @@ import {
88
ReplicationAssertionError,
99
ServiceError
1010
} from '@powersync/lib-services-framework';
11-
import {
12-
BSON_DESERIALIZE_DATA_OPTIONS,
13-
MetricsEngine,
14-
ReplicationMetricType,
15-
SaveOperationTag,
16-
SourceEntityDescriptor,
17-
SourceTable,
18-
storage
19-
} from '@powersync/service-core';
11+
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
2012
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
2113
import { MongoLSN } from '../common/MongoLSN.js';
2214
import { PostImagesOption } from '../types/types.js';
2315
import { escapeRegExp } from '../utils.js';
2416
import { MongoManager } from './MongoManager.js';
2517
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
2618
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
19+
import { ReplicationMetric } from '@powersync/service-types';
2720

2821
export interface ChangeStreamOptions {
2922
connections: MongoManager;
@@ -322,7 +315,7 @@ export class ChangeStream {
322315
}
323316

324317
at += docBatch.length;
325-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(docBatch.length);
318+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(docBatch.length);
326319
const duration = performance.now() - lastBatch;
327320
lastBatch = performance.now();
328321
logger.info(
@@ -450,7 +443,7 @@ export class ChangeStream {
450443
return null;
451444
}
452445

453-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
446+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
454447
if (change.operationType == 'insert') {
455448
const baseRecord = constructAfterRecord(change.fullDocument);
456449
return await batch.save({

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
framework,
88
getUuidReplicaIdentityBson,
99
MetricsEngine,
10-
ReplicationMetricType,
1110
storage
1211
} from '@powersync/service-core';
1312
import mysql, { FieldPacket } from 'mysql2';
@@ -19,6 +18,7 @@ import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../
1918
import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js';
2019
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
2120
import * as zongji_utils from './zongji/zongji-utils.js';
21+
import { ReplicationMetric } from '@powersync/service-types';
2222

2323
export interface BinLogStreamOptions {
2424
connections: MySQLConnectionManager;
@@ -348,7 +348,7 @@ AND table_type = 'BASE TABLE';`,
348348
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
349349
});
350350

351-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
351+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
352352
}
353353
await batch.flush();
354354
}
@@ -475,7 +475,7 @@ AND table_type = 'BASE TABLE';`,
475475
});
476476
break;
477477
case zongji_utils.eventIsXid(evt):
478-
this.metrics.getCounter(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL).add(1);
478+
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL).add(1);
479479
// Need to commit with a replicated GTID with updated next position
480480
await batch.commit(
481481
new common.ReplicatedGTID({
@@ -617,7 +617,7 @@ AND table_type = 'BASE TABLE';`,
617617
): Promise<storage.FlushedResult | null> {
618618
switch (payload.type) {
619619
case storage.SaveOperationTag.INSERT:
620-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
620+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
621621
const record = common.toSQLiteRow(payload.data, payload.columns);
622622
return await batch.save({
623623
tag: storage.SaveOperationTag.INSERT,
@@ -628,7 +628,7 @@ AND table_type = 'BASE TABLE';`,
628628
afterReplicaId: getUuidReplicaIdentityBson(record, payload.sourceTable.replicaIdColumns)
629629
});
630630
case storage.SaveOperationTag.UPDATE:
631-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
631+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
632632
// "before" may be null if the replica id columns are unchanged
633633
// It's fine to treat that the same as an insert.
634634
const beforeUpdated = payload.previous_data
@@ -648,7 +648,7 @@ AND table_type = 'BASE TABLE';`,
648648
});
649649

650650
case storage.SaveOperationTag.DELETE:
651-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
651+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
652652
const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns);
653653

654654
return await batch.save({

modules/module-mysql/test/src/BinLogStream.test.ts

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import { ReplicationMetricType, storage } from '@powersync/service-core';
1+
import { storage } from '@powersync/service-core';
22
import { METRICS_HELPER, putOp, removeOp } from '@powersync/service-core-tests';
33
import { v4 as uuid } from 'uuid';
44
import { describe, expect, test } from 'vitest';
55
import { BinlogStreamTestContext } from './BinlogStreamUtils.js';
66
import { env } from './env.js';
77
import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js';
8+
import { ReplicationMetric } from '@powersync/service-types';
89

910
const BASIC_SYNC_RULES = `
1011
bucket_definitions:
@@ -35,10 +36,9 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
3536

3637
await context.replicateSnapshot();
3738

38-
const startRowCount =
39-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
39+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
4040
const startTxCount =
41-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
41+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
4242

4343
context.startStreaming();
4444
const testId = uuid();
@@ -48,9 +48,9 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
4848
const data = await context.getBucketData('global[]');
4949

5050
expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]);
51-
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
51+
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
5252
const endTxCount =
53-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
53+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
5454
expect(endRowCount - startRowCount).toEqual(1);
5555
expect(endTxCount - startTxCount).toEqual(1);
5656
});
@@ -69,10 +69,9 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
6969

7070
await context.replicateSnapshot();
7171

72-
const startRowCount =
73-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
72+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
7473
const startTxCount =
75-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
74+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
7675

7776
context.startStreaming();
7877

@@ -82,9 +81,9 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
8281
const data = await context.getBucketData('global[]');
8382

8483
expect(data).toMatchObject([putOp('test_DATA', { id: testId, description: 'test1' })]);
85-
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
84+
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
8685
const endTxCount =
87-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
86+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
8887
expect(endRowCount - startRowCount).toEqual(1);
8988
expect(endTxCount - startTxCount).toEqual(1);
9089
});
@@ -174,12 +173,11 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
174173
const testId = uuid();
175174
await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`);
176175

177-
const startRowCount =
178-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
176+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
179177

180178
await context.replicateSnapshot();
181179

182-
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
180+
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
183181
const data = await context.getBucketData('global[]');
184182
expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]);
185183
expect(endRowCount - startRowCount).toEqual(1);
@@ -234,10 +232,9 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
234232

235233
await context.replicateSnapshot();
236234

237-
const startRowCount =
238-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
235+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
239236
const startTxCount =
240-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
237+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
241238

242239
context.startStreaming();
243240

@@ -264,9 +261,9 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
264261
timestamp: '2023-03-06T15:47:00.000Z'
265262
})
266263
]);
267-
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
264+
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
268265
const endTxCount =
269-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
266+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
270267
expect(endRowCount - startRowCount).toEqual(2);
271268
expect(endTxCount - startTxCount).toEqual(2);
272269
});
@@ -280,20 +277,19 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
280277

281278
await context.replicateSnapshot();
282279

283-
const startRowCount =
284-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
280+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
285281
const startTxCount =
286-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
282+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
287283

288284
context.startStreaming();
289285

290286
await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`);
291287
const data = await context.getBucketData('global[]');
292288

293289
expect(data).toMatchObject([]);
294-
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
290+
const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
295291
const endTxCount =
296-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
292+
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL)) ?? 0;
297293

298294
// There was a transaction, but we should not replicate any actual data
299295
expect(endRowCount - startRowCount).toEqual(0);

modules/module-postgres/src/module/PostgresModule.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
ConnectionTestResult,
66
modules,
77
replication,
8-
ReplicationMetricType,
98
system
109
} from '@powersync/service-core';
1110
import * as jpgwire from '@powersync/service-jpgwire';
@@ -20,6 +19,7 @@ import { WalStreamReplicator } from '../replication/WalStreamReplicator.js';
2019
import * as types from '../types/types.js';
2120
import { PostgresConnectionConfig } from '../types/types.js';
2221
import { baseUri, NormalizedBasePostgresConnectionConfig } from '@powersync/lib-service-postgres';
22+
import { ReplicationMetric } from '@powersync/service-types';
2323

2424
export class PostgresModule extends replication.ReplicationModule<types.PostgresConnectionConfig> {
2525
constructor() {
@@ -47,7 +47,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
4747
// Record replicated bytes using global jpgwire metrics.
4848
jpgwire.setMetricsRecorder({
4949
addBytesRead(bytes) {
50-
context.metricsEngine.getCounter(ReplicationMetricType.DATA_REPLICATED_BYTES).add(bytes);
50+
context.metricsEngine.getCounter(ReplicationMetric.DATA_REPLICATED_BYTES).add(bytes);
5151
}
5252
});
5353
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,15 @@ import {
88
ReplicationAbortedError,
99
ReplicationAssertionError
1010
} from '@powersync/lib-services-framework';
11-
import {
12-
getUuidReplicaIdentityBson,
13-
MetricsEngine,
14-
ReplicationMetricType,
15-
SourceEntityDescriptor,
16-
storage
17-
} from '@powersync/service-core';
11+
import { getUuidReplicaIdentityBson, MetricsEngine, SourceEntityDescriptor, storage } from '@powersync/service-core';
1812
import * as pgwire from '@powersync/service-jpgwire';
1913
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
2014
import * as pg_utils from '../utils/pgwire_utils.js';
2115

2216
import { PgManager } from './PgManager.js';
2317
import { getPgOutputRelation, getRelId } from './PgRelation.js';
2418
import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js';
19+
import { ReplicationMetric } from '@powersync/service-types';
2520

2621
export interface WalStreamOptions {
2722
connections: PgManager;
@@ -484,7 +479,7 @@ WHERE oid = $1::regclass`,
484479
}
485480

486481
at += rows.length;
487-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(rows.length);
482+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(rows.length);
488483

489484
await touch();
490485
}
@@ -575,7 +570,7 @@ WHERE oid = $1::regclass`,
575570
}
576571

577572
if (msg.tag == 'insert') {
578-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
573+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
579574
const baseRecord = pg_utils.constructAfterRecord(msg);
580575
return await batch.save({
581576
tag: storage.SaveOperationTag.INSERT,
@@ -586,7 +581,7 @@ WHERE oid = $1::regclass`,
586581
afterReplicaId: getUuidReplicaIdentityBson(baseRecord, table.replicaIdColumns)
587582
});
588583
} else if (msg.tag == 'update') {
589-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
584+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
590585
// "before" may be null if the replica id columns are unchanged
591586
// It's fine to treat that the same as an insert.
592587
const before = pg_utils.constructBeforeRecord(msg);
@@ -600,7 +595,7 @@ WHERE oid = $1::regclass`,
600595
afterReplicaId: getUuidReplicaIdentityBson(after, table.replicaIdColumns)
601596
});
602597
} else if (msg.tag == 'delete') {
603-
this.metrics.getCounter(ReplicationMetricType.ROWS_REPLICATED_TOTAL).add(1);
598+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED_TOTAL).add(1);
604599
const before = pg_utils.constructBeforeRecord(msg)!;
605600

606601
return await batch.save({
@@ -710,7 +705,7 @@ WHERE oid = $1::regclass`,
710705
} else if (msg.tag == 'begin') {
711706
inTx = true;
712707
} else if (msg.tag == 'commit') {
713-
this.metrics.getCounter(ReplicationMetricType.TRANSACTIONS_REPLICATED_TOTAL).add(1);
708+
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED_TOTAL).add(1);
714709
inTx = false;
715710
await batch.commit(msg.lsn!, { createEmptyCheckpoints });
716711
await this.ack(msg.lsn!, replicationStream);
@@ -752,7 +747,7 @@ WHERE oid = $1::regclass`,
752747
await this.ack(chunkLastLsn, replicationStream);
753748
}
754749

755-
this.metrics.getCounter(ReplicationMetricType.CHUNKS_REPLICATED_TOTAL).add(1);
750+
this.metrics.getCounter(ReplicationMetric.CHUNKS_REPLICATED_TOTAL).add(1);
756751
}
757752
}
758753
);

modules/module-postgres/test/src/large_batch.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ReplicationMetricType, storage } from '@powersync/service-core';
1+
import { storage } from '@powersync/service-core';
22
import * as timers from 'timers/promises';
33
import { describe, expect, test } from 'vitest';
44
import { populateData } from '../../dist/utils/populate_test_data.js';
@@ -10,6 +10,7 @@ import {
1010
} from './util.js';
1111
import { WalStreamTestContext } from './wal_stream_utils.js';
1212
import { METRICS_HELPER } from '@powersync/service-core-tests';
13+
import { ReplicationMetric } from '@powersync/service-types';
1314

1415
describe.skipIf(!env.TEST_MONGO_STORAGE)('batch replication tests - mongodb', { timeout: 120_000 }, function () {
1516
// These are slow but consistent tests.
@@ -302,13 +303,12 @@ function defineBatchTests(factory: storage.TestStorageFactory) {
302303

303304
let done = false;
304305

305-
const startRowCount =
306-
(await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0;
306+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0;
307307
try {
308308
(async () => {
309309
while (!done) {
310310
const count =
311-
((await METRICS_HELPER.getMetricValueForTests(ReplicationMetricType.ROWS_REPLICATED_TOTAL)) ?? 0) -
311+
((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED_TOTAL)) ?? 0) -
312312
startRowCount;
313313

314314
if (count >= stopAfter) {

0 commit comments

Comments
 (0)