diff --git a/.changeset/odd-snails-end.md b/.changeset/odd-snails-end.md new file mode 100644 index 000000000..2f29459f9 --- /dev/null +++ b/.changeset/odd-snails-end.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Stream changes in priority order. diff --git a/.changeset/tall-peas-cough.md b/.changeset/tall-peas-cough.md new file mode 100644 index 000000000..ef52a9dec --- /dev/null +++ b/.changeset/tall-peas-cough.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-sync-rules': minor +--- + +Replace bucket ids from queries with a description also containing a priority. diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cbb49b62b..b8b03d717 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -151,6 +151,9 @@ jobs: - name: Test run: pnpm test --filter='./modules/module-postgres' + - name: Test + run: pnpm test --filter='./modules/module-postgres-storage' + run-mysql-tests: name: MySQL Test runs-on: ubuntu-latest @@ -287,3 +290,6 @@ jobs: - name: Test run: pnpm test --filter='./modules/module-mongodb' + + - name: Test Storage + run: pnpm test --filter='./modules/module-mongodb-storage' diff --git a/docs/compacting-operations.md b/docs/compacting-operations.md index 6caf9e269..48e2ab89e 100644 --- a/docs/compacting-operations.md +++ b/docs/compacting-operations.md @@ -75,8 +75,10 @@ The second part is compacting to CLEAR operations. For each bucket, we keep trac For an initial workaround, defragmenting can be performed outside powersync by touching all rows in a bucket: ```sql -update mytable set id = id --- Repeat the above for other tables in the same bucket if relevant +UPDATE mytable +SET + id = id + -- Repeat the above for other tables in the same bucket if relevant ``` After this, the normal MOVE + CLEAR compacting will compact the bucket to only have a single operation per active row. @@ -86,7 +88,11 @@ This would cause existing clients to re-sync every row, while reducing the numbe If an updated_at column or similar is present, we can use this to defragment more incrementally: ```sql -update mytable set id = id where updated_at < now() - interval '1 week' +UPDATE mytable +SET + id = id +WHERE + updated_at < now() - interval '1 week' ``` This version avoids unnecessary defragmentation of rows modified recently. diff --git a/docs/postgres-initial-replication.md b/docs/postgres-initial-replication.md index 351293583..15229e6f1 100644 --- a/docs/postgres-initial-replication.md +++ b/docs/postgres-initial-replication.md @@ -11,7 +11,7 @@ This is our first approach. We start by creating a logical replication slot, exporting a snapshot: ```sql -CREATE_REPLICATION_SLOT LOGICAL pgoutput EXPORT_SNAPSHOT +CREATE_REPLICATION_SLOT < slot > LOGICAL pgoutput EXPORT_SNAPSHOT ``` While that connection stays open, we create another connection with a transaction, and read each table: diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 233f10df8..efb28bcd3 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -9,6 +9,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 1`] = ` "bucket": "mybucket[]", "checksum": -93886621, "count": 2, + "priority": 3, }, ], "last_op_id": "2", @@ -44,6 +45,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "bucket": "mybucket[]", "checksum": 499012468, "count": 4, + "priority": 3, }, ], "write_checkpoint": undefined, @@ -102,6 +104,7 @@ exports[`sync - mongodb > expiring token 1`] = ` "bucket": "mybucket[]", "checksum": 0, "count": 0, + "priority": 3, }, ], "last_op_id": "0", @@ -124,6 +127,80 @@ exports[`sync - mongodb > expiring token 2`] = ` ] `; +exports[`sync - mongodb > sync buckets in order 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "b0[]", + "checksum": 920318466, + "count": 1, + "priority": 2, + }, + { + "bucket": "b1[]", + "checksum": -1382098757, + "count": 1, + "priority": 1, + }, + ], + "last_op_id": "2", + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "b1[]", + "data": [ + { + "checksum": 2912868539n, + "data": "{"id":"earlier","description":"Test 2"}", + "object_id": "earlier", + "object_type": "test", + "op": "PUT", + "op_id": "2", + "subkey": "0dfe86bd-d15b-5fd0-9c7b-a31693030ee0", + }, + ], + "has_more": false, + "next_after": "2", + }, + }, + { + "partial_checkpoint_complete": { + "last_op_id": "2", + "priority": 1, + }, + }, + { + "data": { + "after": "0", + "bucket": "b0[]", + "data": [ + { + "checksum": 920318466n, + "data": "{"id":"t1","description":"Test 1"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "1", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + exports[`sync - mongodb > sync global data 1`] = ` [ { @@ -133,6 +210,7 @@ exports[`sync - mongodb > sync global data 1`] = ` "bucket": "mybucket[]", "checksum": -93886621, "count": 2, + "priority": 3, }, ], "last_op_id": "2", @@ -184,6 +262,7 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = ` "bucket": "mybucket[]", "checksum": -852817836, "count": 1, + "priority": 3, }, ], "last_op_id": "1", @@ -231,6 +310,7 @@ exports[`sync - mongodb > sync updates to global data 1`] = ` "bucket": "mybucket[]", "checksum": 0, "count": 0, + "priority": 3, }, ], "last_op_id": "0", @@ -256,6 +336,7 @@ exports[`sync - mongodb > sync updates to global data 2`] = ` "bucket": "mybucket[]", "checksum": 920318466, "count": 1, + "priority": 3, }, ], "write_checkpoint": undefined, @@ -299,6 +380,7 @@ exports[`sync - mongodb > sync updates to global data 3`] = ` "bucket": "mybucket[]", "checksum": -93886621, "count": 2, + "priority": 3, }, ], "write_checkpoint": undefined, diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 64e792029..52af71a0c 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -9,6 +9,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 1`] = ` "bucket": "mybucket[]", "checksum": -93886621, "count": 2, + "priority": 3, }, ], "last_op_id": "2", @@ -44,6 +45,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = ` "bucket": "mybucket[]", "checksum": 499012468, "count": 4, + "priority": 3, }, ], "write_checkpoint": undefined, @@ -102,6 +104,7 @@ exports[`sync - postgres > expiring token 1`] = ` "bucket": "mybucket[]", "checksum": 0, "count": 0, + "priority": 3, }, ], "last_op_id": "0", @@ -124,6 +127,80 @@ exports[`sync - postgres > expiring token 2`] = ` ] `; +exports[`sync - postgres > sync buckets in order 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "b0[]", + "checksum": 920318466, + "count": 1, + "priority": 2, + }, + { + "bucket": "b1[]", + "checksum": -1382098757, + "count": 1, + "priority": 1, + }, + ], + "last_op_id": "2", + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "b1[]", + "data": [ + { + "checksum": 2912868539n, + "data": "{"id":"earlier","description":"Test 2"}", + "object_id": "earlier", + "object_type": "test", + "op": "PUT", + "op_id": "2", + "subkey": "243b0e26-87b2-578a-993c-5ac5b6f7fd64", + }, + ], + "has_more": false, + "next_after": "2", + }, + }, + { + "partial_checkpoint_complete": { + "last_op_id": "2", + "priority": 1, + }, + }, + { + "data": { + "after": "0", + "bucket": "b0[]", + "data": [ + { + "checksum": 920318466n, + "data": "{"id":"t1","description":"Test 1"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "1", + "subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + exports[`sync - postgres > sync global data 1`] = ` [ { @@ -133,6 +210,7 @@ exports[`sync - postgres > sync global data 1`] = ` "bucket": "mybucket[]", "checksum": -93886621, "count": 2, + "priority": 3, }, ], "last_op_id": "2", @@ -184,6 +262,7 @@ exports[`sync - postgres > sync legacy non-raw data 1`] = ` "bucket": "mybucket[]", "checksum": -852817836, "count": 1, + "priority": 3, }, ], "last_op_id": "1", @@ -231,6 +310,7 @@ exports[`sync - postgres > sync updates to global data 1`] = ` "bucket": "mybucket[]", "checksum": 0, "count": 0, + "priority": 3, }, ], "last_op_id": "0", @@ -256,6 +336,7 @@ exports[`sync - postgres > sync updates to global data 2`] = ` "bucket": "mybucket[]", "checksum": 920318466, "count": 1, + "priority": 3, }, ], "write_checkpoint": undefined, @@ -299,6 +380,7 @@ exports[`sync - postgres > sync updates to global data 3`] = ` "bucket": "mybucket[]", "checksum": -93886621, "count": 2, + "priority": 3, }, ], "write_checkpoint": undefined, diff --git a/package.json b/package.json index 4c4d625aa..b2df21c8a 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,8 @@ "start:service": "pnpm --filter @powersync/service-image watch", "clean": "pnpm run -r clean", "release": "pnpm build:production && pnpm changeset publish", - "test": "pnpm run -r test" + "test": "pnpm run -r test", + "vitest": "vitest" }, "devDependencies": { "@changesets/cli": "^2.27.8", diff --git a/packages/service-core-tests/README.md b/packages/service-core-tests/README.md index bc376c699..a35c19fbd 100644 --- a/packages/service-core-tests/README.md +++ b/packages/service-core-tests/README.md @@ -2,4 +2,4 @@ A small helper package which exposes common unit tests and test utility functions. -This package is used in various modules for their unit tests. \ No newline at end of file +This package is used in various modules for their unit tests. diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index fdfde9b66..a67ae9cab 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -394,13 +394,13 @@ bucket_definitions: const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]); - const buckets = await sync_rules.queryBucketIds({ + const buckets = await sync_rules.queryBucketDescriptions({ getParameterSets(lookups) { return bucketStorage.getParameterSets(checkpoint, lookups); }, parameters }); - expect(buckets).toEqual(['by_workspace["workspace1"]']); + expect(buckets).toEqual([{ bucket: 'by_workspace["workspace1"]', priority: 3 }]); }); test('save and load parameters with dynamic global buckets', async () => { @@ -466,14 +466,17 @@ bucket_definitions: parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]); - const buckets = await sync_rules.queryBucketIds({ + const buckets = await sync_rules.queryBucketDescriptions({ getParameterSets(lookups) { return bucketStorage.getParameterSets(checkpoint, lookups); }, parameters }); - buckets.sort(); - expect(buckets).toEqual(['by_public_workspace["workspace1"]', 'by_public_workspace["workspace3"]']); + buckets.sort((a, b) => a.bucket.localeCompare(b.bucket)); + expect(buckets).toEqual([ + { bucket: 'by_public_workspace["workspace1"]', priority: 3 }, + { bucket: 'by_public_workspace["workspace3"]', priority: 3 } + ]); }); test('multiple parameter queries', async () => { @@ -562,12 +565,14 @@ bucket_definitions: expect(parameter_sets2).toEqual([{ workspace_id: 'workspace3' }]); // Test final values - the important part - const buckets = await sync_rules.queryBucketIds({ - getParameterSets(lookups) { - return bucketStorage.getParameterSets(checkpoint, lookups); - }, - parameters - }); + const buckets = ( + await sync_rules.queryBucketDescriptions({ + getParameterSets(lookups) { + return bucketStorage.getParameterSets(checkpoint, lookups); + }, + parameters + }) + ).map((e) => e.bucket); buckets.sort(); expect(buckets).toEqual(['by_workspace["workspace1"]', 'by_workspace["workspace3"]']); }); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 2cf14cb54..d96dc34ad 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -83,6 +83,159 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { expect(lines).toMatchSnapshot(); }); + test('sync buckets in order', async () => { + await using f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: ` +bucket_definitions: + b0: + priority: 2 + data: + - SELECT * FROM test WHERE LENGTH(id) <= 2; + b1: + priority: 1 + data: + - SELECT * FROM test WHERE LENGTH(id) > 2; + ` + }); + + const bucketStorage = f.getInstance(syncRules); + await bucketStorage.autoActivate(); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + description: 'Test 1' + }, + afterReplicaId: 't1' + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'earlier', + description: 'Test 2' + }, + afterReplicaId: 'earlier' + }); + + await batch.commit('0/1'); + }); + + const stream = sync.streamResponse({ + storage: f, + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + parseOptions: test_utils.PARSE_OPTIONS, + tracker, + syncParams: new RequestParameters({ sub: '' }, {}), + token: { exp: Date.now() / 1000 + 10 } as any + }); + + const lines = await consumeCheckpointLines(stream); + expect(lines).toMatchSnapshot(); + }); + + test('sync interrupts low-priority buckets on new checkpoints', async () => { + await using f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: ` +bucket_definitions: + b0: + priority: 2 + data: + - SELECT * FROM test WHERE LENGTH(id) <= 5; + b1: + priority: 1 + data: + - SELECT * FROM test WHERE LENGTH(id) > 5; + ` + }); + + const bucketStorage = f.getInstance(syncRules); + await bucketStorage.autoActivate(); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // Initial data: Add one priority row and 10k low-priority rows. + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'highprio', + description: 'High priority row' + }, + afterReplicaId: 'highprio' + }); + for (let i = 0; i < 10_000; i++) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: `${i}`, + description: 'low prio' + }, + afterReplicaId: `${i}` + }); + } + + await batch.commit('0/1'); + }); + + const stream = sync.streamResponse({ + storage: f, + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + parseOptions: test_utils.PARSE_OPTIONS, + tracker, + syncParams: new RequestParameters({ sub: '' }, {}), + token: { exp: Date.now() / 1000 + 10 } as any + }); + + let sentCheckpoints = 0; + for await (const next of stream) { + if (typeof next === 'object' && next !== null) { + if ('partial_checkpoint_complete' in next) { + expect(sentCheckpoints).toBe(1); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // Add another high-priority row. This should interrupt the long-running low-priority sync. + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'highprio2', + description: 'Another high-priority row' + }, + afterReplicaId: 'highprio2' + }); + + await batch.commit('0/2'); + }); + } + if ('checkpoint' in next || 'checkpoint_diff' in next) { + sentCheckpoints += 1; + } + if ('checkpoint_complete' in next) { + break; + } + } + } + + expect(sentCheckpoints).toBe(2); + }); + test('sync legacy non-raw data', async () => { const f = await factory(); diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 64daad3ec..671c33706 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -2,6 +2,7 @@ import { BucketChecksum, OpId } from '../util/protocol-types.js'; import { ChecksumMap, addBucketChecksums } from '../util/utils.js'; import { LRUCache } from 'lru-cache/min'; import { OrderedSet } from '@js-sdsl/ordered-set'; +import { BucketPriority } from '@powersync/service-sync-rules'; interface ChecksumFetchContext { fetch(bucket: string): Promise; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 0b2578209..fdf1b7f70 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -1,5 +1,5 @@ import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; -import { RequestParameters } from '@powersync/service-sync-rules'; +import { BucketDescription, BucketPriority, RequestParameters } from '@powersync/service-sync-rules'; import { Semaphore, withTimeout } from 'async-mutex'; import { AbortError } from 'ix/aborterror.js'; @@ -11,7 +11,7 @@ import * as util from '../util/util-index.js'; import { logger } from '@powersync/lib-services-framework'; import { mergeAsyncIterables } from './merge.js'; import { RequestTracker } from './RequestTracker.js'; -import { TokenStreamOptions, tokenStream } from './util.js'; +import { TokenStreamOptions, acquireSemaphoreAbortable, tokenStream } from './util.js'; /** * Maximum number of connections actively fetching data. @@ -84,6 +84,11 @@ export async function* streamResponse( } } +type BucketSyncState = { + description?: BucketDescription; // Undefined if the bucket has not yet been resolved by us. + start_op_id: string; +}; + async function* streamResponseInner( storage: storage.BucketStorageFactory, params: util.StreamingSyncRequest, @@ -94,7 +99,7 @@ async function* streamResponseInner( ): AsyncGenerator { // Bucket state of bucket id -> op_id. // This starts with the state from the client. May contain buckets that the user do not have access to (anymore). - let dataBuckets = new Map(); + let dataBuckets = new Map(); let lastChecksums: util.ChecksumMap | null = null; let lastWriteCheckpoint: bigint | null = null; @@ -103,138 +108,218 @@ async function* streamResponseInner( if (params.buckets) { for (let { name, after: start } of params.buckets) { - dataBuckets.set(name, start); + dataBuckets.set(name, { start_op_id: start }); } } const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id); const stream = storage.watchWriteCheckpoint(checkpointUserId, signal); - for await (const next of stream) { - const { base, writeCheckpoint } = next; - const checkpoint = base.checkpoint; - - const storage = await base.getBucketStorage(); - if (storage == null) { - // Sync rules deleted in the meantime - try again with the next checkpoint. - continue; - } - const syncRules = storage.getParsedSyncRules(parseOptions); + const newCheckpoints = stream[Symbol.asyncIterator](); - const allBuckets = await syncRules.queryBucketIds({ - getParameterSets(lookups) { - return storage.getParameterSets(checkpoint, lookups); - }, - parameters: syncParams - }); - - if (allBuckets.length > 1000) { - logger.error(`Too many buckets`, { - checkpoint, - user_id: syncParams.user_id, - buckets: allBuckets.length - }); - // TODO: Limit number of buckets even before we get to this point - throw new Error(`Too many buckets: ${allBuckets.length}`); - } + try { + let nextCheckpointPromise: Promise> | undefined; - let dataBucketsNew = new Map(); - for (let bucket of allBuckets) { - dataBucketsNew.set(bucket, dataBuckets.get(bucket) ?? '0'); - } - dataBuckets = dataBucketsNew; - - const bucketList = [...dataBuckets.keys()]; - const checksumMap = await storage.getChecksums(checkpoint, bucketList); - // Subset of buckets for which there may be new data in this batch. - let bucketsToFetch: string[]; - - if (lastChecksums) { - const diff = util.checksumsDiff(lastChecksums, checksumMap); - - if ( - lastWriteCheckpoint == writeCheckpoint && - diff.removedBuckets.length == 0 && - diff.updatedBuckets.length == 0 - ) { - // No changes - don't send anything to the client + do { + if (!nextCheckpointPromise) { + nextCheckpointPromise = newCheckpoints.next(); + } + const next = await nextCheckpointPromise; + nextCheckpointPromise = undefined; + if (next.done) { + break; + } + + const { base, writeCheckpoint } = next.value; + const checkpoint = base.checkpoint; + + const storage = await base.getBucketStorage(); + if (storage == null) { + // Sync rules deleted in the meantime - try again with the next checkpoint. continue; } - bucketsToFetch = diff.updatedBuckets.map((c) => c.bucket); - - let message = `Updated checkpoint: ${checkpoint} | `; - message += `write: ${writeCheckpoint} | `; - message += `buckets: ${allBuckets.length} | `; - message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; - message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`; - logger.info(message, { - checkpoint, - user_id: syncParams.user_id, - buckets: allBuckets.length, - updated: diff.updatedBuckets.length, - removed: diff.removedBuckets.length + const syncRules = storage.getParsedSyncRules(parseOptions); + + const allBuckets = await syncRules.queryBucketDescriptions({ + getParameterSets(lookups) { + return storage.getParameterSets(checkpoint, lookups); + }, + parameters: syncParams }); - const checksum_line: util.StreamingSyncCheckpointDiff = { - checkpoint_diff: { - last_op_id: checkpoint, - write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - removed_buckets: diff.removedBuckets, - updated_buckets: diff.updatedBuckets + if (allBuckets.length > 1000) { + logger.error(`Too many buckets`, { + checkpoint, + user_id: syncParams.user_id, + buckets: allBuckets.length + }); + // TODO: Limit number of buckets even before we get to this point + throw new Error(`Too many buckets: ${allBuckets.length}`); + } + + let dataBucketsNew = new Map(); + for (let bucket of allBuckets) { + dataBucketsNew.set(bucket.bucket, { + description: bucket, + start_op_id: dataBuckets.get(bucket.bucket)?.start_op_id ?? '0' + }); + } + dataBuckets = dataBucketsNew; + + const bucketList = [...dataBuckets.keys()]; + const checksumMap = await storage.getChecksums(checkpoint, bucketList); + // Subset of buckets for which there may be new data in this batch. + let bucketsToFetch: BucketDescription[]; + + if (lastChecksums) { + const diff = util.checksumsDiff(lastChecksums, checksumMap); + + if ( + lastWriteCheckpoint == writeCheckpoint && + diff.removedBuckets.length == 0 && + diff.updatedBuckets.length == 0 + ) { + // No changes - don't send anything to the client + continue; } - }; + const updatedBucketDescriptions = diff.updatedBuckets.map((e) => ({ + ...e, + priority: dataBuckets.get(e.bucket)!.description!.priority + })); + bucketsToFetch = updatedBucketDescriptions; + + let message = `Updated checkpoint: ${checkpoint} | `; + message += `write: ${writeCheckpoint} | `; + message += `buckets: ${allBuckets.length} | `; + message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; + message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`; + logger.info(message, { + checkpoint, + user_id: syncParams.user_id, + buckets: allBuckets.length, + updated: diff.updatedBuckets.length, + removed: diff.removedBuckets.length + }); + + const checksum_line: util.StreamingSyncCheckpointDiff = { + checkpoint_diff: { + last_op_id: checkpoint, + write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, + removed_buckets: diff.removedBuckets, + updated_buckets: updatedBucketDescriptions + } + }; - yield checksum_line; - } else { - let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; - message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; - logger.info(message, { checkpoint, user_id: syncParams.user_id, buckets: allBuckets.length }); - bucketsToFetch = allBuckets; - const checksum_line: util.StreamingSyncCheckpoint = { - checkpoint: { - last_op_id: checkpoint, - write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - buckets: [...checksumMap.values()] + yield checksum_line; + } else { + let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; + message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; + logger.info(message, { checkpoint, user_id: syncParams.user_id, buckets: allBuckets.length }); + bucketsToFetch = allBuckets; + const checksum_line: util.StreamingSyncCheckpoint = { + checkpoint: { + last_op_id: checkpoint, + write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, + buckets: [...checksumMap.values()].map((e) => ({ + ...e, + priority: dataBuckets.get(e.bucket)!.description!.priority + })) + } + }; + yield checksum_line; + } + lastChecksums = checksumMap; + lastWriteCheckpoint = writeCheckpoint; + + // Start syncing data for buckets up to the checkpoint. As soon as we have completed at least one priority and + // at least 1000 operations, we also start listening for new checkpoints concurrently. When a new checkpoint comes + // in while we're still busy syncing data for lower priorities, interrupt the current operation and start syncing + // the new checkpoint. + const abortCheckpointController = new AbortController(); + let syncedOperations = 0; + + const abortCheckpointSignal = AbortSignal.any([abortCheckpointController.signal, signal]); + + const bucketsByPriority = [...Map.groupBy(bucketsToFetch, (bucket) => bucket.priority).entries()]; + bucketsByPriority.sort((a, b) => a[0] - b[0]); // Sort from high to lower priorities + const lowestPriority = bucketsByPriority.at(-1)?.[0]; + + function maybeRaceForNewCheckpoint() { + if (syncedOperations >= 1000 && nextCheckpointPromise === undefined) { + nextCheckpointPromise = (async () => { + const next = await newCheckpoints.next(); + if (!next.done) { + // Stop the running bucketDataInBatches() iterations, making the main flow reach the new checkpoint. + abortCheckpointController.abort(); + } + + return next; + })(); } - }; - yield checksum_line; - } - lastChecksums = checksumMap; - lastWriteCheckpoint = writeCheckpoint; - - // This incrementally updates dataBuckets with each individual bucket position. - // At the end of this, we can be sure that all buckets have data up to the checkpoint. - yield* bucketDataInBatches({ - storage, - checkpoint, - bucketsToFetch, - dataBuckets, - raw_data, - binary_data, - signal, - tracker, - user_id: syncParams.user_id - }); - - await new Promise((resolve) => setTimeout(resolve, 10)); + } + + function markOperationsSent(operations: number) { + syncedOperations += operations; + tracker.addOperationsSynced(operations); + maybeRaceForNewCheckpoint(); + } + + // This incrementally updates dataBuckets with each individual bucket position. + // At the end of this, we can be sure that all buckets have data up to the checkpoint. + for (const [priority, buckets] of bucketsByPriority) { + const isLast = priority === lowestPriority; + if (abortCheckpointSignal.aborted) { + break; + } + + yield* bucketDataInBatches({ + storage, + checkpoint, + bucketsToFetch: buckets, + dataBuckets, + raw_data, + binary_data, + onRowsSent: markOperationsSent, + abort_connection: signal, + abort_batch: abortCheckpointSignal, + user_id: syncParams.user_id, + // Passing undefined will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial + // sync complete message. + forPriority: !isLast ? priority : undefined + }); + } + + if (!abortCheckpointSignal.aborted) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + } while (!signal.aborted); + } finally { + await newCheckpoints.return?.(); } } interface BucketDataRequest { storage: storage.SyncRulesBucketStorage; checkpoint: string; - bucketsToFetch: string[]; + bucketsToFetch: BucketDescription[]; /** Bucket data position, modified by the request. */ - dataBuckets: Map; + dataBuckets: Map; raw_data: boolean | undefined; binary_data: boolean | undefined; - tracker: RequestTracker; - signal: AbortSignal; + /** Signals that the connection was aborted and that streaming should stop ASAP. */ + abort_connection: AbortSignal; + /** + * Signals that higher-priority batches are available. The current batch can stop at a sensible point. + * This signal also fires when abort_connection fires. + */ + abort_batch: AbortSignal; user_id?: string; + forPriority?: BucketPriority; + onRowsSent: (amount: number) => void; } async function* bucketDataInBatches(request: BucketDataRequest) { let isDone = false; - while (!request.signal.aborted && !isDone) { + while (!request.abort_batch.aborted && !isDone) { // The code below is functionally the same as this for-await loop below. // However, the for-await loop appears to have a memory leak, so we avoid it. // for await (const { done, data } of bucketDataBatch(storage, checkpoint, dataBuckets, raw_data, signal)) { @@ -273,7 +358,17 @@ interface BucketDataBatchResult { * Extracted as a separate internal function just to avoid memory leaks. */ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator { - const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, tracker, signal } = request; + const { + storage, + checkpoint, + bucketsToFetch, + dataBuckets, + raw_data, + binary_data, + abort_connection, + abort_batch, + onRowsSent + } = request; const checkpointOp = BigInt(checkpoint); let checkpointInvalidated = false; @@ -281,7 +376,12 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator [bucket, dataBuckets.get(bucket)!])); + const filteredBuckets = new Map( + bucketsToFetch.map((bucket) => [bucket.bucket, dataBuckets.get(bucket.bucket)?.start_op_id!]) + ); const data = storage.getBucketDataBatch(checkpoint, filteredBuckets); let has_more = false; for await (let { batch: r, targetOp } of data) { - if (signal.aborted) { + // Abort in current batch if the connection is closed + if (abort_connection.aborted) { return; } if (r.has_more) { @@ -339,9 +442,15 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator { if (typeof b != 'string') { return b.bucket; diff --git a/packages/service-core/src/sync/util.ts b/packages/service-core/src/sync/util.ts index 2437d38f5..f55b0b15f 100644 --- a/packages/service-core/src/sync/util.ts +++ b/packages/service-core/src/sync/util.ts @@ -2,6 +2,7 @@ import * as timers from 'timers/promises'; import * as util from '../util/util-index.js'; import { RequestTracker } from './RequestTracker.js'; +import { SemaphoreInterface } from 'async-mutex'; export type TokenStreamOptions = { /** @@ -99,3 +100,33 @@ export async function* transformToBytesTracked( yield encoded; } } + +export function acquireSemaphoreAbortable( + semaphone: SemaphoreInterface, + abort: AbortSignal +): Promise<[number, SemaphoreInterface.Releaser] | 'aborted'> { + return new Promise((resolve, reject) => { + let aborted = false; + let hasSemaphore = false; + + const listener = () => { + if (!hasSemaphore) { + aborted = true; + abort.removeEventListener('abort', listener); + resolve('aborted'); + } + }; + abort.addEventListener('abort', listener); + + semaphone.acquire().then((acquired) => { + hasSemaphore = true; + if (aborted) { + // Release semaphore, already aborted + acquired[1](); + } else { + abort.removeEventListener('abort', listener); + resolve(acquired); + } + }, reject); + }); +} diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 5c7e47064..5faea4cff 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -1,5 +1,5 @@ import * as t from 'ts-codec'; -import { SqliteJsonValue } from '@powersync/service-sync-rules'; +import { BucketDescription, BucketPriority, SqliteJsonValue } from '@powersync/service-sync-rules'; export const BucketRequest = t.object({ name: t.string, @@ -59,7 +59,7 @@ export interface StreamingSyncCheckpointDiff { checkpoint_diff: { last_op_id: OpId; write_checkpoint?: OpId; - updated_buckets: BucketChecksum[]; + updated_buckets: BucketChecksumWithDescription[]; removed_buckets: string[]; }; } @@ -74,13 +74,23 @@ export interface StreamingSyncCheckpointComplete { }; } -export interface StreamingSyncKeepalive {} +export interface StreamingSyncCheckpointPartiallyComplete { + partial_checkpoint_complete: { + last_op_id: OpId; + priority: BucketPriority; + }; +} + +export interface StreamingSyncKeepalive { + token_expires_in: number; +} export type StreamingSyncLine = | StreamingSyncData | StreamingSyncCheckpoint | StreamingSyncCheckpointDiff | StreamingSyncCheckpointComplete + | StreamingSyncCheckpointPartiallyComplete | StreamingSyncKeepalive; /** @@ -91,7 +101,7 @@ export type OpId = string; export interface Checkpoint { last_op_id: OpId; write_checkpoint?: OpId; - buckets: BucketChecksum[]; + buckets: BucketChecksumWithDescription[]; } export interface BucketState { @@ -142,3 +152,5 @@ export interface BucketChecksum { */ count: number; } + +export interface BucketChecksumWithDescription extends BucketChecksum, BucketDescription {} diff --git a/packages/service-core/test/src/sync/util.test.ts b/packages/service-core/test/src/sync/util.test.ts new file mode 100644 index 000000000..75803a994 --- /dev/null +++ b/packages/service-core/test/src/sync/util.test.ts @@ -0,0 +1,34 @@ +import { acquireSemaphoreAbortable } from '@/index.js'; +import { Semaphore, SemaphoreInterface } from 'async-mutex'; +import { describe, expect, test, vi } from 'vitest'; + +describe('acquireSemaphoreAbortable', () => { + test('can acquire', async () => { + const semaphore = new Semaphore(1); + const controller = new AbortController(); + + expect(await acquireSemaphoreAbortable(semaphore, controller.signal)).not.toBe('aborted'); + }); + + test('can cancel', async () => { + const semaphore = new Semaphore(1); + const controller = new AbortController(); + + const resolve = vi.fn(); + const reject = vi.fn(); + + // First invocation: Lock the semaphore + const result = await acquireSemaphoreAbortable(semaphore, controller.signal); + expect(result).not.toBe('aborted'); + const [count, release] = result as [number, SemaphoreInterface.Releaser]; + + acquireSemaphoreAbortable(semaphore, controller.signal).then(resolve, reject); + controller.abort(); + await Promise.resolve(); + expect(reject).not.toHaveBeenCalled(); + expect(resolve).toHaveBeenCalledWith('aborted'); + + // Releasing the semaphore should not invoke resolve again + release(); + }); +}); diff --git a/packages/sync-rules/src/BucketDescription.ts b/packages/sync-rules/src/BucketDescription.ts new file mode 100644 index 000000000..668ac9453 --- /dev/null +++ b/packages/sync-rules/src/BucketDescription.ts @@ -0,0 +1,32 @@ +/** + * The priority in which to synchronize buckets. + * + * Lower numbers represent higher priorities. + * Generally, the sync service _may_ synchronize buckets with higher priorities first. + * Priorities also refine the consistency notion by the sync service in the way that clients + * may choose to publish data when all buckets of a certain priority have been synchronized. + * So, when clients are synchronizing buckets with different priorities, they will only get + * consistent views within each priority. + * + * Additionally, data from buckets with priority 0 may be made visible when clients still + * have data in their upload queue. + */ +export type BucketPriority = 0 | 1 | 2 | 3; + +export const defaultBucketPriority: BucketPriority = 3; + +export const isValidPriority = (i: number): i is BucketPriority => { + return Number.isInteger(i) && i >= 0 && i <= 3; +}; + +export interface BucketDescription { + /** + * The id of the bucket, which is derived from the name of the bucket's definition + * in the sync rules as well as the values returned by the parameter queries. + */ + bucket: string; + /** + * The priority used to synchronize this bucket, derived from its definition. + */ + priority: BucketPriority; +} diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index 0af7b1e33..8071a0012 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -1,3 +1,4 @@ +import { BucketDescription } from './BucketDescription.js'; import { IdSequence } from './IdSequence.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { SqlDataQuery } from './SqlDataQuery.js'; @@ -13,7 +14,6 @@ import { QueryBucketIdOptions, QueryParseOptions, RequestParameters, - SourceSchema, SqliteRow } from './types.js'; @@ -108,18 +108,18 @@ export class SqlBucketDescriptor { return results; } - getStaticBucketIds(parameters: RequestParameters) { - let results: string[] = []; + getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] { + let results: BucketDescription[] = []; for (let query of this.global_parameter_queries) { - results.push(...query.getStaticBucketIds(parameters)); + results.push(...query.getStaticBucketDescriptions(parameters)); } return results; } - async queryBucketIds(options: QueryBucketIdOptions): Promise { - let result: string[] = this.getStaticBucketIds(options.parameters); + async queryBucketDescriptions(options: QueryBucketIdOptions): Promise { + let result = this.getStaticBucketDescriptions(options.parameters); for (let query of this.parameter_queries) { - result.push(...(await query.queryBucketIds(options))); + result.push(...(await query.queryBucketDescriptions(options))); } return result; } diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index 92c6cf527..32e681437 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -17,14 +17,13 @@ import { QuerySchema, RequestParameters, RowValueClause, - SourceSchema, SqliteJsonRow, SqliteJsonValue, SqliteRow } from './types.js'; import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement } from './utils.js'; -import { SyncRulesOptions } from './SqlSyncRules.js'; import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js'; +import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js'; /** * Represents a parameter query, such as: @@ -107,7 +106,9 @@ export class SqlParameterQuery { const where = q.where; const filter = tools.compileWhereClause(where); - const bucket_parameters = (q.columns ?? []).map((column) => tools.getOutputName(column)); + const bucket_parameters = (q.columns ?? []) + .map((column) => tools.getOutputName(column)) + .filter((c) => !tools.isBucketPriorityParameter(c)); rows.sourceTable = sourceTable; rows.table = alias; rows.sql = sql; @@ -115,6 +116,7 @@ export class SqlParameterQuery { rows.descriptor_name = descriptor_name; rows.bucket_parameters = bucket_parameters; rows.input_parameters = filter.inputParameters!; + rows.priority = options.priority; const expandedParams = rows.input_parameters!.filter((param) => param.expands); if (expandedParams.length > 1) { rows.errors.push(new SqlRuleError('Cannot have multiple array input parameters', sql)); @@ -129,7 +131,14 @@ export class SqlParameterQuery { if (column.alias != null) { tools.checkSpecificNameCase(column.alias); } - if (tools.isTableRef(column.expr)) { + if (tools.isBucketPriorityParameter(name)) { + if (rows.priority !== undefined) { + rows.errors.push(new SqlRuleError('Cannot set priority multiple times.', sql)); + continue; + } + + rows.priority = tools.extractBucketPriority(column.expr); + } else if (tools.isTableRef(column.expr)) { rows.lookup_columns.push(column); const extractor = tools.compileRowValueExtractor(column.expr); if (isClauseError(extractor)) { @@ -177,6 +186,7 @@ export class SqlParameterQuery { * Example: SELECT *token_parameters.user_id* */ parameter_extractors: Record = {}; + priority?: BucketPriority; filter?: ParameterMatchClause; descriptor_name?: string; @@ -247,7 +257,7 @@ export class SqlParameterQuery { /** * Given partial parameter rows, turn into bucket ids. */ - resolveBucketIds(bucketParameters: SqliteJsonRow[], parameters: RequestParameters): string[] { + resolveBucketDescriptions(bucketParameters: SqliteJsonRow[], parameters: RequestParameters): BucketDescription[] { // Filters have already been applied and gotten us the set of bucketParameters - don't attempt to filter again. // We _do_ need to evaluate the output columns here, using a combination of precomputed bucketParameters, // and values from token parameters. @@ -270,9 +280,12 @@ export class SqlParameterQuery { } } - return getBucketId(this.descriptor_name!, this.bucket_parameters!, result); + return { + bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result), + priority: this.priority ?? defaultBucketPriority + }; }) - .filter((lookup) => lookup != null) as string[]; + .filter((lookup) => lookup != null); } /** @@ -351,21 +364,21 @@ export class SqlParameterQuery { } /** - * Given sync parameters (token and user parameters), return bucket ids. + * Given sync parameters (token and user parameters), return bucket ids and priorities. * * This is done in three steps: * 1. Given the parameters, get lookups we need to perform on the database. * 2. Perform the lookups, returning parameter sets (partial rows). * 3. Given the parameter sets, resolve bucket ids. */ - async queryBucketIds(options: QueryBucketIdOptions): Promise { + async queryBucketDescriptions(options: QueryBucketIdOptions): Promise { let lookups = this.getLookups(options.parameters); if (lookups.length == 0) { return []; } const parameters = await options.getParameterSets(lookups); - return this.resolveBucketIds(parameters, options.parameters); + return this.resolveBucketDescriptions(parameters, options.parameters); } get hasAuthenticatedBucketParameters(): boolean { diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index e6462b62d..99f08b083 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -23,6 +23,7 @@ import { SqliteRow, SyncRules } from './types.js'; +import { BucketDescription, BucketPriority, isValidPriority } from './BucketDescription.js'; const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); @@ -117,9 +118,22 @@ export class SqlSyncRules implements SyncRules { const accept_potentially_dangerous_queries = value.get('accept_potentially_dangerous_queries', true)?.value == true; + let parseOptionPriority: BucketPriority | undefined = undefined; + if (value.has('priority')) { + const priorityValue = value.get('priority', true)!; + if (typeof priorityValue.value != 'number' || !isValidPriority(priorityValue.value)) { + rules.errors.push( + this.tokenError(priorityValue, 'Invalid priority, expected a number between 0 and 3 (inclusive).') + ); + } else { + parseOptionPriority = priorityValue.value; + } + } + const queryOptions: QueryParseOptions = { ...options, - accept_potentially_dangerous_queries + accept_potentially_dangerous_queries, + priority: parseOptionPriority }; const parameters = value.get('parameters', true) as unknown; const dataQueries = value.get('data', true) as unknown; @@ -306,10 +320,10 @@ export class SqlSyncRules implements SyncRules { /** * @deprecated For testing only. */ - getStaticBucketIds(parameters: RequestParameters) { - let results: string[] = []; + getStaticBucketDescriptions(parameters: RequestParameters) { + let results: BucketDescription[] = []; for (let bucket of this.bucket_descriptors) { - results.push(...bucket.getStaticBucketIds(parameters)); + results.push(...bucket.getStaticBucketDescriptions(parameters)); } return results; } @@ -317,10 +331,10 @@ export class SqlSyncRules implements SyncRules { /** * Note: This can error hard. */ - async queryBucketIds(options: QueryBucketIdOptions): Promise { - let results: string[] = []; + async queryBucketDescriptions(options: QueryBucketIdOptions): Promise { + let results: BucketDescription[] = []; for (let bucket of this.bucket_descriptors) { - results.push(...(await bucket.queryBucketIds(options))); + results.push(...(await bucket.queryBucketDescriptions(options))); } return results; } diff --git a/packages/sync-rules/src/StaticSqlParameterQuery.ts b/packages/sync-rules/src/StaticSqlParameterQuery.ts index bcc21c7b4..0c39cbd43 100644 --- a/packages/sync-rules/src/StaticSqlParameterQuery.ts +++ b/packages/sync-rules/src/StaticSqlParameterQuery.ts @@ -4,6 +4,7 @@ import { SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js'; import { ParameterValueClause, QueryParseOptions, RequestParameters, SqliteJsonValue } from './types.js'; import { getBucketId, isJsonValue } from './utils.js'; +import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js'; /** * Represents a bucket parameter query without any tables, e.g.: @@ -27,13 +28,16 @@ export class StaticSqlParameterQuery { const filter = tools.compileParameterValueExtractor(where); const columns = q.columns ?? []; - const bucket_parameters = columns.map((column) => tools.getOutputName(column)); + const bucket_parameters = (q.columns ?? []) + .map((column) => tools.getOutputName(column)) + .filter((c) => !tools.isBucketPriorityParameter(c)); query.sql = sql; query.descriptor_name = descriptor_name; query.bucket_parameters = bucket_parameters; query.columns = columns; query.tools = tools; + query.priority = options?.priority; if (!isClauseError(filter)) { query.filter = filter; } @@ -43,6 +47,16 @@ export class StaticSqlParameterQuery { tools.checkSpecificNameCase(column.alias); } const name = tools.getSpecificOutputName(column); + if (tools.isBucketPriorityParameter(name)) { + if (query.priority !== undefined) { + query.errors.push(new SqlRuleError('Cannot set priority multiple times.', sql)); + continue; + } + + query.priority = tools.extractBucketPriority(column.expr); + continue; + } + const extractor = tools.compileParameterValueExtractor(column.expr); if (isClauseError(extractor)) { // Error logged already @@ -67,6 +81,7 @@ export class StaticSqlParameterQuery { sql?: string; columns?: SelectedColumn[]; parameter_extractors: Record = {}; + priority?: BucketPriority; descriptor_name?: string; /** _Output_ bucket parameters */ bucket_parameters?: string[]; @@ -77,7 +92,7 @@ export class StaticSqlParameterQuery { errors: SqlRuleError[] = []; - getStaticBucketIds(parameters: RequestParameters): string[] { + getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] { if (this.filter == null) { // Error in filter clause return []; @@ -99,7 +114,12 @@ export class StaticSqlParameterQuery { } } - return [getBucketId(this.descriptor_name!, this.bucket_parameters!, result)]; + return [ + { + bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result), + priority: this.priority ?? defaultBucketPriority + } + ]; } get hasAuthenticatedBucketParameters(): boolean { diff --git a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts index 5537fe2bb..a11b415bd 100644 --- a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts +++ b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts @@ -12,6 +12,7 @@ import { SqliteRow } from './types.js'; import { getBucketId, isJsonValue } from './utils.js'; +import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js'; /** * Represents a parameter query using a table-valued function. @@ -74,6 +75,11 @@ export class TableValuedFunctionSqlParameterQuery { tools.checkSpecificNameCase(column.alias); } const name = tools.getSpecificOutputName(column); + if (tools.isBucketPriorityParameter(name)) { + query.priority = tools.extractBucketPriority(column.expr); + continue; + } + const extractor = tools.compileParameterValueExtractor(column.expr); if (isClauseError(extractor)) { // Error logged already @@ -98,6 +104,7 @@ export class TableValuedFunctionSqlParameterQuery { sql?: string; columns?: SelectedColumn[]; parameter_extractors: Record = {}; + priority?: BucketPriority; descriptor_name?: string; /** _Output_ bucket parameters */ bucket_parameters?: string[]; @@ -111,7 +118,7 @@ export class TableValuedFunctionSqlParameterQuery { errors: SqlRuleError[] = []; - getStaticBucketIds(parameters: RequestParameters): string[] { + getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] { if (this.filter == null || this.callClause == null) { // Error in filter clause return []; @@ -119,14 +126,17 @@ export class TableValuedFunctionSqlParameterQuery { const valueString = this.callClause.lookupParameterValue(parameters); const rows = this.function!.call([valueString]); - let total: string[] = []; + let total: BucketDescription[] = []; for (let row of rows) { - total.push(...this.getIndividualBucketIds(row, parameters)); + const description = this.getIndividualBucketDescription(row, parameters); + if (description !== null) { + total.push(description); + } } return total; } - private getIndividualBucketIds(row: SqliteRow, parameters: RequestParameters): string[] { + private getIndividualBucketDescription(row: SqliteRow, parameters: RequestParameters): BucketDescription | null { const mergedParams: ParameterValueSet = { raw_token_payload: parameters.raw_token_payload, raw_user_parameters: parameters.raw_user_parameters, @@ -141,7 +151,7 @@ export class TableValuedFunctionSqlParameterQuery { }; const filterValue = this.filter!.lookupParameterValue(mergedParams); if (sqliteBool(filterValue) === 0n) { - return []; + return null; } let result: Record = {}; @@ -154,7 +164,10 @@ export class TableValuedFunctionSqlParameterQuery { } } - return [getBucketId(this.descriptor_name!, this.bucket_parameters!, result)]; + return { + bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result), + priority: this.priority ?? defaultBucketPriority + }; } get hasAuthenticatedBucketParameters(): boolean { diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index 877d52c37..bb8248613 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -1,3 +1,4 @@ +export * from './BucketDescription.js'; export * from './DartSchemaGenerator.js'; export * from './errors.js'; export * from './events/SqlEventDescriptor.js'; diff --git a/packages/sync-rules/src/json_schema.ts b/packages/sync-rules/src/json_schema.ts index b2f9367a9..5d21169d4 100644 --- a/packages/sync-rules/src/json_schema.ts +++ b/packages/sync-rules/src/json_schema.ts @@ -20,6 +20,10 @@ export const syncRulesSchema: ajvModule.Schema = { description: 'If true, disables warnings on potentially dangerous queries', type: 'boolean' }, + priority: { + description: 'Priority for the bucket (lower values indicate higher priority).', + type: 'integer' + }, parameters: { description: 'Parameter query(ies)', anyOf: [ diff --git a/packages/sync-rules/src/sql_filters.ts b/packages/sync-rules/src/sql_filters.ts index 14b41c824..04bbe7edc 100644 --- a/packages/sync-rules/src/sql_filters.ts +++ b/packages/sync-rules/src/sql_filters.ts @@ -46,6 +46,7 @@ import { TrueIfParametersMatch } from './types.js'; import { isJsonValue } from './utils.js'; +import { BucketPriority, isValidPriority } from './BucketDescription.js'; export const MATCH_CONST_FALSE: TrueIfParametersMatch = []; export const MATCH_CONST_TRUE: TrueIfParametersMatch = [{}]; @@ -738,6 +739,25 @@ export class SqlTools { argsType }; } + + isBucketPriorityParameter(name: string): boolean { + return name == '_priority'; + } + + extractBucketPriority(expr: Expr): BucketPriority | undefined { + if (expr.type !== 'integer') { + this.error('Priority must be a simple integer literal', expr); + return; + } + + const value = expr.value; + if (!isValidPriority(value)) { + this.error('Invalid value for priority, must be between 0 and 3 (inclusive).', expr); + return; + } + + return value as BucketPriority; + } } function isStatic(expr: Expr) { diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index 4621ae916..6ae8ca013 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -4,6 +4,7 @@ import { SourceTableInterface } from './SourceTableInterface.js'; import { SyncRulesOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; import { toSyncRulesParameters } from './utils.js'; +import { BucketPriority } from './BucketDescription.js'; export interface SyncRules { evaluateRow(options: EvaluateRowOptions): EvaluationResult[]; @@ -13,6 +14,7 @@ export interface SyncRules { export interface QueryParseOptions extends SyncRulesOptions { accept_potentially_dangerous_queries?: boolean; + priority?: BucketPriority; } export interface EvaluatedParameters { diff --git a/packages/sync-rules/test/src/parameter_queries.test.ts b/packages/sync-rules/test/src/parameter_queries.test.ts index 59d0f1147..e3f88cd43 100644 --- a/packages/sync-rules/test/src/parameter_queries.test.ts +++ b/packages/sync-rules/test/src/parameter_queries.test.ts @@ -1,6 +1,7 @@ import { describe, expect, test } from 'vitest'; import { SqlParameterQuery } from '../../src/index.js'; import { BASIC_SCHEMA, normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; +import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js'; describe('parameter queries', () => { test('token_parameters IN query', function () { @@ -84,13 +85,13 @@ describe('parameter queries', () => { ]); // We _do_ need to care about the bucket string representation. - expect(query.resolveBucketIds([{ int1: 314, float1: 3.14, float2: 314 }], normalizeTokenParameters({}))).toEqual([ - 'mybucket[314,3.14,314]' - ]); + expect( + query.resolveBucketDescriptions([{ int1: 314, float1: 3.14, float2: 314 }], normalizeTokenParameters({})) + ).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]); - expect(query.resolveBucketIds([{ int1: 314n, float1: 3.14, float2: 314 }], normalizeTokenParameters({}))).toEqual([ - 'mybucket[314,3.14,314]' - ]); + expect( + query.resolveBucketDescriptions([{ int1: 314n, float1: 3.14, float2: 314 }], normalizeTokenParameters({})) + ).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]); }); test('plain token_parameter (baseline)', () => { @@ -351,8 +352,11 @@ describe('parameter queries', () => { ]); expect( - query.resolveBucketIds([{ user_id: 'user1' }], normalizeTokenParameters({ user_id: 'user1', is_admin: true })) - ).toEqual(['mybucket["user1",1]']); + query.resolveBucketDescriptions( + [{ user_id: 'user1' }], + normalizeTokenParameters({ user_id: 'user1', is_admin: true }) + ) + ).toEqual([{ bucket: 'mybucket["user1",1]', priority: 3 }]); }); test('case-sensitive parameter queries (1)', () => { @@ -732,4 +736,48 @@ describe('parameter queries', () => { // Can be safe, but better to opt in testDangerousQuery("SELECT id as category_id FROM categories WHERE request.parameters() ->> 'include_categories'"); }); + + describe('bucket priorities', () => { + test('valid definition', function () { + const sql = 'SELECT id as group_id, 1 AS _priority FROM groups WHERE token_parameters.user_id IN groups.user_ids'; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; + expect(query.errors).toEqual([]); + expect(Object.entries(query.lookup_extractors)).toHaveLength(1); + expect(Object.entries(query.parameter_extractors)).toHaveLength(0); + expect(query.bucket_parameters).toEqual(['group_id']); + expect(query.priority).toBe(1); + }); + + test('valid definition, static query', function () { + const sql = 'SELECT token_parameters.user_id, 0 AS _priority'; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; + expect(query.errors).toEqual([]); + expect(Object.entries(query.parameter_extractors)).toHaveLength(1); + expect(query.bucket_parameters).toEqual(['user_id']); + expect(query.priority).toBe(0); + }); + + test('invalid dynamic query', function () { + const sql = 'SELECT LENGTH(assets.name) AS _priority FROM assets'; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as SqlParameterQuery; + + expect(query.errors).toMatchObject([{ message: 'Priority must be a simple integer literal' }]); + }); + + test('invalid literal type', function () { + const sql = "SELECT 'very fast please' AS _priority"; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; + + expect(query.errors).toMatchObject([{ message: 'Priority must be a simple integer literal' }]); + }); + + test('invalid literal value', function () { + const sql = 'SELECT 15 AS _priority'; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; + + expect(query.errors).toMatchObject([ + { message: 'Invalid value for priority, must be between 0 and 3 (inclusive).' } + ]); + }); + }); }); diff --git a/packages/sync-rules/test/src/static_parameter_queries.test.ts b/packages/sync-rules/test/src/static_parameter_queries.test.ts index a82dd9106..b285fad39 100644 --- a/packages/sync-rules/test/src/static_parameter_queries.test.ts +++ b/packages/sync-rules/test/src/static_parameter_queries.test.ts @@ -9,7 +9,9 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucket_parameters!).toEqual(['user_id']); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["user1"]']); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["user1"]', priority: 3 } + ]); }); test('global query', function () { @@ -17,24 +19,30 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucket_parameters!).toEqual([]); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket[]']); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); }); test('query with filter', function () { const sql = 'SELECT token_parameters.user_id WHERE token_parameters.is_admin'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([ - 'mybucket["user1"]' + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([ + { bucket: 'mybucket["user1"]', priority: 3 } ]); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1', is_admin: false }))).toEqual([]); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1', is_admin: false }))).toEqual( + [] + ); }); test('function in select clause', function () { const sql = 'SELECT upper(token_parameters.user_id) as upper_id'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["USER1"]']); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["USER1"]', priority: 3 } + ]); expect(query.bucket_parameters!).toEqual(['upper_id']); }); @@ -42,16 +50,20 @@ describe('static parameter queries', () => { const sql = "SELECT WHERE upper(token_parameters.role) = 'ADMIN'"; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(normalizeTokenParameters({ role: 'admin' }))).toEqual(['mybucket[]']); - expect(query.getStaticBucketIds(normalizeTokenParameters({ role: 'user' }))).toEqual([]); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'admin' }))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'user' }))).toEqual([]); }); test('comparison in filter clause', function () { const sql = 'SELECT WHERE token_parameters.id1 = token_parameters.id2'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(normalizeTokenParameters({ id1: 't1', id2: 't1' }))).toEqual(['mybucket[]']); - expect(query.getStaticBucketIds(normalizeTokenParameters({ id1: 't1', id2: 't2' }))).toEqual([]); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't1' }))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't2' }))).toEqual([]); }); test('request.parameters()', function () { @@ -62,7 +74,9 @@ describe('static parameter queries', () => { }) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(normalizeTokenParameters({}, { org_id: 'test' }))).toEqual(['mybucket["test"]']); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({}, { org_id: 'test' }))).toEqual([ + { bucket: 'mybucket["test"]', priority: 3 } + ]); }); test('request.jwt()', function () { @@ -71,7 +85,9 @@ describe('static parameter queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['user_id']); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["user1"]']); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["user1"]', priority: 3 } + ]); }); test('request.user_id()', function () { @@ -80,35 +96,43 @@ describe('static parameter queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['user_id']); - expect(query.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["user1"]']); + expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["user1"]', priority: 3 } + ]); }); test('static value', function () { const sql = `SELECT WHERE 1`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual(['mybucket[]']); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); }); test('static expression (1)', function () { const sql = `SELECT WHERE 1 = 1`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual(['mybucket[]']); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); }); test('static expression (2)', function () { const sql = `SELECT WHERE 1 != 1`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); }); test('static IN expression', function () { const sql = `SELECT WHERE 'admin' IN '["admin", "superuser"]'`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual(['mybucket[]']); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); }); test('IN for permissions in request.jwt() (1)', function () { @@ -117,11 +141,11 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect( - query.getStaticBucketIds(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {})) - ).toEqual(['mybucket[1]']); + query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {})) + ).toEqual([{ bucket: 'mybucket[1]', priority: 3 }]); expect( - query.getStaticBucketIds(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {})) - ).toEqual(['mybucket[0]']); + query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {})) + ).toEqual([{ bucket: 'mybucket[0]', priority: 3 }]); }); test('IN for permissions in request.jwt() (2)', function () { @@ -130,10 +154,10 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect( - query.getStaticBucketIds(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {})) - ).toEqual(['mybucket[]']); + query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {})) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); expect( - query.getStaticBucketIds(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {})) + query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {})) ).toEqual([]); }); @@ -141,8 +165,10 @@ describe('static parameter queries', () => { const sql = `SELECT WHERE request.jwt() ->> 'role' IN '["admin", "superuser"]'`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '', role: 'superuser' }, {}))).toEqual(['mybucket[]']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '', role: 'superadmin' }, {}))).toEqual([]); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '', role: 'superuser' }, {}))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '', role: 'superadmin' }, {}))).toEqual([]); }); test('case-sensitive queries (1)', () => { diff --git a/packages/sync-rules/test/src/sync_rules.test.ts b/packages/sync-rules/test/src/sync_rules.test.ts index 7741dff79..90d75c697 100644 --- a/packages/sync-rules/test/src/sync_rules.test.ts +++ b/packages/sync-rules/test/src/sync_rules.test.ts @@ -37,7 +37,9 @@ bucket_definitions: bucket: 'mybucket[]' } ]); - expect(rules.getStaticBucketIds(normalizeTokenParameters({}))).toEqual(['mybucket[]']); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({}))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); }); test('parse global sync rules with filter', () => { @@ -58,9 +60,11 @@ bucket_definitions: expect(param_query.filter!.lookupParameterValue(normalizeTokenParameters({ is_admin: 1n }))).toEqual(1n); expect(param_query.filter!.lookupParameterValue(normalizeTokenParameters({ is_admin: 0n }))).toEqual(0n); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ is_admin: true }))).toEqual(['mybucket[]']); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ is_admin: false }))).toEqual([]); - expect(rules.getStaticBucketIds(normalizeTokenParameters({}))).toEqual([]); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ is_admin: true }))).toEqual([ + { bucket: 'mybucket[]', priority: 3 } + ]); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ is_admin: false }))).toEqual([]); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({}))).toEqual([]); }); test('parse global sync rules with table filter', () => { @@ -101,9 +105,9 @@ bucket_definitions: expect(bucket.bucket_parameters).toEqual(['user_id', 'device_id']); const param_query = bucket.global_parameter_queries[0]; expect(param_query.bucket_parameters).toEqual(['user_id', 'device_id']); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }, { device_id: 'device1' }))).toEqual([ - 'mybucket["user1","device1"]' - ]); + expect( + rules.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }, { device_id: 'device1' })) + ).toEqual([{ bucket: 'mybucket["user1","device1"]', priority: 3 }]); const data_query = bucket.data_queries[0]; expect(data_query.bucket_parameters).toEqual(['user_id', 'device_id']); @@ -147,7 +151,9 @@ bucket_definitions: expect(bucket.bucket_parameters).toEqual(['user_id']); const param_query = bucket.global_parameter_queries[0]; expect(param_query.bucket_parameters).toEqual(['user_id']); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["user1"]']); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["user1"]', priority: 3 } + ]); const data_query = bucket.data_queries[0]; expect(data_query.bucket_parameters).toEqual(['user_id']); @@ -289,7 +295,9 @@ bucket_definitions: ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id']); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["USER1"]']); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["USER1"]', priority: 3 } + ]); expect( rules.evaluateRow({ @@ -324,7 +332,9 @@ bucket_definitions: ); const bucket = rules.bucket_descriptors[0]; expect(bucket.bucket_parameters).toEqual(['user_id']); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'user1' }))).toEqual(['mybucket["USER1"]']); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ + { bucket: 'mybucket["USER1"]', priority: 3 } + ]); expect( rules.evaluateRow({ @@ -494,7 +504,9 @@ bucket_definitions: } ]); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ is_admin: true }))).toEqual(['mybucket[1]']); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ is_admin: true }))).toEqual([ + { bucket: 'mybucket[1]', priority: 3 } + ]); }); test('some math', () => { @@ -535,9 +547,9 @@ bucket_definitions: `, PARSE_OPTIONS ); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ int1: 314, float1: 3.14, float2: 314 }))).toEqual([ - 'mybucket[314,3.14,314]' - ]); + expect( + rules.getStaticBucketDescriptions(normalizeTokenParameters({ int1: 314, float1: 3.14, float2: 314 })) + ).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]); expect( rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1', int1: 314n, float1: 3.14, float2: 314 } }) @@ -565,7 +577,9 @@ bucket_definitions: PARSE_OPTIONS ); expect(rules.errors).toEqual([]); - expect(rules.getStaticBucketIds(normalizeTokenParameters({ user_id: 'test' }))).toEqual(['mybucket["TEST"]']); + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'test' }))).toEqual([ + { bucket: 'mybucket["TEST"]', priority: 3 } + ]); }); test('custom table and id', () => { @@ -802,4 +816,81 @@ bucket_definitions: expect(rules.errors).toEqual([]); }); + + test('priorities on queries', () => { + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + highprio: + parameters: SELECT 0 as _priority; + data: + - SELECT * FROM assets WHERE count <= 10; + defaultprio: + data: + - SELECT * FROM assets WHERE count > 10; + `, + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } + ); + + expect(rules.errors).toEqual([]); + + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({}))).toEqual([ + { bucket: 'highprio[]', priority: 0 }, + { bucket: 'defaultprio[]', priority: 3 } + ]); + }); + + test('priorities on bucket', () => { + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + highprio: + priority: 0 + data: + - SELECT * FROM assets WHERE count <= 10; + defaultprio: + data: + - SELECT * FROM assets WHERE count > 10; + `, + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } + ); + + expect(rules.errors).toEqual([]); + + expect(rules.getStaticBucketDescriptions(normalizeTokenParameters({}))).toEqual([ + { bucket: 'highprio[]', priority: 0 }, + { bucket: 'defaultprio[]', priority: 3 } + ]); + }); + + test(`invalid priority on bucket`, () => { + expect(() => + SqlSyncRules.fromYaml( + ` +bucket_definitions: + highprio: + priority: instant + data: + - SELECT * FROM assets WHERE count <= 10; + `, + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } + ) + ).toThrowError(/Invalid priority/); + }); + + test(`can't duplicate priority`, () => { + expect(() => + SqlSyncRules.fromYaml( + ` +bucket_definitions: + highprio: + priority: 1 + parameters: SELECT 0 as _priority; + data: + - SELECT * FROM assets WHERE count <= 10; + `, + { schema: BASIC_SCHEMA, ...PARSE_OPTIONS } + ) + ).toThrowError(/Cannot set priority multiple times/); + }); }); diff --git a/packages/sync-rules/test/src/table_valued_function_queries.test.ts b/packages/sync-rules/test/src/table_valued_function_queries.test.ts index 324f51f20..fcd6eee06 100644 --- a/packages/sync-rules/test/src/table_valued_function_queries.test.ts +++ b/packages/sync-rules/test/src/table_valued_function_queries.test.ts @@ -13,10 +13,10 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['v']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ - 'mybucket[1]', - 'mybucket[2]', - 'mybucket[3]' + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + { bucket: 'mybucket[1]', priority: 3 }, + { bucket: 'mybucket[2]', priority: 3 }, + { bucket: 'mybucket[3]', priority: 3 } ]); }); @@ -26,10 +26,10 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['v']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual([ - 'mybucket[1]', - 'mybucket[2]', - 'mybucket[3]' + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + { bucket: 'mybucket[1]', priority: 3 }, + { bucket: 'mybucket[2]', priority: 3 }, + { bucket: 'mybucket[3]', priority: 3 } ]); }); @@ -39,7 +39,7 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['v']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); }); test('json_each(array param not present)', function () { @@ -51,7 +51,7 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['v']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); }); test('json_each(array param not present, ifnull)', function () { @@ -63,7 +63,7 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['v']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); }); test('json_each on json_keys', function () { @@ -72,10 +72,10 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['value']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, {}))).toEqual([ - 'mybucket["a"]', - 'mybucket["b"]', - 'mybucket["c"]' + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + { bucket: 'mybucket["a"]', priority: 3 }, + { bucket: 'mybucket["b"]', priority: 3 }, + { bucket: 'mybucket["c"]', priority: 3 } ]); }); @@ -88,10 +88,10 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['value']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ - 'mybucket[1]', - 'mybucket[2]', - 'mybucket[3]' + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + { bucket: 'mybucket[1]', priority: 3 }, + { bucket: 'mybucket[2]', priority: 3 }, + { bucket: 'mybucket[3]', priority: 3 } ]); }); @@ -104,10 +104,10 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['value']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ - 'mybucket[1]', - 'mybucket[2]', - 'mybucket[3]' + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + { bucket: 'mybucket[1]', priority: 3 }, + { bucket: 'mybucket[2]', priority: 3 }, + { bucket: 'mybucket[3]', priority: 3 } ]); }); @@ -120,9 +120,9 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucket_parameters).toEqual(['v']); - expect(query.getStaticBucketIds(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ - 'mybucket[2]', - 'mybucket[3]' + expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + { bucket: 'mybucket[2]', priority: 3 }, + { bucket: 'mybucket[3]', priority: 3 } ]); }); @@ -137,7 +137,7 @@ describe('table-valued function queries', () => { expect(query.bucket_parameters).toEqual(['project_id']); expect( - query.getStaticBucketIds( + query.getStaticBucketDescriptions( new RequestParameters( { sub: '', @@ -149,7 +149,7 @@ describe('table-valued function queries', () => { {} ) ) - ).toEqual(['mybucket[1]']); + ).toEqual([{ bucket: 'mybucket[1]', priority: 3 }]); }); describe('dangerous queries', function () { diff --git a/vitest.workspace.ts b/vitest.workspace.ts new file mode 100644 index 000000000..6294a8e57 --- /dev/null +++ b/vitest.workspace.ts @@ -0,0 +1 @@ +export default ['modules/*', 'packages/*'];