From 52bc6153110ca7df73cfdea8dcce5ccf3a318a31 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 13 Nov 2024 13:18:41 +0200 Subject: [PATCH 1/2] Avoid current_data document storage for MongoDB. --- .../src/replication/ChangeStream.ts | 203 +++++++++--------- .../src/replication/BinLogStream.ts | 4 +- .../src/replication/WalStream.ts | 104 ++++----- .../service-core/src/storage/BucketStorage.ts | 10 + .../src/storage/mongo/MongoBucketBatch.ts | 128 ++++++----- .../storage/mongo/MongoSyncBucketStorage.ts | 25 +-- .../src/storage/mongo/OperationBatch.ts | 11 +- 7 files changed, 267 insertions(+), 218 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 4e3548f12..da9f9b628 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -191,7 +191,7 @@ export class ChangeStream { }); try { await this.storage.startBatch( - { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, + { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, async (batch) => { // Start by resolving all tables. // This checks postImage configuration, and that should fail as @@ -515,124 +515,127 @@ export class ChangeStream { // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); - await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, async (batch) => { - const lastLsn = batch.lastCheckpointLsn; - const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined; - logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); + await this.storage.startBatch( + { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, + async (batch) => { + const lastLsn = batch.lastCheckpointLsn; + const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined; + logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); - // TODO: Use changeStreamSplitLargeEvent + // TODO: Use changeStreamSplitLargeEvent - const filters = this.getSourceNamespaceFilters(); + const filters = this.getSourceNamespaceFilters(); - const pipeline: mongo.Document[] = [ - { - $match: filters.$match - } - ]; - - let fullDocument: 'required' | 'updateLookup'; - - if (this.usePostImages) { - // 'read_only' or 'auto_configure' - // Configuration happens during snapshot, or when we see new - // collections. - fullDocument = 'required'; - } else { - fullDocument = 'updateLookup'; - } - - const streamOptions: mongo.ChangeStreamOptions = { - startAtOperationTime: startAfter, - showExpandedEvents: true, - useBigInt64: true, - maxAwaitTimeMS: 200, - fullDocument: fullDocument - }; - let stream: mongo.ChangeStream; - if (filters.multipleDatabases) { - // Requires readAnyDatabase@admin on Atlas - stream = this.client.watch(pipeline, streamOptions); - } else { - // Same general result, but requires less permissions than the above - stream = this.defaultDb.watch(pipeline, streamOptions); - } + const pipeline: mongo.Document[] = [ + { + $match: filters.$match + } + ]; - if (this.abort_signal.aborted) { - stream.close(); - return; - } + let fullDocument: 'required' | 'updateLookup'; - this.abort_signal.addEventListener('abort', () => { - stream.close(); - }); + if (this.usePostImages) { + // 'read_only' or 'auto_configure' + // Configuration happens during snapshot, or when we see new + // collections. + fullDocument = 'required'; + } else { + fullDocument = 'updateLookup'; + } - let waitForCheckpointLsn: string | null = null; + const streamOptions: mongo.ChangeStreamOptions = { + startAtOperationTime: startAfter, + showExpandedEvents: true, + useBigInt64: true, + maxAwaitTimeMS: 200, + fullDocument: fullDocument + }; + let stream: mongo.ChangeStream; + if (filters.multipleDatabases) { + // Requires readAnyDatabase@admin on Atlas + stream = this.client.watch(pipeline, streamOptions); + } else { + // Same general result, but requires less permissions than the above + stream = this.defaultDb.watch(pipeline, streamOptions); + } - while (true) { if (this.abort_signal.aborted) { - break; + stream.close(); + return; } - const changeDocument = await stream.tryNext(); + this.abort_signal.addEventListener('abort', () => { + stream.close(); + }); - if (changeDocument == null || this.abort_signal.aborted) { - continue; - } - await touch(); + let waitForCheckpointLsn: string | null = null; - if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) { - continue; - } + while (true) { + if (this.abort_signal.aborted) { + break; + } - // console.log('event', changeDocument); + const changeDocument = await stream.tryNext(); - if ( - (changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace') && - changeDocument.ns.coll == CHECKPOINTS_COLLECTION - ) { - const lsn = getMongoLsn(changeDocument.clusterTime!); - if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { - waitForCheckpointLsn = null; - } - await batch.flush(); - await batch.keepalive(lsn); - } else if ( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' || - changeDocument.operationType == 'delete' - ) { - if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb); - } - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel); - if (table.syncAny) { - await this.writeChange(batch, table, changeDocument); + if (changeDocument == null || this.abort_signal.aborted) { + continue; } - } else if (changeDocument.operationType == 'drop') { - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel); - if (table.syncAny) { - await batch.drop([table]); - this.relation_cache.delete(table.objectId); + await touch(); + + if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) { + continue; } - } else if (changeDocument.operationType == 'rename') { - const relFrom = getMongoRelation(changeDocument.ns); - const relTo = getMongoRelation(changeDocument.to); - const tableFrom = await this.getRelation(batch, relFrom); - if (tableFrom.syncAny) { - await batch.drop([tableFrom]); - this.relation_cache.delete(tableFrom.objectId); + + // console.log('event', changeDocument); + + if ( + (changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace') && + changeDocument.ns.coll == CHECKPOINTS_COLLECTION + ) { + const lsn = getMongoLsn(changeDocument.clusterTime!); + if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { + waitForCheckpointLsn = null; + } + await batch.flush(); + await batch.keepalive(lsn); + } else if ( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'delete' + ) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb); + } + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel); + if (table.syncAny) { + await this.writeChange(batch, table, changeDocument); + } + } else if (changeDocument.operationType == 'drop') { + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel); + if (table.syncAny) { + await batch.drop([table]); + this.relation_cache.delete(table.objectId); + } + } else if (changeDocument.operationType == 'rename') { + const relFrom = getMongoRelation(changeDocument.ns); + const relTo = getMongoRelation(changeDocument.to); + const tableFrom = await this.getRelation(batch, relFrom); + if (tableFrom.syncAny) { + await batch.drop([tableFrom]); + this.relation_cache.delete(tableFrom.objectId); + } + // Here we do need to snapshot the new table + const collection = await this.getCollectionInfo(relTo.schema, relTo.name); + await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection }); } - // Here we do need to snapshot the new table - const collection = await this.getCollectionInfo(relTo.schema, relTo.name); - await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection }); } } - }); + ); } } diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 44e594f6d..d5e0c43ad 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -260,7 +260,7 @@ AND table_type = 'BASE TABLE';`, await promiseConnection.query('START TRANSACTION'); const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema }, + { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { for (let tablePattern of sourceTables) { const tables = await this.getQualifiedTableNames(batch, tablePattern); @@ -383,7 +383,7 @@ AND table_type = 'BASE TABLE';`, if (!this.stopped) { await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema }, + { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { const zongji = this.connections.createBinlogListener(); diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 06d9868d2..67436646d 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -341,17 +341,20 @@ WHERE oid = $1::regclass`, async initialReplication(db: pgwire.PgConnection, lsn: string) { const sourceTables = this.sync_rules.getSourceTables(); - await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => { - for (let tablePattern of sourceTables) { - const tables = await this.getQualifiedTableNames(batch, db, tablePattern); - for (let table of tables) { - await this.snapshotTable(batch, db, table); - await batch.markSnapshotDone([table], lsn); - await touch(); + await this.storage.startBatch( + { zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true }, + async (batch) => { + for (let tablePattern of sourceTables) { + const tables = await this.getQualifiedTableNames(batch, db, tablePattern); + for (let table of tables) { + await this.snapshotTable(batch, db, table); + await batch.markSnapshotDone([table], lsn); + await touch(); + } } + await batch.commit(lsn); } - await batch.commit(lsn); - }); + ); } static *getQueryData(results: Iterable): Generator { @@ -577,55 +580,58 @@ WHERE oid = $1::regclass`, // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); - await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => { - // Replication never starts in the middle of a transaction - let inTx = false; - let count = 0; + await this.storage.startBatch( + { zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true }, + async (batch) => { + // Replication never starts in the middle of a transaction + let inTx = false; + let count = 0; - for await (const chunk of replicationStream.pgoutputDecode()) { - await touch(); + for await (const chunk of replicationStream.pgoutputDecode()) { + await touch(); - if (this.abort_signal.aborted) { - break; - } + if (this.abort_signal.aborted) { + break; + } - // chunkLastLsn may come from normal messages in the chunk, - // or from a PrimaryKeepalive message. - const { messages, lastLsn: chunkLastLsn } = chunk; - for (const msg of messages) { - if (msg.tag == 'relation') { - await this.handleRelation(batch, getPgOutputRelation(msg), true); - } else if (msg.tag == 'begin') { - inTx = true; - } else if (msg.tag == 'commit') { - Metrics.getInstance().transactions_replicated_total.add(1); - inTx = false; - await batch.commit(msg.lsn!); - await this.ack(msg.lsn!, replicationStream); - } else { - if (count % 100 == 0) { - logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`); - } + // chunkLastLsn may come from normal messages in the chunk, + // or from a PrimaryKeepalive message. + const { messages, lastLsn: chunkLastLsn } = chunk; + for (const msg of messages) { + if (msg.tag == 'relation') { + await this.handleRelation(batch, getPgOutputRelation(msg), true); + } else if (msg.tag == 'begin') { + inTx = true; + } else if (msg.tag == 'commit') { + Metrics.getInstance().transactions_replicated_total.add(1); + inTx = false; + await batch.commit(msg.lsn!); + await this.ack(msg.lsn!, replicationStream); + } else { + if (count % 100 == 0) { + logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`); + } - count += 1; - await this.writeChange(batch, msg); + count += 1; + await this.writeChange(batch, msg); + } } - } - if (!inTx) { - // In a transaction, we ack and commit according to the transaction progress. - // Outside transactions, we use the PrimaryKeepalive messages to advance progress. - // Big caveat: This _must not_ be used to skip individual messages, since this LSN - // may be in the middle of the next transaction. - // It must only be used to associate checkpoints with LSNs. - if (await batch.keepalive(chunkLastLsn)) { - await this.ack(chunkLastLsn, replicationStream); + if (!inTx) { + // In a transaction, we ack and commit according to the transaction progress. + // Outside transactions, we use the PrimaryKeepalive messages to advance progress. + // Big caveat: This _must not_ be used to skip individual messages, since this LSN + // may be in the middle of the next transaction. + // It must only be used to associate checkpoints with LSNs. + if (await batch.keepalive(chunkLastLsn)) { + await this.ack(chunkLastLsn, replicationStream); + } } - } - Metrics.getInstance().chunks_replicated_total.add(1); + Metrics.getInstance().chunks_replicated_total.add(1); + } } - }); + ); } async ack(lsn: string, replicationStream: pgwire.ReplicationStream) { diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 96f16ad56..735367700 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -201,6 +201,16 @@ export interface BucketDataBatchOptions { export interface StartBatchOptions extends ParseSyncRulesOptions { zeroLSN: string; + /** + * Whether or not to store a copy of the current data. + * + * This is needed if we need to apply partial updates, for example + * when we get TOAST values from Postgres. + * + * This is not needed when we get the full document from the source + * database, for example from MongoDB. + */ + storeCurrentData: boolean; } export interface SyncRulesBucketStorageListener extends DisposableListener { diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index 20e200500..54bb81443 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -33,6 +33,16 @@ const MAX_ROW_SIZE = 15 * 1024 * 1024; // In the future, we can investigate allowing multiple replication streams operating independently. const replicationMutex = new util.Mutex(); +export interface MongoBucketBatchOptions { + db: PowerSyncMongo; + syncRules: SqlSyncRules; + groupId: number; + slotName: string; + lastCheckpointLsn: string | null; + noCheckpointBeforeLsn: string; + storeCurrentData: boolean; +} + export class MongoBucketBatch extends DisposableObserver implements BucketStorageBatch { private readonly client: mongo.MongoClient; public readonly db: PowerSyncMongo; @@ -42,6 +52,7 @@ export class MongoBucketBatch extends DisposableObserver { - // 1. Find sizes of current_data documents, to assist in intelligent batching without - // exceeding memory limits. - // - // A previous attempt tried to do batching by the results of the current_data query - // (automatically limited to 48MB(?) per batch by MongoDB). The issue is that it changes - // the order of processing, which then becomes really tricky to manage. - // This now takes 2+ queries, but doesn't have any issues with order of operations. - const sizeLookups: SourceKey[] = batch.batch.map((r) => { - return { g: this.group_id, t: r.record.sourceTable.id, k: r.beforeId }; - }); + let sizes: Map | undefined = undefined; + if (this.storeCurrentData) { + // We skip this step if we don't store current_data, since the sizes will + // always be small in that case. + + // Find sizes of current_data documents, to assist in intelligent batching without + // exceeding memory limits. + // + // A previous attempt tried to do batching by the results of the current_data query + // (automatically limited to 48MB(?) per batch by MongoDB). The issue is that it changes + // the order of processing, which then becomes really tricky to manage. + // This now takes 2+ queries, but doesn't have any issues with order of operations. + const sizeLookups: SourceKey[] = batch.batch.map((r) => { + return { g: this.group_id, t: r.record.sourceTable.id, k: r.beforeId }; + }); - const sizes = new Map(); + sizes = new Map(); - const sizeCursor: mongo.AggregationCursor<{ _id: SourceKey; size: number }> = this.db.current_data.aggregate( - [ - { - $match: { - _id: { $in: sizeLookups } - } - }, - { - $project: { - _id: 1, - size: { $bsonSize: '$$ROOT' } + const sizeCursor: mongo.AggregationCursor<{ _id: SourceKey; size: number }> = this.db.current_data.aggregate( + [ + { + $match: { + _id: { $in: sizeLookups } + } + }, + { + $project: { + _id: 1, + size: { $bsonSize: '$$ROOT' } + } } - } - ], - { session } - ); - for await (let doc of sizeCursor.stream()) { - const key = cacheKey(doc._id.t, doc._id.k); - sizes.set(key, doc.size); + ], + { session } + ); + for await (let doc of sizeCursor.stream()) { + const key = cacheKey(doc._id.t, doc._id.k); + sizes.set(key, doc.size); + } } // If set, we need to start a new transaction with this batch. @@ -181,6 +192,7 @@ export class MongoBucketBatch extends DisposableObserver 16MB. afterData = new bson.Binary(bson.serialize(after!)); diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 72e007714..88da4540d 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -53,7 +53,7 @@ export class MongoSyncBucketStorage } }); - private parsedSyncRulesCache: {parsed: SqlSyncRules, options: ParseSyncRulesOptions} | undefined; + private parsedSyncRulesCache: { parsed: SqlSyncRules; options: ParseSyncRulesOptions } | undefined; private writeCheckpointAPI: WriteCheckpointAPI; constructor( @@ -104,13 +104,13 @@ export class MongoSyncBucketStorage } getParsedSyncRules(options: ParseSyncRulesOptions): SqlSyncRules { - const {parsed, options: cachedOptions} = this.parsedSyncRulesCache ?? {}; + const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {}; /** * Check if the cached sync rules, if present, had the same options. * Parse sync rules if the options are different or if there is no cached value. */ - if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema ) { - this.parsedSyncRulesCache = {parsed: this.sync_rules.parsed(options).sync_rules, options}; + if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) { + this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options }; } return this.parsedSyncRulesCache!.parsed; @@ -141,14 +141,15 @@ export class MongoSyncBucketStorage ); const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null; - await using batch = new MongoBucketBatch( - this.db, - this.sync_rules.parsed(options).sync_rules, - this.group_id, - this.slot_name, - checkpoint_lsn, - doc?.no_checkpoint_before ?? options.zeroLSN - ); + await using batch = new MongoBucketBatch({ + db: this.db, + syncRules: this.sync_rules.parsed(options).sync_rules, + groupId: this.group_id, + slotName: this.slot_name, + lastCheckpointLsn: checkpoint_lsn, + noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN, + storeCurrentData: options.storeCurrentData + }); this.iterateListeners((cb) => cb.batchStarted?.(batch)); await callback(batch); diff --git a/packages/service-core/src/storage/mongo/OperationBatch.ts b/packages/service-core/src/storage/mongo/OperationBatch.ts index 4bcc0e64a..127562d54 100644 --- a/packages/service-core/src/storage/mongo/OperationBatch.ts +++ b/packages/service-core/src/storage/mongo/OperationBatch.ts @@ -43,7 +43,16 @@ export class OperationBatch { return this.batch.length >= MAX_BATCH_COUNT || this.currentSize > MAX_RECORD_BATCH_SIZE; } - *batched(sizes: Map): Generator { + /** + * + * @param sizes Map of source key to estimated size of the current_data document, or undefined if current_data is not persisted. + * + */ + *batched(sizes: Map | undefined): Generator { + if (sizes == null) { + yield this.batch; + return; + } let currentBatch: RecordOperation[] = []; let currentBatchSize = 0; for (let op of this.batch) { From 812b5c4f098f5e1b7493fcb8acb0d4087d2b155a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 13 Nov 2024 13:24:35 +0200 Subject: [PATCH 2/2] Fix tests. --- packages/service-core/test/src/util.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/service-core/test/src/util.ts b/packages/service-core/test/src/util.ts index 421879d3c..138ee5d03 100644 --- a/packages/service-core/test/src/util.ts +++ b/packages/service-core/test/src/util.ts @@ -51,7 +51,8 @@ export const PARSE_OPTIONS: ParseSyncRulesOptions = { export const BATCH_OPTIONS: StartBatchOptions = { ...PARSE_OPTIONS, - zeroLSN: ZERO_LSN + zeroLSN: ZERO_LSN, + storeCurrentData: true }; export function testRules(content: string): PersistedSyncRulesContent {