diff --git a/.changeset/good-years-marry.md b/.changeset/good-years-marry.md new file mode 100644 index 000000000..9886a8e5d --- /dev/null +++ b/.changeset/good-years-marry.md @@ -0,0 +1,12 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +'@powersync/service-types': minor +--- + +Add powersync_replication_lag_seconds metric diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index c1640c890..b72f97617 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -112,12 +112,12 @@ export class MongoBucketBatch return this.last_checkpoint_lsn; } - async flush(): Promise { + async flush(options?: storage.BucketBatchCommitOptions): Promise { let result: storage.FlushedResult | null = null; // One flush may be split over multiple transactions. // Each flushInner() is one transaction. while (this.batch != null) { - let r = await this.flushInner(); + let r = await this.flushInner(options); if (r) { result = r; } @@ -127,7 +127,7 @@ export class MongoBucketBatch return result; } - private async flushInner(): Promise { + private async flushInner(options?: storage.BucketBatchCommitOptions): Promise { const batch = this.batch; if (batch == null) { return null; @@ -137,7 +137,7 @@ export class MongoBucketBatch let resumeBatch: OperationBatch | null = null; await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => { - resumeBatch = await this.replicateBatch(session, batch, opSeq); + resumeBatch = await this.replicateBatch(session, batch, opSeq, options); last_op = opSeq.last(); }); @@ -157,7 +157,8 @@ export class MongoBucketBatch private async replicateBatch( session: mongo.ClientSession, batch: OperationBatch, - op_seq: MongoIdSequence + op_seq: MongoIdSequence, + options?: storage.BucketBatchCommitOptions ): Promise { let sizes: Map | undefined = undefined; if (this.storeCurrentData && !this.skipExistingRows) { @@ -253,7 +254,7 @@ export class MongoBucketBatch if (persistedBatch!.shouldFlushTransaction()) { // Transaction is getting big. // Flush, and resume in a new transaction. - await persistedBatch!.flush(this.db, this.session); + await persistedBatch!.flush(this.db, this.session, options); persistedBatch = null; // Computing our current progress is a little tricky here, since // we're stopping in the middle of a batch. @@ -264,7 +265,7 @@ export class MongoBucketBatch if (persistedBatch) { transactionSize = persistedBatch.currentSize; - await persistedBatch.flush(this.db, this.session); + await persistedBatch.flush(this.db, this.session, options); } } @@ -613,12 +614,13 @@ export class MongoBucketBatch async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise { const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options }; - await this.flush(); + await this.flush(options); if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { // When re-applying transactions, don't create a new checkpoint until // we are past the last transaction. logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`); + // Cannot create a checkpoint yet - return false return false; } if (lsn < this.no_checkpoint_before_lsn) { @@ -647,11 +649,13 @@ export class MongoBucketBatch { session: this.session } ); + // Cannot create a checkpoint yet - return false return false; } if (!createEmptyCheckpoints && this.persisted_op == null) { - return false; + // Nothing to commit - also return true + return true; } const now = new Date(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRulesContent.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRulesContent.ts index b828cf342..64b38dc22 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRulesContent.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRulesContent.ts @@ -15,6 +15,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule public readonly last_fatal_error: string | null; public readonly last_keepalive_ts: Date | null; public readonly last_checkpoint_ts: Date | null; + public readonly active: boolean; public current_lock: MongoSyncRulesLock | null = null; @@ -30,6 +31,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule this.last_fatal_error = doc.last_fatal_error; this.last_checkpoint_ts = doc.last_checkpoint_ts; this.last_keepalive_ts = doc.last_keepalive_ts; + this.active = doc.state == 'ACTIVE'; } parsed(options: storage.ParseSyncRulesOptions) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts index 427059ed7..800308391 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts @@ -270,9 +270,11 @@ export class PersistedBatch { ); } - async flush(db: PowerSyncMongo, session: mongo.ClientSession) { + async flush(db: PowerSyncMongo, session: mongo.ClientSession, options?: storage.BucketBatchCommitOptions) { const startAt = performance.now(); + let flushedSomething = false; if (this.bucketData.length > 0) { + flushedSomething = true; await db.bucket_data.bulkWrite(this.bucketData, { session, // inserts only - order doesn't matter @@ -280,6 +282,7 @@ export class PersistedBatch { }); } if (this.bucketParameters.length > 0) { + flushedSomething = true; await db.bucket_parameters.bulkWrite(this.bucketParameters, { session, // inserts only - order doesn't matter @@ -287,6 +290,7 @@ export class PersistedBatch { }); } if (this.currentData.length > 0) { + flushedSomething = true; await db.current_data.bulkWrite(this.currentData, { session, // may update and delete data within the same batch - order matters @@ -295,6 +299,7 @@ export class PersistedBatch { } if (this.bucketStates.size > 0) { + flushedSomething = true; await db.bucket_state.bulkWrite(this.getBucketStateUpdates(), { session, // Per-bucket operation - order doesn't matter @@ -302,12 +307,43 @@ export class PersistedBatch { }); } - const duration = performance.now() - startAt; - logger.info( - `powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${ - this.currentData.length - } updates, ${Math.round(this.currentSize / 1024)}kb in ${duration.toFixed(0)}ms. Last op_id: ${this.debugLastOpId}` - ); + if (flushedSomething) { + const duration = Math.round(performance.now() - startAt); + if (options?.oldestUncommittedChange != null) { + const replicationLag = Math.round((Date.now() - options.oldestUncommittedChange.getTime()) / 1000); + + logger.info( + `powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${ + this.currentData.length + } updates, ${Math.round(this.currentSize / 1024)}kb in ${duration}ms. Last op_id: ${this.debugLastOpId}. Replication lag: ${replicationLag}s`, + { + flushed: { + duration: duration, + size: this.currentSize, + bucket_data_count: this.bucketData.length, + parameter_data_count: this.bucketParameters.length, + current_data_count: this.currentData.length, + replication_lag_seconds: replicationLag + } + } + ); + } else { + logger.info( + `powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${ + this.currentData.length + } updates, ${Math.round(this.currentSize / 1024)}kb in ${duration}ms. Last op_id: ${this.debugLastOpId}`, + { + flushed: { + duration: duration, + size: this.currentSize, + bucket_data_count: this.bucketData.length, + parameter_data_count: this.bucketParameters.length, + current_data_count: this.currentData.length + } + } + ); + } + } this.bucketData = []; this.bucketParameters = []; diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index 958517b99..f7c523b92 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -200,7 +200,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { return result; } - async getReplicationLag(options: api.ReplicationLagOptions): Promise { + async getReplicationLagBytes(options: api.ReplicationLagOptions): Promise { // There is no fast way to get replication lag in bytes in MongoDB. // We can get replication lag in seconds, but need a different API for that. return undefined; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 958872216..4e6e26eae 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -6,6 +6,7 @@ import { logger, ReplicationAbortedError, ReplicationAssertionError, + ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core'; @@ -22,7 +23,7 @@ import { getMongoRelation, STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; -import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; +import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; export interface ChangeStreamOptions { connections: MongoManager; @@ -75,6 +76,17 @@ export class ChangeStream { private relation_cache = new Map(); + /** + * Time of the oldest uncommitted change, according to the source db. + * This is used to determine the replication lag. + */ + private oldestUncommittedChange: Date | null = null; + /** + * Keep track of whether we have done a commit or keepalive yet. + * We can only compute replication lag if isStartingReplication == false, or oldestUncommittedChange is present. + */ + private isStartingReplication = true; + private checkpointStreamId = new mongo.ObjectId(); constructor(options: ChangeStreamOptions) { @@ -553,10 +565,18 @@ export class ChangeStream { async (batch) => { const { lastCheckpointLsn } = batch; const lastLsn = lastCheckpointLsn ? MongoLSN.fromSerialized(lastCheckpointLsn) : null; - const startAfter = lastLsn?.timestamp; - const resumeAfter = lastLsn?.resumeToken; - - logger.info(`${this.logPrefix} Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); + if (lastLsn == null) { + throw new ServiceAssertionError(`No resume timestamp found`); + } + const startAfter = lastLsn.timestamp; + const resumeAfter = lastLsn.resumeToken; + // It is normal for this to be a minute or two old when there is a low volume + // of ChangeStream events. + const tokenAgeSeconds = Math.round((Date.now() - timestampToDate(startAfter).getTime()) / 1000); + + logger.info( + `${this.logPrefix} Resume streaming at ${startAfter.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s` + ); const filters = this.getSourceNamespaceFilters(); @@ -655,10 +675,16 @@ export class ChangeStream { // We add an additional check for waitForCheckpointLsn == null, to make sure we're not // doing a keepalive in the middle of a transaction. if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { - const { comparable: lsn } = MongoLSN.fromResumeToken(stream.resumeToken); + const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken); await batch.keepalive(lsn); await touch(); lastEmptyResume = performance.now(); + // Log the token update. This helps as a general "replication is still active" message in the logs. + // This token would typically be around 10s behind. + logger.info( + `${this.logPrefix} Idle change stream. Persisted resumeToken for ${timestampToDate(timestamp).toISOString()}` + ); + this.isStartingReplication = false; } continue; } @@ -771,7 +797,12 @@ export class ChangeStream { if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { waitForCheckpointLsn = null; } - await batch.commit(lsn); + const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange }); + + if (didCommit) { + this.oldestUncommittedChange = null; + this.isStartingReplication = false; + } } else if ( changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || @@ -790,6 +821,9 @@ export class ChangeStream { snapshot: true }); if (table.syncAny) { + if (this.oldestUncommittedChange == null && changeDocument.clusterTime != null) { + this.oldestUncommittedChange = timestampToDate(changeDocument.clusterTime); + } await this.writeChange(batch, table, changeDocument); } } else if (changeDocument.operationType == 'drop') { @@ -825,6 +859,19 @@ export class ChangeStream { } ); } + + async getReplicationLagMillis(): Promise { + if (this.oldestUncommittedChange == null) { + if (this.isStartingReplication) { + // We don't have anything to compute replication lag with yet. + return undefined; + } else { + // We don't have any uncommitted changes, so replication is up-to-date. + return 0; + } + } + return Date.now() - this.oldestUncommittedChange.getTime(); + } } async function touch() { diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index 497c48307..9498cfdf5 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -1,4 +1,3 @@ -import { isMongoServerError } from '@powersync/lib-service-mongodb'; import { container } from '@powersync/lib-services-framework'; import { replication } from '@powersync/service-core'; @@ -11,6 +10,7 @@ export interface ChangeStreamReplicationJobOptions extends replication.AbstractR export class ChangeStreamReplicationJob extends replication.AbstractReplicationJob { private connectionFactory: ConnectionManagerFactory; + private lastStream: ChangeStream | null = null; constructor(options: ChangeStreamReplicationJobOptions) { super(options); @@ -18,11 +18,11 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ } async cleanUp(): Promise { - // TODO: Implement? + // Nothing needed here } async keepAlive() { - // TODO: Implement? + // Nothing needed here } private get slotName() { @@ -74,6 +74,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ metrics: this.options.metrics, connections: connectionManager }); + this.lastStream = stream; await stream.replicate(); } catch (e) { if (this.abortController.signal.aborted) { @@ -98,4 +99,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ await connectionManager.end(); } } + + async getReplicationLagMillis(): Promise { + return this.lastStream?.getReplicationLagMillis(); + } } diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts index 99930e2eb..5140ae5d7 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts @@ -3,6 +3,8 @@ import { ChangeStreamReplicationJob } from './ChangeStreamReplicationJob.js'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; import { MongoErrorRateLimiter } from './MongoErrorRateLimiter.js'; import { MongoModule } from '../module/MongoModule.js'; +import { MongoLSN } from '../common/MongoLSN.js'; +import { timestampToDate } from './replication-utils.js'; export interface ChangeStreamReplicatorOptions extends replication.AbstractReplicatorOptions { connectionFactory: ConnectionManagerFactory; @@ -39,4 +41,25 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator { + const lag = await super.getReplicationLagMillis(); + if (lag != null) { + return lag; + } + + // Booting or in an error loop. Check last active replication status. + // This includes sync rules in an ERROR state. + const content = await this.storage.getActiveSyncRulesContent(); + if (content == null) { + return undefined; + } + // Measure the lag from the last resume token's time + const lsn = content.last_checkpoint_lsn; + if (lsn == null) { + return undefined; + } + const { timestamp } = MongoLSN.fromSerialized(lsn); + return Date.now() - timestampToDate(timestamp).getTime(); + } } diff --git a/modules/module-mongodb/src/replication/replication-utils.ts b/modules/module-mongodb/src/replication/replication-utils.ts index fc095bbb8..f3fbb4f4b 100644 --- a/modules/module-mongodb/src/replication/replication-utils.ts +++ b/modules/module-mongodb/src/replication/replication-utils.ts @@ -1,6 +1,7 @@ import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; import { MongoManager } from './MongoManager.js'; import { PostImagesOption } from '../types/types.js'; +import * as bson from 'bson'; export const CHECKPOINTS_COLLECTION = '_powersync_checkpoints'; @@ -86,3 +87,7 @@ export async function checkSourceConfiguration(connectionManager: MongoManager): .toArray(); } } + +export function timestampToDate(timestamp: bson.Timestamp) { + return new Date(timestamp.getHighBitsUnsigned() * 1000); +} diff --git a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts index ab0a74817..a76751ef3 100644 --- a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts +++ b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts @@ -249,7 +249,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI { }; } - async getReplicationLag(options: api.ReplicationLagOptions): Promise { + async getReplicationLagBytes(options: api.ReplicationLagOptions): Promise { const { bucketStorage } = options; const lastCheckpoint = await bucketStorage.getCheckpoint(); diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index 18a09c501..c02f8a0f6 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -9,6 +9,7 @@ export interface BinLogReplicationJobOptions extends replication.AbstractReplica export class BinLogReplicationJob extends replication.AbstractReplicationJob { private connectionFactory: MySQLConnectionManagerFactory; + private lastStream: BinLogStream | null = null; constructor(options: BinLogReplicationJobOptions) { super(options); @@ -66,6 +67,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { metrics: this.options.metrics, connections: connectionManager }); + this.lastStream = stream; await stream.replicate(); } catch (e) { if (this.abortController.signal.aborted) { @@ -92,4 +94,8 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { await connectionManager.end(); } } + + async getReplicationLagMillis(): Promise { + return this.lastStream?.getReplicationLagMillis(); + } } diff --git a/modules/module-mysql/src/replication/BinLogReplicator.ts b/modules/module-mysql/src/replication/BinLogReplicator.ts index bc2687535..eeb477fdf 100644 --- a/modules/module-mysql/src/replication/BinLogReplicator.ts +++ b/modules/module-mysql/src/replication/BinLogReplicator.ts @@ -38,4 +38,29 @@ export class BinLogReplicator extends replication.AbstractReplicator { + const lag = await super.getReplicationLagMillis(); + if (lag != null) { + return lag; + } + + // Booting or in an error loop. Check last active replication status. + // This includes sync rules in an ERROR state. + const content = await this.storage.getActiveSyncRulesContent(); + if (content == null) { + return undefined; + } + // Measure the lag from the last commit or keepalive timestamp. + // This is not 100% accurate since it is the commit time in the storage db rather than + // the source db, but it's the best we currently have for mysql. + const checkpointTs = content.last_checkpoint_ts?.getTime() ?? 0; + const keepaliveTs = content.last_keepalive_ts?.getTime() ?? 0; + const latestTs = Math.max(checkpointTs, keepaliveTs); + if (latestTs != 0) { + return Date.now() - latestTs; + } + + return undefined; + } } diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index fe137ae04..4272eff11 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -66,6 +66,17 @@ export class BinLogStream { private tableCache = new Map(); + /** + * Time of the oldest uncommitted change, according to the source db. + * This is used to determine the replication lag. + */ + private oldestUncommittedChange: Date | null = null; + /** + * Keep track of whether we have done a commit or keepalive yet. + * We can only compute replication lag if isStartingReplication == false, or oldestUncommittedChange is present. + */ + private isStartingReplication = true; + constructor(private options: BinLogStreamOptions) { this.storage = options.storage; this.connections = options.connections; @@ -473,7 +484,19 @@ AND table_type = 'BASE TABLE';`, }, onCommit: async (lsn: string) => { this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); - await batch.commit(lsn); + const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange }); + if (didCommit) { + this.oldestUncommittedChange = null; + this.isStartingReplication = false; + } + }, + onTransactionStart: async (options) => { + if (this.oldestUncommittedChange == null) { + this.oldestUncommittedChange = options.timestamp; + } + }, + onRotate: async () => { + this.isStartingReplication = false; } }; } @@ -560,6 +583,19 @@ AND table_type = 'BASE TABLE';`, return null; } } + + async getReplicationLagMillis(): Promise { + if (this.oldestUncommittedChange == null) { + if (this.isStartingReplication) { + // We don't have anything to compute replication lag with yet. + return undefined; + } else { + // We don't have any uncommitted changes, so replication is up-to-date. + return 0; + } + } + return Date.now() - this.oldestUncommittedChange.getTime(); + } } async function tryRollback(promiseConnection: mysqlPromise.Connection) { diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index e22ff8b23..bbd9c8b29 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -12,6 +12,8 @@ const MAX_QUEUE_PAUSE_TIME_MS = 45_000; export type Row = Record; export interface BinLogEventHandler { + onTransactionStart: (options: { timestamp: Date }) => Promise; + onRotate: () => Promise; onWrite: (rows: Row[], tableMap: TableMapEntry) => Promise; onUpdate: (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => Promise; onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise; @@ -196,10 +198,12 @@ export class BinLogListener { offset: evt.nextPosition } }); + await this.eventHandler.onTransactionStart({ timestamp: new Date(evt.timestamp) }); break; case zongji_utils.eventIsRotation(evt): this.binLogPosition.filename = evt.binlogName; this.binLogPosition.offset = evt.position; + await this.eventHandler.onRotate(); break; case zongji_utils.eventIsWriteMutation(evt): await this.eventHandler.onWrite(evt.rows, evt.tableMap[evt.tableId]); diff --git a/modules/module-mysql/test/src/BinLogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts index 263eff262..c36bf1cf0 100644 --- a/modules/module-mysql/test/src/BinLogListener.test.ts +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -155,4 +155,7 @@ class TestBinLogEventHandler implements BinLogEventHandler { async onCommit(lsn: string) { this.commitCount++; } + + async onTransactionStart(options: { timestamp: Date }) {} + async onRotate() {} } diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index f11b730dd..735aa7d32 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -280,6 +280,7 @@ export class PostgresBucketBatch // When re-applying transactions, don't create a new checkpoint until // we are past the last transaction. logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`); + // Cannot create a checkpoint yet - return false return false; } @@ -305,12 +306,14 @@ export class PostgresBucketBatch id = ${{ type: 'int4', value: this.group_id }} `.execute(); + // Cannot create a checkpoint yet - return false return false; } // Don't create a checkpoint if there were no changes if (!createEmptyCheckpoints && this.persisted_op == null) { - return false; + // Nothing to commit - return true + return true; } const now = new Date().toISOString(); diff --git a/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts b/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts index 9bef5a000..c55cea108 100644 --- a/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts +++ b/modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts @@ -14,6 +14,7 @@ export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncR public readonly last_fatal_error: string | null; public readonly last_keepalive_ts: Date | null; public readonly last_checkpoint_ts: Date | null; + public readonly active: boolean; current_lock: storage.ReplicationLock | null = null; constructor( @@ -27,6 +28,7 @@ export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncR this.last_fatal_error = row.last_fatal_error; this.last_checkpoint_ts = row.last_checkpoint_ts ? new Date(row.last_checkpoint_ts) : null; this.last_keepalive_ts = row.last_keepalive_ts ? new Date(row.last_keepalive_ts) : null; + this.active = row.state == 'ACTIVE'; } parsed(options: storage.ParseSyncRulesOptions): storage.PersistedSyncRules { diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 19a8acb9e..634ece0f3 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -212,7 +212,7 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI { }); } - async getReplicationLag(options: api.ReplicationLagOptions): Promise { + async getReplicationLagBytes(options: api.ReplicationLagOptions): Promise { const { bucketStorage } = options; const slotName = bucketStorage.slot_name; const results = await lib_postgres.retriedQuery(this.pool, { diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index df589c63c..4176b9eb9 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -85,6 +85,17 @@ export class WalStream { private startedStreaming = false; + /** + * Time of the oldest uncommitted change, according to the source db. + * This is used to determine the replication lag. + */ + private oldestUncommittedChange: Date | null = null; + /** + * Keep track of whether we have done a commit or keepalive yet. + * We can only compute replication lag if isStartingReplication == false, or oldestUncommittedChange is present. + */ + private isStartingReplication = true; + constructor(options: WalStreamOptions) { this.storage = options.storage; this.metrics = options.metrics; @@ -708,6 +719,9 @@ WHERE oid = $1::regclass`, } else if (msg.tag == 'begin') { // This may span multiple transactions in the same chunk, or even across chunks. skipKeepalive = true; + if (this.oldestUncommittedChange == null) { + this.oldestUncommittedChange = new Date(Number(msg.commitTime / 1000n)); + } } else if (msg.tag == 'commit') { this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); if (msg == lastCommit) { @@ -715,8 +729,15 @@ WHERE oid = $1::regclass`, // This effectively lets us batch multiple transactions within the same chunk // into a single flush, increasing throughput for many small transactions. skipKeepalive = false; - await batch.commit(msg.lsn!, { createEmptyCheckpoints }); + const didCommit = await batch.commit(msg.lsn!, { + createEmptyCheckpoints, + oldestUncommittedChange: this.oldestUncommittedChange + }); await this.ack(msg.lsn!, replicationStream); + if (didCommit) { + this.oldestUncommittedChange = null; + this.isStartingReplication = false; + } } } else { if (count % 100 == 0) { @@ -749,6 +770,7 @@ WHERE oid = $1::regclass`, // may be in the middle of the next transaction. // It must only be used to associate checkpoints with LSNs. await batch.keepalive(chunkLastLsn); + this.isStartingReplication = false; } // We receive chunks with empty messages often (about each second). @@ -781,7 +803,8 @@ WHERE oid = $1::regclass`, if (storageIdentifier.type != lib_postgres.POSTGRES_CONNECTION_TYPE) { return { // Keep the same behaviour as before allowing Postgres storage. - createEmptyCheckpoints: true + createEmptyCheckpoints: true, + oldestUncommittedChange: null }; } @@ -804,7 +827,8 @@ WHERE oid = $1::regclass`, * Don't create empty checkpoints if the same Postgres database is used for the data source * and sync bucket storage. Creating empty checkpoints will cause WAL feedback loops. */ - createEmptyCheckpoints: replicationIdentifier.database_name != parsedStorageIdentifier.database_name + createEmptyCheckpoints: replicationIdentifier.database_name != parsedStorageIdentifier.database_name, + oldestUncommittedChange: null }; } @@ -816,6 +840,19 @@ WHERE oid = $1::regclass`, const version = await this.connections.getServerVersion(); return version ? version.compareMain('14.0.0') >= 0 : false; } + + async getReplicationLagMillis(): Promise { + if (this.oldestUncommittedChange == null) { + if (this.isStartingReplication) { + // We don't have anything to compute replication lag with yet. + return undefined; + } else { + // We don't have any uncommitted changes, so replication is up-to-date. + return 0; + } + } + return Date.now() - this.oldestUncommittedChange.getTime(); + } } async function touch() { diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index 3f8dad99a..899cb6a71 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -12,6 +12,7 @@ export interface WalStreamReplicationJobOptions extends replication.AbstractRepl export class WalStreamReplicationJob extends replication.AbstractReplicationJob { private connectionFactory: ConnectionManagerFactory; private readonly connectionManager: PgManager; + private lastStream: WalStream | null = null; constructor(options: WalStreamReplicationJobOptions) { super(options); @@ -98,6 +99,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob metrics: this.options.metrics, connections: connectionManager }); + this.lastStream = stream; await stream.replicate(); } catch (e) { this.logger.error(`${this.slotName} Replication error`, e); @@ -140,4 +142,8 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob await connectionManager.end(); } } + + async getReplicationLagMillis(): Promise { + return this.lastStream?.getReplicationLagMillis(); + } } diff --git a/modules/module-postgres/src/replication/WalStreamReplicator.ts b/modules/module-postgres/src/replication/WalStreamReplicator.ts index a806ff793..2d8c482c6 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicator.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicator.ts @@ -48,4 +48,30 @@ export class WalStreamReplicator extends replication.AbstractReplicator { + const lag = await super.getReplicationLagMillis(); + if (lag != null) { + return lag; + } + + // Booting or in an error loop. Check last active replication status. + // This includes sync rules in an ERROR state. + const content = await this.storage.getActiveSyncRulesContent(); + if (content == null) { + return undefined; + } + // Measure the lag from the last commit or keepalive timestamp. + // This is not 100% accurate since it is the commit time in the storage db rather than + // the source db, but it's the best we have for postgres. + + const checkpointTs = content.last_checkpoint_ts?.getTime() ?? 0; + const keepaliveTs = content.last_keepalive_ts?.getTime() ?? 0; + const latestTs = Math.max(checkpointTs, keepaliveTs); + if (latestTs != 0) { + return Date.now() - latestTs; + } + + return undefined; + } } diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index 03a818799..e9c07c85b 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -19,6 +19,8 @@ export function testRules(content: string): storage.PersistedSyncRulesContent { id: 1, sync_rules_content: content, slot_name: 'test', + active: true, + last_checkpoint_lsn: '', parsed(options) { return { id: 1, diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index 38a37d59f..4c92516a3 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -47,7 +47,7 @@ export interface RouteAPI { * @returns The replication lag: that is the amount of data which has not been * replicated yet, in bytes. */ - getReplicationLag(options: ReplicationLagOptions): Promise; + getReplicationLagBytes(options: ReplicationLagOptions): Promise; /** * Get the current LSN or equivalent replication HEAD position identifier. diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 3d562e5fd..a4c6af3c0 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -78,7 +78,7 @@ export async function getSyncRulesStatus( if (systemStorage) { try { - replication_lag_bytes = await apiHandler.getReplicationLag({ + replication_lag_bytes = await apiHandler.getReplicationLagBytes({ bucketStorage: systemStorage }); } catch (e) { diff --git a/packages/service-core/src/metrics/open-telemetry/OpenTelemetryMetricsFactory.ts b/packages/service-core/src/metrics/open-telemetry/OpenTelemetryMetricsFactory.ts index f90b972c9..21604e070 100644 --- a/packages/service-core/src/metrics/open-telemetry/OpenTelemetryMetricsFactory.ts +++ b/packages/service-core/src/metrics/open-telemetry/OpenTelemetryMetricsFactory.ts @@ -1,11 +1,11 @@ import { Meter, ValueType } from '@opentelemetry/api'; import { Counter, - ObservableGauge, - UpDownCounter, MetricMetadata, MetricsFactory, - Precision + ObservableGauge, + Precision, + UpDownCounter } from '../metrics-interfaces.js'; export class OpenTelemetryMetricsFactory implements MetricsFactory { diff --git a/packages/service-core/src/replication/AbstractReplicationJob.ts b/packages/service-core/src/replication/AbstractReplicationJob.ts index e38cbd3fb..02f03bad5 100644 --- a/packages/service-core/src/replication/AbstractReplicationJob.ts +++ b/packages/service-core/src/replication/AbstractReplicationJob.ts @@ -78,4 +78,9 @@ export abstract class AbstractReplicationJob { public get isStopped(): boolean { return this.abortController.signal.aborted; } + + /** + * Get replication lag for this job in ms. + */ + abstract getReplicationLagMillis(): Promise; } diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index cb6cc870f..a475124d0 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -8,6 +8,7 @@ import { AbstractReplicationJob } from './AbstractReplicationJob.js'; import { ErrorRateLimiter } from './ErrorRateLimiter.js'; import { ConnectionTestResult } from './ReplicationModule.js'; import { MetricsEngine } from '../metrics/MetricsEngine.js'; +import { ReplicationMetric } from '@powersync/service-types'; // 5 minutes const PING_INTERVAL = 1_000_000_000n * 300n; @@ -42,6 +43,11 @@ export abstract class AbstractReplicator(); + /** + * Used for replication lag computation. + */ + private activeReplicationJob: T | undefined = undefined; + private stopped = false; // First ping is only after 5 minutes, not when starting @@ -87,6 +93,17 @@ export abstract class AbstractReplicator { + const lag = await this.getReplicationLagMillis().catch((e) => { + this.logger.error('Failed to get replication lag', e); + return undefined; + }); + if (lag == null) { + return undefined; + } + // ms to seconds + return Math.round(lag / 1000); + }); } public async stop(): Promise { @@ -161,8 +178,12 @@ export abstract class AbstractReplicator(this.replicationJobs.entries()); const replicatingSyncRules = await this.storage.getReplicatingSyncRules(); const newJobs = new Map(); + let activeJob: T | undefined = undefined; for (let syncRules of replicatingSyncRules) { const existingJob = existingJobs.get(syncRules.id); + if (syncRules.active && activeJob == null) { + activeJob = existingJob; + } if (existingJob && !existingJob.isStopped) { // No change existingJobs.delete(syncRules.id); @@ -188,6 +209,9 @@ export abstract class AbstractReplicator; + + /** + * Measure replication lag in milliseconds. + * + * In general, this is the difference between now() and the time the oldest record, that we haven't committed yet, + * has been written (committed) to the source database. + * + * This is roughly a measure of the _average_ amount of time we're behind. + * If we get a new change as soon as each previous one has finished processing, and each change takes 1000ms + * to process, the average replication lag will be 500ms, not 1000ms. + * + * 1. When we are actively replicating, this is the difference between now and when the time the change was + * written to the source database. + * 2. When the replication stream is idle, this is either 0, or the delay for keepalive messages to make it to us. + * 3. When the active replication stream is an error state, this is the time since the last successful commit. + * 4. If there is no active replication stream, this is undefined. + * + * "processing" replication streams are not taken into account for this metric. + */ + async getReplicationLagMillis(): Promise { + return this.activeReplicationJob?.getReplicationLagMillis(); + } } diff --git a/packages/service-core/src/replication/replication-metrics.ts b/packages/service-core/src/replication/replication-metrics.ts index d589696ec..eed0a3793 100644 --- a/packages/service-core/src/replication/replication-metrics.ts +++ b/packages/service-core/src/replication/replication-metrics.ts @@ -26,6 +26,11 @@ export function createCoreReplicationMetrics(engine: MetricsEngine): void { name: ReplicationMetric.CHUNKS_REPLICATED, description: 'Total number of replication chunks' }); + + engine.createObservableGauge({ + name: ReplicationMetric.REPLICATION_LAG_SECONDS, + description: 'Replication lag between the source database and PowerSync instance' + }); } /** @@ -42,4 +47,6 @@ export function initializeCoreReplicationMetrics(engine: MetricsEngine): void { rows_replicated_total.add(0); transactions_replicated_total.add(0); chunks_replicated_total.add(0); + // REPLICATION_LAG_SECONDS is not explicitly initialized - the value remains "unknown" until the first value + // is reported. } diff --git a/packages/service-core/src/routes/endpoints/admin.ts b/packages/service-core/src/routes/endpoints/admin.ts index 2ec4fb492..b4064f328 100644 --- a/packages/service-core/src/routes/endpoints/admin.ts +++ b/packages/service-core/src/routes/endpoints/admin.ts @@ -168,6 +168,8 @@ export const validate = routeDefinition({ // Dummy values id: 0, slot_name: '', + active: false, + last_checkpoint_lsn: '', parsed() { return { diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index e8dfc1c5f..da256bf07 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -7,7 +7,8 @@ import { BatchedCustomWriteCheckpointOptions } from './storage-index.js'; import { InternalOpId } from '../util/utils.js'; export const DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS: ResolvedBucketBatchCommitOptions = { - createEmptyCheckpoints: true + createEmptyCheckpoints: true, + oldestUncommittedChange: null }; export interface BucketStorageBatch extends ObserverClient, AsyncDisposable { @@ -44,6 +45,8 @@ export interface BucketStorageBatch extends ObserverClient; @@ -154,6 +157,13 @@ export interface BucketBatchCommitOptions { * Defaults to true. */ createEmptyCheckpoints?: boolean; + + /** + * The timestamp of the first change in this batch, according to the source database. + * + * Used to estimate replication lag. + */ + oldestUncommittedChange?: Date | null; } export type ResolvedBucketBatchCommitOptions = Required; diff --git a/packages/service-core/src/storage/PersistedSyncRulesContent.ts b/packages/service-core/src/storage/PersistedSyncRulesContent.ts index e1d5a4654..0b1024bab 100644 --- a/packages/service-core/src/storage/PersistedSyncRulesContent.ts +++ b/packages/service-core/src/storage/PersistedSyncRulesContent.ts @@ -9,6 +9,12 @@ export interface PersistedSyncRulesContent { readonly id: number; readonly sync_rules_content: string; readonly slot_name: string; + /** + * True if this is the "active" copy of the sync rules. + */ + readonly active: boolean; + + readonly last_checkpoint_lsn: string | null; readonly last_fatal_error?: string | null; readonly last_keepalive_ts?: Date | null; diff --git a/packages/types/src/metrics.ts b/packages/types/src/metrics.ts index 360c0f961..3f67ef53a 100644 --- a/packages/types/src/metrics.ts +++ b/packages/types/src/metrics.ts @@ -15,7 +15,10 @@ export enum ReplicationMetric { // Total number of replicated transactions. Not used for pricing. TRANSACTIONS_REPLICATED = 'powersync_transactions_replicated_total', // Total number of replication chunks. Not used for pricing. - CHUNKS_REPLICATED = 'powersync_chunks_replicated_total' + CHUNKS_REPLICATED = 'powersync_chunks_replicated_total', + // Replication lag between the source database and PowerSync instance (estimated). + // This is estimated, and may have delays in reporting. + REPLICATION_LAG_SECONDS = 'powersync_replication_lag_seconds' } export enum StorageMetric {