From c75f480e62679fcd6771c193c3518757358dfe00 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 6 Nov 2024 18:02:02 +0200 Subject: [PATCH 01/10] Add configuration for using mongodb postimages. --- .../src/api/MongoRouteAPIAdapter.ts | 3 +- .../src/replication/ChangeStream.ts | 124 +++++++++++++++--- .../src/replication/MongoRelation.ts | 3 +- .../src/replication/replication-utils.ts | 3 +- modules/module-mongodb/src/types/types.ts | 10 +- 5 files changed, 122 insertions(+), 21 deletions(-) diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index b9b740e24..5acad6eb0 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -7,6 +7,7 @@ import { MongoManager } from '../replication/MongoManager.js'; import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js'; import * as types from '../types/types.js'; import { escapeRegExp } from '../utils.js'; +import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js'; export class MongoRouteAPIAdapter implements api.RouteAPI { protected client: mongo.MongoClient; @@ -202,7 +203,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { let tables: service_types.TableSchema[] = []; for (let collection of collections) { - if (['_powersync_checkpoints'].includes(collection.name)) { + if ([CHECKPOINTS_COLLECTION].includes(collection.name)) { continue; } if (collection.name.startsWith('system.')) { diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9e8a8acab..f7a7676b0 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -11,6 +11,7 @@ import { mongoLsnToTimestamp } from './MongoRelation.js'; import { escapeRegExp } from '../utils.js'; +import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export const ZERO_LSN = '0000000000000000'; @@ -70,7 +71,17 @@ export class ChangeStream { return this.abort_signal.aborted; } - async getQualifiedTableNames( + private get postImages() { + return this.connections.options.postImages; + } + + /** + * This resolves a pattern, persists the related metadata, and returns + * the resulting SourceTables. + * + * This implicitly checks the collection postImage configuration. + */ + async resolveQualifiedTableNames( batch: storage.BucketStorageBatch, tablePattern: TablePattern ): Promise { @@ -94,10 +105,14 @@ export class ChangeStream { { name: nameFilter }, - { nameOnly: true } + { nameOnly: false } ) .toArray(); + if (!tablePattern.isWildcard && collections.length == 0) { + logger.warn(`Collection ${schema}.${tablePattern.name} not found`); + } + for (let collection of collections) { const table = await this.handleRelation( batch, @@ -108,7 +123,7 @@ export class ChangeStream { replicationColumns: [{ name: '_id' }] } as SourceEntityDescriptor, // This is done as part of the initial setup - snapshot is handled elsewhere - { snapshot: false } + { snapshot: false, collectionInfo: collection } ); result.push(table); @@ -165,14 +180,20 @@ export class ChangeStream { await this.storage.startBatch( { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, async (batch) => { + // Start by resolving all tables. + // This checks postImage configuration, and that should fail as + // earlier as possible. + let allSourceTables: SourceTable[] = []; for (let tablePattern of sourceTables) { - const tables = await this.getQualifiedTableNames(batch, tablePattern); - for (let table of tables) { - await this.snapshotTable(batch, table, session); - await batch.markSnapshotDone([table], ZERO_LSN); + const tables = await this.resolveQualifiedTableNames(batch, tablePattern); + allSourceTables.push(...tables); + } - await touch(); - } + for (let table of allSourceTables) { + await this.snapshotTable(batch, table, session); + await batch.markSnapshotDone([table], ZERO_LSN); + + await touch(); } const snapshotTime = session.clusterTime?.clusterTime ?? startTime; @@ -193,10 +214,24 @@ export class ChangeStream { } } + private async setupCheckpointsCollection() { + const collection = await this.getCollectionInfo(this.defaultDb.databaseName, CHECKPOINTS_COLLECTION); + if (collection == null) { + await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { + changeStreamPreAndPostImages: { enabled: true } + }); + } else if (this.postImages != 'updateLookup' && collection.options?.changeStreamPreAndPostImages?.enabled != true) { + await this.defaultDb.command({ + collMod: CHECKPOINTS_COLLECTION, + changeStreamPreAndPostImages: { enabled: true } + }); + } + } + private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { const sourceTables = this.sync_rules.getSourceTables(); - let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: '_powersync_checkpoints' }]; + let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }]; let $refilters: any[] = []; let multipleDatabases = false; for (let tablePattern of sourceTables) { @@ -278,14 +313,60 @@ export class ChangeStream { if (existing != null) { return existing; } - return this.handleRelation(batch, descriptor, { snapshot: false }); + + // Note: collection may have been dropped at this point, so we handle + // missing values. + const collection = await this.getCollectionInfo(descriptor.schema, descriptor.name); + + return this.handleRelation(batch, descriptor, { snapshot: false, collectionInfo: collection }); + } + + private async getCollectionInfo(db: string, name: string): Promise { + const collection = ( + await this.client + .db(db) + .listCollections( + { + name: name + }, + { nameOnly: false } + ) + .toArray() + )[0]; + return collection; + } + + private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) { + if (this.postImages == 'updateLookup') { + // Nothing to check + return; + } + + const enabled = collectionInfo.options?.changeStreamPreAndPostImages?.enabled == true; + + if (!enabled && this.postImages == 'autoConfigure') { + await this.client.db(db).command({ + collMod: collectionInfo.name, + changeStreamPreAndPostImages: { enabled: true } + }); + logger.info(`Enabled postImages on ${db}.${collectionInfo.name}`); + } else if (!enabled) { + throw new Error(`postImages not enabled on ${db}.${collectionInfo.name}`); + } } async handleRelation( batch: storage.BucketStorageBatch, descriptor: SourceEntityDescriptor, - options: { snapshot: boolean } + options: { snapshot: boolean; collectionInfo: mongo.CollectionInfo | undefined } ) { + if (options.collectionInfo != null) { + await this.checkPostImages(descriptor.schema, options.collectionInfo); + } else { + // If collectionInfo is null, the collection may have been dropped. + // Ignore the postImages check in this case. + } + const snapshot = options.snapshot; if (!descriptor.objectId && typeof descriptor.objectId != 'string') { throw new Error('objectId expected'); @@ -388,6 +469,7 @@ export class ChangeStream { async initReplication() { const result = await this.initSlot(); + await this.setupCheckpointsCollection(); if (result.needsInitialSync) { await this.startInitialReplication(); } @@ -412,12 +494,23 @@ export class ChangeStream { } ]; + let fullDocument: 'required' | 'updateLookup'; + + if (this.connections.options.postImages == 'updateLookup') { + fullDocument = 'updateLookup'; + } else { + // 'on' or 'autoConfigure' + // Configuration happens during snapshot + fullDocument = 'required'; + } + console.log({ fullDocument }); + const streamOptions: mongo.ChangeStreamOptions = { startAtOperationTime: startAfter, showExpandedEvents: true, useBigInt64: true, maxAwaitTimeMS: 200, - fullDocument: 'updateLookup' + fullDocument: fullDocument }; let stream: mongo.ChangeStream; if (filters.multipleDatabases) { @@ -461,7 +554,7 @@ export class ChangeStream { (changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || changeDocument.operationType == 'replace') && - changeDocument.ns.coll == '_powersync_checkpoints' + changeDocument.ns.coll == CHECKPOINTS_COLLECTION ) { const lsn = getMongoLsn(changeDocument.clusterTime!); if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { @@ -499,7 +592,8 @@ export class ChangeStream { this.relation_cache.delete(tableFrom.objectId); } // Here we do need to snapshot the new table - await this.handleRelation(batch, relTo, { snapshot: true }); + const collection = await this.getCollectionInfo(relTo.schema, relTo.name); + await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection }); } } }); diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 1a418a6ae..b027b4e98 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -2,6 +2,7 @@ import { storage } from '@powersync/service-core'; import { SqliteRow, SqliteValue, toSyncRulesRow } from '@powersync/service-sync-rules'; import * as mongo from 'mongodb'; import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; +import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor { return { @@ -145,7 +146,7 @@ function filterJsonData(data: any, depth = 0): any { export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise { const session = client.startSession(); try { - const result = await db.collection('_powersync_checkpoints').findOneAndUpdate( + const result = await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { _id: 'checkpoint' as any }, diff --git a/modules/module-mongodb/src/replication/replication-utils.ts b/modules/module-mongodb/src/replication/replication-utils.ts index 71a521d77..5370fdd5e 100644 --- a/modules/module-mongodb/src/replication/replication-utils.ts +++ b/modules/module-mongodb/src/replication/replication-utils.ts @@ -1,6 +1,7 @@ -import * as mongo from 'mongodb'; import { MongoManager } from './MongoManager.js'; +export const CHECKPOINTS_COLLECTION = '_powersync_checkpoints'; + export async function checkSourceConfiguration(connectionManager: MongoManager): Promise { const db = connectionManager.db; const hello = await db.command({ hello: 1 }); diff --git a/modules/module-mongodb/src/types/types.ts b/modules/module-mongodb/src/types/types.ts index ac7873876..e53c0419e 100644 --- a/modules/module-mongodb/src/types/types.ts +++ b/modules/module-mongodb/src/types/types.ts @@ -13,6 +13,8 @@ export interface NormalizedMongoConnectionConfig { username?: string; password?: string; + + postImages: 'on' | 'autoConfigure' | 'updateLookup'; } export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.and( @@ -25,7 +27,9 @@ export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.a uri: t.string, username: t.string.optional(), password: t.string.optional(), - database: t.string.optional() + database: t.string.optional(), + + postImages: t.literal('on').or(t.literal('autoConfigure')).or(t.literal('updateLookup')).optional() }) ); @@ -48,10 +52,10 @@ export function normalizeConnectionConfig(options: MongoConnectionConfig): Norma const base = normalizeMongoConfig(options); return { + ...base, id: options.id ?? 'default', tag: options.tag ?? 'default', - - ...base + postImages: options.postImages ?? 'updateLookup' }; } From e4c60cdff4d229b9d7263772bc6eff3e771cef62 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 7 Nov 2024 12:29:33 +0200 Subject: [PATCH 02/10] Increase timeout between writes in tests. --- .../test/src/change_stream.test.ts | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 2b5c949d4..06d72ce96 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -16,13 +16,9 @@ bucket_definitions: - SELECT _id as id, description FROM "test_data" `; -describe( - 'change stream - mongodb', - function () { - defineChangeStreamTests(MONGO_STORAGE_FACTORY); - }, - { timeout: 20_000 } -); +describe('change stream - mongodb', { timeout: 20_000 }, function () { + defineChangeStreamTests(MONGO_STORAGE_FACTORY); +}); function defineChangeStreamTests(factory: StorageFactory) { test( @@ -46,11 +42,11 @@ bucket_definitions: const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); const test_id = result.insertedId; - await setTimeout(10); + await setTimeout(30); await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - await setTimeout(10); + await setTimeout(30); await collection.replaceOne({ _id: test_id }, { description: 'test3' }); - await setTimeout(10); + await setTimeout(30); await collection.deleteOne({ _id: test_id }); const data = await context.getBucketData('global[]'); @@ -213,7 +209,6 @@ bucket_definitions: await context.replicateSnapshot(); context.startStreaming(); - console.log('insert1', db.databaseName); const collection = db.collection('test_data1'); const result = await collection.insertOne({ description: 'test1' }); const test_id = result.insertedId.toHexString(); From 9c3e7d6432705d03437e674aeb653c62da1ee8dc Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 7 Nov 2024 12:40:06 +0200 Subject: [PATCH 03/10] Use "await using" to simplify tests. --- .../test/src/change_stream.test.ts | 406 +++++++++--------- .../test/src/change_stream_utils.ts | 42 +- 2 files changed, 212 insertions(+), 236 deletions(-) diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 06d72ce96..dd96b9126 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -3,7 +3,7 @@ import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; import { BucketStorageFactory } from '@powersync/service-core'; import * as crypto from 'crypto'; import { describe, expect, test } from 'vitest'; -import { changeStreamTest } from './change_stream_utils.js'; +import { ChangeStreamTestContext } from './change_stream_utils.js'; import * as mongo from 'mongodb'; import { setTimeout } from 'node:timers/promises'; @@ -21,152 +21,143 @@ describe('change stream - mongodb', { timeout: 20_000 }, function () { }); function defineChangeStreamTests(factory: StorageFactory) { - test( - 'replicating basic values', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(` + test('replicating basic values', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: true } - }); - const collection = db.collection('test_data'); - - await context.replicateSnapshot(); - - context.startStreaming(); - - const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); - const test_id = result.insertedId; - await setTimeout(30); - await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - await setTimeout(30); - await collection.replaceOne({ _id: test_id }, { description: 'test3' }); - await setTimeout(30); - await collection.deleteOne({ _id: test_id }); - - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([ - putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), - putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }), - putOp('test_data', { id: test_id.toHexString(), description: 'test3' }), - removeOp('test_data', test_id.toHexString()) - ]); - }) - ); - - test( - 'no fullDocument available', - changeStreamTest(factory, async (context) => { - const { db, client } = context; - await context.updateSyncRules(` + db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: true } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + const test_id = result.insertedId; + await setTimeout(30); + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + await setTimeout(30); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }); + await setTimeout(30); + await collection.deleteOne({ _id: test_id }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test3' }), + removeOp('test_data', test_id.toHexString()) + ]); + }); + + test('no fullDocument available', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db, client } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: false } + db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); }); - const collection = db.collection('test_data'); - - await context.replicateSnapshot(); - - context.startStreaming(); - - const session = client.startSession(); - let test_id: mongo.ObjectId | undefined; - try { - await session.withTransaction(async () => { - const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); - test_id = result.insertedId; - await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); - await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); - await collection.deleteOne({ _id: test_id }, { session }); - }); - } finally { - await session.endSession(); - } - - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([ - putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), - // fullDocument is not available at the point this is replicated, resulting in it treated as a remove - removeOp('test_data', test_id!.toHexString()), - putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), - removeOp('test_data', test_id!.toHexString()) - ]); - }) - ); - - test( - 'replicating case sensitive table', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(` + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // fullDocument is not available at the point this is replicated, resulting in it treated as a remove + removeOp('test_data', test_id!.toHexString()), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }); + + test('replicating case sensitive table', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, description FROM "test_DATA" `); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - context.startStreaming(); + context.startStreaming(); - const collection = db.collection('test_DATA'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); + const collection = db.collection('test_DATA'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_DATA', { id: test_id, description: 'test1' })]); - }) - ); + expect(data).toMatchObject([putOp('test_DATA', { id: test_id, description: 'test1' })]); + }); - test( - 'replicating large values', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(` + test('replicating large values', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, name, description FROM "test_data" `); - await context.replicateSnapshot(); - context.startStreaming(); + await context.replicateSnapshot(); + context.startStreaming(); - const largeDescription = crypto.randomBytes(20_000).toString('hex'); + const largeDescription = crypto.randomBytes(20_000).toString('hex'); - const collection = db.collection('test_data'); - const result = await collection.insertOne({ name: 'test1', description: largeDescription }); - const test_id = result.insertedId; + const collection = db.collection('test_data'); + const result = await collection.insertOne({ name: 'test1', description: largeDescription }); + const test_id = result.insertedId; - await collection.updateOne({ _id: test_id }, { $set: { name: 'test2' } }); + await collection.updateOne({ _id: test_id }, { $set: { name: 'test2' } }); - const data = await context.getBucketData('global[]'); - expect(data.slice(0, 1)).toMatchObject([ - putOp('test_data', { id: test_id.toHexString(), name: 'test1', description: largeDescription }) - ]); - expect(data.slice(1)).toMatchObject([ - putOp('test_data', { id: test_id.toHexString(), name: 'test2', description: largeDescription }) - ]); - }) - ); + const data = await context.getBucketData('global[]'); + expect(data.slice(0, 1)).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), name: 'test1', description: largeDescription }) + ]); + expect(data.slice(1)).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), name: 'test2', description: largeDescription }) + ]); + }); - test( - 'replicating dropCollection', - changeStreamTest(factory, async (context) => { - const { db } = context; - const syncRuleContent = ` + test('replicating dropCollection', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + const syncRuleContent = ` bucket_definitions: global: data: @@ -175,127 +166,118 @@ bucket_definitions: parameters: SELECT _id as id FROM test_data WHERE id = token_parameters.user_id data: [] `; - await context.updateSyncRules(syncRuleContent); - await context.replicateSnapshot(); - context.startStreaming(); - - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); - - await collection.drop(); - - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([ - putOp('test_data', { id: test_id, description: 'test1' }), - removeOp('test_data', test_id) - ]); - }) - ); - - test( - 'replicating renameCollection', - changeStreamTest(factory, async (context) => { - const { db } = context; - const syncRuleContent = ` + await context.updateSyncRules(syncRuleContent); + await context.replicateSnapshot(); + context.startStreaming(); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); + + await collection.drop(); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id, description: 'test1' }), + removeOp('test_data', test_id) + ]); + }); + + test('replicating renameCollection', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + const syncRuleContent = ` bucket_definitions: global: data: - SELECT _id as id, description FROM "test_data1" - SELECT _id as id, description FROM "test_data2" `; - await context.updateSyncRules(syncRuleContent); - await context.replicateSnapshot(); - context.startStreaming(); + await context.updateSyncRules(syncRuleContent); + await context.replicateSnapshot(); + context.startStreaming(); - const collection = db.collection('test_data1'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); + const collection = db.collection('test_data1'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); - await collection.rename('test_data2'); + await collection.rename('test_data2'); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - putOp('test_data1', { id: test_id, description: 'test1' }), - removeOp('test_data1', test_id), - putOp('test_data2', { id: test_id, description: 'test1' }) - ]); - }) - ); + expect(data).toMatchObject([ + putOp('test_data1', { id: test_id, description: 'test1' }), + removeOp('test_data1', test_id), + putOp('test_data2', { id: test_id, description: 'test1' }) + ]); + }); - test( - 'initial sync', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); + test('initial sync', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); - await context.replicateSnapshot(); - context.startStreaming(); + await context.replicateSnapshot(); + context.startStreaming(); - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]); - }) - ); + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]); + }); // Not correctly implemented yet - test.skip( - 'large record', - changeStreamTest(factory, async (context) => { - await context.updateSyncRules(`bucket_definitions: + test.skip('large record', async () => { + await using context = await ChangeStreamTestContext.open(factory); + await context.updateSyncRules(`bucket_definitions: global: data: - SELECT _id as id, description, other FROM "test_data"`); - const { db } = context; - - await context.replicateSnapshot(); - - // 16MB - const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex'); - - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: largeDescription }); - const test_id = result.insertedId; - - await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } }); - context.startStreaming(); - - const data = await context.getBucketData('global[]'); - expect(data.length).toEqual(2); - const row = JSON.parse(data[0].data as string); - delete row.description; - expect(row).toEqual({ id: test_id.toHexString() }); - delete data[0].data; - expect(data[0]).toMatchObject({ - object_id: test_id.toHexString(), - object_type: 'test_data', - op: 'PUT', - op_id: '1' - }); - }) - ); + const { db } = context; + + await context.replicateSnapshot(); + + // 16MB + const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex'); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: largeDescription }); + const test_id = result.insertedId; + + await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } }); + context.startStreaming(); + + const data = await context.getBucketData('global[]'); + expect(data.length).toEqual(2); + const row = JSON.parse(data[0].data as string); + delete row.description; + expect(row).toEqual({ id: test_id.toHexString() }); + delete data[0].data; + expect(data[0]).toMatchObject({ + object_id: test_id.toHexString(), + object_type: 'test_data', + op: 'PUT', + op_id: '1' + }); + }); - test( - 'table not in sync rules', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); + test('table not in sync rules', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - context.startStreaming(); + context.startStreaming(); - const collection = db.collection('test_donotsync'); - const result = await collection.insertOne({ description: 'test' }); + const collection = db.collection('test_donotsync'); + const result = await collection.insertOne({ description: 'test' }); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([]); - }) - ); + expect(data).toMatchObject([]); + }); } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index e533c56ca..2739d1939 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -7,36 +7,26 @@ import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStr import * as mongo from 'mongodb'; import { createCheckpoint } from '@module/replication/MongoRelation.js'; -/** - * Tests operating on the mongo change stream need to configure the stream and manage asynchronous - * replication, which gets a little tricky. - * - * This wraps a test in a function that configures all the context, and tears it down afterwards. - */ -export function changeStreamTest( - factory: () => Promise, - test: (context: ChangeStreamTestContext) => Promise -): () => Promise { - return async () => { - const f = await factory(); - const connectionManager = new MongoManager(TEST_CONNECTION_OPTIONS); - - await clearTestDb(connectionManager.db); - const context = new ChangeStreamTestContext(f, connectionManager); - try { - await test(context); - } finally { - await context.dispose(); - } - }; -} - export class ChangeStreamTestContext { private _walStream?: ChangeStream; private abortController = new AbortController(); private streamPromise?: Promise; public storage?: SyncRulesBucketStorage; + /** + * Tests operating on the mongo change stream need to configure the stream and manage asynchronous + * replication, which gets a little tricky. + * + * This configures all the context, and tears it down afterwards. + */ + static async open(factory: () => Promise) { + const f = await factory(); + const connectionManager = new MongoManager(TEST_CONNECTION_OPTIONS); + + await clearTestDb(connectionManager.db); + return new ChangeStreamTestContext(f, connectionManager); + } + constructor( public factory: BucketStorageFactory, public connectionManager: MongoManager @@ -48,6 +38,10 @@ export class ChangeStreamTestContext { await this.connectionManager.destroy(); } + async [Symbol.asyncDispose]() { + await this.dispose(); + } + get client() { return this.connectionManager.client; } From a6cd6f90a55a15432793f0527ab1402a77ffedfd Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 7 Nov 2024 12:50:16 +0200 Subject: [PATCH 04/10] Add postImage tests. --- .../test/src/change_stream.test.ts | 97 ++++++++++++++++++- .../test/src/change_stream_utils.ts | 5 +- 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index dd96b9126..f32e227e6 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -2,7 +2,7 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js'; import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; import { BucketStorageFactory } from '@powersync/service-core'; import * as crypto from 'crypto'; -import { describe, expect, test } from 'vitest'; +import { afterEach, beforeEach, describe, expect, test } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; import * as mongo from 'mongodb'; import { setTimeout } from 'node:timers/promises'; @@ -58,8 +58,8 @@ bucket_definitions: ]); }); - test('no fullDocument available', async () => { - await using context = await ChangeStreamTestContext.open(factory); + test('updateLookup - no fullDocument available', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: 'updateLookup' }); const { db, client } = context; await context.updateSyncRules(` bucket_definitions: @@ -101,6 +101,97 @@ bucket_definitions: ]); }); + test('postImages - autoConfigure', async () => { + // Similar to the above test, but with postImages enabled. + // This resolves the consistency issue. + await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' }); + const { db, client } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + db.createCollection('test_data', { + // enabled: false here, but autoConfigure will enable it. + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); + }); + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // The postImage helps us get this data + putOp('test_data', { id: test_id!.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }); + + test('postImages - on', async () => { + // Similar to postImages - autoConfigure, but does not auto-configure. + // changeStreamPreAndPostImages must be manually configured. + await using context = await ChangeStreamTestContext.open(factory, { postImages: 'on' }); + const { db, client } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: true } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); + }); + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // The postImage helps us get this data + putOp('test_data', { id: test_id!.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }); + test('replicating case sensitive table', async () => { await using context = await ChangeStreamTestContext.open(factory); const { db } = context; diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 2739d1939..76735d380 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -6,6 +6,7 @@ import { MongoManager } from '@module/replication/MongoManager.js'; import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; import * as mongo from 'mongodb'; import { createCheckpoint } from '@module/replication/MongoRelation.js'; +import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; export class ChangeStreamTestContext { private _walStream?: ChangeStream; @@ -19,9 +20,9 @@ export class ChangeStreamTestContext { * * This configures all the context, and tears it down afterwards. */ - static async open(factory: () => Promise) { + static async open(factory: () => Promise, options?: Partial) { const f = await factory(); - const connectionManager = new MongoManager(TEST_CONNECTION_OPTIONS); + const connectionManager = new MongoManager({ ...TEST_CONNECTION_OPTIONS, ...options }); await clearTestDb(connectionManager.db); return new ChangeStreamTestContext(f, connectionManager); From 0b9d0e70d63561d48feea6bb7e51f7eb881ee707 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 7 Nov 2024 13:41:22 +0200 Subject: [PATCH 05/10] Test and fix wildcard collections. --- .../src/replication/ChangeStream.ts | 9 +++- .../test/src/change_stream.test.ts | 49 +++++++++++++++---- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index f7a7676b0..afdc33cad 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -244,7 +244,10 @@ export class ChangeStream { } if (tablePattern.isWildcard) { - $refilters.push({ db: tablePattern.schema, coll: new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) }); + $refilters.push({ + 'ns.db': tablePattern.schema, + 'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) + }); } else { $inFilters.push({ db: tablePattern.schema, @@ -284,6 +287,8 @@ export class ChangeStream { throw new Error(`Aborted initial replication`); } + at += 1; + const record = constructAfterRecord(document); // This auto-flushes when the batch reaches its size limit @@ -303,6 +308,7 @@ export class ChangeStream { } await batch.flush(); + logger.info(`Replicated ${at} documents for ${table.qualifiedName}`); } private async getRelation( @@ -503,7 +509,6 @@ export class ChangeStream { // Configuration happens during snapshot fullDocument = 'required'; } - console.log({ fullDocument }); const streamOptions: mongo.ChangeStreamOptions = { startAtOperationTime: startAfter, diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index f32e227e6..63b96bf05 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -2,10 +2,10 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js'; import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; import { BucketStorageFactory } from '@powersync/service-core'; import * as crypto from 'crypto'; -import { afterEach, beforeEach, describe, expect, test } from 'vitest'; -import { ChangeStreamTestContext } from './change_stream_utils.js'; import * as mongo from 'mongodb'; import { setTimeout } from 'node:timers/promises'; +import { describe, expect, test } from 'vitest'; +import { ChangeStreamTestContext } from './change_stream_utils.js'; type StorageFactory = () => Promise; @@ -30,8 +30,8 @@ bucket_definitions: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: true } + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } }); const collection = db.collection('test_data'); @@ -58,6 +58,38 @@ bucket_definitions: ]); }); + test('replicating wildcard', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_%"`); + + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + const test_id = result.insertedId; + + await context.replicateSnapshot(); + + context.startStreaming(); + + await setTimeout(30); + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }) + ]); + }); + test('updateLookup - no fullDocument available', async () => { await using context = await ChangeStreamTestContext.open(factory, { postImages: 'updateLookup' }); const { db, client } = context; @@ -67,13 +99,12 @@ bucket_definitions: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { + await db.createCollection('test_data', { changeStreamPreAndPostImages: { enabled: false } }); const collection = db.collection('test_data'); await context.replicateSnapshot(); - context.startStreaming(); const session = client.startSession(); @@ -112,7 +143,7 @@ bucket_definitions: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { + await db.createCollection('test_data', { // enabled: false here, but autoConfigure will enable it. changeStreamPreAndPostImages: { enabled: false } }); @@ -158,7 +189,7 @@ bucket_definitions: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { + await db.createCollection('test_data', { changeStreamPreAndPostImages: { enabled: true } }); const collection = db.collection('test_data'); @@ -355,7 +386,7 @@ bucket_definitions: }); }); - test('table not in sync rules', async () => { + test('collection not in sync rules', async () => { await using context = await ChangeStreamTestContext.open(factory); const { db } = context; await context.updateSyncRules(BASIC_SYNC_RULES); From ef85649c328b3aea1a65ce35fc7f455623c40ae6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 7 Nov 2024 14:41:18 +0200 Subject: [PATCH 06/10] Invalidate changestream if postImage is not available. --- .../src/replication/ChangeStream.ts | 25 +++++++- .../replication/ChangeStreamReplicationJob.ts | 10 ++-- .../test/src/change_stream.test.ts | 57 +++++++++++++++++++ 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index afdc33cad..ada11ac73 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -25,7 +25,15 @@ interface InitResult { needsInitialSync: boolean; } -export class MissingReplicationSlotError extends Error { +/** + * Thrown when the change stream is not valid anymore, and replication + * must be restarted. + * + * Possible reasons: + * * Some change stream documents do not have postImages. + * * startAfter/resumeToken is not valid anymore. + */ +export class ChangeStreamInvalidatedError extends Error { constructor(message: string) { super(message); } @@ -482,6 +490,21 @@ export class ChangeStream { } async streamChanges() { + try { + await this.streamChangesInternal(); + } catch (e) { + if ( + e instanceof mongo.MongoServerError && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg); + } + throw e; + } + } + + async streamChangesInternal() { // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index 06583d571..78842fd36 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -1,5 +1,5 @@ import { container } from '@powersync/lib-services-framework'; -import { MissingReplicationSlotError, ChangeStream } from './ChangeStream.js'; +import { ChangeStreamInvalidatedError, ChangeStream } from './ChangeStream.js'; import { replication } from '@powersync/service-core'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; @@ -40,7 +40,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ }); this.logger.error(`Replication failed`, e); - if (e instanceof MissingReplicationSlotError) { + if (e instanceof ChangeStreamInvalidatedError) { // This stops replication on this slot, and creates a new slot await this.options.storage.factory.slotRemoved(this.slotName); } @@ -84,8 +84,10 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ // Without this additional log, the cause may not be visible in the logs. this.logger.error(`cause`, e.cause); } - if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new MissingReplicationSlotError(e.message); + if (e instanceof ChangeStreamInvalidatedError) { + throw e; + } else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message); } else { // Report the error if relevant, before retrying container.reporter.captureException(e, { diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 63b96bf05..5bb19f71e 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -402,4 +402,61 @@ bucket_definitions: expect(data).toMatchObject([]); }); + + test('postImages - new collection with postImages enabled', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' }); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_%"`); + + await context.replicateSnapshot(); + + await db.createCollection('test_data', { + // enabled: true here - everything should work + changeStreamPreAndPostImages: { enabled: true } + }); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + + context.startStreaming(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }), + putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }) + ]); + }); + + test.only('postImages - new collection with postImages disabled', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' }); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_data%"`); + + await context.replicateSnapshot(); + + await db.createCollection('test_data', { + // enabled: false here, but autoConfigure will enable it. + // Unfortunately, that is too late, and replication must be restarted. + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + + context.startStreaming(); + + await expect(() => context.getBucketData('global[]')).rejects.toMatchObject({ + message: expect.stringContaining('stream was configured to require a post-image for all update events') + }); + }); } From 0206aaab0264362a4cff4c514b510836ade107b9 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 7 Nov 2024 14:52:43 +0200 Subject: [PATCH 07/10] Rename post_images config option. --- .../src/replication/ChangeStream.ts | 26 +++++++----- modules/module-mongodb/src/types/types.ts | 42 +++++++++++++++++-- .../test/src/change_stream.test.ts | 13 +++--- 3 files changed, 62 insertions(+), 19 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index ada11ac73..250217c95 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -12,6 +12,7 @@ import { } from './MongoRelation.js'; import { escapeRegExp } from '../utils.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; +import { PostImagesOption } from '../types/types.js'; export const ZERO_LSN = '0000000000000000'; @@ -79,8 +80,12 @@ export class ChangeStream { return this.abort_signal.aborted; } - private get postImages() { - return this.connections.options.postImages; + private get usePostImages() { + return this.connections.options.postImages != PostImagesOption.OFF; + } + + private get configurePostImages() { + return this.connections.options.postImages == PostImagesOption.AUTO_CONFIGURE; } /** @@ -228,7 +233,7 @@ export class ChangeStream { await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { changeStreamPreAndPostImages: { enabled: true } }); - } else if (this.postImages != 'updateLookup' && collection.options?.changeStreamPreAndPostImages?.enabled != true) { + } else if (this.usePostImages && collection.options?.changeStreamPreAndPostImages?.enabled != true) { await this.defaultDb.command({ collMod: CHECKPOINTS_COLLECTION, changeStreamPreAndPostImages: { enabled: true } @@ -351,14 +356,14 @@ export class ChangeStream { } private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) { - if (this.postImages == 'updateLookup') { + if (!this.usePostImages) { // Nothing to check return; } const enabled = collectionInfo.options?.changeStreamPreAndPostImages?.enabled == true; - if (!enabled && this.postImages == 'autoConfigure') { + if (!enabled && this.configurePostImages) { await this.client.db(db).command({ collMod: collectionInfo.name, changeStreamPreAndPostImages: { enabled: true } @@ -525,12 +530,13 @@ export class ChangeStream { let fullDocument: 'required' | 'updateLookup'; - if (this.connections.options.postImages == 'updateLookup') { - fullDocument = 'updateLookup'; - } else { - // 'on' or 'autoConfigure' - // Configuration happens during snapshot + if (this.usePostImages) { + // 'read_only' or 'auto_configure' + // Configuration happens during snapshot, or when we see new + // collections. fullDocument = 'required'; + } else { + fullDocument = 'updateLookup'; } const streamOptions: mongo.ChangeStreamOptions = { diff --git a/modules/module-mongodb/src/types/types.ts b/modules/module-mongodb/src/types/types.ts index e53c0419e..1498193f5 100644 --- a/modules/module-mongodb/src/types/types.ts +++ b/modules/module-mongodb/src/types/types.ts @@ -4,6 +4,42 @@ import * as t from 'ts-codec'; export const MONGO_CONNECTION_TYPE = 'mongodb' as const; +export enum PostImagesOption { + /** + * Use fullDocument: updateLookup on the changeStream. + * + * This does not guarantee consistency - operations may + * arrive out of order, especially when there is replication lag. + * + * This is the default option for backwards-compatability. + */ + OFF = 'off', + + /** + * Use fullDocument: required on the changeStream. + * + * Collections are automatically configured with: + * `changeStreamPreAndPostImages: { enabled: true }` + * + * This is the recommended behavior for new instances. + */ + AUTO_CONFIGURE = 'auto_configure', + + /** + * + * Use fullDocument: required on the changeStream. + * + * Collections are not automatically configured. Each + * collection must be configured configured manually before + * replicating with: + * + * `changeStreamPreAndPostImages: { enabled: true }` + * + * Use when the collMod permission is not available. + */ + READ_ONLY = 'read_only' +} + export interface NormalizedMongoConnectionConfig { id: string; tag: string; @@ -14,7 +50,7 @@ export interface NormalizedMongoConnectionConfig { username?: string; password?: string; - postImages: 'on' | 'autoConfigure' | 'updateLookup'; + postImages: PostImagesOption; } export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.and( @@ -29,7 +65,7 @@ export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.a password: t.string.optional(), database: t.string.optional(), - postImages: t.literal('on').or(t.literal('autoConfigure')).or(t.literal('updateLookup')).optional() + post_images: t.literal('off').or(t.literal('auto_configure')).or(t.literal('read_only')).optional() }) ); @@ -55,7 +91,7 @@ export function normalizeConnectionConfig(options: MongoConnectionConfig): Norma ...base, id: options.id ?? 'default', tag: options.tag ?? 'default', - postImages: options.postImages ?? 'updateLookup' + postImages: (options.post_images as PostImagesOption | undefined) ?? PostImagesOption.OFF }; } diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 5bb19f71e..bff0a6857 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -6,6 +6,7 @@ import * as mongo from 'mongodb'; import { setTimeout } from 'node:timers/promises'; import { describe, expect, test } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; +import { PostImagesOption } from '@module/types/types.js'; type StorageFactory = () => Promise; @@ -91,7 +92,7 @@ bucket_definitions: }); test('updateLookup - no fullDocument available', async () => { - await using context = await ChangeStreamTestContext.open(factory, { postImages: 'updateLookup' }); + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.OFF }); const { db, client } = context; await context.updateSyncRules(` bucket_definitions: @@ -135,7 +136,7 @@ bucket_definitions: test('postImages - autoConfigure', async () => { // Similar to the above test, but with postImages enabled. // This resolves the consistency issue. - await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' }); + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.AUTO_CONFIGURE }); const { db, client } = context; await context.updateSyncRules(` bucket_definitions: @@ -181,7 +182,7 @@ bucket_definitions: test('postImages - on', async () => { // Similar to postImages - autoConfigure, but does not auto-configure. // changeStreamPreAndPostImages must be manually configured. - await using context = await ChangeStreamTestContext.open(factory, { postImages: 'on' }); + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.READ_ONLY }); const { db, client } = context; await context.updateSyncRules(` bucket_definitions: @@ -404,7 +405,7 @@ bucket_definitions: }); test('postImages - new collection with postImages enabled', async () => { - await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' }); + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.AUTO_CONFIGURE }); const { db } = context; await context.updateSyncRules(` bucket_definitions: @@ -432,8 +433,8 @@ bucket_definitions: ]); }); - test.only('postImages - new collection with postImages disabled', async () => { - await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' }); + test('postImages - new collection with postImages disabled', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.AUTO_CONFIGURE }); const { db } = context; await context.updateSyncRules(` bucket_definitions: From 225af878791c2d979b76df6a457a34c87de0d589 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 13 Nov 2024 12:06:21 +0200 Subject: [PATCH 08/10] Avoid collMod permission on _powersync_checkpoints. --- modules/module-mongodb/src/replication/ChangeStream.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 250217c95..4e3548f12 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -234,8 +234,10 @@ export class ChangeStream { changeStreamPreAndPostImages: { enabled: true } }); } else if (this.usePostImages && collection.options?.changeStreamPreAndPostImages?.enabled != true) { - await this.defaultDb.command({ - collMod: CHECKPOINTS_COLLECTION, + // Drop + create requires less permissions than collMod, + // and we don't care about the data in this collection. + await this.defaultDb.dropCollection(CHECKPOINTS_COLLECTION); + await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { changeStreamPreAndPostImages: { enabled: true } }); } From 267ec11b35815a633149762b054c21220d3f7961 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 13 Nov 2024 12:10:54 +0200 Subject: [PATCH 09/10] Minor cleanup. --- modules/module-mongodb/src/replication/MongoRelation.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index cdb9299ed..e2dc675e1 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -146,7 +146,10 @@ function filterJsonData(data: any, depth = 0): any { export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise { const session = client.startSession(); try { - const result = await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( + // Note: If multiple PowerSync instances are replicating the same source database, + // they'll modify the same checkpoint document. This is fine - it could create + // more replication load than required, but won't break anything. + await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { _id: 'checkpoint' as any }, @@ -160,7 +163,6 @@ export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): } ); const time = session.operationTime!; - // console.log('marked checkpoint at', time, getMongoLsn(time)); // TODO: Use the above when we support custom write checkpoints return getMongoLsn(time); } finally { From c5ca6e728e1f3ab03c4ab4bf86f20ceaae64233c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 13 Nov 2024 12:45:40 +0200 Subject: [PATCH 10/10] Validate changeStreamPreAndPostImages on existing collections. --- .../src/api/MongoRouteAPIAdapter.ts | 71 ++++-- .../test/src/mongo_test.test.ts | 226 ++++++++++++++---- 2 files changed, 231 insertions(+), 66 deletions(-) diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index 5acad6eb0..a0bc519ec 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -34,6 +34,10 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { await this.client.close(); } + async [Symbol.asyncDispose]() { + await this.shutdown(); + } + async getSourceConfig(): Promise { return this.config; } @@ -77,6 +81,28 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { sqlSyncRules: sync_rules.SqlSyncRules ): Promise { let result: api.PatternResult[] = []; + + const validatePostImages = (schema: string, collection: mongo.CollectionInfo): service_types.ReplicationError[] => { + if (this.config.postImages == types.PostImagesOption.OFF) { + return []; + } else if (!collection.options?.changeStreamPreAndPostImages?.enabled) { + if (this.config.postImages == types.PostImagesOption.READ_ONLY) { + return [ + { level: 'fatal', message: `changeStreamPreAndPostImages not enabled on ${schema}.${collection.name}` } + ]; + } else { + return [ + { + level: 'warning', + message: `changeStreamPreAndPostImages not enabled on ${schema}.${collection.name}, will be enabled automatically` + } + ]; + } + } else { + return []; + } + }; + for (let tablePattern of tablePatterns) { const schema = tablePattern.schema; @@ -101,7 +127,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { { name: nameFilter }, - { nameOnly: true } + { nameOnly: false } ) .toArray(); @@ -117,6 +143,12 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { [], true ); + let errors: service_types.ReplicationError[] = []; + if (collection.type == 'view') { + errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` }); + } else { + errors.push(...validatePostImages(schema, collection)); + } const syncData = sqlSyncRules.tableSyncsData(sourceTable); const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); patternResult.tables.push({ @@ -125,7 +157,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { replication_id: ['_id'], data_queries: syncData, parameter_queries: syncParameters, - errors: [] + errors: errors }); } } else { @@ -141,26 +173,25 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { const syncData = sqlSyncRules.tableSyncsData(sourceTable); const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); + const collection = collections[0]; - if (collections.length == 1) { - patternResult.table = { - schema, - name: tablePattern.name, - replication_id: ['_id'], - data_queries: syncData, - parameter_queries: syncParameters, - errors: [] - }; - } else { - patternResult.table = { - schema, - name: tablePattern.name, - replication_id: ['_id'], - data_queries: syncData, - parameter_queries: syncParameters, - errors: [{ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` }] - }; + let errors: service_types.ReplicationError[] = []; + if (collections.length != 1) { + errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` }); + } else if (collection.type == 'view') { + errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` }); + } else if (!collection.options?.changeStreamPreAndPostImages?.enabled) { + errors.push(...validatePostImages(schema, collection)); } + + patternResult.table = { + schema, + name: tablePattern.name, + replication_id: ['_id'], + data_queries: syncData, + parameter_queries: syncParameters, + errors + }; } } return result; diff --git a/modules/module-mongodb/test/src/mongo_test.test.ts b/modules/module-mongodb/test/src/mongo_test.test.ts index 16cca3be7..5d30067da 100644 --- a/modules/module-mongodb/test/src/mongo_test.test.ts +++ b/modules/module-mongodb/test/src/mongo_test.test.ts @@ -1,10 +1,11 @@ import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js'; import { ChangeStream } from '@module/replication/ChangeStream.js'; import { constructAfterRecord } from '@module/replication/MongoRelation.js'; -import { SqliteRow } from '@powersync/service-sync-rules'; +import { SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules'; import * as mongo from 'mongodb'; import { describe, expect, test } from 'vitest'; import { clearTestDb, connectMongoData, TEST_CONNECTION_OPTIONS } from './util.js'; +import { PostImagesOption } from '@module/types/types.js'; describe('mongo data types', () => { async function setupTable(db: mongo.Db) { @@ -245,58 +246,191 @@ describe('mongo data types', () => { }); test('connection schema', async () => { - const adapter = new MongoRouteAPIAdapter({ + await using adapter = new MongoRouteAPIAdapter({ type: 'mongodb', ...TEST_CONNECTION_OPTIONS }); - try { - const db = adapter.db; - await clearTestDb(db); + const db = adapter.db; + await clearTestDb(db); - const collection = db.collection('test_data'); - await setupTable(db); - await insert(collection); + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const schema = await adapter.getConnectionSchema(); + const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0]; + expect(dbSchema).not.toBeNull(); + expect(dbSchema.tables).toMatchObject([ + { + name: 'test_data', + columns: [ + { name: '_id', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'bool', sqlite_type: 4, internal_type: 'Boolean' }, + { name: 'bytea', sqlite_type: 1, internal_type: 'Binary' }, + { name: 'date', sqlite_type: 2, internal_type: 'Date' }, + { name: 'decimal', sqlite_type: 2, internal_type: 'Decimal' }, + { name: 'float', sqlite_type: 8, internal_type: 'Double' }, + { name: 'int2', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'int4', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'int8', sqlite_type: 4, internal_type: 'Long' }, + // We can fix these later + { name: 'js', sqlite_type: 2, internal_type: 'Object' }, + { name: 'js2', sqlite_type: 2, internal_type: 'Object' }, + { name: 'maxKey', sqlite_type: 0, internal_type: 'MaxKey' }, + { name: 'minKey', sqlite_type: 0, internal_type: 'MinKey' }, + { name: 'nested', sqlite_type: 2, internal_type: 'Object' }, + { name: 'null', sqlite_type: 0, internal_type: 'Null' }, + { name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' }, + // We can fix these later + { name: 'pointer', sqlite_type: 2, internal_type: 'Object' }, + { name: 'pointer2', sqlite_type: 2, internal_type: 'Object' }, + { name: 'regexp', sqlite_type: 2, internal_type: 'RegExp' }, + // Can fix this later + { name: 'symbol', sqlite_type: 2, internal_type: 'String' }, + { name: 'text', sqlite_type: 2, internal_type: 'String' }, + { name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' }, + { name: 'undefined', sqlite_type: 0, internal_type: 'Null' }, + { name: 'uuid', sqlite_type: 2, internal_type: 'UUID' } + ] + } + ]); + }); + + test('validate postImages', async () => { + await using adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS, + postImages: PostImagesOption.READ_ONLY + }); + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: + - select _id as id, * from test_data - const schema = await adapter.getConnectionSchema(); - const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0]; - expect(dbSchema).not.toBeNull(); - expect(dbSchema.tables).toMatchObject([ + `, + { + ...adapter.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + } + ); + const source_table_patterns = rules.getSourceTables(); + const results = await adapter.getDebugTablesInfo(source_table_patterns, rules); + + const result = results[0]; + expect(result).not.toBeNull(); + expect(result.table).toMatchObject({ + schema: 'powersync_test_data', + name: 'test_data', + replication_id: ['_id'], + data_queries: true, + parameter_queries: false, + errors: [ { - name: 'test_data', - columns: [ - { name: '_id', sqlite_type: 4, internal_type: 'Integer' }, - { name: 'bool', sqlite_type: 4, internal_type: 'Boolean' }, - { name: 'bytea', sqlite_type: 1, internal_type: 'Binary' }, - { name: 'date', sqlite_type: 2, internal_type: 'Date' }, - { name: 'decimal', sqlite_type: 2, internal_type: 'Decimal' }, - { name: 'float', sqlite_type: 8, internal_type: 'Double' }, - { name: 'int2', sqlite_type: 4, internal_type: 'Integer' }, - { name: 'int4', sqlite_type: 4, internal_type: 'Integer' }, - { name: 'int8', sqlite_type: 4, internal_type: 'Long' }, - // We can fix these later - { name: 'js', sqlite_type: 2, internal_type: 'Object' }, - { name: 'js2', sqlite_type: 2, internal_type: 'Object' }, - { name: 'maxKey', sqlite_type: 0, internal_type: 'MaxKey' }, - { name: 'minKey', sqlite_type: 0, internal_type: 'MinKey' }, - { name: 'nested', sqlite_type: 2, internal_type: 'Object' }, - { name: 'null', sqlite_type: 0, internal_type: 'Null' }, - { name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' }, - // We can fix these later - { name: 'pointer', sqlite_type: 2, internal_type: 'Object' }, - { name: 'pointer2', sqlite_type: 2, internal_type: 'Object' }, - { name: 'regexp', sqlite_type: 2, internal_type: 'RegExp' }, - // Can fix this later - { name: 'symbol', sqlite_type: 2, internal_type: 'String' }, - { name: 'text', sqlite_type: 2, internal_type: 'String' }, - { name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' }, - { name: 'undefined', sqlite_type: 0, internal_type: 'Null' }, - { name: 'uuid', sqlite_type: 2, internal_type: 'UUID' } - ] + level: 'fatal', + message: 'changeStreamPreAndPostImages not enabled on powersync_test_data.test_data' } - ]); - } finally { - await adapter.shutdown(); - } + ] + }); + }); + + test('validate postImages - auto-configure', async () => { + await using adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS, + postImages: PostImagesOption.AUTO_CONFIGURE + }); + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: + - select _id as id, * from test_data + + `, + { + ...adapter.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + } + ); + const source_table_patterns = rules.getSourceTables(); + const results = await adapter.getDebugTablesInfo(source_table_patterns, rules); + + const result = results[0]; + expect(result).not.toBeNull(); + expect(result.table).toMatchObject({ + schema: 'powersync_test_data', + name: 'test_data', + replication_id: ['_id'], + data_queries: true, + parameter_queries: false, + errors: [ + { + level: 'warning', + message: + 'changeStreamPreAndPostImages not enabled on powersync_test_data.test_data, will be enabled automatically' + } + ] + }); + }); + + test('validate postImages - off', async () => { + await using adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS, + postImages: PostImagesOption.OFF + }); + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: + - select _id as id, * from test_data + + `, + { + ...adapter.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + } + ); + const source_table_patterns = rules.getSourceTables(); + const results = await adapter.getDebugTablesInfo(source_table_patterns, rules); + + const result = results[0]; + expect(result).not.toBeNull(); + expect(result.table).toMatchObject({ + schema: 'powersync_test_data', + name: 'test_data', + replication_id: ['_id'], + data_queries: true, + parameter_queries: false, + errors: [] + }); }); });