From 8514fa361917709a14cf78c7a0132a797a7a7e26 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 15:05:30 +0200 Subject: [PATCH 1/8] Refactor replica_id handling for MongoDB source database. --- .../implementation/MongoSyncBucketStorage.ts | 67 ++++++++++++------- .../src/replication/ChangeStream.ts | 54 +++++++++------ .../src/replication/MongoRelation.ts | 10 ++- .../src/replication/BinLogStream.ts | 3 +- .../src/storage/PostgresSyncRulesStorage.ts | 57 +++++++++++----- .../src/types/models/SourceTable.ts | 2 +- .../service-core/src/storage/SourceEntity.ts | 8 ++- .../service-core/src/storage/SourceTable.ts | 2 +- 8 files changed, 136 insertions(+), 67 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index ce8ab8677..0d0187ef8 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -13,6 +13,7 @@ import { CheckpointChanges, GetCheckpointChangesOptions, ReplicationCheckpoint, + SourceTable, storage, utils, WatchWriteCheckpointOptions @@ -23,7 +24,14 @@ import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; import * as timers from 'timers/promises'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; -import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleCheckpointState, SyncRuleDocument } from './models.js'; +import { + BucketDataDocument, + BucketDataKey, + SourceKey, + SourceTableDocument, + SyncRuleCheckpointState, + SyncRuleDocument +} from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; @@ -163,17 +171,17 @@ export class MongoSyncBucketStorage let result: storage.ResolveTableResult | null = null; await this.db.client.withSession(async (session) => { const col = this.db.source_tables; - let doc = await col.findOne( - { - group_id: group_id, - connection_id: connection_id, - relation_id: objectId, - schema_name: schema, - table_name: table, - replica_id_columns2: columns - }, - { session } - ); + let filter: Partial = { + group_id: group_id, + connection_id: connection_id, + schema_name: schema, + table_name: table, + replica_id_columns2: columns + }; + if (objectId != null) { + filter.relation_id = objectId; + } + let doc = await col.findOne(filter, { session }); if (doc == null) { doc = { _id: new bson.ObjectId(), @@ -202,31 +210,40 @@ export class MongoSyncBucketStorage sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable); + let dropTables: storage.SourceTable[] = []; + // Detect tables that are either renamed, or have different replica_id_columns + let truncateFilter = [{ schema_name: schema, table_name: table }] as any[]; + if (objectId != null) { + // Only detect renames if the source uses relation ids. + truncateFilter.push({ relation_id: objectId }); + } const truncate = await col .find( { group_id: group_id, connection_id: connection_id, _id: { $ne: doc._id }, - $or: [{ relation_id: objectId }, { schema_name: schema, table_name: table }] + $or: truncateFilter }, { session } ) .toArray(); + dropTables = truncate.map( + (doc) => + new storage.SourceTable( + doc._id, + connection_tag, + doc.relation_id, + doc.schema_name, + doc.table_name, + doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [], + doc.snapshot_done ?? true + ) + ); + result = { table: sourceTable, - dropTables: truncate.map( - (doc) => - new storage.SourceTable( - doc._id, - connection_tag, - doc.relation_id ?? 0, - doc.schema_name, - doc.table_name, - doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [], - doc.snapshot_done ?? true - ) - ) + dropTables: dropTables }; }); return result!; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index c3ccef811..2a297c6be 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -14,7 +14,7 @@ import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; import { MongoManager } from './MongoManager.js'; -import { constructAfterRecord, createCheckpoint, getMongoRelation } from './MongoRelation.js'; +import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -130,12 +130,7 @@ export class ChangeStream { for (let collection of collections) { const table = await this.handleRelation( batch, - { - name: collection.name, - schema, - objectId: collection.name, - replicationColumns: [{ name: '_id' }] - } as SourceEntityDescriptor, + getMongoRelation({ db: schema, coll: collection.name }), // This is done as part of the initial setup - snapshot is handled elsewhere { snapshot: false, collectionInfo: collection } ); @@ -333,9 +328,11 @@ export class ChangeStream { private async getRelation( batch: storage.BucketStorageBatch, - descriptor: SourceEntityDescriptor + descriptor: SourceEntityDescriptor, + options: { snapshot: boolean } ): Promise { - const existing = this.relation_cache.get(descriptor.objectId); + const cacheId = getCacheIdentifier(descriptor); + const existing = this.relation_cache.get(cacheId); if (existing != null) { return existing; } @@ -344,7 +341,7 @@ export class ChangeStream { // missing values. const collection = await this.getCollectionInfo(descriptor.schema, descriptor.name); - return this.handleRelation(batch, descriptor, { snapshot: false, collectionInfo: collection }); + return this.handleRelation(batch, descriptor, { snapshot: options.snapshot, collectionInfo: collection }); } private async getCollectionInfo(db: string, name: string): Promise { @@ -406,8 +403,14 @@ export class ChangeStream { }); this.relation_cache.set(descriptor.objectId, result.table); - // Drop conflicting tables. This includes for example renamed tables. - await batch.drop(result.dropTables); + // Drop conflicting collections. + // This is generally not expected for MongoDB source dbs, so we log an error. + if (result.dropTables.length > 0) { + logger.error( + `Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}` + ); + await batch.drop(result.dropTables); + } // Snapshot if: // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere) @@ -596,7 +599,6 @@ export class ChangeStream { } const originalChangeDocument = await stream.tryNext(); - // The stream was closed, we will only ever receive `null` from it if (!originalChangeDocument && stream.closed) { break; @@ -682,28 +684,42 @@ export class ChangeStream { waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb); } const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel); + const table = await this.getRelation(batch, rel, { + // In most cases, we should not need to snapshot this. But if this is the first time we see the collection + // for whatever reason, then we do need to snapshot it. + snapshot: true + }); if (table.syncAny) { await this.writeChange(batch, table, changeDocument); } } else if (changeDocument.operationType == 'drop') { const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel); + const table = await this.getRelation(batch, rel, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); if (table.syncAny) { await batch.drop([table]); - this.relation_cache.delete(table.objectId); + this.relation_cache.delete(getCacheIdentifier(rel)); } } else if (changeDocument.operationType == 'rename') { const relFrom = getMongoRelation(changeDocument.ns); const relTo = getMongoRelation(changeDocument.to); - const tableFrom = await this.getRelation(batch, relFrom); + const tableFrom = await this.getRelation(batch, relFrom, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); if (tableFrom.syncAny) { await batch.drop([tableFrom]); - this.relation_cache.delete(tableFrom.objectId); + this.relation_cache.delete(getCacheIdentifier(relFrom)); } // Here we do need to snapshot the new table const collection = await this.getCollectionInfo(relTo.schema, relTo.name); - await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection }); + await this.handleRelation(batch, relTo, { + // This is a new (renamed) collection, so always snapshot it. + snapshot: true, + collectionInfo: collection + }); } } } diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index cc4774a21..62127eb48 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -11,11 +11,19 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S return { name: source.coll, schema: source.db, - objectId: source.coll, + // Not relevant for MongoDB - we use db + coll name as the identifier + objectId: undefined, replicationColumns: [{ name: '_id' }] } satisfies storage.SourceEntityDescriptor; } +/** + * For in-memory cache only. + */ +export function getCacheIdentifier(source: storage.SourceEntityDescriptor): string { + return `${source.schema}.${source.name}`; +} + export function constructAfterRecord(document: mongo.Document): SqliteRow { let record: SqliteRow = {}; for (let key of Object.keys(document)) { diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 116235afc..408ae4c43 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -106,7 +106,8 @@ export class BinLogStream { entity_descriptor: entity, sync_rules: this.syncRules }); - this.tableCache.set(entity.objectId, result.table); + // objectId is always defined for mysql + this.tableCache.set(entity.objectId!, result.table); // Drop conflicting tables. This includes for example renamed tables. await batch.drop(result.dropTables); diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 3edcb6825..89f6e6ed3 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -21,13 +21,14 @@ import * as timers from 'timers/promises'; import * as framework from '@powersync/lib-services-framework'; import { StatementParam } from '@powersync/service-jpgwire'; -import { StoredRelationId } from '../types/models/SourceTable.js'; +import { SourceTableDecoded, StoredRelationId } from '../types/models/SourceTable.js'; import { pick } from '../utils/ts-codec.js'; import { PostgresBucketBatch } from './batch/PostgresBucketBatch.js'; import { PostgresWriteCheckpointAPI } from './checkpoints/PostgresWriteCheckpointAPI.js'; import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js'; import { PostgresCompactor } from './PostgresCompactor.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; +import { Decoded } from 'ts-codec'; export type PostgresSyncRulesStorageOptions = { factory: PostgresBucketStorageFactory; @@ -225,25 +226,47 @@ export class PostgresSyncRulesStorage sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable); - const truncatedTables = await db.sql` - SELECT - * - FROM - source_tables - WHERE - group_id = ${{ type: 'int4', value: group_id }} - AND connection_id = ${{ type: 'int4', value: connection_id }} - AND id != ${{ type: 'varchar', value: sourceTableRow!.id }} - AND ( - relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} - OR ( + let truncatedTables: SourceTableDecoded[] = []; + if (objectId != null) { + // relation_id present - check for renamed tables + truncatedTables = await db.sql` + SELECT + * + FROM + source_tables + WHERE + group_id = ${{ type: 'int4', value: group_id }} + AND connection_id = ${{ type: 'int4', value: connection_id }} + AND id != ${{ type: 'varchar', value: sourceTableRow!.id }} + AND ( + relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} + OR ( + schema_name = ${{ type: 'varchar', value: schema }} + AND table_name = ${{ type: 'varchar', value: table }} + ) + ) + ` + .decoded(models.SourceTable) + .rows(); + } else { + // relation_id not present - only check for changed replica_id_columns + truncatedTables = await db.sql` + SELECT + * + FROM + source_tables + WHERE + group_id = ${{ type: 'int4', value: group_id }} + AND connection_id = ${{ type: 'int4', value: connection_id }} + AND id != ${{ type: 'varchar', value: sourceTableRow!.id }} + AND ( schema_name = ${{ type: 'varchar', value: schema }} AND table_name = ${{ type: 'varchar', value: table }} ) - ) - ` - .decoded(models.SourceTable) - .rows(); + ` + .decoded(models.SourceTable) + .rows(); + } return { table: sourceTable, diff --git a/modules/module-postgres-storage/src/types/models/SourceTable.ts b/modules/module-postgres-storage/src/types/models/SourceTable.ts index 1673bc959..9613fd3b3 100644 --- a/modules/module-postgres-storage/src/types/models/SourceTable.ts +++ b/modules/module-postgres-storage/src/types/models/SourceTable.ts @@ -2,7 +2,7 @@ import * as t from 'ts-codec'; import { bigint, jsonb, jsonb_raw, pgwire_number } from '../codecs.js'; export type StoredRelationId = { - object_id: string | number; + object_id: string | number | undefined; }; export const ColumnDescriptor = t.object({ diff --git a/packages/service-core/src/storage/SourceEntity.ts b/packages/service-core/src/storage/SourceEntity.ts index 2b0031831..1de253885 100644 --- a/packages/service-core/src/storage/SourceEntity.ts +++ b/packages/service-core/src/storage/SourceEntity.ts @@ -13,9 +13,13 @@ export interface ColumnDescriptor { // TODO: This needs to be consolidated with SourceTable into something new. export interface SourceEntityDescriptor { /** - * The internal id of the data source structure in the database + * The internal id of the data source structure in the database. + * + * If undefined, the schema and name are used as the identifier. + * + * If specified, this is specifically used to detect renames. */ - objectId: number | string; + objectId: number | string | undefined; schema: string; name: string; replicationColumns: ColumnDescriptor[]; diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index a2c2f65e7..ab415c913 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -35,7 +35,7 @@ export class SourceTable { constructor( public readonly id: any, public readonly connectionTag: string, - public readonly objectId: number | string, + public readonly objectId: number | string | undefined, public readonly schema: string, public readonly table: string, From a1904fd06ec5ef8ff2a3bf89e6ce456c618d8bec Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 15:20:04 +0200 Subject: [PATCH 2/8] Improve log output. --- .../src/replication/ChangeStream.ts | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 2a297c6be..45d6f58af 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -89,6 +89,10 @@ export class ChangeStream { return this.connections.options.postImages == PostImagesOption.AUTO_CONFIGURE; } + private get logPrefix() { + return `[powersync_${this.group_id}]`; + } + /** * This resolves a pattern, persists the related metadata, and returns * the resulting SourceTables. @@ -124,7 +128,7 @@ export class ChangeStream { .toArray(); if (!tablePattern.isWildcard && collections.length == 0) { - logger.warn(`Collection ${schema}.${tablePattern.name} not found`); + logger.warn(`${this.logPrefix} Collection ${schema}.${tablePattern.name} not found`); } for (let collection of collections) { @@ -144,7 +148,7 @@ export class ChangeStream { async initSlot(): Promise { const status = await this.storage.getStatus(); if (status.snapshot_done && status.checkpoint_lsn) { - logger.info(`Initial replication already done`); + logger.info(`${this.logPrefix} Initial replication already done`); return { needsInitialSync: false }; } @@ -215,7 +219,7 @@ export class ChangeStream { } const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime }); - logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); + logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); await batch.commit(lsn); } ); @@ -284,7 +288,7 @@ export class ChangeStream { table: storage.SourceTable, session?: mongo.ClientSession ) { - logger.info(`Replicating ${table.qualifiedName}`); + logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`); const estimatedCount = await this.estimatedCount(table); let at = 0; let lastLogIndex = 0; @@ -314,7 +318,7 @@ export class ChangeStream { at += 1; if (at - lastLogIndex >= 5000) { - logger.info(`[${this.group_id}] Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); + logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); lastLogIndex = at; } Metrics.getInstance().rows_replicated_total.add(1); @@ -323,7 +327,7 @@ export class ChangeStream { } await batch.flush(); - logger.info(`Replicated ${at} documents for ${table.qualifiedName}`); + logger.info(`${this.logPrefix} Replicated ${at} documents for ${table.qualifiedName}`); } private async getRelation( @@ -372,7 +376,7 @@ export class ChangeStream { collMod: collectionInfo.name, changeStreamPreAndPostImages: { enabled: true } }); - logger.info(`Enabled postImages on ${db}.${collectionInfo.name}`); + logger.info(`${this.logPrefix} Enabled postImages on ${db}.${collectionInfo.name}`); } else if (!enabled) { throw new ServiceError(ErrorCode.PSYNC_S1343, `postImages not enabled on ${db}.${collectionInfo.name}`); } @@ -437,7 +441,7 @@ export class ChangeStream { change: mongo.ChangeStreamDocument ): Promise { if (!table.syncAny) { - logger.debug(`Collection ${table.qualifiedName} not used in sync rules - skipping`); + logger.debug(`${this.logPrefix} Collection ${table.qualifiedName} not used in sync rules - skipping`); return null; } @@ -531,7 +535,7 @@ export class ChangeStream { const startAfter = lastLsn?.timestamp; const resumeAfter = lastLsn?.resumeToken; - logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); + logger.info(`${this.logPrefix} Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); const filters = this.getSourceNamespaceFilters(); From c4008e6e6c8e6019f43755378aee6cf08b76c982 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 15:22:02 +0200 Subject: [PATCH 3/8] Fix for postgres storage. --- .../src/storage/PostgresSyncRulesStorage.ts | 50 +++++++++++++------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 89f6e6ed3..24a85f803 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -166,21 +166,39 @@ export class PostgresSyncRulesStorage type_oid: typeof column.typeId !== 'undefined' ? Number(column.typeId) : column.typeId })); return this.db.transaction(async (db) => { - let sourceTableRow = await db.sql` - SELECT - * - FROM - source_tables - WHERE - group_id = ${{ type: 'int4', value: group_id }} - AND connection_id = ${{ type: 'int4', value: connection_id }} - AND relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} - AND schema_name = ${{ type: 'varchar', value: schema }} - AND table_name = ${{ type: 'varchar', value: table }} - AND replica_id_columns = ${{ type: 'jsonb', value: columns }} - ` - .decoded(models.SourceTable) - .first(); + let sourceTableRow: SourceTableDecoded | null; + if (objectId != null) { + sourceTableRow = await db.sql` + SELECT + * + FROM + source_tables + WHERE + group_id = ${{ type: 'int4', value: group_id }} + AND connection_id = ${{ type: 'int4', value: connection_id }} + AND relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} + AND schema_name = ${{ type: 'varchar', value: schema }} + AND table_name = ${{ type: 'varchar', value: table }} + AND replica_id_columns = ${{ type: 'jsonb', value: columns }} + ` + .decoded(models.SourceTable) + .first(); + } else { + sourceTableRow = await db.sql` + SELECT + * + FROM + source_tables + WHERE + group_id = ${{ type: 'int4', value: group_id }} + AND connection_id = ${{ type: 'int4', value: connection_id }} + AND schema_name = ${{ type: 'varchar', value: schema }} + AND table_name = ${{ type: 'varchar', value: table }} + AND replica_id_columns = ${{ type: 'jsonb', value: columns }} + ` + .decoded(models.SourceTable) + .first(); + } if (sourceTableRow == null) { const row = await db.sql` @@ -199,7 +217,7 @@ export class PostgresSyncRulesStorage ${{ type: 'varchar', value: uuid.v4() }}, ${{ type: 'int4', value: group_id }}, ${{ type: 'int4', value: connection_id }}, - --- The objectId can be string | number, we store it as jsonb value + --- The objectId can be string | number | undefined, we store it as jsonb value ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }}, ${{ type: 'varchar', value: schema }}, ${{ type: 'varchar', value: table }}, From 131f52f17b3d565d6e2daa742943bfe4f4ca8a7e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 15:48:41 +0200 Subject: [PATCH 4/8] Fixes. --- modules/module-mongodb/src/replication/ChangeStream.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 45d6f58af..9754539e7 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -395,9 +395,6 @@ export class ChangeStream { } const snapshot = options.snapshot; - if (!descriptor.objectId && typeof descriptor.objectId != 'string') { - throw new ReplicationAssertionError('MongoDB replication - objectId expected'); - } const result = await this.storage.resolveTable({ group_id: this.group_id, connection_id: this.connection_id, @@ -405,7 +402,7 @@ export class ChangeStream { entity_descriptor: descriptor, sync_rules: this.sync_rules }); - this.relation_cache.set(descriptor.objectId, result.table); + this.relation_cache.set(getCacheIdentifier(descriptor), result.table); // Drop conflicting collections. // This is generally not expected for MongoDB source dbs, so we log an error. @@ -422,6 +419,7 @@ export class ChangeStream { // 3. The table is used in sync rules. const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny; if (shouldSnapshot) { + logger.info(`${this.logPrefix} New collection: ${descriptor.schema}.${descriptor.name}`); // Truncate this table, in case a previous snapshot was interrupted. await batch.truncate([result.table]); From 4e3d78cb0fc5f3e7801c8965a7a20e637341c7ef Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 15:48:56 +0200 Subject: [PATCH 5/8] Workaround for Flex instance issue. --- .../src/replication/ChangeStream.ts | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9754539e7..e166fd7db 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -595,6 +595,8 @@ export class ChangeStream { let splitDocument: mongo.ChangeStreamDocument | null = null; + let flexDbNameWorkaroundLogged = false; + while (true) { if (this.abort_signal.aborted) { break; @@ -640,6 +642,31 @@ export class ChangeStream { throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } + console.log('event', changeDocument); + + if ( + !filters.multipleDatabases && + 'ns' in changeDocument && + changeDocument.ns.db != this.defaultDb.databaseName && + changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) + ) { + // When all of the following conditions are met: + // 1. We're replicating from a Flex instance. + // 2. There were changestream events recorded while the PowerSync service is paused. + // 3. We're only replicating from a single database. + // Then we've obeserved an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, + // instead of the expected {db: 'ps'}. + // We correct this. + changeDocument.ns.db = this.defaultDb.databaseName; + + if (!flexDbNameWorkaroundLogged) { + flexDbNameWorkaroundLogged = true; + logger.warn( + `${this.logPrefix} Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` + ); + } + } + if ( (changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || From f8c7b0199ec39a0605f579935348711e8ed49fab Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 15:53:28 +0200 Subject: [PATCH 6/8] Remove log and fix typos. --- modules/module-mongodb/src/replication/ChangeStream.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index e166fd7db..ae0cab1dc 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -642,8 +642,6 @@ export class ChangeStream { throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } - console.log('event', changeDocument); - if ( !filters.multipleDatabases && 'ns' in changeDocument && @@ -651,10 +649,10 @@ export class ChangeStream { changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) ) { // When all of the following conditions are met: - // 1. We're replicating from a Flex instance. + // 1. We're replicating from an Atlas Flex instance. // 2. There were changestream events recorded while the PowerSync service is paused. // 3. We're only replicating from a single database. - // Then we've obeserved an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, + // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, // instead of the expected {db: 'ps'}. // We correct this. changeDocument.ns.db = this.defaultDb.databaseName; From 81d5f6acc88bb98ff23ff6b7d9f08e73771faa27 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 16:14:55 +0200 Subject: [PATCH 7/8] Pre-create collections in tests. --- modules/module-mongodb/src/replication/ChangeStream.ts | 2 ++ modules/module-mongodb/test/src/change_stream.test.ts | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index ae0cab1dc..e7930942f 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -714,6 +714,8 @@ export class ChangeStream { const table = await this.getRelation(batch, rel, { // In most cases, we should not need to snapshot this. But if this is the first time we see the collection // for whatever reason, then we do need to snapshot it. + // This may result in some duplicate operations when a collection is created for the first time after + // sync rules was deployed. snapshot: true }); if (table.syncAny) { diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index a4c461887..e363e8976 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -239,6 +239,7 @@ bucket_definitions: - SELECT _id as id, description FROM "test_DATA" `); + await db.createCollection('test_DATA'); await context.replicateSnapshot(); context.startStreaming(); @@ -261,6 +262,7 @@ bucket_definitions: data: - SELECT _id as id, name, description FROM "test_data" `); + await db.createCollection('test_data'); await context.replicateSnapshot(); context.startStreaming(); @@ -371,6 +373,8 @@ bucket_definitions: - SELECT _id as id, name, other FROM "test_data"`); const { db } = context; + await db.createCollection('test_data'); + await context.replicateSnapshot(); const collection = db.collection('test_data'); @@ -451,6 +455,8 @@ bucket_definitions: const data = await context.getBucketData('global[]'); expect(data).toMatchObject([ + // An extra op here, since this triggers a snapshot in addition to getting the event. + test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }), test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }), test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }) ]); From 070957744a2e7e03ac6dffbf3a658f100d851198 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 24 Feb 2025 16:17:19 +0200 Subject: [PATCH 8/8] Add changeset. --- .changeset/rare-birds-peel.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/rare-birds-peel.md diff --git a/.changeset/rare-birds-peel.md b/.changeset/rare-birds-peel.md new file mode 100644 index 000000000..526aff277 --- /dev/null +++ b/.changeset/rare-birds-peel.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +--- + +Improve handling of some edge cases which could trigger truncating of synced tables.