diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index b9b740e24..a0bc519ec 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -7,6 +7,7 @@ import { MongoManager } from '../replication/MongoManager.js'; import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js'; import * as types from '../types/types.js'; import { escapeRegExp } from '../utils.js'; +import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js'; export class MongoRouteAPIAdapter implements api.RouteAPI { protected client: mongo.MongoClient; @@ -33,6 +34,10 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { await this.client.close(); } + async [Symbol.asyncDispose]() { + await this.shutdown(); + } + async getSourceConfig(): Promise { return this.config; } @@ -76,6 +81,28 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { sqlSyncRules: sync_rules.SqlSyncRules ): Promise { let result: api.PatternResult[] = []; + + const validatePostImages = (schema: string, collection: mongo.CollectionInfo): service_types.ReplicationError[] => { + if (this.config.postImages == types.PostImagesOption.OFF) { + return []; + } else if (!collection.options?.changeStreamPreAndPostImages?.enabled) { + if (this.config.postImages == types.PostImagesOption.READ_ONLY) { + return [ + { level: 'fatal', message: `changeStreamPreAndPostImages not enabled on ${schema}.${collection.name}` } + ]; + } else { + return [ + { + level: 'warning', + message: `changeStreamPreAndPostImages not enabled on ${schema}.${collection.name}, will be enabled automatically` + } + ]; + } + } else { + return []; + } + }; + for (let tablePattern of tablePatterns) { const schema = tablePattern.schema; @@ -100,7 +127,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { { name: nameFilter }, - { nameOnly: true } + { nameOnly: false } ) .toArray(); @@ -116,6 +143,12 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { [], true ); + let errors: service_types.ReplicationError[] = []; + if (collection.type == 'view') { + errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` }); + } else { + errors.push(...validatePostImages(schema, collection)); + } const syncData = sqlSyncRules.tableSyncsData(sourceTable); const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); patternResult.tables.push({ @@ -124,7 +157,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { replication_id: ['_id'], data_queries: syncData, parameter_queries: syncParameters, - errors: [] + errors: errors }); } } else { @@ -140,26 +173,25 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { const syncData = sqlSyncRules.tableSyncsData(sourceTable); const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); + const collection = collections[0]; - if (collections.length == 1) { - patternResult.table = { - schema, - name: tablePattern.name, - replication_id: ['_id'], - data_queries: syncData, - parameter_queries: syncParameters, - errors: [] - }; - } else { - patternResult.table = { - schema, - name: tablePattern.name, - replication_id: ['_id'], - data_queries: syncData, - parameter_queries: syncParameters, - errors: [{ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` }] - }; + let errors: service_types.ReplicationError[] = []; + if (collections.length != 1) { + errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` }); + } else if (collection.type == 'view') { + errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` }); + } else if (!collection.options?.changeStreamPreAndPostImages?.enabled) { + errors.push(...validatePostImages(schema, collection)); } + + patternResult.table = { + schema, + name: tablePattern.name, + replication_id: ['_id'], + data_queries: syncData, + parameter_queries: syncParameters, + errors + }; } } return result; @@ -202,7 +234,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { let tables: service_types.TableSchema[] = []; for (let collection of collections) { - if (['_powersync_checkpoints'].includes(collection.name)) { + if ([CHECKPOINTS_COLLECTION].includes(collection.name)) { continue; } if (collection.name.startsWith('system.')) { diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9e8a8acab..4e3548f12 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -11,6 +11,8 @@ import { mongoLsnToTimestamp } from './MongoRelation.js'; import { escapeRegExp } from '../utils.js'; +import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; +import { PostImagesOption } from '../types/types.js'; export const ZERO_LSN = '0000000000000000'; @@ -24,7 +26,15 @@ interface InitResult { needsInitialSync: boolean; } -export class MissingReplicationSlotError extends Error { +/** + * Thrown when the change stream is not valid anymore, and replication + * must be restarted. + * + * Possible reasons: + * * Some change stream documents do not have postImages. + * * startAfter/resumeToken is not valid anymore. + */ +export class ChangeStreamInvalidatedError extends Error { constructor(message: string) { super(message); } @@ -70,7 +80,21 @@ export class ChangeStream { return this.abort_signal.aborted; } - async getQualifiedTableNames( + private get usePostImages() { + return this.connections.options.postImages != PostImagesOption.OFF; + } + + private get configurePostImages() { + return this.connections.options.postImages == PostImagesOption.AUTO_CONFIGURE; + } + + /** + * This resolves a pattern, persists the related metadata, and returns + * the resulting SourceTables. + * + * This implicitly checks the collection postImage configuration. + */ + async resolveQualifiedTableNames( batch: storage.BucketStorageBatch, tablePattern: TablePattern ): Promise { @@ -94,10 +118,14 @@ export class ChangeStream { { name: nameFilter }, - { nameOnly: true } + { nameOnly: false } ) .toArray(); + if (!tablePattern.isWildcard && collections.length == 0) { + logger.warn(`Collection ${schema}.${tablePattern.name} not found`); + } + for (let collection of collections) { const table = await this.handleRelation( batch, @@ -108,7 +136,7 @@ export class ChangeStream { replicationColumns: [{ name: '_id' }] } as SourceEntityDescriptor, // This is done as part of the initial setup - snapshot is handled elsewhere - { snapshot: false } + { snapshot: false, collectionInfo: collection } ); result.push(table); @@ -165,14 +193,20 @@ export class ChangeStream { await this.storage.startBatch( { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, async (batch) => { + // Start by resolving all tables. + // This checks postImage configuration, and that should fail as + // earlier as possible. + let allSourceTables: SourceTable[] = []; for (let tablePattern of sourceTables) { - const tables = await this.getQualifiedTableNames(batch, tablePattern); - for (let table of tables) { - await this.snapshotTable(batch, table, session); - await batch.markSnapshotDone([table], ZERO_LSN); + const tables = await this.resolveQualifiedTableNames(batch, tablePattern); + allSourceTables.push(...tables); + } + + for (let table of allSourceTables) { + await this.snapshotTable(batch, table, session); + await batch.markSnapshotDone([table], ZERO_LSN); - await touch(); - } + await touch(); } const snapshotTime = session.clusterTime?.clusterTime ?? startTime; @@ -193,10 +227,26 @@ export class ChangeStream { } } + private async setupCheckpointsCollection() { + const collection = await this.getCollectionInfo(this.defaultDb.databaseName, CHECKPOINTS_COLLECTION); + if (collection == null) { + await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { + changeStreamPreAndPostImages: { enabled: true } + }); + } else if (this.usePostImages && collection.options?.changeStreamPreAndPostImages?.enabled != true) { + // Drop + create requires less permissions than collMod, + // and we don't care about the data in this collection. + await this.defaultDb.dropCollection(CHECKPOINTS_COLLECTION); + await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { + changeStreamPreAndPostImages: { enabled: true } + }); + } + } + private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { const sourceTables = this.sync_rules.getSourceTables(); - let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: '_powersync_checkpoints' }]; + let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }]; let $refilters: any[] = []; let multipleDatabases = false; for (let tablePattern of sourceTables) { @@ -209,7 +259,10 @@ export class ChangeStream { } if (tablePattern.isWildcard) { - $refilters.push({ db: tablePattern.schema, coll: new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) }); + $refilters.push({ + 'ns.db': tablePattern.schema, + 'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) + }); } else { $inFilters.push({ db: tablePattern.schema, @@ -249,6 +302,8 @@ export class ChangeStream { throw new Error(`Aborted initial replication`); } + at += 1; + const record = constructAfterRecord(document); // This auto-flushes when the batch reaches its size limit @@ -268,6 +323,7 @@ export class ChangeStream { } await batch.flush(); + logger.info(`Replicated ${at} documents for ${table.qualifiedName}`); } private async getRelation( @@ -278,14 +334,60 @@ export class ChangeStream { if (existing != null) { return existing; } - return this.handleRelation(batch, descriptor, { snapshot: false }); + + // Note: collection may have been dropped at this point, so we handle + // missing values. + const collection = await this.getCollectionInfo(descriptor.schema, descriptor.name); + + return this.handleRelation(batch, descriptor, { snapshot: false, collectionInfo: collection }); + } + + private async getCollectionInfo(db: string, name: string): Promise { + const collection = ( + await this.client + .db(db) + .listCollections( + { + name: name + }, + { nameOnly: false } + ) + .toArray() + )[0]; + return collection; + } + + private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) { + if (!this.usePostImages) { + // Nothing to check + return; + } + + const enabled = collectionInfo.options?.changeStreamPreAndPostImages?.enabled == true; + + if (!enabled && this.configurePostImages) { + await this.client.db(db).command({ + collMod: collectionInfo.name, + changeStreamPreAndPostImages: { enabled: true } + }); + logger.info(`Enabled postImages on ${db}.${collectionInfo.name}`); + } else if (!enabled) { + throw new Error(`postImages not enabled on ${db}.${collectionInfo.name}`); + } } async handleRelation( batch: storage.BucketStorageBatch, descriptor: SourceEntityDescriptor, - options: { snapshot: boolean } + options: { snapshot: boolean; collectionInfo: mongo.CollectionInfo | undefined } ) { + if (options.collectionInfo != null) { + await this.checkPostImages(descriptor.schema, options.collectionInfo); + } else { + // If collectionInfo is null, the collection may have been dropped. + // Ignore the postImages check in this case. + } + const snapshot = options.snapshot; if (!descriptor.objectId && typeof descriptor.objectId != 'string') { throw new Error('objectId expected'); @@ -388,12 +490,28 @@ export class ChangeStream { async initReplication() { const result = await this.initSlot(); + await this.setupCheckpointsCollection(); if (result.needsInitialSync) { await this.startInitialReplication(); } } async streamChanges() { + try { + await this.streamChangesInternal(); + } catch (e) { + if ( + e instanceof mongo.MongoServerError && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg); + } + throw e; + } + } + + async streamChangesInternal() { // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); @@ -412,12 +530,23 @@ export class ChangeStream { } ]; + let fullDocument: 'required' | 'updateLookup'; + + if (this.usePostImages) { + // 'read_only' or 'auto_configure' + // Configuration happens during snapshot, or when we see new + // collections. + fullDocument = 'required'; + } else { + fullDocument = 'updateLookup'; + } + const streamOptions: mongo.ChangeStreamOptions = { startAtOperationTime: startAfter, showExpandedEvents: true, useBigInt64: true, maxAwaitTimeMS: 200, - fullDocument: 'updateLookup' + fullDocument: fullDocument }; let stream: mongo.ChangeStream; if (filters.multipleDatabases) { @@ -461,7 +590,7 @@ export class ChangeStream { (changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || changeDocument.operationType == 'replace') && - changeDocument.ns.coll == '_powersync_checkpoints' + changeDocument.ns.coll == CHECKPOINTS_COLLECTION ) { const lsn = getMongoLsn(changeDocument.clusterTime!); if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { @@ -499,7 +628,8 @@ export class ChangeStream { this.relation_cache.delete(tableFrom.objectId); } // Here we do need to snapshot the new table - await this.handleRelation(batch, relTo, { snapshot: true }); + const collection = await this.getCollectionInfo(relTo.schema, relTo.name); + await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection }); } } }); diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index 06583d571..78842fd36 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -1,5 +1,5 @@ import { container } from '@powersync/lib-services-framework'; -import { MissingReplicationSlotError, ChangeStream } from './ChangeStream.js'; +import { ChangeStreamInvalidatedError, ChangeStream } from './ChangeStream.js'; import { replication } from '@powersync/service-core'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; @@ -40,7 +40,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ }); this.logger.error(`Replication failed`, e); - if (e instanceof MissingReplicationSlotError) { + if (e instanceof ChangeStreamInvalidatedError) { // This stops replication on this slot, and creates a new slot await this.options.storage.factory.slotRemoved(this.slotName); } @@ -84,8 +84,10 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ // Without this additional log, the cause may not be visible in the logs. this.logger.error(`cause`, e.cause); } - if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new MissingReplicationSlotError(e.message); + if (e instanceof ChangeStreamInvalidatedError) { + throw e; + } else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message); } else { // Report the error if relevant, before retrying container.reporter.captureException(e, { diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 7670daec1..e2dc675e1 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -2,6 +2,7 @@ import { storage } from '@powersync/service-core'; import { SqliteRow, SqliteValue, toSyncRulesRow } from '@powersync/service-sync-rules'; import * as mongo from 'mongodb'; import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; +import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor { return { @@ -145,7 +146,10 @@ function filterJsonData(data: any, depth = 0): any { export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise { const session = client.startSession(); try { - const result = await db.collection('_powersync_checkpoints').findOneAndUpdate( + // Note: If multiple PowerSync instances are replicating the same source database, + // they'll modify the same checkpoint document. This is fine - it could create + // more replication load than required, but won't break anything. + await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { _id: 'checkpoint' as any }, @@ -159,7 +163,6 @@ export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): } ); const time = session.operationTime!; - // console.log('marked checkpoint at', time, getMongoLsn(time)); // TODO: Use the above when we support custom write checkpoints return getMongoLsn(time); } finally { diff --git a/modules/module-mongodb/src/replication/replication-utils.ts b/modules/module-mongodb/src/replication/replication-utils.ts index 71a521d77..5370fdd5e 100644 --- a/modules/module-mongodb/src/replication/replication-utils.ts +++ b/modules/module-mongodb/src/replication/replication-utils.ts @@ -1,6 +1,7 @@ -import * as mongo from 'mongodb'; import { MongoManager } from './MongoManager.js'; +export const CHECKPOINTS_COLLECTION = '_powersync_checkpoints'; + export async function checkSourceConfiguration(connectionManager: MongoManager): Promise { const db = connectionManager.db; const hello = await db.command({ hello: 1 }); diff --git a/modules/module-mongodb/src/types/types.ts b/modules/module-mongodb/src/types/types.ts index ac7873876..1498193f5 100644 --- a/modules/module-mongodb/src/types/types.ts +++ b/modules/module-mongodb/src/types/types.ts @@ -4,6 +4,42 @@ import * as t from 'ts-codec'; export const MONGO_CONNECTION_TYPE = 'mongodb' as const; +export enum PostImagesOption { + /** + * Use fullDocument: updateLookup on the changeStream. + * + * This does not guarantee consistency - operations may + * arrive out of order, especially when there is replication lag. + * + * This is the default option for backwards-compatability. + */ + OFF = 'off', + + /** + * Use fullDocument: required on the changeStream. + * + * Collections are automatically configured with: + * `changeStreamPreAndPostImages: { enabled: true }` + * + * This is the recommended behavior for new instances. + */ + AUTO_CONFIGURE = 'auto_configure', + + /** + * + * Use fullDocument: required on the changeStream. + * + * Collections are not automatically configured. Each + * collection must be configured configured manually before + * replicating with: + * + * `changeStreamPreAndPostImages: { enabled: true }` + * + * Use when the collMod permission is not available. + */ + READ_ONLY = 'read_only' +} + export interface NormalizedMongoConnectionConfig { id: string; tag: string; @@ -13,6 +49,8 @@ export interface NormalizedMongoConnectionConfig { username?: string; password?: string; + + postImages: PostImagesOption; } export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.and( @@ -25,7 +63,9 @@ export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.a uri: t.string, username: t.string.optional(), password: t.string.optional(), - database: t.string.optional() + database: t.string.optional(), + + post_images: t.literal('off').or(t.literal('auto_configure')).or(t.literal('read_only')).optional() }) ); @@ -48,10 +88,10 @@ export function normalizeConnectionConfig(options: MongoConnectionConfig): Norma const base = normalizeMongoConfig(options); return { + ...base, id: options.id ?? 'default', tag: options.tag ?? 'default', - - ...base + postImages: (options.post_images as PostImagesOption | undefined) ?? PostImagesOption.OFF }; } diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 2b5c949d4..bff0a6857 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -2,10 +2,11 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js'; import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; import { BucketStorageFactory } from '@powersync/service-core'; import * as crypto from 'crypto'; -import { describe, expect, test } from 'vitest'; -import { changeStreamTest } from './change_stream_utils.js'; import * as mongo from 'mongodb'; import { setTimeout } from 'node:timers/promises'; +import { describe, expect, test } from 'vitest'; +import { ChangeStreamTestContext } from './change_stream_utils.js'; +import { PostImagesOption } from '@module/types/types.js'; type StorageFactory = () => Promise; @@ -16,161 +17,270 @@ bucket_definitions: - SELECT _id as id, description FROM "test_data" `; -describe( - 'change stream - mongodb', - function () { - defineChangeStreamTests(MONGO_STORAGE_FACTORY); - }, - { timeout: 20_000 } -); +describe('change stream - mongodb', { timeout: 20_000 }, function () { + defineChangeStreamTests(MONGO_STORAGE_FACTORY); +}); function defineChangeStreamTests(factory: StorageFactory) { - test( - 'replicating basic values', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(` + test('replicating basic values', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: true } + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + const test_id = result.insertedId; + await setTimeout(30); + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + await setTimeout(30); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }); + await setTimeout(30); + await collection.deleteOne({ _id: test_id }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test3' }), + removeOp('test_data', test_id.toHexString()) + ]); + }); + + test('replicating wildcard', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_%"`); + + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + const test_id = result.insertedId; + + await context.replicateSnapshot(); + + context.startStreaming(); + + await setTimeout(30); + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }) + ]); + }); + + test('updateLookup - no fullDocument available', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.OFF }); + const { db, client } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); }); - const collection = db.collection('test_data'); - - await context.replicateSnapshot(); - - context.startStreaming(); - - const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); - const test_id = result.insertedId; - await setTimeout(10); - await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - await setTimeout(10); - await collection.replaceOne({ _id: test_id }, { description: 'test3' }); - await setTimeout(10); - await collection.deleteOne({ _id: test_id }); - - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([ - putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), - putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }), - putOp('test_data', { id: test_id.toHexString(), description: 'test3' }), - removeOp('test_data', test_id.toHexString()) - ]); - }) - ); - - test( - 'no fullDocument available', - changeStreamTest(factory, async (context) => { - const { db, client } = context; - await context.updateSyncRules(` + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // fullDocument is not available at the point this is replicated, resulting in it treated as a remove + removeOp('test_data', test_id!.toHexString()), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }); + + test('postImages - autoConfigure', async () => { + // Similar to the above test, but with postImages enabled. + // This resolves the consistency issue. + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.AUTO_CONFIGURE }); + const { db, client } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, description, num FROM "test_data"`); - db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: false } + await db.createCollection('test_data', { + // enabled: false here, but autoConfigure will enable it. + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); }); - const collection = db.collection('test_data'); - - await context.replicateSnapshot(); - - context.startStreaming(); - - const session = client.startSession(); - let test_id: mongo.ObjectId | undefined; - try { - await session.withTransaction(async () => { - const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); - test_id = result.insertedId; - await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); - await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); - await collection.deleteOne({ _id: test_id }, { session }); - }); - } finally { - await session.endSession(); - } - - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([ - putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), - // fullDocument is not available at the point this is replicated, resulting in it treated as a remove - removeOp('test_data', test_id!.toHexString()), - putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), - removeOp('test_data', test_id!.toHexString()) - ]); - }) - ); - - test( - 'replicating case sensitive table', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(` + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // The postImage helps us get this data + putOp('test_data', { id: test_id!.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }); + + test('postImages - on', async () => { + // Similar to postImages - autoConfigure, but does not auto-configure. + // changeStreamPreAndPostImages must be manually configured. + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.READ_ONLY }); + const { db, client } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: true } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); + }); + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // The postImage helps us get this data + putOp('test_data', { id: test_id!.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }); + + test('replicating case sensitive table', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, description FROM "test_DATA" `); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - context.startStreaming(); + context.startStreaming(); - const collection = db.collection('test_DATA'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); + const collection = db.collection('test_DATA'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_DATA', { id: test_id, description: 'test1' })]); - }) - ); + expect(data).toMatchObject([putOp('test_DATA', { id: test_id, description: 'test1' })]); + }); - test( - 'replicating large values', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(` + test('replicating large values', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT _id as id, name, description FROM "test_data" `); - await context.replicateSnapshot(); - context.startStreaming(); + await context.replicateSnapshot(); + context.startStreaming(); - const largeDescription = crypto.randomBytes(20_000).toString('hex'); + const largeDescription = crypto.randomBytes(20_000).toString('hex'); - const collection = db.collection('test_data'); - const result = await collection.insertOne({ name: 'test1', description: largeDescription }); - const test_id = result.insertedId; + const collection = db.collection('test_data'); + const result = await collection.insertOne({ name: 'test1', description: largeDescription }); + const test_id = result.insertedId; - await collection.updateOne({ _id: test_id }, { $set: { name: 'test2' } }); + await collection.updateOne({ _id: test_id }, { $set: { name: 'test2' } }); - const data = await context.getBucketData('global[]'); - expect(data.slice(0, 1)).toMatchObject([ - putOp('test_data', { id: test_id.toHexString(), name: 'test1', description: largeDescription }) - ]); - expect(data.slice(1)).toMatchObject([ - putOp('test_data', { id: test_id.toHexString(), name: 'test2', description: largeDescription }) - ]); - }) - ); + const data = await context.getBucketData('global[]'); + expect(data.slice(0, 1)).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), name: 'test1', description: largeDescription }) + ]); + expect(data.slice(1)).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), name: 'test2', description: largeDescription }) + ]); + }); - test( - 'replicating dropCollection', - changeStreamTest(factory, async (context) => { - const { db } = context; - const syncRuleContent = ` + test('replicating dropCollection', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + const syncRuleContent = ` bucket_definitions: global: data: @@ -179,128 +289,175 @@ bucket_definitions: parameters: SELECT _id as id FROM test_data WHERE id = token_parameters.user_id data: [] `; - await context.updateSyncRules(syncRuleContent); - await context.replicateSnapshot(); - context.startStreaming(); - - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); - - await collection.drop(); - - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([ - putOp('test_data', { id: test_id, description: 'test1' }), - removeOp('test_data', test_id) - ]); - }) - ); - - test( - 'replicating renameCollection', - changeStreamTest(factory, async (context) => { - const { db } = context; - const syncRuleContent = ` + await context.updateSyncRules(syncRuleContent); + await context.replicateSnapshot(); + context.startStreaming(); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); + + await collection.drop(); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id, description: 'test1' }), + removeOp('test_data', test_id) + ]); + }); + + test('replicating renameCollection', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + const syncRuleContent = ` bucket_definitions: global: data: - SELECT _id as id, description FROM "test_data1" - SELECT _id as id, description FROM "test_data2" `; - await context.updateSyncRules(syncRuleContent); - await context.replicateSnapshot(); - context.startStreaming(); + await context.updateSyncRules(syncRuleContent); + await context.replicateSnapshot(); + context.startStreaming(); - console.log('insert1', db.databaseName); - const collection = db.collection('test_data1'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); + const collection = db.collection('test_data1'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); - await collection.rename('test_data2'); + await collection.rename('test_data2'); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - putOp('test_data1', { id: test_id, description: 'test1' }), - removeOp('test_data1', test_id), - putOp('test_data2', { id: test_id, description: 'test1' }) - ]); - }) - ); + expect(data).toMatchObject([ + putOp('test_data1', { id: test_id, description: 'test1' }), + removeOp('test_data1', test_id), + putOp('test_data2', { id: test_id, description: 'test1' }) + ]); + }); - test( - 'initial sync', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); + test('initial sync', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: 'test1' }); - const test_id = result.insertedId.toHexString(); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); - await context.replicateSnapshot(); - context.startStreaming(); + await context.replicateSnapshot(); + context.startStreaming(); - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]); - }) - ); + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]); + }); // Not correctly implemented yet - test.skip( - 'large record', - changeStreamTest(factory, async (context) => { - await context.updateSyncRules(`bucket_definitions: + test.skip('large record', async () => { + await using context = await ChangeStreamTestContext.open(factory); + await context.updateSyncRules(`bucket_definitions: global: data: - SELECT _id as id, description, other FROM "test_data"`); - const { db } = context; - - await context.replicateSnapshot(); - - // 16MB - const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex'); - - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: largeDescription }); - const test_id = result.insertedId; - - await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } }); - context.startStreaming(); - - const data = await context.getBucketData('global[]'); - expect(data.length).toEqual(2); - const row = JSON.parse(data[0].data as string); - delete row.description; - expect(row).toEqual({ id: test_id.toHexString() }); - delete data[0].data; - expect(data[0]).toMatchObject({ - object_id: test_id.toHexString(), - object_type: 'test_data', - op: 'PUT', - op_id: '1' - }); - }) - ); + const { db } = context; + + await context.replicateSnapshot(); - test( - 'table not in sync rules', - changeStreamTest(factory, async (context) => { - const { db } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); + // 16MB + const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex'); - await context.replicateSnapshot(); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: largeDescription }); + const test_id = result.insertedId; - context.startStreaming(); + await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } }); + context.startStreaming(); - const collection = db.collection('test_donotsync'); - const result = await collection.insertOne({ description: 'test' }); + const data = await context.getBucketData('global[]'); + expect(data.length).toEqual(2); + const row = JSON.parse(data[0].data as string); + delete row.description; + expect(row).toEqual({ id: test_id.toHexString() }); + delete data[0].data; + expect(data[0]).toMatchObject({ + object_id: test_id.toHexString(), + object_type: 'test_data', + op: 'PUT', + op_id: '1' + }); + }); - const data = await context.getBucketData('global[]'); + test('collection not in sync rules', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); - expect(data).toMatchObject([]); - }) - ); + await context.replicateSnapshot(); + + context.startStreaming(); + + const collection = db.collection('test_donotsync'); + const result = await collection.insertOne({ description: 'test' }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([]); + }); + + test('postImages - new collection with postImages enabled', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.AUTO_CONFIGURE }); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_%"`); + + await context.replicateSnapshot(); + + await db.createCollection('test_data', { + // enabled: true here - everything should work + changeStreamPreAndPostImages: { enabled: true } + }); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + + context.startStreaming(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }), + putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }) + ]); + }); + + test('postImages - new collection with postImages disabled', async () => { + await using context = await ChangeStreamTestContext.open(factory, { postImages: PostImagesOption.AUTO_CONFIGURE }); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_data%"`); + + await context.replicateSnapshot(); + + await db.createCollection('test_data', { + // enabled: false here, but autoConfigure will enable it. + // Unfortunately, that is too late, and replication must be restarted. + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + + context.startStreaming(); + + await expect(() => context.getBucketData('global[]')).rejects.toMatchObject({ + message: expect.stringContaining('stream was configured to require a post-image for all update events') + }); + }); } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index e533c56ca..76735d380 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -6,30 +6,7 @@ import { MongoManager } from '@module/replication/MongoManager.js'; import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; import * as mongo from 'mongodb'; import { createCheckpoint } from '@module/replication/MongoRelation.js'; - -/** - * Tests operating on the mongo change stream need to configure the stream and manage asynchronous - * replication, which gets a little tricky. - * - * This wraps a test in a function that configures all the context, and tears it down afterwards. - */ -export function changeStreamTest( - factory: () => Promise, - test: (context: ChangeStreamTestContext) => Promise -): () => Promise { - return async () => { - const f = await factory(); - const connectionManager = new MongoManager(TEST_CONNECTION_OPTIONS); - - await clearTestDb(connectionManager.db); - const context = new ChangeStreamTestContext(f, connectionManager); - try { - await test(context); - } finally { - await context.dispose(); - } - }; -} +import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; export class ChangeStreamTestContext { private _walStream?: ChangeStream; @@ -37,6 +14,20 @@ export class ChangeStreamTestContext { private streamPromise?: Promise; public storage?: SyncRulesBucketStorage; + /** + * Tests operating on the mongo change stream need to configure the stream and manage asynchronous + * replication, which gets a little tricky. + * + * This configures all the context, and tears it down afterwards. + */ + static async open(factory: () => Promise, options?: Partial) { + const f = await factory(); + const connectionManager = new MongoManager({ ...TEST_CONNECTION_OPTIONS, ...options }); + + await clearTestDb(connectionManager.db); + return new ChangeStreamTestContext(f, connectionManager); + } + constructor( public factory: BucketStorageFactory, public connectionManager: MongoManager @@ -48,6 +39,10 @@ export class ChangeStreamTestContext { await this.connectionManager.destroy(); } + async [Symbol.asyncDispose]() { + await this.dispose(); + } + get client() { return this.connectionManager.client; } diff --git a/modules/module-mongodb/test/src/mongo_test.test.ts b/modules/module-mongodb/test/src/mongo_test.test.ts index 16cca3be7..5d30067da 100644 --- a/modules/module-mongodb/test/src/mongo_test.test.ts +++ b/modules/module-mongodb/test/src/mongo_test.test.ts @@ -1,10 +1,11 @@ import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js'; import { ChangeStream } from '@module/replication/ChangeStream.js'; import { constructAfterRecord } from '@module/replication/MongoRelation.js'; -import { SqliteRow } from '@powersync/service-sync-rules'; +import { SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules'; import * as mongo from 'mongodb'; import { describe, expect, test } from 'vitest'; import { clearTestDb, connectMongoData, TEST_CONNECTION_OPTIONS } from './util.js'; +import { PostImagesOption } from '@module/types/types.js'; describe('mongo data types', () => { async function setupTable(db: mongo.Db) { @@ -245,58 +246,191 @@ describe('mongo data types', () => { }); test('connection schema', async () => { - const adapter = new MongoRouteAPIAdapter({ + await using adapter = new MongoRouteAPIAdapter({ type: 'mongodb', ...TEST_CONNECTION_OPTIONS }); - try { - const db = adapter.db; - await clearTestDb(db); + const db = adapter.db; + await clearTestDb(db); - const collection = db.collection('test_data'); - await setupTable(db); - await insert(collection); + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const schema = await adapter.getConnectionSchema(); + const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0]; + expect(dbSchema).not.toBeNull(); + expect(dbSchema.tables).toMatchObject([ + { + name: 'test_data', + columns: [ + { name: '_id', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'bool', sqlite_type: 4, internal_type: 'Boolean' }, + { name: 'bytea', sqlite_type: 1, internal_type: 'Binary' }, + { name: 'date', sqlite_type: 2, internal_type: 'Date' }, + { name: 'decimal', sqlite_type: 2, internal_type: 'Decimal' }, + { name: 'float', sqlite_type: 8, internal_type: 'Double' }, + { name: 'int2', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'int4', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'int8', sqlite_type: 4, internal_type: 'Long' }, + // We can fix these later + { name: 'js', sqlite_type: 2, internal_type: 'Object' }, + { name: 'js2', sqlite_type: 2, internal_type: 'Object' }, + { name: 'maxKey', sqlite_type: 0, internal_type: 'MaxKey' }, + { name: 'minKey', sqlite_type: 0, internal_type: 'MinKey' }, + { name: 'nested', sqlite_type: 2, internal_type: 'Object' }, + { name: 'null', sqlite_type: 0, internal_type: 'Null' }, + { name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' }, + // We can fix these later + { name: 'pointer', sqlite_type: 2, internal_type: 'Object' }, + { name: 'pointer2', sqlite_type: 2, internal_type: 'Object' }, + { name: 'regexp', sqlite_type: 2, internal_type: 'RegExp' }, + // Can fix this later + { name: 'symbol', sqlite_type: 2, internal_type: 'String' }, + { name: 'text', sqlite_type: 2, internal_type: 'String' }, + { name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' }, + { name: 'undefined', sqlite_type: 0, internal_type: 'Null' }, + { name: 'uuid', sqlite_type: 2, internal_type: 'UUID' } + ] + } + ]); + }); + + test('validate postImages', async () => { + await using adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS, + postImages: PostImagesOption.READ_ONLY + }); + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: + - select _id as id, * from test_data - const schema = await adapter.getConnectionSchema(); - const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0]; - expect(dbSchema).not.toBeNull(); - expect(dbSchema.tables).toMatchObject([ + `, + { + ...adapter.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + } + ); + const source_table_patterns = rules.getSourceTables(); + const results = await adapter.getDebugTablesInfo(source_table_patterns, rules); + + const result = results[0]; + expect(result).not.toBeNull(); + expect(result.table).toMatchObject({ + schema: 'powersync_test_data', + name: 'test_data', + replication_id: ['_id'], + data_queries: true, + parameter_queries: false, + errors: [ { - name: 'test_data', - columns: [ - { name: '_id', sqlite_type: 4, internal_type: 'Integer' }, - { name: 'bool', sqlite_type: 4, internal_type: 'Boolean' }, - { name: 'bytea', sqlite_type: 1, internal_type: 'Binary' }, - { name: 'date', sqlite_type: 2, internal_type: 'Date' }, - { name: 'decimal', sqlite_type: 2, internal_type: 'Decimal' }, - { name: 'float', sqlite_type: 8, internal_type: 'Double' }, - { name: 'int2', sqlite_type: 4, internal_type: 'Integer' }, - { name: 'int4', sqlite_type: 4, internal_type: 'Integer' }, - { name: 'int8', sqlite_type: 4, internal_type: 'Long' }, - // We can fix these later - { name: 'js', sqlite_type: 2, internal_type: 'Object' }, - { name: 'js2', sqlite_type: 2, internal_type: 'Object' }, - { name: 'maxKey', sqlite_type: 0, internal_type: 'MaxKey' }, - { name: 'minKey', sqlite_type: 0, internal_type: 'MinKey' }, - { name: 'nested', sqlite_type: 2, internal_type: 'Object' }, - { name: 'null', sqlite_type: 0, internal_type: 'Null' }, - { name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' }, - // We can fix these later - { name: 'pointer', sqlite_type: 2, internal_type: 'Object' }, - { name: 'pointer2', sqlite_type: 2, internal_type: 'Object' }, - { name: 'regexp', sqlite_type: 2, internal_type: 'RegExp' }, - // Can fix this later - { name: 'symbol', sqlite_type: 2, internal_type: 'String' }, - { name: 'text', sqlite_type: 2, internal_type: 'String' }, - { name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' }, - { name: 'undefined', sqlite_type: 0, internal_type: 'Null' }, - { name: 'uuid', sqlite_type: 2, internal_type: 'UUID' } - ] + level: 'fatal', + message: 'changeStreamPreAndPostImages not enabled on powersync_test_data.test_data' } - ]); - } finally { - await adapter.shutdown(); - } + ] + }); + }); + + test('validate postImages - auto-configure', async () => { + await using adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS, + postImages: PostImagesOption.AUTO_CONFIGURE + }); + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: + - select _id as id, * from test_data + + `, + { + ...adapter.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + } + ); + const source_table_patterns = rules.getSourceTables(); + const results = await adapter.getDebugTablesInfo(source_table_patterns, rules); + + const result = results[0]; + expect(result).not.toBeNull(); + expect(result.table).toMatchObject({ + schema: 'powersync_test_data', + name: 'test_data', + replication_id: ['_id'], + data_queries: true, + parameter_queries: false, + errors: [ + { + level: 'warning', + message: + 'changeStreamPreAndPostImages not enabled on powersync_test_data.test_data, will be enabled automatically' + } + ] + }); + }); + + test('validate postImages - off', async () => { + await using adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS, + postImages: PostImagesOption.OFF + }); + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: + - select _id as id, * from test_data + + `, + { + ...adapter.getParseSyncRulesOptions(), + // No schema-based validation at this point + schema: undefined + } + ); + const source_table_patterns = rules.getSourceTables(); + const results = await adapter.getDebugTablesInfo(source_table_patterns, rules); + + const result = results[0]; + expect(result).not.toBeNull(); + expect(result.table).toMatchObject({ + schema: 'powersync_test_data', + name: 'test_data', + replication_id: ['_id'], + data_queries: true, + parameter_queries: false, + errors: [] + }); }); });