diff --git a/.changeset/wicked-rockets-add.md b/.changeset/wicked-rockets-add.md new file mode 100644 index 000000000..a79e3034f --- /dev/null +++ b/.changeset/wicked-rockets-add.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-image': patch +--- + +Fix rare issue of incorrect checksums on fallback after checksum query timed out. diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 82bbdf0bd..50459b84a 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -11,7 +11,7 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { PowerSyncMongo } from './implementation/db.js'; import { SyncRuleDocument } from './implementation/models.js'; import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js'; -import { MongoSyncBucketStorage } from './implementation/MongoSyncBucketStorage.js'; +import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js'; import { generateSlotName } from './implementation/util.js'; export class MongoBucketStorage @@ -31,7 +31,8 @@ export class MongoBucketStorage db: PowerSyncMongo, options: { slot_name_prefix: string; - } + }, + private internalOptions?: MongoSyncBucketStorageOptions ) { super(); this.client = db.client; @@ -49,7 +50,7 @@ export class MongoBucketStorage if ((typeof id as any) == 'bigint') { id = Number(id); } - const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name); + const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, this.internalOptions); if (!options?.skipLifecycleHooks) { this.iterateListeners((cb) => cb.syncStorageCreated?.(storage)); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts index ef7c41c03..6930b8817 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -1,3 +1,4 @@ +import * as lib_mongo from '@powersync/lib-service-mongodb'; import { addPartialChecksums, bson, @@ -11,27 +12,51 @@ import { PartialChecksumMap, PartialOrFullChecksum } from '@powersync/service-core'; -import * as lib_mongo from '@powersync/lib-service-mongodb'; -import { logger } from '@powersync/lib-services-framework'; import { PowerSyncMongo } from './db.js'; +/** + * Checksum calculation options, primarily for tests. + */ +export interface MongoChecksumOptions { + /** + * How many buckets to process in a batch when calculating checksums. + */ + bucketBatchLimit?: number; + + /** + * Limit on the number of documents to calculate a checksum on at a time. + */ + operationBatchLimit?: number; +} + +const DEFAULT_BUCKET_BATCH_LIMIT = 200; +const DEFAULT_OPERATION_BATCH_LIMIT = 50_000; + /** * Checksum query implementation. + * + * General implementation flow is: + * 1. getChecksums() -> check cache for (partial) matches. If not found or partial match, query the remainder using computePartialChecksums(). + * 2. computePartialChecksums() -> query bucket_state for partial matches. Query the remainder using computePartialChecksumsDirect(). + * 3. computePartialChecksumsDirect() -> split into batches of 200 buckets at a time -> computePartialChecksumsInternal() + * 4. computePartialChecksumsInternal() -> aggregate over 50_000 operations in bucket_data at a time */ export class MongoChecksums { private cache = new ChecksumCache({ fetchChecksums: (batch) => { - return this.getChecksumsInternal(batch); + return this.computePartialChecksums(batch); } }); constructor( private db: PowerSyncMongo, - private group_id: number + private group_id: number, + private options?: MongoChecksumOptions ) {} /** - * Calculate checksums, utilizing the cache. + * Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from + * the database (bucket_state + bucket_data). */ async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { return this.cache.getChecksumMap(checkpoint, buckets); @@ -42,11 +67,15 @@ export class MongoChecksums { } /** - * Calculate (partial) checksums from bucket_state and the data collection. + * Calculate (partial) checksums from bucket_state (pre-aggregated) and bucket_data (individual operations). + * + * Results are not cached here. This method is only called by {@link ChecksumCache.getChecksumMap}, + * which is responsible for caching its result. * - * Results are not cached. + * As long as data is compacted regularly, this should be fast. Large buckets without pre-compacted bucket_state + * can be slow. */ - private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + private async computePartialChecksums(batch: FetchPartialBucketChecksum[]): Promise { if (batch.length == 0) { return new Map(); } @@ -100,7 +129,7 @@ export class MongoChecksums { }; }); - const queriedChecksums = await this.queryPartialChecksums(mappedRequests); + const queriedChecksums = await this.computePartialChecksumsDirect(mappedRequests); return new Map( batch.map((request) => { @@ -117,61 +146,150 @@ export class MongoChecksums { } /** - * Calculate (partial) checksums from the data collection directly. + * Calculate (partial) checksums from the data collection directly, bypassing the cache and bucket_state. + * + * Can be used directly in cases where the cache should be bypassed, such as from a compact job. + * + * Internally, we do calculations in smaller batches of buckets as appropriate. + * + * For large buckets, this can be slow, but should not time out as the underlying queries are performed in + * smaller batches. */ - async queryPartialChecksums(batch: FetchPartialBucketChecksum[]): Promise { - try { - return await this.queryPartialChecksumsInternal(batch); - } catch (e) { - if (e.codeName == 'MaxTimeMSExpired') { - logger.warn(`Checksum query timed out; falling back to slower version`, e); - // Timeout - try the slower but more robust version - return await this.queryPartialChecksumsFallback(batch); + public async computePartialChecksumsDirect(batch: FetchPartialBucketChecksum[]): Promise { + // Limit the number of buckets we query for at a time. + const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT; + + if (batch.length < bucketBatchLimit) { + // Single batch - no need for splitting the batch and merging results + return await this.computePartialChecksumsInternal(batch); + } + // Split the batch and merge results + let results = new Map(); + for (let i = 0; i < batch.length; i += bucketBatchLimit) { + const bucketBatch = batch.slice(i, i + bucketBatchLimit); + const batchResults = await this.computePartialChecksumsInternal(bucketBatch); + for (let r of batchResults.values()) { + results.set(r.bucket, r); } - throw lib_mongo.mapQueryError(e, 'while reading checksums'); } + return results; } - private async queryPartialChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { - const filters: any[] = []; + /** + * Query a batch of checksums. + * + * We limit the number of operations that the query aggregates in each sub-batch, to avoid potential query timeouts. + * + * `batch` must be limited to DEFAULT_BUCKET_BATCH_LIMIT buckets before calling this. + */ + private async computePartialChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + const batchLimit = this.options?.operationBatchLimit ?? DEFAULT_OPERATION_BATCH_LIMIT; + + // Map requests by bucket. We adjust this as we get partial results. + let requests = new Map(); for (let request of batch) { - filters.push({ - _id: { - $gt: { - g: this.group_id, - b: request.bucket, - o: request.start ?? new bson.MinKey() - }, - $lte: { - g: this.group_id, - b: request.bucket, - o: request.end - } - } - }); + requests.set(request.bucket, request); } - const aggregate = await this.db.bucket_data - .aggregate( - [ - { - $match: { - $or: filters + const partialChecksums = new Map(); + + while (requests.size > 0) { + const filters: any[] = []; + for (let request of requests.values()) { + filters.push({ + _id: { + $gt: { + g: this.group_id, + b: request.bucket, + o: request.start ?? new bson.MinKey() + }, + $lte: { + g: this.group_id, + b: request.bucket, + o: request.end } - }, - CHECKSUM_QUERY_GROUP_STAGE - ], - { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } - ) - // Don't map the error here - we want to keep timeout errors as-is - .toArray(); + } + }); + } - const partialChecksums = new Map( - aggregate.map((doc) => { + // Aggregate over a max of `batchLimit` operations at a time. + // Let's say we have 3 buckets (A, B, C), each with 10 operations, and our batch limit is 12. + // Then we'll do three batches: + // 1. Query: A[1-end], B[1-end], C[1-end] + // Returns: A[1-10], B[1-2] + // 2. Query: B[3-end], C[1-end] + // Returns: B[3-10], C[1-4] + // 3. Query: C[5-end] + // Returns: C[5-10] + const aggregate = await this.db.bucket_data + .aggregate( + [ + { + $match: { + $or: filters + } + }, + // sort and limit _before_ grouping + { $sort: { _id: 1 } }, + { $limit: batchLimit }, + { + $group: { + _id: '$_id.b', + // Historically, checksum may be stored as 'int' or 'double'. + // More recently, this should be a 'long'. + // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. + checksum_total: { $sum: { $toLong: '$checksum' } }, + count: { $sum: 1 }, + has_clear_op: { + $max: { + $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] + } + }, + last_op: { $max: '$_id.o' } + } + } + ], + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } + ) + .toArray() + .catch((e) => { + throw lib_mongo.mapQueryError(e, 'while reading checksums'); + }); + + let batchCount = 0; + let limitReached = false; + for (let doc of aggregate) { const bucket = doc._id; - return [bucket, checksumFromAggregate(doc)]; - }) - ); + const checksum = checksumFromAggregate(doc); + + const existing = partialChecksums.get(bucket); + if (existing != null) { + partialChecksums.set(bucket, addPartialChecksums(bucket, existing, checksum)); + } else { + partialChecksums.set(bucket, checksum); + } + + batchCount += doc.count; + if (batchCount == batchLimit) { + // Limit reached. Request more in the next batch. + // Note that this only affects the _last_ bucket in a batch. + limitReached = true; + const req = requests.get(bucket); + requests.set(bucket, { + bucket, + start: doc.last_op, + end: req!.end + }); + } else { + // All done for this bucket + requests.delete(bucket); + } + batchCount++; + } + if (!limitReached) { + break; + } + } return new Map( batch.map((request) => { @@ -197,106 +315,10 @@ export class MongoChecksums { }) ); } - - /** - * Checksums for large buckets can run over the query timeout. - * To avoid this, we query in batches. - * This version can handle larger amounts of data, but is slower, especially for many buckets. - */ - async queryPartialChecksumsFallback(batch: FetchPartialBucketChecksum[]): Promise { - const partialChecksums = new Map(); - for (let request of batch) { - const checksum = await this.slowChecksum(request); - partialChecksums.set(request.bucket, checksum); - } - - return partialChecksums; - } - - private async slowChecksum(request: FetchPartialBucketChecksum): Promise { - const batchLimit = 50_000; - - let lowerBound = 0n; - const bucket = request.bucket; - - let runningChecksum: PartialOrFullChecksum = { - bucket, - partialCount: 0, - partialChecksum: 0 - }; - if (request.start == null) { - runningChecksum = { - bucket, - count: 0, - checksum: 0 - }; - } - - while (true) { - const filter = { - _id: { - $gt: { - g: this.group_id, - b: bucket, - o: lowerBound - }, - $lte: { - g: this.group_id, - b: bucket, - o: request.end - } - } - }; - const docs = await this.db.bucket_data - .aggregate( - [ - { - $match: filter - }, - // sort and limit _before_ grouping - { $sort: { _id: 1 } }, - { $limit: batchLimit }, - CHECKSUM_QUERY_GROUP_STAGE - ], - { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } - ) - .toArray(); - const doc = docs[0]; - if (doc == null) { - return runningChecksum; - } - const partial = checksumFromAggregate(doc); - runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); - const isFinal = doc.count != batchLimit; - if (isFinal) { - break; - } else { - lowerBound = doc.last_op; - } - } - return runningChecksum; - } } -const CHECKSUM_QUERY_GROUP_STAGE = { - $group: { - _id: '$_id.b', - // Historically, checksum may be stored as 'int' or 'double'. - // More recently, this should be a 'long'. - // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. - checksum_total: { $sum: { $toLong: '$checksum' } }, - count: { $sum: 1 }, - has_clear_op: { - $max: { - $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] - } - }, - last_op: { $max: '$_id.o' } - } -}; - /** - * Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum. + * Convert output of the $group stage into a checksum. */ function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum { const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 7c62717af..ab776ccea 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -520,7 +520,7 @@ export class MongoCompactor { } private async updateChecksumsBatch(buckets: string[]) { - const checksums = await this.storage.checksums.queryPartialChecksums( + const checksums = await this.storage.checksums.computePartialChecksumsDirect( buckets.map((bucket) => { return { bucket, diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index e566620e9..18ab73ea5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -31,12 +31,16 @@ import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; -import { MongoChecksums } from './MongoChecksums.js'; +import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoParameterCompactor } from './MongoParameterCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js'; +export interface MongoSyncBucketStorageOptions { + checksumOptions?: MongoChecksumOptions; +} + export class MongoSyncBucketStorage extends BaseObserver implements storage.SyncRulesBucketStorage @@ -52,14 +56,15 @@ export class MongoSyncBucketStorage public readonly group_id: number, private readonly sync_rules: storage.PersistedSyncRulesContent, public readonly slot_name: string, - writeCheckpointMode: storage.WriteCheckpointMode = storage.WriteCheckpointMode.MANAGED + writeCheckpointMode?: storage.WriteCheckpointMode, + options?: MongoSyncBucketStorageOptions ) { super(); this.db = factory.db; - this.checksums = new MongoChecksums(this.db, this.group_id); + this.checksums = new MongoChecksums(this.db, this.group_id, options?.checksumOptions); this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ db: this.db, - mode: writeCheckpointMode, + mode: writeCheckpointMode ?? storage.WriteCheckpointMode.MANAGED, sync_rules_id: group_id }); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts index a7457b43e..72be4547e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts @@ -1,10 +1,12 @@ import { TestStorageOptions } from '@powersync/service-core'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { connectMongoForTests } from './util.js'; +import { MongoSyncBucketStorageOptions } from './MongoSyncBucketStorage.js'; export type MongoTestStorageOptions = { url: string; isCI: boolean; + internalOptions?: MongoSyncBucketStorageOptions; }; export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorageOptions) => { @@ -23,6 +25,6 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag await db.clear(); } - return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }); + return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions); }; }; diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage.test.ts.snap index 7c072c0a1..c852d392d 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage.test.ts.snap @@ -1,6 +1,22 @@ // Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html -exports[`Mongo Sync Bucket Storage > empty storage metrics 1`] = ` +exports[`Mongo Sync Bucket Storage - Data > empty storage metrics 1`] = ` +{ + "operations_size_bytes": 0, + "parameters_size_bytes": 0, + "replication_size_bytes": 0, +} +`; + +exports[`Mongo Sync Bucket Storage - split buckets > empty storage metrics 1`] = ` +{ + "operations_size_bytes": 0, + "parameters_size_bytes": 0, + "replication_size_bytes": 0, +} +`; + +exports[`Mongo Sync Bucket Storage - split operations > empty storage metrics 1`] = ` { "operations_size_bytes": 0, "parameters_size_bytes": 0, diff --git a/modules/module-mongodb-storage/test/src/storage.test.ts b/modules/module-mongodb-storage/test/src/storage.test.ts index c84e4657c..858c51b87 100644 --- a/modules/module-mongodb-storage/test/src/storage.test.ts +++ b/modules/module-mongodb-storage/test/src/storage.test.ts @@ -1,7 +1,44 @@ import { register } from '@powersync/service-core-tests'; import { describe } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; +import { env } from './env.js'; +import { MongoTestStorageFactoryGenerator } from '@module/storage/implementation/MongoTestStorageFactoryGenerator.js'; -describe('Mongo Sync Bucket Storage', () => register.registerDataStorageTests(INITIALIZED_MONGO_STORAGE_FACTORY)); +describe('Mongo Sync Bucket Storage - Parameters', () => + register.registerDataStorageParameterTests(INITIALIZED_MONGO_STORAGE_FACTORY)); + +describe('Mongo Sync Bucket Storage - Data', () => + register.registerDataStorageDataTests(INITIALIZED_MONGO_STORAGE_FACTORY)); + +describe('Mongo Sync Bucket Storage - Checkpoints', () => + register.registerDataStorageCheckpointTests(INITIALIZED_MONGO_STORAGE_FACTORY)); describe('Sync Bucket Validation', register.registerBucketValidationTests); + +describe('Mongo Sync Bucket Storage - split operations', () => + register.registerDataStorageDataTests( + MongoTestStorageFactoryGenerator({ + url: env.MONGO_TEST_URL, + isCI: env.CI, + internalOptions: { + checksumOptions: { + bucketBatchLimit: 100, + operationBatchLimit: 1 + } + } + }) + )); + +describe('Mongo Sync Bucket Storage - split buckets', () => + register.registerDataStorageDataTests( + MongoTestStorageFactoryGenerator({ + url: env.MONGO_TEST_URL, + isCI: env.CI, + internalOptions: { + checksumOptions: { + bucketBatchLimit: 1, + operationBatchLimit: 100 + } + } + }) + )); diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage.test.ts.snap index 10eb83682..d1b24f45b 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage.test.ts.snap @@ -1,6 +1,6 @@ // Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html -exports[`Postgres Sync Bucket Storage > empty storage metrics 1`] = ` +exports[`Postgres Sync Bucket Storage - Data > empty storage metrics 1`] = ` { "operations_size_bytes": 16384, "parameters_size_bytes": 32768, diff --git a/modules/module-postgres-storage/test/src/storage.test.ts b/modules/module-postgres-storage/test/src/storage.test.ts index 5551b9f88..9fb1ce197 100644 --- a/modules/module-postgres-storage/test/src/storage.test.ts +++ b/modules/module-postgres-storage/test/src/storage.test.ts @@ -3,11 +3,17 @@ import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests' import { describe, expect, test } from 'vitest'; import { POSTGRES_STORAGE_FACTORY } from './util.js'; -describe('Sync Bucket Validation', register.registerBucketValidationTests); +describe('Postgres Sync Bucket Storage - Parameters', () => + register.registerDataStorageParameterTests(POSTGRES_STORAGE_FACTORY)); + +describe('Postgres Sync Bucket Storage - Data', () => register.registerDataStorageDataTests(POSTGRES_STORAGE_FACTORY)); -describe('Postgres Sync Bucket Storage', () => { - register.registerDataStorageTests(POSTGRES_STORAGE_FACTORY); +describe('Postgres Sync Bucket Storage - Checkpoints', () => + register.registerDataStorageCheckpointTests(POSTGRES_STORAGE_FACTORY)); + +describe('Sync Bucket Validation', register.registerBucketValidationTests); +describe('Postgres Sync Bucket Storage - pg-specific', () => { /** * The split of returned results can vary depending on storage drivers. * The large rows here are 2MB large while the default chunk limit is 1mb. diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index 8edad4516..06504a606 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -521,4 +521,67 @@ bucket_definitions: checksum: 1874612650 }); }); + + test('partial checksums after compacting (2)', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: [select * from test] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1' + }, + afterReplicaId: 't1' + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 't1' + }, + afterReplicaId: 't1' + }); + + await batch.commit('1/1'); + }); + + // Get checksums here just to populate the cache + await bucketStorage.getChecksums(result!.flushed_op, ['global[]']); + const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.DELETE, + before: { + id: 't1' + }, + beforeReplicaId: 't1' + }); + await batch.commit('2/1'); + }); + + await bucketStorage.compact({ + clearBatchLimit: 20, + moveBatchLimit: 10, + moveBatchQueryLimit: 10 + }); + + const checkpoint2 = result2!.flushed_op; + // Check that the checksum was correctly updated with the clear operation after having a cached checksum + const checksumAfter = await bucketStorage.getChecksums(checkpoint2, ['global[]']); + expect(checksumAfter.get('global[]')).toMatchObject({ + bucket: 'global[]', + count: 1, + checksum: -1481659821 + }); + }); } diff --git a/packages/service-core-tests/src/tests/register-data-storage-checkpoint-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-checkpoint-tests.ts new file mode 100644 index 000000000..72dd7dced --- /dev/null +++ b/packages/service-core-tests/src/tests/register-data-storage-checkpoint-tests.ts @@ -0,0 +1,277 @@ +import { storage } from '@powersync/service-core'; +import { expect, test } from 'vitest'; +import * as test_utils from '../test-utils/test-utils-index.js'; + +/** + * @example + * ```TypeScript + * + * describe('store - mongodb', function () { + * registerDataStorageCheckpointTests(MONGO_STORAGE_FACTORY); + * }); + * + * ``` + */ +export function registerDataStorageCheckpointTests(generateStorageFactory: storage.TestStorageFactory) { + test('managed write checkpoints - checkpoint after write', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ + heads: { '1': '5/0' }, + user_id: 'user1' + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: writeCheckpoint + } + }); + }); + + test('managed write checkpoints - write after checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: null + } + }); + + const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ + heads: { '1': '6/0' }, + user_id: 'user1' + }); + // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. + // This is what is effetively triggered with RouteAPI.createReplicationHead(). + // MongoDB storage doesn't explicitly need this anymore. + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('6/0'); + }); + + let result2 = await iter.next(); + if (result2.value?.base?.lsn == '5/0') { + // Events could arrive in a different order in some cases - this caters for it + result2 = await iter.next(); + } + expect(result2).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '6/0' + }, + writeCheckpoint: writeCheckpoint + } + }); + }); + + test('custom write checkpoints - checkpoint after write', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.addCustomWriteCheckpoint({ + checkpoint: 5n, + user_id: 'user1' + }); + await batch.flush(); + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + lsn: '5/0' + }, + writeCheckpoint: 5n + } + }); + }); + + test('custom write checkpoints - standalone checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // Flush to clear state + await batch.flush(); + + await batch.addCustomWriteCheckpoint({ + checkpoint: 5n, + user_id: 'user1' + }); + await batch.flush(); + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + lsn: '5/0' + }, + writeCheckpoint: 5n + } + }); + }); + + test('custom write checkpoints - write after checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + lsn: '5/0' + }, + writeCheckpoint: null + } + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + batch.addCustomWriteCheckpoint({ + checkpoint: 6n, + user_id: 'user1' + }); + await batch.flush(); + await batch.keepalive('6/0'); + }); + + let result2 = await iter.next(); + expect(result2).toMatchObject({ + done: false, + value: { + base: { + // can be 5/0 or 6/0 - actual value not relevant for custom write checkpoints + // lsn: '6/0' + }, + writeCheckpoint: 6n + } + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + batch.addCustomWriteCheckpoint({ + checkpoint: 7n, + user_id: 'user1' + }); + await batch.flush(); + await batch.keepalive('7/0'); + }); + + let result3 = await iter.next(); + expect(result3).toMatchObject({ + done: false, + value: { + base: { + // can be 5/0, 6/0 or 7/0 - actual value not relevant for custom write checkpoints + // lsn: '7/0' + }, + writeCheckpoint: 7n + } + }); + }); +} diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts similarity index 57% rename from packages/service-core-tests/src/tests/register-data-storage-tests.ts rename to packages/service-core-tests/src/tests/register-data-storage-data-tests.ts index d9b2c066e..1aeeb36d7 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts @@ -1,17 +1,14 @@ import { BucketDataBatchOptions, getUuidReplicaIdentityBson, - InternalOpId, OplogEntry, SaveOptions, storage } from '@powersync/service-core'; -import { DateTimeValue, ParameterLookup, RequestParameters } from '@powersync/service-sync-rules'; -import { expect, test, describe, beforeEach } from 'vitest'; +import { DateTimeValue } from '@powersync/service-sync-rules'; +import { describe, expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; -import { SqlBucketDescriptor } from '@powersync/service-sync-rules/src/SqlBucketDescriptor.js'; - -export const TEST_TABLE = test_utils.makeTestTable('test', ['id']); +import { TEST_TABLE } from './util.js'; /** * Normalize data from OplogEntries for comparison in tests. @@ -29,296 +26,12 @@ const normalizeOplogData = (data: OplogEntry['data']) => { * ```TypeScript * * describe('store - mongodb', function () { - * registerDataStorageTests(MONGO_STORAGE_FACTORY); + * registerDataStorageDataTests(MONGO_STORAGE_FACTORY); * }); * * ``` */ -export function registerDataStorageTests(generateStorageFactory: storage.TestStorageFactory) { - test('save and load parameters', async () => { - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - mybucket: - parameters: - - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id - data: [] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 't2', - id1: 'user3', - id2: 'user4', - group_id: 'group2a' - }, - afterReplicaId: test_utils.rid('t2') - }); - - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 't1', - id1: 'user1', - id2: 'user2', - group_id: 'group1a' - }, - afterReplicaId: test_utils.rid('t1') - }); - - await batch.commit('1/1'); - }); - - const checkpoint = await bucketStorage.getCheckpoint(); - const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); - expect(parameters).toEqual([ - { - group_id: 'group1a' - } - ]); - }); - - test('it should use the latest version', async () => { - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - mybucket: - parameters: - - SELECT group_id FROM test WHERE id = token_parameters.user_id - data: [] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'user1', - group_id: 'group1' - }, - afterReplicaId: test_utils.rid('user1') - }); - await batch.commit('1/1'); - }); - const checkpoint1 = await bucketStorage.getCheckpoint(); - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'user1', - group_id: 'group2' - }, - afterReplicaId: test_utils.rid('user1') - }); - await batch.commit('1/2'); - }); - const checkpoint2 = await bucketStorage.getCheckpoint(); - - const parameters = await checkpoint2.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); - expect(parameters).toEqual([ - { - group_id: 'group2' - } - ]); - - // Use the checkpoint to get older data if relevant - const parameters2 = await checkpoint1.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); - expect(parameters2).toEqual([ - { - group_id: 'group1' - } - ]); - }); - - test('it should use the latest version after updates', async () => { - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - mybucket: - parameters: - - SELECT id AS todo_id - FROM todos - WHERE list_id IN token_parameters.list_id - data: [] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - const table = test_utils.makeTestTable('todos', ['id', 'list_id']); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - // Create two todos which initially belong to different lists - await batch.save({ - sourceTable: table, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'todo1', - list_id: 'list1' - }, - afterReplicaId: test_utils.rid('todo1') - }); - await batch.save({ - sourceTable: table, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'todo2', - list_id: 'list2' - }, - afterReplicaId: test_utils.rid('todo2') - }); - - await batch.commit('1/1'); - }); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - // Update the second todo item to now belong to list 1 - await batch.save({ - sourceTable: table, - tag: storage.SaveOperationTag.UPDATE, - after: { - id: 'todo2', - list_id: 'list1' - }, - afterReplicaId: test_utils.rid('todo2') - }); - - await batch.commit('1/1'); - }); - - // We specifically request the todo_ids for both lists. - // There removal operation for the association of `list2`::`todo2` should not interfere with the new - // association of `list1`::`todo2` - const checkpoint = await bucketStorage.getCheckpoint(); - const parameters = await checkpoint.getParameterSets([ - ParameterLookup.normalized('mybucket', '1', ['list1']), - ParameterLookup.normalized('mybucket', '1', ['list2']) - ]); - - expect(parameters.sort((a, b) => (a.todo_id as string).localeCompare(b.todo_id as string))).toEqual([ - { - todo_id: 'todo1' - }, - { - todo_id: 'todo2' - } - ]); - }); - - test('save and load parameters with different number types', async () => { - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - mybucket: - parameters: - - SELECT group_id FROM test WHERE n1 = token_parameters.n1 and f2 = token_parameters.f2 and f3 = token_parameters.f3 - data: [] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 't1', - group_id: 'group1', - n1: 314n, - f2: 314, - f3: 3.14 - }, - afterReplicaId: test_utils.rid('t1') - }); - - await batch.commit('1/1'); - }); - - const TEST_PARAMS = { group_id: 'group1' }; - - const checkpoint = await bucketStorage.getCheckpoint(); - - const parameters1 = await checkpoint.getParameterSets([ - ParameterLookup.normalized('mybucket', '1', [314n, 314, 3.14]) - ]); - expect(parameters1).toEqual([TEST_PARAMS]); - const parameters2 = await checkpoint.getParameterSets([ - ParameterLookup.normalized('mybucket', '1', [314, 314n, 3.14]) - ]); - expect(parameters2).toEqual([TEST_PARAMS]); - const parameters3 = await checkpoint.getParameterSets([ - ParameterLookup.normalized('mybucket', '1', [314n, 314, 3]) - ]); - expect(parameters3).toEqual([]); - }); - - test('save and load parameters with large numbers', async () => { - // This ensures serialization / deserialization of "current_data" is done correctly. - // This specific case tested here cannot happen with postgres in practice, but we still - // test this to ensure correct deserialization. - - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - mybucket: - parameters: - - SELECT group_id FROM test WHERE n1 = token_parameters.n1 - data: [] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 't1', - group_id: 'group1', - n1: 1152921504606846976n // 2^60 - }, - afterReplicaId: test_utils.rid('t1') - }); - - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.UPDATE, - after: { - id: 't1', - group_id: 'group1', - // Simulate a TOAST value, even though it can't happen for values like this - // in practice. - n1: undefined - }, - afterReplicaId: test_utils.rid('t1') - }); - - await batch.commit('1/1'); - }); - - const TEST_PARAMS = { group_id: 'group1' }; - - const checkpoint = await bucketStorage.getCheckpoint(); - - const parameters1 = await checkpoint.getParameterSets([ - ParameterLookup.normalized('mybucket', '1', [1152921504606846976n]) - ]); - expect(parameters1).toEqual([TEST_PARAMS]); - }); - +export function registerDataStorageDataTests(generateStorageFactory: storage.TestStorageFactory) { test('removing row', async () => { await using factory = await generateStorageFactory(); const syncRules = await factory.updateSyncRules({ @@ -380,247 +93,6 @@ bucket_definitions: ]); }); - test('save and load parameters with workspaceId', async () => { - const WORKSPACE_TABLE = test_utils.makeTestTable('workspace', ['id']); - - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - by_workspace: - parameters: - - SELECT id as workspace_id FROM workspace WHERE - workspace."userId" = token_parameters.user_id - data: [] - ` - }); - const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace1', - userId: 'u1' - }, - afterReplicaId: test_utils.rid('workspace1') - }); - await batch.commit('1/1'); - }); - const checkpoint = await bucketStorage.getCheckpoint(); - - const parameters = new RequestParameters({ sub: 'u1' }, {}); - - const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; - - const lookups = q1.getLookups(parameters); - expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]); - - const parameter_sets = await checkpoint.getParameterSets(lookups); - expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]); - - const buckets = await sync_rules - .getBucketParameterQuerier(test_utils.querierOptions(parameters)) - .querier.queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return checkpoint.getParameterSets(lookups); - } - }); - expect(buckets).toEqual([ - { bucket: 'by_workspace["workspace1"]', priority: 3, definition: 'by_workspace', inclusion_reasons: ['default'] } - ]); - }); - - test('save and load parameters with dynamic global buckets', async () => { - const WORKSPACE_TABLE = test_utils.makeTestTable('workspace'); - - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - by_public_workspace: - parameters: - - SELECT id as workspace_id FROM workspace WHERE - workspace.visibility = 'public' - data: [] - ` - }); - const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace1', - visibility: 'public' - }, - afterReplicaId: test_utils.rid('workspace1') - }); - - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace2', - visibility: 'private' - }, - afterReplicaId: test_utils.rid('workspace2') - }); - - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace3', - visibility: 'public' - }, - afterReplicaId: test_utils.rid('workspace3') - }); - - await batch.commit('1/1'); - }); - - const checkpoint = await bucketStorage.getCheckpoint(); - - const parameters = new RequestParameters({ sub: 'unknown' }, {}); - - const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; - - const lookups = q1.getLookups(parameters); - expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]); - - const parameter_sets = await checkpoint.getParameterSets(lookups); - parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); - expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]); - - const buckets = await sync_rules - .getBucketParameterQuerier(test_utils.querierOptions(parameters)) - .querier.queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return checkpoint.getParameterSets(lookups); - } - }); - buckets.sort((a, b) => a.bucket.localeCompare(b.bucket)); - expect(buckets).toEqual([ - { - bucket: 'by_public_workspace["workspace1"]', - priority: 3, - definition: 'by_public_workspace', - inclusion_reasons: ['default'] - }, - { - bucket: 'by_public_workspace["workspace3"]', - priority: 3, - definition: 'by_public_workspace', - inclusion_reasons: ['default'] - } - ]); - }); - - test('multiple parameter queries', async () => { - const WORKSPACE_TABLE = test_utils.makeTestTable('workspace'); - - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - by_workspace: - parameters: - - SELECT id as workspace_id FROM workspace WHERE - workspace.visibility = 'public' - - SELECT id as workspace_id FROM workspace WHERE - workspace.user_id = token_parameters.user_id - data: [] - ` - }); - const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace1', - visibility: 'public' - }, - afterReplicaId: test_utils.rid('workspace1') - }); - - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace2', - visibility: 'private' - }, - afterReplicaId: test_utils.rid('workspace2') - }); - - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace3', - user_id: 'u1', - visibility: 'private' - }, - afterReplicaId: test_utils.rid('workspace3') - }); - - await batch.save({ - sourceTable: WORKSPACE_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 'workspace4', - user_id: 'u2', - visibility: 'private' - }, - afterReplicaId: test_utils.rid('workspace4') - }); - - await batch.commit('1/1'); - }); - - const checkpoint = await bucketStorage.getCheckpoint(); - - const parameters = new RequestParameters({ sub: 'u1' }, {}); - - // Test intermediate values - could be moved to sync_rules.test.ts - const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; - const lookups1 = q1.getLookups(parameters); - expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]); - - const parameter_sets1 = await checkpoint.getParameterSets(lookups1); - parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); - expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]); - - const q2 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[1]; - const lookups2 = q2.getLookups(parameters); - expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]); - - const parameter_sets2 = await checkpoint.getParameterSets(lookups2); - parameter_sets2.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); - expect(parameter_sets2).toEqual([{ workspace_id: 'workspace3' }]); - - // Test final values - the important part - const buckets = ( - await sync_rules - .getBucketParameterQuerier(test_utils.querierOptions(parameters)) - .querier.queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return checkpoint.getParameterSets(lookups); - } - }) - ).map((e) => e.bucket); - buckets.sort(); - expect(buckets).toEqual(['by_workspace["workspace1"]', 'by_workspace["workspace3"]']); - }); - test('changing client ids', async () => { await using factory = await generateStorageFactory(); const syncRules = await factory.updateSyncRules({ @@ -885,41 +357,6 @@ bucket_definitions: ]); }); - test('truncate parameters', async () => { - await using factory = await generateStorageFactory(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - mybucket: - parameters: - - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id - data: [] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 't2', - id1: 'user3', - id2: 'user4', - group_id: 'group2a' - }, - afterReplicaId: test_utils.rid('t2') - }); - - await batch.truncate([TEST_TABLE]); - }); - - const checkpoint = await bucketStorage.getCheckpoint(); - - const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); - expect(parameters).toEqual([]); - }); - test('batch with overlapping replica ids', async () => { // This test checks that we get the correct output when processing rows with: // 1. changing replica ids @@ -1650,303 +1087,6 @@ bucket_definitions: expect(metrics2).toMatchSnapshot(); }); - test('invalidate cached parsed sync rules', async () => { - await using bucketStorageFactory = await generateStorageFactory(); - const syncRules = await bucketStorageFactory.updateSyncRules({ - content: ` -bucket_definitions: - by_workspace: - parameters: - - SELECT id as workspace_id FROM workspace WHERE - workspace."userId" = token_parameters.user_id - data: [] - ` - }); - const syncBucketStorage = bucketStorageFactory.getInstance(syncRules); - - const parsedSchema1 = syncBucketStorage.getParsedSyncRules({ - defaultSchema: 'public' - }); - - const parsedSchema2 = syncBucketStorage.getParsedSyncRules({ - defaultSchema: 'public' - }); - - // These should be cached, this will be the same instance - expect(parsedSchema2).equals(parsedSchema1); - expect(parsedSchema1.getSourceTables()[0].schema).equals('public'); - - const parsedSchema3 = syncBucketStorage.getParsedSyncRules({ - defaultSchema: 'databasename' - }); - - // The cache should not be used - expect(parsedSchema3).not.equals(parsedSchema2); - expect(parsedSchema3.getSourceTables()[0].schema).equals('databasename'); - }); - - test('managed write checkpoints - checkpoint after write', async (context) => { - await using factory = await generateStorageFactory(); - const r = await factory.configureSyncRules({ - content: ` -bucket_definitions: - mybucket: - data: [] - `, - validate: false - }); - const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - - const abortController = new AbortController(); - context.onTestFinished(() => abortController.abort()); - const iter = bucketStorage - .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) - [Symbol.asyncIterator](); - - const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ - heads: { '1': '5/0' }, - user_id: 'user1' - }); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.keepalive('5/0'); - }); - - const result = await iter.next(); - expect(result).toMatchObject({ - done: false, - value: { - base: { - checkpoint: 0n, - lsn: '5/0' - }, - writeCheckpoint: writeCheckpoint - } - }); - }); - - test('managed write checkpoints - write after checkpoint', async (context) => { - await using factory = await generateStorageFactory(); - const r = await factory.configureSyncRules({ - content: ` -bucket_definitions: - mybucket: - data: [] - `, - validate: false - }); - const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - - const abortController = new AbortController(); - context.onTestFinished(() => abortController.abort()); - const iter = bucketStorage - .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) - [Symbol.asyncIterator](); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.keepalive('5/0'); - }); - - const result = await iter.next(); - expect(result).toMatchObject({ - done: false, - value: { - base: { - checkpoint: 0n, - lsn: '5/0' - }, - writeCheckpoint: null - } - }); - - const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ - heads: { '1': '6/0' }, - user_id: 'user1' - }); - // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. - // This is what is effetively triggered with RouteAPI.createReplicationHead(). - // MongoDB storage doesn't explicitly need this anymore. - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.keepalive('6/0'); - }); - - let result2 = await iter.next(); - if (result2.value?.base?.lsn == '5/0') { - // Events could arrive in a different order in some cases - this caters for it - result2 = await iter.next(); - } - expect(result2).toMatchObject({ - done: false, - value: { - base: { - checkpoint: 0n, - lsn: '6/0' - }, - writeCheckpoint: writeCheckpoint - } - }); - }); - - test('custom write checkpoints - checkpoint after write', async (context) => { - await using factory = await generateStorageFactory(); - const r = await factory.configureSyncRules({ - content: ` -bucket_definitions: - mybucket: - data: [] - `, - validate: false - }); - const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); - - const abortController = new AbortController(); - context.onTestFinished(() => abortController.abort()); - const iter = bucketStorage - .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) - [Symbol.asyncIterator](); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.addCustomWriteCheckpoint({ - checkpoint: 5n, - user_id: 'user1' - }); - await batch.flush(); - await batch.keepalive('5/0'); - }); - - const result = await iter.next(); - expect(result).toMatchObject({ - done: false, - value: { - base: { - lsn: '5/0' - }, - writeCheckpoint: 5n - } - }); - }); - - test('custom write checkpoints - standalone checkpoint', async (context) => { - await using factory = await generateStorageFactory(); - const r = await factory.configureSyncRules({ - content: ` -bucket_definitions: - mybucket: - data: [] - `, - validate: false - }); - const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); - - const abortController = new AbortController(); - context.onTestFinished(() => abortController.abort()); - const iter = bucketStorage - .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) - [Symbol.asyncIterator](); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - // Flush to clear state - await batch.flush(); - - await batch.addCustomWriteCheckpoint({ - checkpoint: 5n, - user_id: 'user1' - }); - await batch.flush(); - await batch.keepalive('5/0'); - }); - - const result = await iter.next(); - expect(result).toMatchObject({ - done: false, - value: { - base: { - lsn: '5/0' - }, - writeCheckpoint: 5n - } - }); - }); - - test('custom write checkpoints - write after checkpoint', async (context) => { - await using factory = await generateStorageFactory(); - const r = await factory.configureSyncRules({ - content: ` -bucket_definitions: - mybucket: - data: [] - `, - validate: false - }); - const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); - - const abortController = new AbortController(); - context.onTestFinished(() => abortController.abort()); - const iter = bucketStorage - .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) - [Symbol.asyncIterator](); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.keepalive('5/0'); - }); - - const result = await iter.next(); - expect(result).toMatchObject({ - done: false, - value: { - base: { - lsn: '5/0' - }, - writeCheckpoint: null - } - }); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - batch.addCustomWriteCheckpoint({ - checkpoint: 6n, - user_id: 'user1' - }); - await batch.flush(); - await batch.keepalive('6/0'); - }); - - let result2 = await iter.next(); - expect(result2).toMatchObject({ - done: false, - value: { - base: { - // can be 5/0 or 6/0 - actual value not relevant for custom write checkpoints - // lsn: '6/0' - }, - writeCheckpoint: 6n - } - }); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - batch.addCustomWriteCheckpoint({ - checkpoint: 7n, - user_id: 'user1' - }); - await batch.flush(); - await batch.keepalive('7/0'); - }); - - let result3 = await iter.next(); - expect(result3).toMatchObject({ - done: false, - value: { - base: { - // can be 5/0, 6/0 or 7/0 - actual value not relevant for custom write checkpoints - // lsn: '7/0' - }, - writeCheckpoint: 7n - } - }); - }); - test('op_id initialization edge case', async () => { // Test syncing a batch of data that is small in count, // but large enough in size to be split over multiple returned chunks. @@ -2062,4 +1202,37 @@ bucket_definitions: } ]); }); + + test('unchanged checksums', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT client_id as id, description FROM "%" +` + }); + const bucketStorage = factory.getInstance(syncRules); + + const sourceTable = TEST_TABLE; + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'test1', + description: 'test1a' + }, + afterReplicaId: test_utils.rid('test1') + }); + await batch.commit('1/1'); + }); + const { checkpoint } = await bucketStorage.getCheckpoint(); + + const checksums = [...(await bucketStorage.getChecksums(checkpoint, ['global[]'])).values()]; + expect(checksums).toEqual([{ bucket: 'global[]', checksum: 1917136889, count: 1 }]); + const checksums2 = [...(await bucketStorage.getChecksums(checkpoint + 1n, ['global[]'])).values()]; + expect(checksums2).toEqual([{ bucket: 'global[]', checksum: 1917136889, count: 1 }]); + }); } diff --git a/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts new file mode 100644 index 000000000..d079aaa8c --- /dev/null +++ b/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts @@ -0,0 +1,613 @@ +import { storage } from '@powersync/service-core'; +import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules'; +import { SqlBucketDescriptor } from '@powersync/service-sync-rules/src/SqlBucketDescriptor.js'; +import { expect, test } from 'vitest'; +import * as test_utils from '../test-utils/test-utils-index.js'; +import { TEST_TABLE } from './util.js'; + +/** + * @example + * ```TypeScript + * + * describe('store - mongodb', function () { + * registerDataStorageTests(MONGO_STORAGE_FACTORY); + * }); + * + * ``` + */ +export function registerDataStorageParameterTests(generateStorageFactory: storage.TestStorageFactory) { + test('save and load parameters', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + mybucket: + parameters: + - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2', + id1: 'user3', + id2: 'user4', + group_id: 'group2a' + }, + afterReplicaId: test_utils.rid('t2') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + id1: 'user1', + id2: 'user2', + group_id: 'group1a' + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = await bucketStorage.getCheckpoint(); + const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); + expect(parameters).toEqual([ + { + group_id: 'group1a' + } + ]); + }); + + test('it should use the latest version', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + mybucket: + parameters: + - SELECT group_id FROM test WHERE id = token_parameters.user_id + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'user1', + group_id: 'group1' + }, + afterReplicaId: test_utils.rid('user1') + }); + await batch.commit('1/1'); + }); + const checkpoint1 = await bucketStorage.getCheckpoint(); + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'user1', + group_id: 'group2' + }, + afterReplicaId: test_utils.rid('user1') + }); + await batch.commit('1/2'); + }); + const checkpoint2 = await bucketStorage.getCheckpoint(); + + const parameters = await checkpoint2.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); + expect(parameters).toEqual([ + { + group_id: 'group2' + } + ]); + + // Use the checkpoint to get older data if relevant + const parameters2 = await checkpoint1.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); + expect(parameters2).toEqual([ + { + group_id: 'group1' + } + ]); + }); + + test('it should use the latest version after updates', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + mybucket: + parameters: + - SELECT id AS todo_id + FROM todos + WHERE list_id IN token_parameters.list_id + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const table = test_utils.makeTestTable('todos', ['id', 'list_id']); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // Create two todos which initially belong to different lists + await batch.save({ + sourceTable: table, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'todo1', + list_id: 'list1' + }, + afterReplicaId: test_utils.rid('todo1') + }); + await batch.save({ + sourceTable: table, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'todo2', + list_id: 'list2' + }, + afterReplicaId: test_utils.rid('todo2') + }); + + await batch.commit('1/1'); + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // Update the second todo item to now belong to list 1 + await batch.save({ + sourceTable: table, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 'todo2', + list_id: 'list1' + }, + afterReplicaId: test_utils.rid('todo2') + }); + + await batch.commit('1/1'); + }); + + // We specifically request the todo_ids for both lists. + // There removal operation for the association of `list2`::`todo2` should not interfere with the new + // association of `list1`::`todo2` + const checkpoint = await bucketStorage.getCheckpoint(); + const parameters = await checkpoint.getParameterSets([ + ParameterLookup.normalized('mybucket', '1', ['list1']), + ParameterLookup.normalized('mybucket', '1', ['list2']) + ]); + + expect(parameters.sort((a, b) => (a.todo_id as string).localeCompare(b.todo_id as string))).toEqual([ + { + todo_id: 'todo1' + }, + { + todo_id: 'todo2' + } + ]); + }); + + test('save and load parameters with different number types', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + mybucket: + parameters: + - SELECT group_id FROM test WHERE n1 = token_parameters.n1 and f2 = token_parameters.f2 and f3 = token_parameters.f3 + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + group_id: 'group1', + n1: 314n, + f2: 314, + f3: 3.14 + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.commit('1/1'); + }); + + const TEST_PARAMS = { group_id: 'group1' }; + + const checkpoint = await bucketStorage.getCheckpoint(); + + const parameters1 = await checkpoint.getParameterSets([ + ParameterLookup.normalized('mybucket', '1', [314n, 314, 3.14]) + ]); + expect(parameters1).toEqual([TEST_PARAMS]); + const parameters2 = await checkpoint.getParameterSets([ + ParameterLookup.normalized('mybucket', '1', [314, 314n, 3.14]) + ]); + expect(parameters2).toEqual([TEST_PARAMS]); + const parameters3 = await checkpoint.getParameterSets([ + ParameterLookup.normalized('mybucket', '1', [314n, 314, 3]) + ]); + expect(parameters3).toEqual([]); + }); + + test('save and load parameters with large numbers', async () => { + // This ensures serialization / deserialization of "current_data" is done correctly. + // This specific case tested here cannot happen with postgres in practice, but we still + // test this to ensure correct deserialization. + + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + mybucket: + parameters: + - SELECT group_id FROM test WHERE n1 = token_parameters.n1 + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + group_id: 'group1', + n1: 1152921504606846976n // 2^60 + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 't1', + group_id: 'group1', + // Simulate a TOAST value, even though it can't happen for values like this + // in practice. + n1: undefined + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.commit('1/1'); + }); + + const TEST_PARAMS = { group_id: 'group1' }; + + const checkpoint = await bucketStorage.getCheckpoint(); + + const parameters1 = await checkpoint.getParameterSets([ + ParameterLookup.normalized('mybucket', '1', [1152921504606846976n]) + ]); + expect(parameters1).toEqual([TEST_PARAMS]); + }); + + test('save and load parameters with workspaceId', async () => { + const WORKSPACE_TABLE = test_utils.makeTestTable('workspace', ['id']); + + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + by_workspace: + parameters: + - SELECT id as workspace_id FROM workspace WHERE + workspace."userId" = token_parameters.user_id + data: [] + ` + }); + const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace1', + userId: 'u1' + }, + afterReplicaId: test_utils.rid('workspace1') + }); + await batch.commit('1/1'); + }); + const checkpoint = await bucketStorage.getCheckpoint(); + + const parameters = new RequestParameters({ sub: 'u1' }, {}); + + const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; + + const lookups = q1.getLookups(parameters); + expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]); + + const parameter_sets = await checkpoint.getParameterSets(lookups); + expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]); + + const buckets = await sync_rules + .getBucketParameterQuerier(test_utils.querierOptions(parameters)) + .querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return checkpoint.getParameterSets(lookups); + } + }); + expect(buckets).toEqual([ + { bucket: 'by_workspace["workspace1"]', priority: 3, definition: 'by_workspace', inclusion_reasons: ['default'] } + ]); + }); + + test('save and load parameters with dynamic global buckets', async () => { + const WORKSPACE_TABLE = test_utils.makeTestTable('workspace'); + + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + by_public_workspace: + parameters: + - SELECT id as workspace_id FROM workspace WHERE + workspace.visibility = 'public' + data: [] + ` + }); + const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace1', + visibility: 'public' + }, + afterReplicaId: test_utils.rid('workspace1') + }); + + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace2', + visibility: 'private' + }, + afterReplicaId: test_utils.rid('workspace2') + }); + + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace3', + visibility: 'public' + }, + afterReplicaId: test_utils.rid('workspace3') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = await bucketStorage.getCheckpoint(); + + const parameters = new RequestParameters({ sub: 'unknown' }, {}); + + const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; + + const lookups = q1.getLookups(parameters); + expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]); + + const parameter_sets = await checkpoint.getParameterSets(lookups); + parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); + expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]); + + const buckets = await sync_rules + .getBucketParameterQuerier(test_utils.querierOptions(parameters)) + .querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return checkpoint.getParameterSets(lookups); + } + }); + buckets.sort((a, b) => a.bucket.localeCompare(b.bucket)); + expect(buckets).toEqual([ + { + bucket: 'by_public_workspace["workspace1"]', + priority: 3, + definition: 'by_public_workspace', + inclusion_reasons: ['default'] + }, + { + bucket: 'by_public_workspace["workspace3"]', + priority: 3, + definition: 'by_public_workspace', + inclusion_reasons: ['default'] + } + ]); + }); + + test('multiple parameter queries', async () => { + const WORKSPACE_TABLE = test_utils.makeTestTable('workspace'); + + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + by_workspace: + parameters: + - SELECT id as workspace_id FROM workspace WHERE + workspace.visibility = 'public' + - SELECT id as workspace_id FROM workspace WHERE + workspace.user_id = token_parameters.user_id + data: [] + ` + }); + const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace1', + visibility: 'public' + }, + afterReplicaId: test_utils.rid('workspace1') + }); + + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace2', + visibility: 'private' + }, + afterReplicaId: test_utils.rid('workspace2') + }); + + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace3', + user_id: 'u1', + visibility: 'private' + }, + afterReplicaId: test_utils.rid('workspace3') + }); + + await batch.save({ + sourceTable: WORKSPACE_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'workspace4', + user_id: 'u2', + visibility: 'private' + }, + afterReplicaId: test_utils.rid('workspace4') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = await bucketStorage.getCheckpoint(); + + const parameters = new RequestParameters({ sub: 'u1' }, {}); + + // Test intermediate values - could be moved to sync_rules.test.ts + const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; + const lookups1 = q1.getLookups(parameters); + expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]); + + const parameter_sets1 = await checkpoint.getParameterSets(lookups1); + parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); + expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]); + + const q2 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[1]; + const lookups2 = q2.getLookups(parameters); + expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]); + + const parameter_sets2 = await checkpoint.getParameterSets(lookups2); + parameter_sets2.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); + expect(parameter_sets2).toEqual([{ workspace_id: 'workspace3' }]); + + // Test final values - the important part + const buckets = ( + await sync_rules + .getBucketParameterQuerier(test_utils.querierOptions(parameters)) + .querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return checkpoint.getParameterSets(lookups); + } + }) + ).map((e) => e.bucket); + buckets.sort(); + expect(buckets).toEqual(['by_workspace["workspace1"]', 'by_workspace["workspace3"]']); + }); + + test('truncate parameters', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + mybucket: + parameters: + - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2', + id1: 'user3', + id2: 'user4', + group_id: 'group2a' + }, + afterReplicaId: test_utils.rid('t2') + }); + + await batch.truncate([TEST_TABLE]); + }); + + const checkpoint = await bucketStorage.getCheckpoint(); + + const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); + expect(parameters).toEqual([]); + }); + + test('invalidate cached parsed sync rules', async () => { + await using bucketStorageFactory = await generateStorageFactory(); + const syncRules = await bucketStorageFactory.updateSyncRules({ + content: ` +bucket_definitions: + by_workspace: + parameters: + - SELECT id as workspace_id FROM workspace WHERE + workspace."userId" = token_parameters.user_id + data: [] + ` + }); + const syncBucketStorage = bucketStorageFactory.getInstance(syncRules); + + const parsedSchema1 = syncBucketStorage.getParsedSyncRules({ + defaultSchema: 'public' + }); + + const parsedSchema2 = syncBucketStorage.getParsedSyncRules({ + defaultSchema: 'public' + }); + + // These should be cached, this will be the same instance + expect(parsedSchema2).equals(parsedSchema1); + expect(parsedSchema1.getSourceTables()[0].schema).equals('public'); + + const parsedSchema3 = syncBucketStorage.getParsedSyncRules({ + defaultSchema: 'databasename' + }); + + // The cache should not be used + expect(parsedSchema3).not.equals(parsedSchema2); + expect(parsedSchema3.getSourceTables()[0].schema).equals('databasename'); + }); +} diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index df7d9d904..56d29336b 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -1347,7 +1347,8 @@ async function consumeIterator( return lines; } catch (e) { if (options?.consume) { - iter.throw?.(e); + // iter.throw here would result in an uncaught error + iter.return?.(e); } throw e; } diff --git a/packages/service-core-tests/src/tests/tests-index.ts b/packages/service-core-tests/src/tests/tests-index.ts index 1917a43c0..6d35089e1 100644 --- a/packages/service-core-tests/src/tests/tests-index.ts +++ b/packages/service-core-tests/src/tests/tests-index.ts @@ -1,6 +1,9 @@ export * from './register-bucket-validation-tests.js'; export * from './register-compacting-tests.js'; export * from './register-parameter-compacting-tests.js'; -export * from './register-data-storage-tests.js'; +export * from './register-data-storage-parameter-tests.js'; +export * from './register-data-storage-data-tests.js'; +export * from './register-data-storage-checkpoint-tests.js'; export * from './register-migration-tests.js'; export * from './register-sync-tests.js'; +export * from './util.js'; diff --git a/packages/service-core-tests/src/tests/util.ts b/packages/service-core-tests/src/tests/util.ts new file mode 100644 index 000000000..67e90f11c --- /dev/null +++ b/packages/service-core-tests/src/tests/util.ts @@ -0,0 +1,3 @@ +import { test_utils } from '../index.js'; + +export const TEST_TABLE = test_utils.makeTestTable('test', ['id']); diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index e10ef392e..b6f9adff2 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -111,6 +111,9 @@ export interface SyncRulesBucketStorage * Compute checksums for a given list of buckets. * * Returns zero checksums for any buckets not found. + * + * This may be slow, depending on the size of the buckets. + * The checksums are cached internally to compensate for this, but does not cover all cases. */ getChecksums(checkpoint: util.InternalOpId, buckets: string[]): Promise;