From 253e9ba3b38c20ff87b04b28e8e08af6e806ad08 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 18 Feb 2025 18:33:08 +0200 Subject: [PATCH 1/3] Workaround for negative int64 issue on mongodb replication. --- .../implementation/MongoBucketBatch.ts | 7 +-- .../implementation/MongoSyncBucketStorage.ts | 2 +- .../src/storage/implementation/db.ts | 2 +- .../src/replication/ChangeStream.ts | 1 - .../src/replication/MongoManager.ts | 7 ++- .../test/src/mongo_test.test.ts | 59 +++++++++++++++---- modules/module-mongodb/test/src/util.ts | 3 +- packages/service-core/src/storage/bson.ts | 25 +++++++- service/tsconfig.json | 3 + 9 files changed, 82 insertions(+), 27 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 2b329f233..d463ee40d 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -11,7 +11,7 @@ import { ReplicationAssertionError, ServiceError } from '@powersync/lib-services-framework'; -import { SaveOperationTag, storage, utils } from '@powersync/service-core'; +import { deserializeBson, SaveOperationTag, storage, utils } from '@powersync/service-core'; import * as timers from 'node:timers/promises'; import { PowerSyncMongo } from './db.js'; import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js'; @@ -322,10 +322,7 @@ export class MongoBucketBatch existing_buckets = result.buckets; existing_lookups = result.lookups; if (this.storeCurrentData) { - const data = bson.deserialize( - (result.data as mongo.Binary).buffer, - storage.BSON_DESERIALIZE_OPTIONS - ) as SqliteRow; + const data = deserializeBson((result.data as mongo.Binary).buffer) as SqliteRow; after = storage.mergeToast(after!, data); } } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index a2d51edfa..43f604fe1 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -324,7 +324,7 @@ export class MongoSyncBucketStorage // Ordered by _id, meaning buckets are grouped together for (let rawData of data) { - const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_OPTIONS) as BucketDataDocument; + const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument; const bucket = row._id.b; if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index a32df2ae1..44cbbd610 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -41,7 +41,7 @@ export class PowerSyncMongo { this.client = client; const db = client.db(options?.database, { - ...storage.BSON_DESERIALIZE_OPTIONS + ...storage.BSON_DESERIALIZE_INTERNAL_OPTIONS }); this.db = db; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index c0e9e56e1..c3ccef811 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -552,7 +552,6 @@ export class ChangeStream { const streamOptions: mongo.ChangeStreamOptions = { showExpandedEvents: true, - useBigInt64: true, maxAwaitTimeMS: 200, fullDocument: fullDocument }; diff --git a/modules/module-mongodb/src/replication/MongoManager.ts b/modules/module-mongodb/src/replication/MongoManager.ts index 14568f471..d2cc2cee2 100644 --- a/modules/module-mongodb/src/replication/MongoManager.ts +++ b/modules/module-mongodb/src/replication/MongoManager.ts @@ -1,11 +1,9 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { NormalizedMongoConnectionConfig } from '../types/types.js'; +import { BSON_DESERIALIZE_DATA_OPTIONS } from '@powersync/service-core'; export class MongoManager { - /** - * Do not use this for any transactions. - */ public readonly client: mongo.MongoClient; public readonly db: mongo.Db; @@ -35,6 +33,9 @@ export class MongoManager { maxConnecting: 3, maxIdleTimeMS: 60_000, + + ...BSON_DESERIALIZE_DATA_OPTIONS, + ...overrides }); this.db = this.client.db(options.database, {}); diff --git a/modules/module-mongodb/test/src/mongo_test.test.ts b/modules/module-mongodb/test/src/mongo_test.test.ts index d458da7b8..864feef66 100644 --- a/modules/module-mongodb/test/src/mongo_test.test.ts +++ b/modules/module-mongodb/test/src/mongo_test.test.ts @@ -47,6 +47,13 @@ describe('mongo data types', () => { 'mydb', { foo: 'bar' } ) + }, + { + _id: 6 as any, + int4: -1, + int8: -9007199254740993n, + float: -3.14, + decimal: new mongo.Decimal128('-3.14') } ]); } @@ -109,6 +116,13 @@ describe('mongo data types', () => { }, { _id: 2 as any, nested: [{ test: 'thing' }] }, { _id: 3 as any, date: [new Date('2023-03-06 15:47+02')] }, + { + _id: 6 as any, + int4: [-1], + int8: [-9007199254740993n], + float: [-3.14], + decimal: [new mongo.Decimal128('-3.14')] + }, { _id: 10 as any, timestamp: [mongo.Timestamp.fromBits(123, 456)], @@ -164,6 +178,14 @@ describe('mongo data types', () => { // This must specifically be null, and not undefined. expect(transformed[4].undefined).toBeNull(); + + expect(transformed[5]).toMatchObject({ + _id: 6n, + int4: -1n, + int8: -9007199254740993n, + float: -3.14, + decimal: '-3.14' + }); } function checkResultsNested(transformed: Record[]) { @@ -193,6 +215,19 @@ describe('mongo data types', () => { }); expect(transformed[3]).toMatchObject({ + _id: 5n, + undefined: '[null]' + }); + + expect(transformed[4]).toMatchObject({ + _id: 6n, + int4: '[-1]', + int8: '[-9007199254740993]', + float: '[-3.14]', + decimal: '["-3.14"]' + }); + + expect(transformed[5]).toMatchObject({ _id: 10n, objectId: '["66e834cc91d805df11fa0ecb"]', timestamp: '[1958505087099]', @@ -203,10 +238,6 @@ describe('mongo data types', () => { minKey: '[null]', maxKey: '[null]' }); - - expect(transformed[4]).toMatchObject({ - undefined: '[null]' - }); } test('test direct queries', async () => { @@ -218,11 +249,13 @@ describe('mongo data types', () => { await insert(collection); await insertUndefined(db, 'test_data'); - const rawResults = await db.collection('test_data').find().toArray(); + const rawResults = await db + .collection('test_data') + .find({}, { sort: { _id: 1 } }) + .toArray(); // It is tricky to save "undefined" with mongo, so we check that it succeeded. expect(rawResults[4].undefined).toBeUndefined(); const transformed = [...ChangeStream.getQueryData(rawResults)]; - checkResults(transformed); } finally { await client.close(); @@ -238,8 +271,11 @@ describe('mongo data types', () => { await insertNested(collection); await insertUndefined(db, 'test_data_arrays', true); - const rawResults = await db.collection('test_data_arrays').find().toArray(); - expect(rawResults[4].undefined).toEqual([undefined]); + const rawResults = await db + .collection('test_data_arrays') + .find({}, { sort: { _id: 1 } }) + .toArray(); + expect(rawResults[3].undefined).toEqual([undefined]); const transformed = [...ChangeStream.getQueryData(rawResults)]; checkResultsNested(transformed); @@ -257,7 +293,6 @@ describe('mongo data types', () => { await setupTable(db); const stream = db.watch([], { - useBigInt64: true, maxAwaitTimeMS: 50, fullDocument: 'updateLookup' }); @@ -267,7 +302,7 @@ describe('mongo data types', () => { await insert(collection); await insertUndefined(db, 'test_data'); - const transformed = await getReplicationTx(stream, 5); + const transformed = await getReplicationTx(stream, 6); checkResults(transformed); } finally { @@ -282,7 +317,6 @@ describe('mongo data types', () => { await setupTable(db); const stream = db.watch([], { - useBigInt64: true, maxAwaitTimeMS: 50, fullDocument: 'updateLookup' }); @@ -292,7 +326,7 @@ describe('mongo data types', () => { await insertNested(collection); await insertUndefined(db, 'test_data_arrays', true); - const transformed = await getReplicationTx(stream, 5); + const transformed = await getReplicationTx(stream, 6); checkResultsNested(transformed); } finally { @@ -505,5 +539,6 @@ async function getReplicationTx(replicationStream: mongo.ChangeStream, count: nu break; } } + transformed.sort((a, b) => Number(a._id) - Number(b._id)); return transformed; } diff --git a/modules/module-mongodb/test/src/util.ts b/modules/module-mongodb/test/src/util.ts index aaa566370..c6aa6cc22 100644 --- a/modules/module-mongodb/test/src/util.ts +++ b/modules/module-mongodb/test/src/util.ts @@ -4,6 +4,7 @@ import * as postgres_storage from '@powersync/service-module-postgres-storage'; import * as types from '@module/types/types.js'; import { env } from './env.js'; +import { BSON_DESERIALIZE_DATA_OPTIONS } from '@powersync/service-core'; export const TEST_URI = env.MONGO_TEST_DATA_URL; @@ -30,7 +31,7 @@ export async function connectMongoData() { connectTimeoutMS: env.CI ? 15_000 : 5_000, socketTimeoutMS: env.CI ? 15_000 : 5_000, serverSelectionTimeoutMS: env.CI ? 15_000 : 2_500, - useBigInt64: true + ...BSON_DESERIALIZE_DATA_OPTIONS }); const dbname = new URL(env.MONGO_TEST_DATA_URL).pathname.substring(1); return { client, db: client.db(dbname) }; diff --git a/packages/service-core/src/storage/bson.ts b/packages/service-core/src/storage/bson.ts index dd1c726a8..af453cc3c 100644 --- a/packages/service-core/src/storage/bson.ts +++ b/packages/service-core/src/storage/bson.ts @@ -5,11 +5,22 @@ import { ReplicaId } from './BucketStorage.js'; type NodeBuffer = Buffer; -export const BSON_DESERIALIZE_OPTIONS: bson.DeserializeOptions = { +/** + * Use for internal (bucket storage) data, where we control each field. + */ +export const BSON_DESERIALIZE_INTERNAL_OPTIONS: bson.DeserializeOptions = { // use bigint instead of Long useBigInt64: true }; +/** + * Use for data from external sources. + */ +export const BSON_DESERIALIZE_DATA_OPTIONS: bson.DeserializeOptions = { + // Temporarily disable due to https://jira.mongodb.org/browse/NODE-6764 + useBigInt64: false +}; + /** * Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers. * @param lookup @@ -51,8 +62,16 @@ export const deserializeReplicaId = (id: Buffer): ReplicaId => { return deserialized.id; }; -export const deserializeBson = (buffer: Buffer) => { - return bson.deserialize(buffer, BSON_DESERIALIZE_OPTIONS); +export const deserializeBson = (buffer: Uint8Array): bson.Document => { + const doc = bson.deserialize(buffer, BSON_DESERIALIZE_DATA_OPTIONS); + // Temporary workaround due to https://jira.mongodb.org/browse/NODE-6764 + for (let key in doc) { + const value = doc[key]; + if (value instanceof bson.Long) { + doc[key] = value.toBigInt(); + } + } + return doc; }; export const serializeBson = (document: any): NodeBuffer => { diff --git a/service/tsconfig.json b/service/tsconfig.json index 574744f72..e92cccf9c 100644 --- a/service/tsconfig.json +++ b/service/tsconfig.json @@ -39,6 +39,9 @@ { "path": "../modules/module-mongodb-storage" }, + { + "path": "../modules/module-postgres-storage" + }, { "path": "../modules/module-mysql" } From 88d4cb38bb82eb84f69bfaa5e984c72aa43f97f1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 18 Feb 2025 18:34:01 +0200 Subject: [PATCH 2/3] Add changeset. --- .changeset/smooth-yaks-float.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/smooth-yaks-float.md diff --git a/.changeset/smooth-yaks-float.md b/.changeset/smooth-yaks-float.md new file mode 100644 index 000000000..aacc131c1 --- /dev/null +++ b/.changeset/smooth-yaks-float.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Fix signed integer overflow issue for int64 values from MongoDB. From edb02607eac26870e44b5088b740bb0a86ff0b20 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 19 Feb 2025 14:23:50 +0200 Subject: [PATCH 3/3] Fix merge conflict. --- packages/service-core/src/storage/bson.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-core/src/storage/bson.ts b/packages/service-core/src/storage/bson.ts index c8f0f5a03..d0df1fedf 100644 --- a/packages/service-core/src/storage/bson.ts +++ b/packages/service-core/src/storage/bson.ts @@ -41,7 +41,7 @@ export const serializeLookup = (lookup: SqliteJsonValue[]) => { }; export const getLookupBucketDefinitionName = (lookup: bson.Binary) => { - const parsed = bson.deserialize(lookup.buffer, BSON_DESERIALIZE_OPTIONS).l as SqliteJsonValue[]; + const parsed = bson.deserialize(lookup.buffer, BSON_DESERIALIZE_INTERNAL_OPTIONS).l as SqliteJsonValue[]; return parsed[0] as string; };