diff --git a/.changeset/swift-wolves-sleep.md b/.changeset/swift-wolves-sleep.md new file mode 100644 index 000000000..c6255a754 --- /dev/null +++ b/.changeset/swift-wolves-sleep.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-core': minor +'@powersync/service-sync-rules': minor +--- + +Cache parameter queries and buckets to reduce incremental sync overhead diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts new file mode 100644 index 000000000..33220d9cc --- /dev/null +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts @@ -0,0 +1,40 @@ +import { migrations } from '@powersync/service-core'; +import * as storage from '../../../storage/storage-index.js'; +import { MongoStorageConfig } from '../../../types/types.js'; + +const INDEX_NAME = 'bucket_updates'; + +export const up: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + await db.bucket_state.createIndex( + { + '_id.g': 1, + last_op: 1 + }, + { name: INDEX_NAME, unique: true } + ); + } finally { + await db.client.close(); + } +}; + +export const down: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + if (await db.bucket_state.indexExists(INDEX_NAME)) { + await db.bucket_state.dropIndex(INDEX_NAME); + } + } finally { + await db.client.close(); + } +}; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 8d83a7998..612ad1aba 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -314,10 +314,12 @@ export class MongoCompactor { let lastOpId: BucketDataKey | null = null; let targetOp: bigint | null = null; let gotAnOp = false; + let numberOfOpsToClear = 0; for await (let op of query.stream()) { if (op.op == 'MOVE' || op.op == 'REMOVE' || op.op == 'CLEAR') { checksum = utils.addChecksums(checksum, op.checksum); lastOpId = op._id; + numberOfOpsToClear += 1; if (op.op != 'CLEAR') { gotAnOp = true; } @@ -337,7 +339,7 @@ export class MongoCompactor { return; } - logger.info(`Flushing CLEAR at ${lastOpId?.o}`); + logger.info(`Flushing CLEAR for ${numberOfOpsToClear} ops at ${lastOpId?.o}`); await this.db.bucket_data.deleteMany( { _id: { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 760703116..57fc44ecb 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -9,27 +9,29 @@ import { } from '@powersync/lib-services-framework'; import { BroadcastIterable, - CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, ProtocolOpId, ReplicationCheckpoint, - SourceTable, storage, utils, - WatchWriteCheckpointOptions + WatchWriteCheckpointOptions, + CHECKPOINT_INVALIDATE_ALL, + deserializeParameterLookup } from '@powersync/service-core'; -import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules'; +import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules'; import * as bson from 'bson'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; +import { LRUCache } from 'lru-cache'; import * as timers from 'timers/promises'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, + BucketStateDocument, SourceKey, SourceTableDocument, SyncRuleCheckpointState, @@ -39,6 +41,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js'; +import { JSONBig } from '@powersync/service-jsonbig'; export class MongoSyncBucketStorage extends BaseObserver @@ -154,7 +157,7 @@ export class MongoSyncBucketStorage await callback(batch); await batch.flush(); - if (batch.last_flushed_op) { + if (batch.last_flushed_op != null) { return { flushed_op: batch.last_flushed_op }; } else { return null; @@ -252,7 +255,7 @@ export class MongoSyncBucketStorage return result!; } - async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise { + async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise { const lookupFilter = lookups.map((lookup) => { return storage.serializeLookup(lookup); }); @@ -585,6 +588,13 @@ export class MongoSyncBucketStorage { maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS } ); + await this.db.bucket_state.deleteMany( + { + _id: idPrefixFilter({ g: this.group_id }, ['b']) + }, + { maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS } + ); + await this.db.source_tables.deleteMany( { group_id: this.group_id @@ -795,12 +805,7 @@ export class MongoSyncBucketStorage const updates: CheckpointChanges = lastCheckpoint == null - ? { - invalidateDataBuckets: true, - invalidateParameterBuckets: true, - updatedDataBuckets: [], - updatedParameterBucketDefinitions: [] - } + ? CHECKPOINT_INVALIDATE_ALL : await this.getCheckpointChanges({ lastCheckpoint: lastCheckpoint, nextCheckpoint: checkpoint @@ -869,7 +874,105 @@ export class MongoSyncBucketStorage return pipeline; } + private async getDataBucketChanges( + options: GetCheckpointChangesOptions + ): Promise> { + const limit = 1000; + const bucketStateUpdates = await this.db.bucket_state + .find( + { + // We have an index on (_id.g, last_op). + '_id.g': this.group_id, + last_op: { $gt: BigInt(options.lastCheckpoint) } + }, + { + projection: { + '_id.b': 1 + }, + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true + } + ) + .toArray(); + + const buckets = bucketStateUpdates.map((doc) => doc._id.b); + const invalidateDataBuckets = buckets.length > limit; + + return { + invalidateDataBuckets: invalidateDataBuckets, + updatedDataBuckets: invalidateDataBuckets ? new Set() : new Set(buckets) + }; + } + + private async getParameterBucketChanges( + options: GetCheckpointChangesOptions + ): Promise> { + const limit = 1000; + const parameterUpdates = await this.db.bucket_parameters + .find( + { + _id: { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }, + 'key.g': this.group_id + }, + { + projection: { + lookup: 1 + }, + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true + } + ) + .toArray(); + const invalidateParameterUpdates = parameterUpdates.length > limit; + + return { + invalidateParameterBuckets: invalidateParameterUpdates, + updatedParameterLookups: invalidateParameterUpdates + ? new Set() + : new Set(parameterUpdates.map((p) => JSONBig.stringify(deserializeParameterLookup(p.lookup)))) + }; + } + + // If we processed all connections together for each checkpoint, we could do a single lookup for all connections. + // In practice, specific connections may fall behind. So instead, we just cache the results of each specific lookup. + // TODO (later): + // We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do + // more efficient lookups in some cases. + private checkpointChangesCache = new LRUCache({ + // Limit to 50 cache entries, or 10MB, whichever comes first. + // Some rough calculations: + // If we process 10 checkpoints per second, and a connection may be 2 seconds behind, we could have + // up to 20 relevant checkpoints. That gives us 20*20 = 400 potentially-relevant cache entries. + // That is a worst-case scenario, so we don't actually store that many. In real life, the cache keys + // would likely be clustered around a few values, rather than spread over all 400 potential values. + max: 50, + maxSize: 10 * 1024 * 1024, + sizeCalculation: (value: CheckpointChanges) => { + // Estimate of memory usage + const paramSize = [...value.updatedParameterLookups].reduce((a, b) => a + b.length, 0); + const bucketSize = [...value.updatedDataBuckets].reduce((a, b) => a + b.length, 0); + return 100 + paramSize + bucketSize; + }, + fetchMethod: async (_key, _staleValue, options) => { + return this.getCheckpointChangesInternal(options.context.options); + } + }); + async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise { - return CHECKPOINT_INVALIDATE_ALL; + const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`; + const result = await this.checkpointChangesCache.fetch(key, { context: { options } }); + return result!; + } + + private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise { + const dataUpdates = await this.getDataBucketChanges(options); + const parameterUpdates = await this.getParameterBucketChanges(options); + + return { + ...dataUpdates, + ...parameterUpdates + }; } } diff --git a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts index 5e3ddfa71..b258bfa71 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts @@ -11,6 +11,7 @@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketParameterDocument, + BucketStateDocument, CurrentBucket, CurrentDataDocument, SourceKey @@ -48,6 +49,7 @@ export class PersistedBatch { bucketData: mongo.AnyBulkWriteOperation[] = []; bucketParameters: mongo.AnyBulkWriteOperation[] = []; currentData: mongo.AnyBulkWriteOperation[] = []; + bucketStates: Map = new Map(); /** * For debug logging only. @@ -66,6 +68,19 @@ export class PersistedBatch { this.currentSize = writtenSize; } + private incrementBucket(bucket: string, op_id: InternalOpId) { + let existingState = this.bucketStates.get(bucket); + if (existingState) { + existingState.lastOp = op_id; + existingState.incrementCount += 1; + } else { + this.bucketStates.set(bucket, { + lastOp: op_id, + incrementCount: 1 + }); + } + } + saveBucketData(options: { op_seq: MongoIdSequence; sourceKey: storage.ReplicaId; @@ -120,6 +135,7 @@ export class PersistedBatch { } } }); + this.incrementBucket(k.bucket, op_id); } for (let bd of remaining_buckets.values()) { @@ -147,6 +163,7 @@ export class PersistedBatch { } }); this.currentSize += 200; + this.incrementBucket(bd.bucket, op_id); } } @@ -277,6 +294,14 @@ export class PersistedBatch { }); } + if (this.bucketStates.size > 0) { + await db.bucket_state.bulkWrite(this.getBucketStateUpdates(), { + session, + // Per-bucket operation - order doesn't matter + ordered: false + }); + } + const duration = performance.now() - startAt; logger.info( `powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${ @@ -287,7 +312,34 @@ export class PersistedBatch { this.bucketData = []; this.bucketParameters = []; this.currentData = []; + this.bucketStates.clear(); this.currentSize = 0; this.debugLastOpId = null; } + + private getBucketStateUpdates(): mongo.AnyBulkWriteOperation[] { + return Array.from(this.bucketStates.entries()).map(([bucket, state]) => { + return { + updateOne: { + filter: { + _id: { + g: this.group_id, + b: bucket + } + }, + update: { + $set: { + last_op: state.lastOp + } + }, + upsert: true + } + } satisfies mongo.AnyBulkWriteOperation; + }); + } +} + +interface BucketStateUpdate { + lastOp: InternalOpId; + incrementCount: number; } diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 969bf9ea4..56d6d9da6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -6,6 +6,7 @@ import { MongoStorageConfig } from '../../types/types.js'; import { BucketDataDocument, BucketParameterDocument, + BucketStateDocument, CurrentDataDocument, CustomWriteCheckpointDocument, IdSequenceDocument, @@ -33,6 +34,7 @@ export class PowerSyncMongo { readonly write_checkpoints: mongo.Collection; readonly instance: mongo.Collection; readonly locks: mongo.Collection; + readonly bucket_state: mongo.Collection; readonly client: mongo.MongoClient; readonly db: mongo.Db; @@ -55,6 +57,7 @@ export class PowerSyncMongo { this.write_checkpoints = db.collection('write_checkpoints'); this.instance = db.collection('instance'); this.locks = this.db.collection('locks'); + this.bucket_state = this.db.collection('bucket_state'); } /** @@ -70,6 +73,7 @@ export class PowerSyncMongo { await this.write_checkpoints.deleteMany({}); await this.instance.deleteOne({}); await this.locks.deleteMany({}); + await this.bucket_state.deleteMany({}); } /** diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index f12447613..de36aaba5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -75,6 +75,24 @@ export interface SourceTableDocument { snapshot_done: boolean | undefined; } +/** + * Record the state of each bucket. + * + * Right now, this is just used to track when buckets are updated, for efficient incremental sync. + * In the future, this could be used to track operation counts, both for diagnostic purposes, and for + * determining when a compact and/or defragment could be beneficial. + * + * Note: There is currently no migration to populate this collection from existing data - it is only + * populated by new updates. + */ +export interface BucketStateDocument { + _id: { + g: number; + b: string; + }; + last_op: bigint; +} + export interface IdSequenceDocument { _id: string; op_id: bigint; diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 717e52de5..e43ec47b8 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -357,6 +357,74 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = ` ] `; +exports[`sync - mongodb > sync updates to data query only 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 0, + "count": 0, + "priority": 3, + }, + ], + "last_op_id": "1", + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; + +exports[`sync - mongodb > sync updates to data query only 2`] = ` +[ + { + "checkpoint_diff": { + "last_op_id": "2", + "removed_buckets": [], + "updated_buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 1418351250, + "count": 1, + "priority": 3, + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "by_user["user1"]", + "data": [ + { + "checksum": 1418351250n, + "data": "{"id":"list1","user_id":"user1","name":"User 1"}", + "object_id": "list1", + "object_type": "lists", + "op": "PUT", + "op_id": "2", + "subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8", + }, + ], + "has_more": false, + "next_after": "2", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + exports[`sync - mongodb > sync updates to global data 1`] = ` [ { @@ -468,3 +536,106 @@ exports[`sync - mongodb > sync updates to global data 3`] = ` }, ] `; + +exports[`sync - mongodb > sync updates to parameter query + data 1`] = ` +[ + { + "checkpoint": { + "buckets": [], + "last_op_id": "0", + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "0", + }, + }, +] +`; + +exports[`sync - mongodb > sync updates to parameter query + data 2`] = ` +[ + { + "checkpoint_diff": { + "last_op_id": "2", + "removed_buckets": [], + "updated_buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 1418351250, + "count": 1, + "priority": 3, + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "by_user["user1"]", + "data": [ + { + "checksum": 1418351250n, + "data": "{"id":"list1","user_id":"user1","name":"User 1"}", + "object_id": "list1", + "object_type": "lists", + "op": "PUT", + "op_id": "1", + "subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + +exports[`sync - mongodb > sync updates to parameter query only 1`] = ` +[ + { + "checkpoint": { + "buckets": [], + "last_op_id": "0", + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "0", + }, + }, +] +`; + +exports[`sync - mongodb > sync updates to parameter query only 2`] = ` +[ + { + "checkpoint_diff": { + "last_op_id": "1", + "removed_buckets": [], + "updated_buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 0, + "count": 0, + "priority": 3, + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index d4e94166a..b7d896f9f 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -1,5 +1,4 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; -import { ReplicationAssertionError } from '@powersync/lib-services-framework'; import { BroadcastIterable, CHECKPOINT_INVALIDATE_ALL, @@ -14,23 +13,22 @@ import { } from '@powersync/service-core'; import { JSONBig } from '@powersync/service-jsonbig'; import * as sync_rules from '@powersync/service-sync-rules'; +import * as timers from 'timers/promises'; import * as uuid from 'uuid'; import { BIGINT_MAX } from '../types/codecs.js'; import { models, RequiredOperationBatchLimits } from '../types/types.js'; import { replicaIdToSubkey } from '../utils/bson.js'; import { mapOpEntry } from '../utils/bucket-data.js'; -import * as timers from 'timers/promises'; import * as framework from '@powersync/lib-services-framework'; import { StatementParam } from '@powersync/service-jpgwire'; +import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; import { SourceTableDecoded, StoredRelationId } from '../types/models/SourceTable.js'; import { pick } from '../utils/ts-codec.js'; import { PostgresBucketBatch } from './batch/PostgresBucketBatch.js'; import { PostgresWriteCheckpointAPI } from './checkpoints/PostgresWriteCheckpointAPI.js'; import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js'; import { PostgresCompactor } from './PostgresCompactor.js'; -import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; -import { Decoded } from 'ts-codec'; export type PostgresSyncRulesStorageOptions = { factory: PostgresBucketStorageFactory; @@ -354,7 +352,7 @@ export class PostgresSyncRulesStorage async getParameterSets( checkpoint: utils.InternalOpId, - lookups: sync_rules.SqliteJsonValue[][] + lookups: sync_rules.ParameterLookup[] ): Promise { const rows = await this.db.sql` SELECT DISTINCT diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 0fc987919..785430590 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -357,6 +357,74 @@ exports[`sync - postgres > sync legacy non-raw data 1`] = ` ] `; +exports[`sync - postgres > sync updates to data query only 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 0, + "count": 0, + "priority": 3, + }, + ], + "last_op_id": "1", + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; + +exports[`sync - postgres > sync updates to data query only 2`] = ` +[ + { + "checkpoint_diff": { + "last_op_id": "2", + "removed_buckets": [], + "updated_buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 1418351250, + "count": 1, + "priority": 3, + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "by_user["user1"]", + "data": [ + { + "checksum": 1418351250n, + "data": "{"id":"list1","user_id":"user1","name":"User 1"}", + "object_id": "list1", + "object_type": "lists", + "op": "PUT", + "op_id": "2", + "subkey": "5ad0aa14-3d5e-5428-ad5b-2c33927d991c", + }, + ], + "has_more": false, + "next_after": "2", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + exports[`sync - postgres > sync updates to global data 1`] = ` [ { @@ -468,3 +536,106 @@ exports[`sync - postgres > sync updates to global data 3`] = ` }, ] `; + +exports[`sync - postgres > sync updates to parameter query + data 1`] = ` +[ + { + "checkpoint": { + "buckets": [], + "last_op_id": "0", + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "0", + }, + }, +] +`; + +exports[`sync - postgres > sync updates to parameter query + data 2`] = ` +[ + { + "checkpoint_diff": { + "last_op_id": "2", + "removed_buckets": [], + "updated_buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 1418351250, + "count": 1, + "priority": 3, + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "by_user["user1"]", + "data": [ + { + "checksum": 1418351250n, + "data": "{"id":"list1","user_id":"user1","name":"User 1"}", + "object_id": "list1", + "object_type": "lists", + "op": "PUT", + "op_id": "1", + "subkey": "5ad0aa14-3d5e-5428-ad5b-2c33927d991c", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + +exports[`sync - postgres > sync updates to parameter query only 1`] = ` +[ + { + "checkpoint": { + "buckets": [], + "last_op_id": "0", + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "0", + }, + }, +] +`; + +exports[`sync - postgres > sync updates to parameter query only 2`] = ` +[ + { + "checkpoint_diff": { + "last_op_id": "1", + "removed_buckets": [], + "updated_buckets": [ + { + "bucket": "by_user["user1"]", + "checksum": 0, + "count": 0, + "priority": 3, + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index 7cbcf577c..faff7adfa 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -32,8 +32,8 @@ export function testRules(content: string): storage.PersistedSyncRulesContent { }; } -export function makeTestTable(name: string, columns?: string[] | undefined) { - const relId = utils.hashData('table', name, (columns ?? ['id']).join(',')); +export function makeTestTable(name: string, replicaIdColumns?: string[] | undefined) { + const relId = utils.hashData('table', name, (replicaIdColumns ?? ['id']).join(',')); const id = new bson.ObjectId('6544e3899293153fa7b38331'); return new storage.SourceTable( id, @@ -41,7 +41,7 @@ export function makeTestTable(name: string, columns?: string[] | undefined) { relId, 'public', name, - (columns ?? ['id']).map((column) => ({ name: column, type: 'VARCHAR', typeId: 25 })), + (replicaIdColumns ?? ['id']).map((column) => ({ name: column, type: 'VARCHAR', typeId: 25 })), true ); } diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index a88551e56..bdb8b8511 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1,5 +1,5 @@ import { getUuidReplicaIdentityBson, OplogEntry, storage } from '@powersync/service-core'; -import { RequestParameters } from '@powersync/service-sync-rules'; +import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules'; import { expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; @@ -65,7 +65,9 @@ bucket_definitions: }); }); - const parameters = await bucketStorage.getParameterSets(result!.flushed_op, [['mybucket', '1', 'user1']]); + const parameters = await bucketStorage.getParameterSets(result!.flushed_op, [ + ParameterLookup.normalized('mybucket', '1', ['user1']) + ]); expect(parameters).toEqual([ { group_id: 'group1a' @@ -110,7 +112,9 @@ bucket_definitions: }); }); - const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [['mybucket', '1', 'user1']]); + const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [ + ParameterLookup.normalized('mybucket', '1', ['user1']) + ]); expect(parameters).toEqual([ { group_id: 'group2' @@ -118,7 +122,9 @@ bucket_definitions: ]); // Use the checkpoint to get older data if relevant - const parameters2 = await bucketStorage.getParameterSets(result1!.flushed_op, [['mybucket', '1', 'user1']]); + const parameters2 = await bucketStorage.getParameterSets(result1!.flushed_op, [ + ParameterLookup.normalized('mybucket', '1', ['user1']) + ]); expect(parameters2).toEqual([ { group_id: 'group1' @@ -183,8 +189,8 @@ bucket_definitions: // There removal operation for the association of `list2`::`todo2` should not interfere with the new // association of `list1`::`todo2` const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [ - ['mybucket', '1', 'list1'], - ['mybucket', '1', 'list2'] + ParameterLookup.normalized('mybucket', '1', ['list1']), + ParameterLookup.normalized('mybucket', '1', ['list2']) ]); expect(parameters.sort((a, b) => (a.todo_id as string).localeCompare(b.todo_id as string))).toEqual([ @@ -230,11 +236,17 @@ bucket_definitions: const checkpoint = result!.flushed_op; - const parameters1 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 314n, 314, 3.14]]); + const parameters1 = await bucketStorage.getParameterSets(checkpoint, [ + ParameterLookup.normalized('mybucket', '1', [314n, 314, 3.14]) + ]); expect(parameters1).toEqual([TEST_PARAMS]); - const parameters2 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 314, 314n, 3.14]]); + const parameters2 = await bucketStorage.getParameterSets(checkpoint, [ + ParameterLookup.normalized('mybucket', '1', [314, 314n, 3.14]) + ]); expect(parameters2).toEqual([TEST_PARAMS]); - const parameters3 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 314n, 314, 3]]); + const parameters3 = await bucketStorage.getParameterSets(checkpoint, [ + ParameterLookup.normalized('mybucket', '1', [314n, 314, 3]) + ]); expect(parameters3).toEqual([]); }); @@ -286,7 +298,9 @@ bucket_definitions: const checkpoint = result!.flushed_op; - const parameters1 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 1152921504606846976n]]); + const parameters1 = await bucketStorage.getParameterSets(checkpoint, [ + ParameterLookup.normalized('mybucket', '1', [1152921504606846976n]) + ]); expect(parameters1).toEqual([TEST_PARAMS]); }); @@ -387,7 +401,7 @@ bucket_definitions: const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0]; const lookups = q1.getLookups(parameters); - expect(lookups).toEqual([['by_workspace', '1', 'u1']]); + expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]); const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]); @@ -457,7 +471,7 @@ bucket_definitions: const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0]; const lookups = q1.getLookups(parameters); - expect(lookups).toEqual([['by_public_workspace', '1']]); + expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]); const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups); parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); @@ -546,7 +560,7 @@ bucket_definitions: // Test intermediate values - could be moved to sync_rules.test.ts const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0]; const lookups1 = q1.getLookups(parameters); - expect(lookups1).toEqual([['by_workspace', '1']]); + expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]); const parameter_sets1 = await bucketStorage.getParameterSets(checkpoint, lookups1); parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); @@ -554,7 +568,7 @@ bucket_definitions: const q2 = sync_rules.bucket_descriptors[0].parameter_queries[1]; const lookups2 = q2.getLookups(parameters); - expect(lookups2).toEqual([['by_workspace', '2', 'u1']]); + expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]); const parameter_sets2 = await bucketStorage.getParameterSets(checkpoint, lookups2); parameter_sets2.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); @@ -861,7 +875,9 @@ bucket_definitions: const { checkpoint } = await bucketStorage.getCheckpoint(); - const parameters = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 'user1']]); + const parameters = await bucketStorage.getParameterSets(checkpoint, [ + ParameterLookup.normalized('mybucket', '1', ['user1']) + ]); expect(parameters).toEqual([]); }); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index b908444ec..1d830aaff 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -1,4 +1,11 @@ -import { storage, sync, utils } from '@powersync/service-core'; +import { + CheckpointLine, + storage, + StreamingSyncCheckpoint, + StreamingSyncCheckpointDiff, + sync, + utils +} from '@powersync/service-core'; import { JSONBig } from '@powersync/service-jsonbig'; import { RequestParameters } from '@powersync/service-sync-rules'; import path from 'path'; @@ -398,7 +405,7 @@ bucket_definitions: expect(lines).toMatchSnapshot(); }); - test('sync updates to global data', async () => { + test('sync updates to global data', async (context) => { await using f = await factory(); const syncRules = await f.updateSyncRules({ @@ -422,6 +429,9 @@ bucket_definitions: token: { exp: Date.now() / 1000 + 10 } as any }); const iter = stream[Symbol.asyncIterator](); + context.onTestFinished(() => { + iter.return?.(); + }); expect(await getCheckpointLines(iter)).toMatchSnapshot(); @@ -456,11 +466,221 @@ bucket_definitions: }); expect(await getCheckpointLines(iter)).toMatchSnapshot(); + }); + + test('sync updates to parameter query only', async (context) => { + await using f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: `bucket_definitions: + by_user: + parameters: select users.id as user_id from users where users.id = request.user_id() + data: + - select * from lists where user_id = bucket.user_id +` + }); + + const usersTable = test_utils.makeTestTable('users', ['id']); + const listsTable = test_utils.makeTestTable('lists', ['id']); + + const bucketStorage = await f.getInstance(syncRules); + await bucketStorage.autoActivate(); + + const stream = sync.streamResponse({ + syncContext, + bucketStorage, + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + tracker, + syncParams: new RequestParameters({ sub: 'user1' }, {}), + token: { exp: Date.now() / 1000 + 100 } as any + }); + const iter = stream[Symbol.asyncIterator](); + context.onTestFinished(() => { + iter.return?.(); + }); + + // Initial empty checkpoint + const checkpoint1 = await getCheckpointLines(iter); + expect((checkpoint1[0] as StreamingSyncCheckpoint).checkpoint?.buckets?.map((b) => b.bucket)).toEqual([]); + expect(checkpoint1).toMatchSnapshot(); + + // Add user + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: usersTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'user1', + name: 'User 1' + }, + afterReplicaId: 'user1' + }); + + await batch.commit('0/1'); + }); + + const checkpoint2 = await getCheckpointLines(iter); + expect( + (checkpoint2[0] as StreamingSyncCheckpointDiff).checkpoint_diff?.updated_buckets?.map((b) => b.bucket) + ).toEqual(['by_user["user1"]']); + expect(checkpoint2).toMatchSnapshot(); + }); + + test('sync updates to data query only', async (context) => { + await using f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: `bucket_definitions: + by_user: + parameters: select users.id as user_id from users where users.id = request.user_id() + data: + - select * from lists where user_id = bucket.user_id +` + }); + + const usersTable = test_utils.makeTestTable('users', ['id']); + const listsTable = test_utils.makeTestTable('lists', ['id']); + + const bucketStorage = await f.getInstance(syncRules); + await bucketStorage.autoActivate(); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: usersTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'user1', + name: 'User 1' + }, + afterReplicaId: 'user1' + }); + + await batch.commit('0/1'); + }); + + const stream = sync.streamResponse({ + syncContext, + bucketStorage, + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + tracker, + syncParams: new RequestParameters({ sub: 'user1' }, {}), + token: { exp: Date.now() / 1000 + 100 } as any + }); + const iter = stream[Symbol.asyncIterator](); + context.onTestFinished(() => { + iter.return?.(); + }); + + const checkpoint1 = await getCheckpointLines(iter); + expect((checkpoint1[0] as StreamingSyncCheckpoint).checkpoint?.buckets?.map((b) => b.bucket)).toEqual([ + 'by_user["user1"]' + ]); + expect(checkpoint1).toMatchSnapshot(); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: listsTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'list1', + user_id: 'user1', + name: 'User 1' + }, + afterReplicaId: 'list1' + }); + + await batch.commit('0/1'); + }); + + const checkpoint2 = await getCheckpointLines(iter); + expect( + (checkpoint2[0] as StreamingSyncCheckpointDiff).checkpoint_diff?.updated_buckets?.map((b) => b.bucket) + ).toEqual(['by_user["user1"]']); + expect(checkpoint2).toMatchSnapshot(); + }); + + test('sync updates to parameter query + data', async (context) => { + await using f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: `bucket_definitions: + by_user: + parameters: select users.id as user_id from users where users.id = request.user_id() + data: + - select * from lists where user_id = bucket.user_id +` + }); + + const usersTable = test_utils.makeTestTable('users', ['id']); + const listsTable = test_utils.makeTestTable('lists', ['id']); + + const bucketStorage = await f.getInstance(syncRules); + await bucketStorage.autoActivate(); + + const stream = sync.streamResponse({ + syncContext, + bucketStorage, + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + tracker, + syncParams: new RequestParameters({ sub: 'user1' }, {}), + token: { exp: Date.now() / 1000 + 100 } as any + }); + const iter = stream[Symbol.asyncIterator](); + context.onTestFinished(() => { + iter.return?.(); + }); + + // Initial empty checkpoint + expect(await getCheckpointLines(iter)).toMatchSnapshot(); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: listsTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'list1', + user_id: 'user1', + name: 'User 1' + }, + afterReplicaId: 'list1' + }); - iter.return?.(); + await batch.save({ + sourceTable: usersTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'user1', + name: 'User 1' + }, + afterReplicaId: 'user1' + }); + + await batch.commit('0/1'); + }); + + const checkpoint2 = await getCheckpointLines(iter); + expect( + (checkpoint2[0] as StreamingSyncCheckpointDiff).checkpoint_diff?.updated_buckets?.map((b) => b.bucket) + ).toEqual(['by_user["user1"]']); + expect(checkpoint2).toMatchSnapshot(); }); - test('expiring token', async () => { + test('expiring token', async (context) => { await using f = await factory(); const syncRules = await f.updateSyncRules({ @@ -486,6 +706,9 @@ bucket_definitions: token: { exp: exp } as any }); const iter = stream[Symbol.asyncIterator](); + context.onTestFinished(() => { + iter.return?.(); + }); const checkpoint = await getCheckpointLines(iter); expect(checkpoint).toMatchSnapshot(); @@ -494,7 +717,7 @@ bucket_definitions: expect(expLines).toMatchSnapshot(); }); - test('compacting data - invalidate checkpoint', async () => { + test('compacting data - invalidate checkpoint', async (context) => { // This tests a case of a compact operation invalidating a checkpoint in the // middle of syncing data. // This is expected to be rare in practice, but it is important to handle @@ -548,6 +771,9 @@ bucket_definitions: }); const iter = stream[Symbol.asyncIterator](); + context.onTestFinished(() => { + iter.return?.(); + }); // Only consume the first "checkpoint" message, and pause before receiving data. const lines = await consumeIterator(iter, { consume: false, isDone: (line) => (line as any)?.checkpoint != null }); diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index bf350e7d7..e59b2455a 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -1,5 +1,5 @@ import { ObserverClient } from '@powersync/lib-services-framework'; -import { SqlSyncRules, SqliteJsonRow, SqliteJsonValue } from '@powersync/service-sync-rules'; +import { ParameterLookup, SqlSyncRules, SqliteJsonRow } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { BucketStorageBatch, FlushedResult } from './BucketStorageBatch.js'; import { BucketStorageFactory } from './BucketStorageFactory.js'; @@ -71,8 +71,18 @@ export interface SyncRulesBucketStorage /** * Used to resolve "dynamic" parameter queries. */ - getParameterSets(checkpoint: util.InternalOpId, lookups: SqliteJsonValue[][]): Promise; + getParameterSets(checkpoint: util.InternalOpId, lookups: ParameterLookup[]): Promise; + /** + * Given two checkpoints, return the changes in bucket data and parameters that may have occurred + * in that period. + * + * This is a best-effort optimization: + * 1. This may include more changes than what actually occurred. + * 2. This may return invalidateDataBuckets or invalidateParameterBuckets instead of of returning + * specific changes. + * @param options + */ getCheckpointChanges(options: GetCheckpointChangesOptions): Promise; /** @@ -251,15 +261,16 @@ export interface GetCheckpointChangesOptions { } export interface CheckpointChanges { - updatedDataBuckets: string[]; + updatedDataBuckets: Set; invalidateDataBuckets: boolean; - updatedParameterBucketDefinitions: string[]; + /** Serialized using JSONBig */ + updatedParameterLookups: Set; invalidateParameterBuckets: boolean; } export const CHECKPOINT_INVALIDATE_ALL: CheckpointChanges = { - updatedDataBuckets: [], + updatedDataBuckets: new Set(), invalidateDataBuckets: true, - updatedParameterBucketDefinitions: [], + updatedParameterLookups: new Set(), invalidateParameterBuckets: true }; diff --git a/packages/service-core/src/storage/bson.ts b/packages/service-core/src/storage/bson.ts index 9d69af220..d205c7ac4 100644 --- a/packages/service-core/src/storage/bson.ts +++ b/packages/service-core/src/storage/bson.ts @@ -1,6 +1,6 @@ import * as bson from 'bson'; -import { SqliteJsonValue } from '@powersync/service-sync-rules'; +import { ParameterLookup, SqliteJsonValue } from '@powersync/service-sync-rules'; import { ReplicaId } from './BucketStorageBatch.js'; type NodeBuffer = Buffer; @@ -24,23 +24,21 @@ export const BSON_DESERIALIZE_DATA_OPTIONS: bson.DeserializeOptions = { * Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers. * @param lookup */ -export const serializeLookupBuffer = (lookup: SqliteJsonValue[]): NodeBuffer => { - const normalized = lookup.map((value) => { - if (typeof value == 'number' && Number.isInteger(value)) { - return BigInt(value); - } else { - return value; - } - }); - return bson.serialize({ l: normalized }) as NodeBuffer; +export const serializeLookupBuffer = (lookup: ParameterLookup): NodeBuffer => { + return bson.serialize({ l: lookup.values }) as NodeBuffer; }; -export const serializeLookup = (lookup: SqliteJsonValue[]) => { +export const serializeLookup = (lookup: ParameterLookup) => { return new bson.Binary(serializeLookupBuffer(lookup)); }; -export const getLookupBucketDefinitionName = (lookup: bson.Binary) => { +export const deserializeParameterLookup = (lookup: bson.Binary) => { const parsed = bson.deserialize(lookup.buffer, BSON_DESERIALIZE_INTERNAL_OPTIONS).l as SqliteJsonValue[]; + return parsed; +}; + +export const getLookupBucketDefinitionName = (lookup: bson.Binary) => { + const parsed = deserializeParameterLookup(lookup); return parsed[0] as string; }; diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index 88e1cba92..78319f60a 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -4,9 +4,11 @@ import * as storage from '../storage/storage-index.js'; import * as util from '../util/util-index.js'; import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; +import { JSONBig } from '@powersync/service-jsonbig'; import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js'; import { BucketSyncState } from './sync.js'; import { SyncContext } from './SyncContext.js'; +import { getIntersection, hasIntersection } from './util.js'; export interface BucketChecksumStateOptions { syncContext: SyncContext; @@ -68,10 +70,9 @@ export class BucketChecksumState { const storage = this.bucketStorage; const update = await this.parameterState.getCheckpointUpdate(next); - if (update == null) { + if (update == null && this.lastWriteCheckpoint == writeCheckpoint) { return null; } - const { buckets: allBuckets, updatedBuckets } = update; let dataBucketsNew = new Map(); @@ -90,7 +91,7 @@ export class BucketChecksumState { } let checksumMap: util.ChecksumMap; - if (updatedBuckets != null) { + if (updatedBuckets != INVALIDATE_ALL_BUCKETS) { if (this.lastChecksums == null) { throw new ServiceAssertionError(`Bucket diff received without existing checksums`); } @@ -113,9 +114,11 @@ export class BucketChecksumState { } } - let updatedChecksums = await storage.getChecksums(base.checkpoint, checksumLookups); - for (let [bucket, value] of updatedChecksums.entries()) { - newChecksums.set(bucket, value); + if (checksumLookups.length > 0) { + let updatedChecksums = await storage.getChecksums(base.checkpoint, checksumLookups); + for (let [bucket, value] of updatedChecksums.entries()) { + newChecksums.set(bucket, value); + } } checksumMap = newChecksums; } else { @@ -123,6 +126,7 @@ export class BucketChecksumState { const bucketList = [...dataBucketsNew.keys()]; checksumMap = await storage.getChecksums(base.checkpoint, bucketList); } + // Subset of buckets for which there may be new data in this batch. let bucketsToFetch: BucketDescription[]; @@ -247,6 +251,8 @@ export class BucketChecksumState { } } +const INVALIDATE_ALL_BUCKETS = Symbol('INVALIDATE_ALL_BUCKETS'); + export interface CheckpointUpdate { /** * All buckets forming part of the checkpoint. @@ -258,7 +264,7 @@ export interface CheckpointUpdate { * * If null, assume that any bucket in `buckets` may have been updated. */ - updatedBuckets: Set | null; + updatedBuckets: Set | typeof INVALIDATE_ALL_BUCKETS; } export class BucketParameterState { @@ -268,6 +274,10 @@ export class BucketParameterState { public readonly syncParams: RequestParameters; private readonly querier: BucketParameterQuerier; private readonly staticBuckets: Map; + private cachedDynamicBuckets: BucketDescription[] | null = null; + private cachedDynamicBucketSet: Set | null = null; + + private readonly lookups: Set; constructor( context: SyncContext, @@ -282,21 +292,18 @@ export class BucketParameterState { this.querier = syncRules.getBucketParameterQuerier(this.syncParams); this.staticBuckets = new Map(this.querier.staticBuckets.map((b) => [b.bucket, b])); + this.lookups = new Set(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values))); } - async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise { + async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise { const querier = this.querier; - let update: CheckpointUpdate | null; + let update: CheckpointUpdate; if (querier.hasDynamicBuckets) { update = await this.getCheckpointUpdateDynamic(checkpoint); } else { update = await this.getCheckpointUpdateStatic(checkpoint); } - if (update == null) { - return null; - } - if (update.buckets.length > this.context.maxParameterQueryResults) { // TODO: Limit number of results even before we get to this point // This limit applies _before_ we get the unique set @@ -318,32 +325,18 @@ export class BucketParameterState { /** * For static buckets, we can keep track of which buckets have been updated. */ - private async getCheckpointUpdateStatic( - checkpoint: storage.StorageCheckpointUpdate - ): Promise { + private async getCheckpointUpdateStatic(checkpoint: storage.StorageCheckpointUpdate): Promise { const querier = this.querier; const update = checkpoint.update; if (update.invalidateDataBuckets) { return { buckets: querier.staticBuckets, - updatedBuckets: null + updatedBuckets: INVALIDATE_ALL_BUCKETS }; } - let updatedBuckets = new Set(); - - for (let bucket of update.updatedDataBuckets ?? []) { - if (this.staticBuckets.has(bucket)) { - updatedBuckets.add(bucket); - } - } - - if (updatedBuckets.size == 0) { - // No change - skip this checkpoint - return null; - } - + const updatedBuckets = new Set(getIntersection(this.staticBuckets, update.updatedDataBuckets)); return { buckets: querier.staticBuckets, updatedBuckets @@ -353,44 +346,67 @@ export class BucketParameterState { /** * For dynamic buckets, we need to re-query the list of buckets every time. */ - private async getCheckpointUpdateDynamic( - checkpoint: storage.StorageCheckpointUpdate - ): Promise { + private async getCheckpointUpdateDynamic(checkpoint: storage.StorageCheckpointUpdate): Promise { const querier = this.querier; const storage = this.bucketStorage; const staticBuckets = querier.staticBuckets; const update = checkpoint.update; - let hasChange = false; - if (update.invalidateDataBuckets || update.updatedDataBuckets?.length > 0) { - hasChange = true; - } else if (update.invalidateParameterBuckets) { - hasChange = true; + let hasParameterChange = false; + let invalidateDataBuckets = false; + // If hasParameterChange == true, then invalidateDataBuckets = true + // If invalidateDataBuckets == true, we ignore updatedBuckets + let updatedBuckets = new Set(); + + if (update.invalidateDataBuckets) { + invalidateDataBuckets = true; + } + + if (update.invalidateParameterBuckets) { + hasParameterChange = true; } else { - for (let bucket of update.updatedParameterBucketDefinitions ?? []) { - if (querier.dynamicBucketDefinitions.has(bucket)) { - hasChange = true; - break; - } + if (hasIntersection(this.lookups, update.updatedParameterLookups)) { + // This is a very coarse re-check of all queries + hasParameterChange = true; } } - if (!hasChange) { - return null; - } + let dynamicBuckets: BucketDescription[]; + if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) { + dynamicBuckets = await querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return storage.getParameterSets(checkpoint.base.checkpoint, lookups); + } + }); + this.cachedDynamicBuckets = dynamicBuckets; + this.cachedDynamicBucketSet = new Set(dynamicBuckets.map((b) => b.bucket)); + invalidateDataBuckets = true; + } else { + dynamicBuckets = this.cachedDynamicBuckets; - const dynamicBuckets = await querier.queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return storage.getParameterSets(checkpoint.base.checkpoint, lookups); + if (!invalidateDataBuckets) { + for (let bucket of getIntersection(this.staticBuckets, update.updatedDataBuckets)) { + updatedBuckets.add(bucket); + } + for (let bucket of getIntersection(this.cachedDynamicBucketSet, update.updatedDataBuckets)) { + updatedBuckets.add(bucket); + } } - }); + } const allBuckets = [...staticBuckets, ...dynamicBuckets]; - return { - buckets: allBuckets, - // We cannot track individual bucket updates for dynamic lookups yet - updatedBuckets: null - }; + if (invalidateDataBuckets) { + return { + buckets: allBuckets, + // We cannot track individual bucket updates for dynamic lookups yet + updatedBuckets: INVALIDATE_ALL_BUCKETS + }; + } else { + return { + buckets: allBuckets, + updatedBuckets: updatedBuckets + }; + } } } diff --git a/packages/service-core/src/sync/util.ts b/packages/service-core/src/sync/util.ts index 7ccb95746..d9ef40ce4 100644 --- a/packages/service-core/src/sync/util.ts +++ b/packages/service-core/src/sync/util.ts @@ -1,8 +1,8 @@ import * as timers from 'timers/promises'; +import { SemaphoreInterface } from 'async-mutex'; import * as util from '../util/util-index.js'; import { RequestTracker } from './RequestTracker.js'; -import { SemaphoreInterface } from 'async-mutex'; export type TokenStreamOptions = { /** @@ -153,3 +153,36 @@ export function settledPromise(promise: Promise): Promise = Map | Set; + +/** + * Check if two sets have any element(s) in common. + */ +export function hasIntersection(a: MapOrSet, b: MapOrSet) { + for (let _ of getIntersection(a, b)) { + return true; + } + return false; +} + +/** + * Return the intersection of two sets or maps. + */ +export function* getIntersection(a: MapOrSet, b: MapOrSet): IterableIterator { + // Iterate over the smaller set to reduce the number of lookups + if (a.size < b.size) { + for (let key of a.keys()) { + if (b.has(key)) { + yield key; + } + } + return false; + } else { + for (let key of b.keys()) { + if (a.has(key)) { + yield key; + } + } + } +} diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index 77c7b110e..7196fed1e 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -8,7 +8,8 @@ import { SyncContext, WatchFilterEvent } from '@/index.js'; -import { RequestParameters, SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules'; +import { JSONBig } from '@powersync/service-jsonbig'; +import { RequestParameters, SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; describe('BucketChecksumState', () => { @@ -97,9 +98,9 @@ bucket_definitions: base: { checkpoint: 2n, lsn: '2' }, writeCheckpoint: null, update: { - updatedDataBuckets: ['global[]'], + updatedDataBuckets: new Set(['global[]']), invalidateDataBuckets: false, - updatedParameterBucketDefinitions: [], + updatedParameterLookups: new Set(), invalidateParameterBuckets: false } }))!; @@ -200,7 +201,7 @@ bucket_definitions: writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, - updatedDataBuckets: ['global[1]', 'global[2]'], + updatedDataBuckets: new Set(['global[1]', 'global[2]']), invalidateDataBuckets: false } }))!; @@ -293,7 +294,7 @@ bucket_definitions: // Invalidate the state for global[1] - will only re-check the single bucket. // This is essentially inconsistent state, but is the simplest way to test that // the filter is working. - updatedDataBuckets: ['global[1]'], + updatedDataBuckets: new Set(['global[1]']), invalidateDataBuckets: false } }))!; @@ -420,7 +421,7 @@ bucket_definitions: update: { ...CHECKPOINT_INVALIDATE_ALL, invalidateDataBuckets: false, - updatedDataBuckets: ['global[1]'] + updatedDataBuckets: new Set(['global[1]']) } }))!; expect(line2.checkpointLine).toEqual({ @@ -474,10 +475,10 @@ bucket_definitions: storage.getParameterSets = async ( checkpoint: InternalOpId, - lookups: SqliteJsonValue[][] + lookups: ParameterLookup[] ): Promise => { expect(checkpoint).toEqual(1n); - expect(lookups).toEqual([['by_project', '1', 'u1']]); + expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]); return [{ id: 1 }, { id: 2 }]; }; @@ -519,10 +520,10 @@ bucket_definitions: storage.getParameterSets = async ( checkpoint: InternalOpId, - lookups: SqliteJsonValue[][] + lookups: ParameterLookup[] ): Promise => { expect(checkpoint).toEqual(2n); - expect(lookups).toEqual([['by_project', '1', 'u1']]); + expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]); return [{ id: 1 }, { id: 2 }, { id: 3 }]; }; @@ -532,8 +533,8 @@ bucket_definitions: writeCheckpoint: null, update: { invalidateDataBuckets: false, - updatedDataBuckets: [], - updatedParameterBucketDefinitions: ['by_project'], + updatedDataBuckets: new Set(), + updatedParameterLookups: new Set([JSONBig.stringify(['by_project', '1', 'u1'])]), invalidateParameterBuckets: false } }))!; @@ -580,7 +581,7 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage { ); } - async getParameterSets(checkpoint: InternalOpId, lookups: SqliteJsonValue[][]): Promise { + async getParameterSets(checkpoint: InternalOpId, lookups: ParameterLookup[]): Promise { throw new Error('Method not implemented.'); } } diff --git a/packages/service-core/test/src/util.test.ts b/packages/service-core/test/src/util.test.ts new file mode 100644 index 000000000..12f85c04e --- /dev/null +++ b/packages/service-core/test/src/util.test.ts @@ -0,0 +1,48 @@ +import { getIntersection, hasIntersection } from '@/index.js'; +import { describe, expect, test } from 'vitest'; + +describe('utils', () => { + function testInstersection(a: Set, b: Set, expected: boolean) { + expect(hasIntersection(a, b)).toBe(expected); + expect(hasIntersection(b, a)).toBe(expected); + const mapA = new Map([...a].map((v) => [v, 1])); + const mapB = new Map([...b].map((v) => [v, 1])); + expect(hasIntersection(mapA, b)).toBe(expected); + expect(hasIntersection(mapB, a)).toBe(expected); + expect(hasIntersection(mapA, mapB)).toBe(expected); + } + + test('hasIntersection', async () => { + testInstersection(new Set(['a']), new Set(['a']), true); + testInstersection(new Set(['a', 'b', 'c']), new Set(['a', 'b', 'c']), true); + testInstersection(new Set(['a', 'b', 'c']), new Set(['d', 'e']), false); + testInstersection(new Set(['a', 'b', 'c']), new Set(['d', 'c', 'e']), true); + testInstersection(new Set(['a', 'b', 'c']), new Set(['c', 'e']), true); + testInstersection(new Set(['a', 'b', 'c', 2]), new Set([1, 2, 3]), true); + testInstersection(new Set(['a', 'b', 'c', 4]), new Set([1, 2, 3]), false); + testInstersection(new Set([]), new Set([1, 2, 3]), false); + testInstersection(new Set([]), new Set([]), false); + }); + + function testGetIntersection(a: Set, b: Set, expected: any[]) { + expect([...getIntersection(a, b)]).toEqual(expected); + expect([...getIntersection(b, a)]).toEqual(expected); + const mapA = new Map([...a].map((v) => [v, 1])); + const mapB = new Map([...b].map((v) => [v, 1])); + expect([...getIntersection(mapA, b)]).toEqual(expected); + expect([...getIntersection(mapB, a)]).toEqual(expected); + expect([...getIntersection(mapA, mapB)]).toEqual(expected); + } + + test('getIntersection', async () => { + testGetIntersection(new Set(['a']), new Set(['a']), ['a']); + testGetIntersection(new Set(['a', 'b', 'c']), new Set(['a', 'b', 'c']), ['a', 'b', 'c']); + testGetIntersection(new Set(['a', 'b', 'c']), new Set(['d', 'e']), []); + testGetIntersection(new Set(['a', 'b', 'c']), new Set(['d', 'c', 'e']), ['c']); + testGetIntersection(new Set(['a', 'b', 'c']), new Set(['c', 'e']), ['c']); + testGetIntersection(new Set(['a', 'b', 'c', 2]), new Set([1, 2, 3]), [2]); + testGetIntersection(new Set(['a', 'b', 'c', 4]), new Set([1, 2, 3]), []); + testGetIntersection(new Set([]), new Set([1, 2, 3]), []); + testGetIntersection(new Set([]), new Set([]), []); + }); +}); diff --git a/packages/sync-rules/src/BucketParameterQuerier.ts b/packages/sync-rules/src/BucketParameterQuerier.ts index c229fd6e1..8a21ac4be 100644 --- a/packages/sync-rules/src/BucketParameterQuerier.ts +++ b/packages/sync-rules/src/BucketParameterQuerier.ts @@ -1,5 +1,6 @@ import { BucketDescription } from './BucketDescription.js'; import { RequestParameters, SqliteJsonRow, SqliteJsonValue } from './types.js'; +import { normalizeParameterValue } from './utils.js'; /** * Represents a set of parameter queries for a specific request. @@ -19,11 +20,11 @@ export interface BucketParameterQuerier { * True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used. * * If this is false, queryDynamicBucketDescriptions() will always return an empty array, - * and dynamicBucketDefinitions.size == 0. + * and parameterQueryLookups.length == 0. */ readonly hasDynamicBuckets: boolean; - readonly dynamicBucketDefinitions: Set; + readonly parameterQueryLookups: ParameterLookup[]; /** * These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint. @@ -39,7 +40,7 @@ export interface BucketParameterQuerier { } export interface ParameterLookupSource { - getParameterSets: (lookups: SqliteJsonValue[][]) => Promise; + getParameterSets: (lookups: ParameterLookup[]) => Promise; } export interface QueryBucketDescriptorOptions extends ParameterLookupSource { @@ -47,11 +48,11 @@ export interface QueryBucketDescriptorOptions extends ParameterLookupSource { } export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]): BucketParameterQuerier { - const dynamicBucketDefinitions = new Set(queriers.flatMap((q) => [...q.dynamicBucketDefinitions])); + const parameterQueryLookups = queriers.flatMap((q) => q.parameterQueryLookups); return { staticBuckets: queriers.flatMap((q) => q.staticBuckets), - hasDynamicBuckets: dynamicBucketDefinitions.size > 0, - dynamicBucketDefinitions, + hasDynamicBuckets: parameterQueryLookups.length > 0, + parameterQueryLookups: parameterQueryLookups, async queryDynamicBucketDescriptions(source: ParameterLookupSource) { let results: BucketDescription[] = []; for (let q of queriers) { @@ -63,3 +64,25 @@ export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]) } }; } + +/** + * Represents an equality filter from a parameter query. + * + * Other query types are not supported yet. + */ +export class ParameterLookup { + // bucket definition name, parameter query index, ...lookup values + readonly values: SqliteJsonValue[]; + + static normalized(bucketDefinitionName: string, queryIndex: string, values: SqliteJsonValue[]): ParameterLookup { + return new ParameterLookup([bucketDefinitionName, queryIndex, ...values.map(normalizeParameterValue)]); + } + + /** + * + * @param values must be pre-normalized (any integer converted into bigint) + */ + constructor(values: SqliteJsonValue[]) { + this.values = values; + } +} diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index a0fa59f0f..1e6f6162a 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -113,7 +113,7 @@ export class SqlBucketDescriptor { const staticQuerier = { staticBuckets, hasDynamicBuckets: false, - dynamicBucketDefinitions: new Set(), + parameterQueryLookups: [], queryDynamicBucketDescriptions: async () => [] } satisfies BucketParameterQuerier; @@ -133,6 +133,10 @@ export class SqlBucketDescriptor { return results; } + hasDynamicBucketQueries(): boolean { + return this.parameter_queries.length > 0; + } + getSourceTables(): Set { let result = new Set(); for (let query of this.parameter_queries) { diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index 5c008ddc8..2df59e7d6 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -1,6 +1,6 @@ import { parse, SelectedColumn } from 'pgsql-ast-parser'; import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js'; -import { BucketParameterQuerier, ParameterLookupSource } from './BucketParameterQuerier.js'; +import { BucketParameterQuerier, ParameterLookup, ParameterLookupSource } from './BucketParameterQuerier.js'; import { SqlRuleError } from './errors.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { SqlTools } from './sql_filters.js'; @@ -23,7 +23,7 @@ import { SqliteJsonValue, SqliteRow } from './types.js'; -import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement } from './utils.js'; +import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement, normalizeParameterValue } from './utils.js'; /** * Represents a parameter query, such as: @@ -230,7 +230,7 @@ export class SqlParameterQuery { let lookup: SqliteJsonValue[] = [this.descriptor_name!, this.id!]; lookup.push( ...this.input_parameters!.map((param) => { - return param.filteredRowToLookupValue(filterParamSet); + return normalizeParameterValue(param.filteredRowToLookupValue(filterParamSet)); }) ); @@ -238,7 +238,7 @@ export class SqlParameterQuery { const role: EvaluatedParameters = { bucket_parameters: data.map((row) => filterJsonRow(row)), - lookup: lookup + lookup: new ParameterLookup(lookup) }; result.push(role); } @@ -297,7 +297,7 @@ export class SqlParameterQuery { * * Each lookup is [bucket definition name, parameter query index, ...lookup values] */ - getLookups(parameters: RequestParameters): SqliteJsonValue[][] { + getLookups(parameters: RequestParameters): ParameterLookup[] { if (!this.expanded_input_parameter) { let lookup: SqliteJsonValue[] = [this.descriptor_name!, this.id!]; @@ -308,7 +308,7 @@ export class SqlParameterQuery { const value = param.parametersToLookupValue(parameters); if (isJsonValue(value)) { - return value; + return normalizeParameterValue(value); } else { valid = false; return null; @@ -318,7 +318,7 @@ export class SqlParameterQuery { if (!valid) { return []; } - return [lookup]; + return [new ParameterLookup(lookup)]; } else { const arrayString = this.expanded_input_parameter.parametersToLookupValue(parameters); @@ -339,17 +339,18 @@ export class SqlParameterQuery { .map((expandedValue) => { let lookup: SqliteJsonValue[] = [this.descriptor_name!, this.id!]; let valid = true; + const normalizedExpandedValue = normalizeParameterValue(expandedValue); lookup.push( ...this.input_parameters!.map((param): SqliteJsonValue => { if (param == this.expanded_input_parameter) { // Expand array value - return expandedValue; + return normalizedExpandedValue; } else { // Scalar value const value = param.parametersToLookupValue(parameters); if (isJsonValue(value)) { - return value; + return normalizeParameterValue(value); } else { valid = false; return null; @@ -361,9 +362,9 @@ export class SqlParameterQuery { return null; } - return lookup; + return new ParameterLookup(lookup); }) - .filter((lookup) => lookup != null) as SqliteJsonValue[][]; + .filter((lookup) => lookup != null) as ParameterLookup[]; } } @@ -375,7 +376,7 @@ export class SqlParameterQuery { return { staticBuckets: [], hasDynamicBuckets: false, - dynamicBucketDefinitions: new Set(), + parameterQueryLookups: [], queryDynamicBucketDescriptions: async () => [] }; } @@ -383,7 +384,7 @@ export class SqlParameterQuery { return { staticBuckets: [], hasDynamicBuckets: true, - dynamicBucketDefinitions: new Set([this.descriptor_name!]), + parameterQueryLookups: lookups, queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => { const bucketParameters = await source.getParameterSets(lookups); return this.resolveBucketDescriptions(bucketParameters, requestParameters); diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index 2db268f2a..d22be2520 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -326,6 +326,10 @@ export class SqlSyncRules implements SyncRules { return mergeBucketParameterQueriers(queriers); } + hasDynamicBucketQueries() { + return this.bucket_descriptors.some((query) => query.hasDynamicBucketQueries()); + } + getSourceTables(): TablePattern[] { const sourceTables = new Map(); for (const bucket of this.bucket_descriptors) { diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index bb8248613..4a80c9dcf 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -21,3 +21,4 @@ export * from './TablePattern.js'; export * from './TsSchemaGenerator.js'; export * from './types.js'; export * from './utils.js'; +export * from './BucketParameterQuerier.js'; diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index f4828d0d1..a637c9ab2 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -5,6 +5,7 @@ import { SyncRulesOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; import { toSyncRulesParameters } from './utils.js'; import { BucketDescription, BucketPriority } from './BucketDescription.js'; +import { ParameterLookup } from './BucketParameterQuerier.js'; export interface SyncRules { evaluateRow(options: EvaluateRowOptions): EvaluationResult[]; @@ -18,7 +19,7 @@ export interface QueryParseOptions extends SyncRulesOptions { } export interface EvaluatedParameters { - lookup: SqliteJsonValue[]; + lookup: ParameterLookup; /** * Parameters used to generate bucket id. May be incomplete. @@ -61,7 +62,7 @@ export function isEvaluatedRow(e: EvaluationResult): e is EvaluatedRow { } export function isEvaluatedParameters(e: EvaluatedParametersResult): e is EvaluatedParameters { - return Array.isArray((e as any).lookup); + return 'lookup' in e; } export type EvaluationResult = EvaluatedRow | EvaluationError; diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index af4fbf7b6..dd1cfb558 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -170,3 +170,13 @@ export const JSONBucketNameSerialize = { return stringifyRaw(value, replacer, space)!; } }; + +/** + * Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers. + */ +export function normalizeParameterValue(value: SqliteJsonValue): SqliteJsonValue { + if (typeof value == 'number' && Number.isInteger(value)) { + return BigInt(value); + } + return value; +} diff --git a/packages/sync-rules/test/src/parameter_queries.test.ts b/packages/sync-rules/test/src/parameter_queries.test.ts index e3f88cd43..0566a58fb 100644 --- a/packages/sync-rules/test/src/parameter_queries.test.ts +++ b/packages/sync-rules/test/src/parameter_queries.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from 'vitest'; -import { SqlParameterQuery } from '../../src/index.js'; +import { ParameterLookup, SqlParameterQuery } from '../../src/index.js'; import { BASIC_SCHEMA, normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js'; @@ -11,7 +11,7 @@ describe('parameter queries', () => { query.id = '1'; expect(query.evaluateParameterRow({ id: 'group1', user_ids: JSON.stringify(['user1', 'user2']) })).toEqual([ { - lookup: ['mybucket', '1', 'user1'], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1']), bucket_parameters: [ { group_id: 'group1' @@ -19,7 +19,7 @@ describe('parameter queries', () => { ] }, { - lookup: ['mybucket', '1', 'user2'], + lookup: ParameterLookup.normalized('mybucket', '1', ['user2']), bucket_parameters: [ { group_id: 'group1' @@ -33,7 +33,7 @@ describe('parameter queries', () => { user_id: 'user1' }) ) - ).toEqual([['mybucket', '1', 'user1']]); + ).toEqual([ParameterLookup.normalized('mybucket', '1', ['user1'])]); }); test('IN token_parameters query', function () { @@ -43,7 +43,7 @@ describe('parameter queries', () => { query.id = '1'; expect(query.evaluateParameterRow({ id: 'region1', name: 'colorado' })).toEqual([ { - lookup: ['mybucket', '1', 'colorado'], + lookup: ParameterLookup.normalized('mybucket', '1', ['colorado']), bucket_parameters: [ { region_id: 'region1' @@ -58,8 +58,8 @@ describe('parameter queries', () => { }) ) ).toEqual([ - ['mybucket', '1', 'colorado'], - ['mybucket', '1', 'texas'] + ParameterLookup.normalized('mybucket', '1', ['colorado']), + ParameterLookup.normalized('mybucket', '1', ['texas']) ]); }); @@ -72,7 +72,7 @@ describe('parameter queries', () => { // Note: We don't need to worry about numeric vs decimal types in the lookup - JSONB handles normalization for us. expect(query.evaluateParameterRow({ int1: 314n, float1: 3.14, float2: 314 })).toEqual([ { - lookup: ['mybucket', '1', 314n, 3.14, 314], + lookup: ParameterLookup.normalized('mybucket', '1', [314n, 3.14, 314]), bucket_parameters: [{ int1: 314n, float1: 3.14, float2: 314 }] } @@ -81,7 +81,7 @@ describe('parameter queries', () => { // Similarly, we don't need to worry about the types here. // This test just checks the current behavior. expect(query.getLookups(normalizeTokenParameters({ int1: 314n, float1: 3.14, float2: 314 }))).toEqual([ - ['mybucket', '1', 314n, 3.14, 314n] + ParameterLookup.normalized('mybucket', '1', [314n, 3.14, 314n]) ]); // We _do_ need to care about the bucket string representation. @@ -98,49 +98,56 @@ describe('parameter queries', () => { const sql = 'SELECT id from users WHERE filter_param = token_parameters.user_id'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ { - lookup: ['mybucket', undefined, 'test_param'], + lookup: ParameterLookup.normalized('mybucket', '1', ['test_param']), bucket_parameters: [{ id: 'test_id' }] } ]); - expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([['mybucket', undefined, 'test']]); + expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', ['test']) + ]); }); test('function on token_parameter', () => { const sql = 'SELECT id from users WHERE filter_param = upper(token_parameters.user_id)'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ { - lookup: ['mybucket', undefined, 'test_param'], + lookup: ParameterLookup.normalized('mybucket', '1', ['test_param']), bucket_parameters: [{ id: 'test_id' }] } ]); - expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([['mybucket', undefined, 'TEST']]); + expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', ['TEST']) + ]); }); test('token parameter member operator', () => { const sql = "SELECT id from users WHERE filter_param = token_parameters.some_param ->> 'description'"; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ { - lookup: ['mybucket', undefined, 'test_param'], + lookup: ParameterLookup.normalized('mybucket', '1', ['test_param']), bucket_parameters: [{ id: 'test_id' }] } ]); expect(query.getLookups(normalizeTokenParameters({ some_param: { description: 'test_description' } }))).toEqual([ - ['mybucket', undefined, 'test_description'] + ParameterLookup.normalized('mybucket', '1', ['test_description']) ]); }); @@ -148,36 +155,45 @@ describe('parameter queries', () => { const sql = 'SELECT id from users WHERE filter_param = token_parameters.some_param + 2'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; - expect(query.getLookups(normalizeTokenParameters({ some_param: 3 }))).toEqual([['mybucket', undefined, 5n]]); + expect(query.getLookups(normalizeTokenParameters({ some_param: 3 }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', [5n]) + ]); }); test('token parameter IS NULL as filter', () => { const sql = 'SELECT id from users WHERE filter_param = (token_parameters.some_param IS NULL)'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; - expect(query.getLookups(normalizeTokenParameters({ some_param: null }))).toEqual([['mybucket', undefined, 1n]]); - expect(query.getLookups(normalizeTokenParameters({ some_param: 'test' }))).toEqual([['mybucket', undefined, 0n]]); + expect(query.getLookups(normalizeTokenParameters({ some_param: null }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', [1n]) + ]); + expect(query.getLookups(normalizeTokenParameters({ some_param: 'test' }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', [0n]) + ]); }); test('direct token parameter', () => { const sql = 'SELECT FROM users WHERE token_parameters.some_param'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', undefined, 1n], + lookup: ParameterLookup.normalized('mybucket', '1', [1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: null }))).toEqual([ - ['mybucket', undefined, 0n] + ParameterLookup.normalized('mybucket', '1', [0n]) ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: 123 }))).toEqual([ - ['mybucket', undefined, 1n] + ParameterLookup.normalized('mybucket', '1', [1n]) ]); }); @@ -185,19 +201,20 @@ describe('parameter queries', () => { const sql = 'SELECT FROM users WHERE token_parameters.some_param IS NULL'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', undefined, 1n], + lookup: ParameterLookup.normalized('mybucket', '1', [1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: null }))).toEqual([ - ['mybucket', undefined, 1n] + ParameterLookup.normalized('mybucket', '1', [1n]) ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: 123 }))).toEqual([ - ['mybucket', undefined, 0n] + ParameterLookup.normalized('mybucket', '1', [0n]) ]); }); @@ -205,19 +222,20 @@ describe('parameter queries', () => { const sql = 'SELECT FROM users WHERE token_parameters.some_param IS NOT NULL'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', undefined, 1n], + lookup: ParameterLookup.normalized('mybucket', '1', [1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: null }))).toEqual([ - ['mybucket', undefined, 0n] + ParameterLookup.normalized('mybucket', '1', [0n]) ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: 123 }))).toEqual([ - ['mybucket', undefined, 1n] + ParameterLookup.normalized('mybucket', '1', [1n]) ]); }); @@ -225,19 +243,20 @@ describe('parameter queries', () => { const sql = 'SELECT FROM users WHERE NOT token_parameters.is_admin'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', undefined, 1n], + lookup: ParameterLookup.normalized('mybucket', '1', [1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', is_admin: false }))).toEqual([ - ['mybucket', undefined, 1n] + ParameterLookup.normalized('mybucket', '1', [1n]) ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', is_admin: 123 }))).toEqual([ - ['mybucket', undefined, 0n] + ParameterLookup.normalized('mybucket', '1', [0n]) ]); }); @@ -245,19 +264,20 @@ describe('parameter queries', () => { const sql = 'SELECT FROM users WHERE users.id = token_parameters.user_id AND token_parameters.some_param IS NULL'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', undefined, 'user1', 1n], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1', 1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: null }))).toEqual([ - ['mybucket', undefined, 'user1', 1n] + ParameterLookup.normalized('mybucket', '1', ['user1', 1n]) ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: 123 }))).toEqual([ - ['mybucket', undefined, 'user1', 0n] + ParameterLookup.normalized('mybucket', '1', ['user1', 0n]) ]); }); @@ -265,19 +285,20 @@ describe('parameter queries', () => { const sql = 'SELECT FROM users WHERE users.id = token_parameters.user_id AND token_parameters.some_param'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', undefined, 'user1', 1n], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1', 1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: 123 }))).toEqual([ - ['mybucket', undefined, 'user1', 1n] + ParameterLookup.normalized('mybucket', '1', ['user1', 1n]) ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', some_param: null }))).toEqual([ - ['mybucket', undefined, 'user1', 0n] + ParameterLookup.normalized('mybucket', '1', ['user1', 0n]) ]); }); @@ -285,26 +306,32 @@ describe('parameter queries', () => { const sql = 'SELECT FROM users WHERE users.id = cast(token_parameters.user_id as text)'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ - ['mybucket', undefined, 'user1'] + ParameterLookup.normalized('mybucket', '1', ['user1']) + ]); + expect(query.getLookups(normalizeTokenParameters({ user_id: 123 }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', ['123']) ]); - expect(query.getLookups(normalizeTokenParameters({ user_id: 123 }))).toEqual([['mybucket', undefined, '123']]); }); test('IS NULL row filter', () => { const sql = 'SELECT id FROM users WHERE role IS NULL'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1', role: null })).toEqual([ { - lookup: ['mybucket', undefined], + lookup: ParameterLookup.normalized('mybucket', '1', []), bucket_parameters: [{ id: 'user1' }] } ]); - expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([['mybucket', undefined]]); + expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', []) + ]); }); test('token filter (1)', () => { @@ -318,17 +345,17 @@ describe('parameter queries', () => { expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', '1', 'user1', 1n], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1', 1n]), bucket_parameters: [{}] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([ - ['mybucket', '1', 'user1', 1n] + ParameterLookup.normalized('mybucket', '1', ['user1', 1n]) ]); // Would not match any actual lookups expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', is_admin: false }))).toEqual([ - ['mybucket', '1', 'user1', 0n] + ParameterLookup.normalized('mybucket', '1', ['user1', 0n]) ]); }); @@ -341,14 +368,14 @@ describe('parameter queries', () => { expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', '1', 'user1', 1n], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1', 1n]), bucket_parameters: [{ user_id: 'user1' }] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([ - ['mybucket', '1', 'user1', 1n] + ParameterLookup.normalized('mybucket', '1', ['user1', 1n]) ]); expect( @@ -367,7 +394,7 @@ describe('parameter queries', () => { expect(query.evaluateParameterRow({ userId: 'user1' })).toEqual([ { - lookup: ['mybucket', '1', 'user1'], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1']), bucket_parameters: [{ user_id: 'user1' }] } @@ -389,7 +416,7 @@ describe('parameter queries', () => { expect(query.evaluateParameterRow({ userId: 'user1' })).toEqual([]); expect(query.evaluateParameterRow({ userid: 'user1' })).toEqual([ { - lookup: ['mybucket', '1', 'user1'], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1']), bucket_parameters: [{ user_id: 'user1' }] } @@ -444,7 +471,7 @@ describe('parameter queries', () => { expect(query.evaluateParameterRow({ id: 'workspace1', visibility: 'public' })).toEqual([ { - lookup: ['mybucket', '1'], + lookup: ParameterLookup.normalized('mybucket', '1', []), bucket_parameters: [{ workspace_id: 'workspace1' }] } @@ -459,17 +486,18 @@ describe('parameter queries', () => { 'SELECT id from users WHERE filter_param = upper(token_parameters.user_id) AND filter_param = lower(token_parameters.user_id)'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'test_id', filter_param: 'test_param' })).toEqual([ { - lookup: ['mybucket', undefined, 'test_param', 'test_param'], + lookup: ParameterLookup.normalized('mybucket', '1', ['test_param', 'test_param']), bucket_parameters: [{ id: 'test_id' }] } ]); expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([ - ['mybucket', undefined, 'TEST', 'test'] + ParameterLookup.normalized('mybucket', '1', ['TEST', 'test']) ]); }); @@ -479,19 +507,22 @@ describe('parameter queries', () => { 'SELECT id from users WHERE filter_param1 = upper(token_parameters.user_id) OR filter_param2 = upper(token_parameters.user_id)'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; expect(query.evaluateParameterRow({ id: 'test_id', filter_param1: 'test1', filter_param2: 'test2' })).toEqual([ { - lookup: ['mybucket', undefined, 'test1'], + lookup: ParameterLookup.normalized('mybucket', '1', ['test1']), bucket_parameters: [{ id: 'test_id' }] }, { - lookup: ['mybucket', undefined, 'test2'], + lookup: ParameterLookup.normalized('mybucket', '1', ['test2']), bucket_parameters: [{ id: 'test_id' }] } ]); - expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([['mybucket', undefined, 'TEST']]); + expect(query.getLookups(normalizeTokenParameters({ user_id: 'test' }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', ['TEST']) + ]); }); test('request.parameters()', function () { @@ -504,11 +535,13 @@ describe('parameter queries', () => { query.id = '1'; expect(query.evaluateParameterRow({ id: 'group1', category: 'red' })).toEqual([ { - lookup: ['mybucket', '1', 'red'], + lookup: ParameterLookup.normalized('mybucket', '1', ['red']), bucket_parameters: [{}] } ]); - expect(query.getLookups(normalizeTokenParameters({}, { category_id: 'red' }))).toEqual([['mybucket', '1', 'red']]); + expect(query.getLookups(normalizeTokenParameters({}, { category_id: 'red' }))).toEqual([ + ParameterLookup.normalized('mybucket', '1', ['red']) + ]); }); test('nested request.parameters() (1)', function () { @@ -520,7 +553,7 @@ describe('parameter queries', () => { expect(query.errors).toEqual([]); query.id = '1'; expect(query.getLookups(normalizeTokenParameters({}, { details: { category: 'red' } }))).toEqual([ - ['mybucket', '1', 'red'] + ParameterLookup.normalized('mybucket', '1', ['red']) ]); }); @@ -533,7 +566,7 @@ describe('parameter queries', () => { expect(query.errors).toEqual([]); query.id = '1'; expect(query.getLookups(normalizeTokenParameters({}, { details: { category: 'red' } }))).toEqual([ - ['mybucket', '1', 'red'] + ParameterLookup.normalized('mybucket', '1', ['red']) ]); }); @@ -548,7 +581,7 @@ describe('parameter queries', () => { query.id = '1'; expect(query.evaluateParameterRow({ id: 'region1', name: 'colorado' })).toEqual([ { - lookup: ['mybucket', '1', 'colorado'], + lookup: ParameterLookup.normalized('mybucket', '1', ['colorado']), bucket_parameters: [ { region_id: 'region1' @@ -566,8 +599,8 @@ describe('parameter queries', () => { ) ) ).toEqual([ - ['mybucket', '1', 'colorado'], - ['mybucket', '1', 'texas'] + ParameterLookup.normalized('mybucket', '1', ['colorado']), + ParameterLookup.normalized('mybucket', '1', ['texas']) ]); }); @@ -578,12 +611,12 @@ describe('parameter queries', () => { query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', '1', 'user1'], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1']), bucket_parameters: [{ id: 'user1' }] } ]); const requestParams = normalizeTokenParameters({ user_id: 'user1' }, { other_id: 'red' }); - expect(query.getLookups(requestParams)).toEqual([['mybucket', '1', 'user1']]); + expect(query.getLookups(requestParams)).toEqual([ParameterLookup.normalized('mybucket', '1', ['user1'])]); }); test('request.parameters() in SELECT', function () { @@ -594,30 +627,32 @@ describe('parameter queries', () => { query.id = '1'; expect(query.evaluateParameterRow({ id: 'user1' })).toEqual([ { - lookup: ['mybucket', '1', 'user1'], + lookup: ParameterLookup.normalized('mybucket', '1', ['user1']), bucket_parameters: [{ id: 'user1' }] } ]); const requestParams = normalizeTokenParameters({ user_id: 'user1' }, { other_id: 'red' }); - expect(query.getLookups(requestParams)).toEqual([['mybucket', '1', 'user1']]); + expect(query.getLookups(requestParams)).toEqual([ParameterLookup.normalized('mybucket', '1', ['user1'])]); }); test('request.jwt()', function () { const sql = "SELECT FROM users WHERE id = request.jwt() ->> 'sub'"; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; const requestParams = normalizeTokenParameters({ user_id: 'user1' }); - expect(query.getLookups(requestParams)).toEqual([['mybucket', undefined, 'user1']]); + expect(query.getLookups(requestParams)).toEqual([ParameterLookup.normalized('mybucket', '1', ['user1'])]); }); test('request.user_id()', function () { const sql = 'SELECT FROM users WHERE id = request.user_id()'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; expect(query.errors).toEqual([]); + query.id = '1'; const requestParams = normalizeTokenParameters({ user_id: 'user1' }); - expect(query.getLookups(requestParams)).toEqual([['mybucket', undefined, 'user1']]); + expect(query.getLookups(requestParams)).toEqual([ParameterLookup.normalized('mybucket', '1', ['user1'])]); }); test('invalid OR in parameter queries', () => { diff --git a/packages/sync-rules/test/src/sync_rules.test.ts b/packages/sync-rules/test/src/sync_rules.test.ts index 1ffa64cb4..e5b277da7 100644 --- a/packages/sync-rules/test/src/sync_rules.test.ts +++ b/packages/sync-rules/test/src/sync_rules.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from 'vitest'; -import { SqlSyncRules } from '../../src/index.js'; +import { ParameterLookup, SqlSyncRules } from '../../src/index.js'; import { ASSETS, BASIC_SCHEMA, PARSE_OPTIONS, TestSourceTable, USERS, normalizeTokenParameters } from './util.js'; @@ -37,10 +37,10 @@ bucket_definitions: bucket: 'mybucket[]' } ]); + expect(rules.hasDynamicBucketQueries()).toBe(false); expect(rules.getBucketParameterQuerier(normalizeTokenParameters({}))).toMatchObject({ staticBuckets: [{ bucket: 'mybucket[]', priority: 3 }], - hasDynamicBuckets: false, - dynamicBucketDefinitions: new Set() + hasDynamicBuckets: false }); }); @@ -72,8 +72,7 @@ bucket_definitions: }); expect(rules.getBucketParameterQuerier(normalizeTokenParameters({}))).toMatchObject({ staticBuckets: [], - hasDynamicBuckets: false, - dynamicBucketDefinitions: new Set() + hasDynamicBuckets: false }); }); @@ -94,7 +93,7 @@ bucket_definitions: expect(rules.evaluateParameterRow(USERS, { id: 'user1', is_admin: 1 })).toEqual([ { bucket_parameters: [{}], - lookup: ['mybucket', '1', 'user1'] + lookup: ParameterLookup.normalized('mybucket', '1', ['user1']) } ]); expect(rules.evaluateParameterRow(USERS, { id: 'user1', is_admin: 0 })).toEqual([]); @@ -936,15 +935,16 @@ bucket_definitions: ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id']); + expect(rules.hasDynamicBucketQueries()).toBe(true); expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'user1' }))).toMatchObject({ hasDynamicBuckets: true, - dynamicBucketDefinitions: new Set([ - 'mybucket', - 'by_list', + parameterQueryLookups: [ + ParameterLookup.normalized('mybucket', '2', ['user1']), + ParameterLookup.normalized('by_list', '1', ['user1']), // These are not filtered out yet, due to how the lookups are structured internally - 'admin_only' - ]), + ParameterLookup.normalized('admin_only', '1', [1]) + ], staticBuckets: [ { bucket: 'mybucket["user1"]',