diff --git a/.changeset/great-rabbits-exercise.md b/.changeset/great-rabbits-exercise.md new file mode 100644 index 000000000..fa448e3d4 --- /dev/null +++ b/.changeset/great-rabbits-exercise.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-sync-rules': minor +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Refactor interface between service and sync rule bindings in preparation for sync streams. diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap index ff9bfdaea..2780b3a21 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -10,9 +10,21 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 1`] = ` "checksum": -93886621, "count": 2, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -46,6 +58,11 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "checksum": 499012468, "count": 4, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -105,9 +122,21 @@ exports[`sync - mongodb > expiring token 1`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -137,9 +166,21 @@ exports[`sync - mongodb > sends checkpoint complete line for empty checkpoint 1` "checksum": -1221282404, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -193,15 +234,37 @@ exports[`sync - mongodb > sync buckets in order 1`] = ` "checksum": 920318466, "count": 1, "priority": 2, + "subscriptions": [ + { + "default": 0, + }, + ], }, { "bucket": "b1[]", "checksum": -1382098757, "count": 1, "priority": 1, + "subscriptions": [ + { + "default": 1, + }, + ], }, ], "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "b0", + }, + { + "errors": [], + "is_default": true, + "name": "b1", + }, + ], "write_checkpoint": undefined, }, }, @@ -267,9 +330,21 @@ exports[`sync - mongodb > sync global data 1`] = ` "checksum": -93886621, "count": 2, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -319,21 +394,53 @@ exports[`sync - mongodb > sync interrupts low-priority buckets on new checkpoint "checksum": -659831575, "count": 2000, "priority": 2, + "subscriptions": [ + { + "default": 0, + }, + ], }, { "bucket": "b0b[]", "checksum": -659831575, "count": 2000, "priority": 2, + "subscriptions": [ + { + "default": 1, + }, + ], }, { "bucket": "b1[]", "checksum": -1096116670, "count": 1, "priority": 1, + "subscriptions": [ + { + "default": 2, + }, + ], }, ], "last_op_id": "4001", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "b0a", + }, + { + "errors": [], + "is_default": true, + "name": "b0b", + }, + { + "errors": [], + "is_default": true, + "name": "b1", + }, + ], "write_checkpoint": undefined, }, }, @@ -380,18 +487,33 @@ exports[`sync - mongodb > sync interrupts low-priority buckets on new checkpoint "checksum": 883076828, "count": 2001, "priority": 2, + "subscriptions": [ + { + "default": 0, + }, + ], }, { "bucket": "b0b[]", "checksum": 883076828, "count": 2001, "priority": 2, + "subscriptions": [ + { + "default": 1, + }, + ], }, { "bucket": "b1[]", "checksum": 1841937527, "count": 2, "priority": 1, + "subscriptions": [ + { + "default": 2, + }, + ], }, ], "write_checkpoint": undefined, @@ -466,9 +588,21 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = ` "checksum": -852817836, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -514,9 +648,21 @@ exports[`sync - mongodb > sync updates to data query only 1`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "by_user", + }, + ], "write_checkpoint": undefined, }, }, @@ -540,6 +686,11 @@ exports[`sync - mongodb > sync updates to data query only 2`] = ` "checksum": 1418351250, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -582,9 +733,21 @@ exports[`sync - mongodb > sync updates to global data 1`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -608,6 +771,11 @@ exports[`sync - mongodb > sync updates to global data 2`] = ` "checksum": 920318466, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -652,6 +820,11 @@ exports[`sync - mongodb > sync updates to global data 3`] = ` "checksum": -93886621, "count": 2, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -690,6 +863,13 @@ exports[`sync - mongodb > sync updates to parameter query + data 1`] = ` "checkpoint": { "buckets": [], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "by_user", + }, + ], "write_checkpoint": undefined, }, }, @@ -713,6 +893,11 @@ exports[`sync - mongodb > sync updates to parameter query + data 2`] = ` "checksum": 1418351250, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -751,6 +936,13 @@ exports[`sync - mongodb > sync updates to parameter query only 1`] = ` "checkpoint": { "buckets": [], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "by_user", + }, + ], "write_checkpoint": undefined, }, }, @@ -774,6 +966,11 @@ exports[`sync - mongodb > sync updates to parameter query only 2`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 54b14a8dc..dbbf0b515 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -10,9 +10,21 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 1`] = ` "checksum": -93886621, "count": 2, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -46,6 +58,11 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = ` "checksum": 499012468, "count": 4, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -105,9 +122,21 @@ exports[`sync - postgres > expiring token 1`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -137,9 +166,21 @@ exports[`sync - postgres > sends checkpoint complete line for empty checkpoint 1 "checksum": -1221282404, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -193,15 +234,37 @@ exports[`sync - postgres > sync buckets in order 1`] = ` "checksum": 920318466, "count": 1, "priority": 2, + "subscriptions": [ + { + "default": 0, + }, + ], }, { "bucket": "b1[]", "checksum": -1382098757, "count": 1, "priority": 1, + "subscriptions": [ + { + "default": 1, + }, + ], }, ], "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "b0", + }, + { + "errors": [], + "is_default": true, + "name": "b1", + }, + ], "write_checkpoint": undefined, }, }, @@ -267,9 +330,21 @@ exports[`sync - postgres > sync global data 1`] = ` "checksum": -93886621, "count": 2, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -319,21 +394,53 @@ exports[`sync - postgres > sync interrupts low-priority buckets on new checkpoin "checksum": -659831575, "count": 2000, "priority": 2, + "subscriptions": [ + { + "default": 0, + }, + ], }, { "bucket": "b0b[]", "checksum": -659831575, "count": 2000, "priority": 2, + "subscriptions": [ + { + "default": 1, + }, + ], }, { "bucket": "b1[]", "checksum": -1096116670, "count": 1, "priority": 1, + "subscriptions": [ + { + "default": 2, + }, + ], }, ], "last_op_id": "4001", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "b0a", + }, + { + "errors": [], + "is_default": true, + "name": "b0b", + }, + { + "errors": [], + "is_default": true, + "name": "b1", + }, + ], "write_checkpoint": undefined, }, }, @@ -380,18 +487,33 @@ exports[`sync - postgres > sync interrupts low-priority buckets on new checkpoin "checksum": 883076828, "count": 2001, "priority": 2, + "subscriptions": [ + { + "default": 0, + }, + ], }, { "bucket": "b0b[]", "checksum": 883076828, "count": 2001, "priority": 2, + "subscriptions": [ + { + "default": 1, + }, + ], }, { "bucket": "b1[]", "checksum": 1841937527, "count": 2, "priority": 1, + "subscriptions": [ + { + "default": 2, + }, + ], }, ], "write_checkpoint": undefined, @@ -466,9 +588,21 @@ exports[`sync - postgres > sync legacy non-raw data 1`] = ` "checksum": -852817836, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -514,9 +648,21 @@ exports[`sync - postgres > sync updates to data query only 1`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "by_user", + }, + ], "write_checkpoint": undefined, }, }, @@ -540,6 +686,11 @@ exports[`sync - postgres > sync updates to data query only 2`] = ` "checksum": 1418351250, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -582,9 +733,21 @@ exports[`sync - postgres > sync updates to global data 1`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "mybucket", + }, + ], "write_checkpoint": undefined, }, }, @@ -608,6 +771,11 @@ exports[`sync - postgres > sync updates to global data 2`] = ` "checksum": 920318466, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -652,6 +820,11 @@ exports[`sync - postgres > sync updates to global data 3`] = ` "checksum": -93886621, "count": 2, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -690,6 +863,13 @@ exports[`sync - postgres > sync updates to parameter query + data 1`] = ` "checkpoint": { "buckets": [], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "by_user", + }, + ], "write_checkpoint": undefined, }, }, @@ -713,6 +893,11 @@ exports[`sync - postgres > sync updates to parameter query + data 2`] = ` "checksum": 1418351250, "count": 1, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, @@ -751,6 +936,13 @@ exports[`sync - postgres > sync updates to parameter query only 1`] = ` "checkpoint": { "buckets": [], "last_op_id": "0", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "by_user", + }, + ], "write_checkpoint": undefined, }, }, @@ -774,6 +966,11 @@ exports[`sync - postgres > sync updates to parameter query only 2`] = ` "checksum": 0, "count": 0, "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], }, ], "write_checkpoint": undefined, diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index 738db1c4a..524a00116 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -1,5 +1,5 @@ import { storage, utils } from '@powersync/service-core'; -import { SqlSyncRules } from '@powersync/service-sync-rules'; +import { GetQuerierOptions, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules'; import * as bson from 'bson'; export const ZERO_LSN = '0/0'; @@ -102,3 +102,11 @@ function getFirst( export function rid(id: string): bson.UUID { return utils.getUuidReplicaIdentityBson({ id: id }, [{ name: 'id', type: 'VARCHAR', typeId: 25 }]); } + +export function querierOptions(globalParameters: RequestParameters): GetQuerierOptions { + return { + globalParameters, + hasDefaultStreams: true, + streams: {} + }; +} diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 3eec6dab2..870677144 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -8,6 +8,7 @@ import { import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules'; import { expect, test, describe, beforeEach } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; +import { SqlBucketDescriptor } from '@powersync/service-sync-rules/src/SqlBucketDescriptor.js'; export const TEST_TABLE = test_utils.makeTestTable('test', ['id']); @@ -411,7 +412,7 @@ bucket_definitions: const parameters = new RequestParameters({ sub: 'u1' }, {}); - const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0]; + const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; const lookups = q1.getLookups(parameters); expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]); @@ -419,12 +420,16 @@ bucket_definitions: const parameter_sets = await checkpoint.getParameterSets(lookups); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]); - const buckets = await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return checkpoint.getParameterSets(lookups); - } - }); - expect(buckets).toEqual([{ bucket: 'by_workspace["workspace1"]', priority: 3 }]); + const buckets = await sync_rules + .getBucketParameterQuerier(test_utils.querierOptions(parameters)) + .querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return checkpoint.getParameterSets(lookups); + } + }); + expect(buckets).toEqual([ + { bucket: 'by_workspace["workspace1"]', priority: 3, definition: 'by_workspace', inclusion_reasons: ['default'] } + ]); }); test('save and load parameters with dynamic global buckets', async () => { @@ -482,7 +487,7 @@ bucket_definitions: const parameters = new RequestParameters({ sub: 'unknown' }, {}); - const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0]; + const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; const lookups = q1.getLookups(parameters); expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]); @@ -491,15 +496,27 @@ bucket_definitions: parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]); - const buckets = await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return checkpoint.getParameterSets(lookups); - } - }); + const buckets = await sync_rules + .getBucketParameterQuerier(test_utils.querierOptions(parameters)) + .querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return checkpoint.getParameterSets(lookups); + } + }); buckets.sort((a, b) => a.bucket.localeCompare(b.bucket)); expect(buckets).toEqual([ - { bucket: 'by_public_workspace["workspace1"]', priority: 3 }, - { bucket: 'by_public_workspace["workspace3"]', priority: 3 } + { + bucket: 'by_public_workspace["workspace1"]', + priority: 3, + definition: 'by_public_workspace', + inclusion_reasons: ['default'] + }, + { + bucket: 'by_public_workspace["workspace3"]', + priority: 3, + definition: 'by_public_workspace', + inclusion_reasons: ['default'] + } ]); }); @@ -573,7 +590,7 @@ bucket_definitions: const parameters = new RequestParameters({ sub: 'u1' }, {}); // Test intermediate values - could be moved to sync_rules.test.ts - const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0]; + const q1 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[0]; const lookups1 = q1.getLookups(parameters); expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]); @@ -581,7 +598,7 @@ bucket_definitions: parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]); - const q2 = sync_rules.bucketDescriptors[0].parameterQueries[1]; + const q2 = (sync_rules.bucketSources[0] as SqlBucketDescriptor).parameterQueries[1]; const lookups2 = q2.getLookups(parameters); expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]); @@ -591,11 +608,13 @@ bucket_definitions: // Test final values - the important part const buckets = ( - await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({ - getParameterSets(lookups) { - return checkpoint.getParameterSets(lookups); - } - }) + await sync_rules + .getBucketParameterQuerier(test_utils.querierOptions(parameters)) + .querier.queryDynamicBucketDescriptions({ + getParameterSets(lookups) { + return checkpoint.getParameterSets(lookups); + } + }) ).map((e) => e.bucket); buckets.sort(); expect(buckets).toEqual(['by_workspace["workspace1"]', 'by_workspace["workspace3"]']); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index fc661dd48..4c7224b43 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -89,8 +89,7 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); const lines = await consumeCheckpointLines(stream); @@ -150,8 +149,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); const lines = await consumeCheckpointLines(stream); @@ -213,8 +211,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); let sentCheckpoints = 0; @@ -323,7 +320,6 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: 'user_one' }, {}), token: { sub: 'user_one', exp: Date.now() / 1000 + 100000 } as any }); @@ -464,8 +460,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); let sentRows = 0; @@ -580,8 +575,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 100000 } as any + token: { sub: '', exp: Date.now() / 1000 + 100000 } as any }); const lines: any[] = []; @@ -646,8 +640,7 @@ bucket_definitions: raw_data: false }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); const lines = await consumeCheckpointLines(stream); @@ -675,8 +668,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: 0 } as any + token: { sub: '', exp: 0 } as any }); const lines = await consumeCheckpointLines(stream); @@ -706,8 +698,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -780,8 +771,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: 'user1' }, {}), - token: { exp: Date.now() / 1000 + 100 } as any + token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -856,8 +846,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: 'user1' }, {}), - token: { exp: Date.now() / 1000 + 100 } as any + token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -923,8 +912,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: 'user1' }, {}), - token: { exp: Date.now() / 1000 + 100 } as any + token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -991,8 +979,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: exp } as any + token: { sub: '', exp: exp } as any }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -1054,8 +1041,7 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: '' }, {}), - token: { exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any }); const iter = stream[Symbol.asyncIterator](); @@ -1180,7 +1166,6 @@ bucket_definitions: raw_data: true }, tracker, - syncParams: new RequestParameters({ sub: 'test' }, {}), token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any }; const stream1 = sync.streamResponse(params); diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 7f6a86ddf..b3badfb0e 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -58,8 +58,6 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => return; } - const syncParams = new RequestParameters(context.token_payload!, params.parameters ?? {}); - const { storageEngine: { activeBucketStorage } } = service_context; @@ -95,7 +93,6 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => ...params, binary_data: true // always true for web sockets }, - syncParams, token: context!.token_payload!, tokenStreamOptions: { // RSocket handles keepalive events by default diff --git a/packages/service-core/src/routes/endpoints/sync-rules.ts b/packages/service-core/src/routes/endpoints/sync-rules.ts index 69d167c03..ebac1c23e 100644 --- a/packages/service-core/src/routes/endpoints/sync-rules.ts +++ b/packages/service-core/src/routes/endpoints/sync-rules.ts @@ -202,34 +202,7 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) { return { valid: true, - bucket_definitions: rules.bucketDescriptors.map((d) => { - let all_parameter_queries = [...d.parameterQueries.values()].flat(); - let all_data_queries = [...d.dataQueries.values()].flat(); - return { - name: d.name, - bucket_parameters: d.bucketParameters, - global_parameter_queries: d.globalParameterQueries.map((q) => { - return { - sql: q.sql - }; - }), - parameter_queries: all_parameter_queries.map((q) => { - return { - sql: q.sql, - table: q.sourceTable, - input_parameters: q.inputParameters - }; - }), - - data_queries: all_data_queries.map((q) => { - return { - sql: q.sql, - table: q.sourceTable, - columns: q.columnOutputNames() - }; - }) - }; - }), + bucket_definitions: rules.bucketSources.map((source) => source.debugRepresentation()), source_tables: resolved_tables, data_tables: rules.debugGetOutputTables() }; diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 8ca6306ca..8ce2aadf4 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -52,9 +52,6 @@ export const syncStreamed = routeDefinition({ }); } - const params: util.StreamingSyncRequest = payload.params; - const syncParams = new RequestParameters(payload.context.token_payload!, payload.params.parameters ?? {}); - const bucketStorage = await storageEngine.activeBucketStorage.getActiveStorage(); if (bucketStorage == null) { @@ -75,8 +72,7 @@ export const syncStreamed = routeDefinition({ syncContext: syncContext, bucketStorage, syncRules: syncRules, - params, - syncParams, + params: payload.params, token: payload.context.token_payload!, tracker, signal: controller.signal, diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index f94720279..2679e21f7 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -1,4 +1,13 @@ -import { BucketDescription, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules'; +import { + BucketDescription, + BucketPriority, + BucketSource, + RequestedStream, + RequestJwtPayload, + RequestParameters, + ResolvedBucket, + SqlSyncRules +} from '@powersync/service-sync-rules'; import * as storage from '../storage/storage-index.js'; import * as util from '../util/util-index.js'; @@ -11,7 +20,7 @@ import { logger as defaultLogger } from '@powersync/lib-services-framework'; import { JSONBig } from '@powersync/service-jsonbig'; -import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js'; +import { BucketParameterQuerier, QuerierError } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js'; import { SyncContext } from './SyncContext.js'; import { getIntersection, hasIntersection } from './util.js'; @@ -19,9 +28,9 @@ export interface BucketChecksumStateOptions { syncContext: SyncContext; bucketStorage: BucketChecksumStateStorage; syncRules: SqlSyncRules; - syncParams: RequestParameters; + tokenPayload: RequestJwtPayload; + syncRequest: util.StreamingSyncRequest; logger?: Logger; - initialBucketPositions?: { name: string; after: util.InternalOpId }[]; } type BucketSyncState = { @@ -50,6 +59,17 @@ export class BucketChecksumState { */ private lastChecksums: util.ChecksumMap | null = null; private lastWriteCheckpoint: bigint | null = null; + /** + * Once we've sent the first full checkpoint line including all {@link util.Checkpoint.streams} that the user is + * subscribed to, we keep an index of the stream names to their index in that array. + * + * This is used to compress the representation of buckets in `checkpoint` and `checkpoint_diff` lines: For buckets + * that are part of sync rules or default streams, we need to include the name of the defining sync rule or definition + * yielding that bucket (so that clients can track progress for default streams). + * But instead of sending the name for each bucket, we use the fact that it's part of the streams array and only send + * their index, reducing the size of those messages. + */ + private streamNameToIndex: Map | null = null; private readonly parameterState: BucketParameterState; @@ -69,13 +89,14 @@ export class BucketChecksumState { options.syncContext, options.bucketStorage, options.syncRules, - options.syncParams, + options.tokenPayload, + options.syncRequest, this.logger ); this.bucketDataPositions = new Map(); - for (let { name, after: start } of options.initialBucketPositions ?? []) { - this.bucketDataPositions.set(name, { start_op_id: start }); + for (let { name, after: start } of options.syncRequest.buckets ?? []) { + this.bucketDataPositions.set(name, { start_op_id: BigInt(start) }); } } @@ -158,6 +179,7 @@ export class BucketChecksumState { // TODO: If updatedBuckets is present, we can use that to more efficiently calculate a diff, // and avoid any unnecessary loops through the entire list of buckets. const diff = util.checksumsDiff(this.lastChecksums, checksumMap); + const streamNameToIndex = this.streamNameToIndex!; if ( this.lastWriteCheckpoint == writeCheckpoint && @@ -182,12 +204,12 @@ export class BucketChecksumState { const updatedBucketDescriptions = diff.updatedBuckets.map((e) => ({ ...e, - priority: bucketDescriptionMap.get(e.bucket)!.priority + ...this.parameterState.translateResolvedBucket(bucketDescriptionMap.get(e.bucket)!, streamNameToIndex) })); bucketsToFetch = [...generateBucketsToFetch].map((b) => { return { - bucket: b, - priority: bucketDescriptionMap.get(b)!.priority + priority: bucketDescriptionMap.get(b)!.priority, + bucket: b }; }); @@ -220,15 +242,37 @@ export class BucketChecksumState { message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length }); }; - bucketsToFetch = allBuckets; + bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority })); + + const subscriptions: util.StreamDescription[] = []; + const streamNameToIndex = new Map(); + this.streamNameToIndex = streamNameToIndex; + + for (const source of this.parameterState.syncRules.bucketSources) { + if (this.parameterState.isSubscribedToStream(source)) { + streamNameToIndex.set(source.name, subscriptions.length); + + subscriptions.push({ + name: source.name, + is_default: source.subscribedToByDefault, + errors: + this.parameterState.streamErrors[source.name]?.map((e) => ({ + subscription: e.subscription?.opaque_id ?? 'default', + message: e.message + })) ?? [] + }); + } + } + checkpointLine = { checkpoint: { last_op_id: util.internalToExternalOpId(base.checkpoint), write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, buckets: [...checksumMap.values()].map((e) => ({ ...e, - priority: bucketDescriptionMap.get(e.bucket)!.priority - })) + ...this.parameterState.translateResolvedBucket(bucketDescriptionMap.get(e.bucket)!, streamNameToIndex) + })), + streams: subscriptions } } satisfies util.StreamingSyncCheckpoint; } @@ -319,7 +363,7 @@ export interface CheckpointUpdate { /** * All buckets forming part of the checkpoint. */ - buckets: BucketDescription[]; + buckets: ResolvedBucket[]; /** * If present, a set of buckets that have been updated since the last checkpoint. @@ -335,9 +379,19 @@ export class BucketParameterState { public readonly syncRules: SqlSyncRules; public readonly syncParams: RequestParameters; private readonly querier: BucketParameterQuerier; - private readonly staticBuckets: Map; + /** + * Static buckets. This map is guaranteed not to change during a request, since resolving static buckets can only + * take request parameters into account, + */ + private readonly staticBuckets: Map; + private readonly includeDefaultStreams: boolean; + // Indexed by the client-side id + private readonly explicitStreamSubscriptions: util.RequestedStreamSubscription[]; + // Indexed by descriptor name. + readonly streamErrors: Record; + private readonly subscribedStreamNames: Set; private readonly logger: Logger; - private cachedDynamicBuckets: BucketDescription[] | null = null; + private cachedDynamicBuckets: ResolvedBucket[] | null = null; private cachedDynamicBucketSet: Set | null = null; private readonly lookups: Set; @@ -346,18 +400,92 @@ export class BucketParameterState { context: SyncContext, bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, - syncParams: RequestParameters, + tokenPayload: RequestJwtPayload, + request: util.StreamingSyncRequest, logger: Logger ) { this.context = context; this.bucketStorage = bucketStorage; this.syncRules = syncRules; - this.syncParams = syncParams; + this.syncParams = new RequestParameters(tokenPayload, request.parameters ?? {}); this.logger = logger; - this.querier = syncRules.getBucketParameterQuerier(this.syncParams); - this.staticBuckets = new Map(this.querier.staticBuckets.map((b) => [b.bucket, b])); + const streamsByName: Record = {}; + const subscriptions = request.streams; + const explicitStreamSubscriptions: util.RequestedStreamSubscription[] = subscriptions?.subscriptions ?? []; + if (subscriptions) { + for (let i = 0; i < explicitStreamSubscriptions.length; i++) { + const subscription = explicitStreamSubscriptions[i]; + + const syncRuleStream: RequestedStream = { + parameters: subscription.parameters ?? {}, + opaque_id: i + }; + if (Object.hasOwn(streamsByName, subscription.stream)) { + streamsByName[subscription.stream].push(syncRuleStream); + } else { + streamsByName[subscription.stream] = [syncRuleStream]; + } + } + } + this.includeDefaultStreams = subscriptions?.include_defaults ?? true; + this.explicitStreamSubscriptions = explicitStreamSubscriptions; + + const { querier, errors } = syncRules.getBucketParameterQuerier({ + globalParameters: this.syncParams, + hasDefaultStreams: this.includeDefaultStreams, + streams: streamsByName + }); + this.querier = querier; + this.streamErrors = Object.groupBy(errors, (e) => e.descriptor) as Record; + + this.staticBuckets = new Map( + mergeBuckets(this.querier.staticBuckets).map((b) => [b.bucket, b]) + ); this.lookups = new Set(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values))); + this.subscribedStreamNames = new Set(Object.keys(streamsByName)); + } + + /** + * Translates an internal sync-rules {@link ResolvedBucket} instance to the public + * {@link util.ClientBucketDescription}. + * + * @param lookupIndex A map from stream names to their index in {@link util.Checkpoint.streams}. These are used to + * reference default buckets by their stream index instead of duplicating the name on wire. + */ + translateResolvedBucket(description: ResolvedBucket, lookupIndex: Map): util.ClientBucketDescription { + // If the client is overriding the priority of any stream that yields this bucket, sync the bucket with that + // priority. + let priorityOverride: BucketPriority | null = null; + for (const reason of description.inclusion_reasons) { + if (reason != 'default') { + const requestedPriority = this.explicitStreamSubscriptions[reason.subscription]?.override_priority; + if (requestedPriority != null) { + if (priorityOverride == null) { + priorityOverride = requestedPriority as BucketPriority; + } else { + priorityOverride = Math.min(requestedPriority, priorityOverride) as BucketPriority; + } + } + } + } + + return { + bucket: description.bucket, + priority: priorityOverride ?? description.priority, + subscriptions: description.inclusion_reasons.map((reason) => { + if (reason == 'default') { + const stream = description.definition; + return { default: lookupIndex.get(stream)! }; + } else { + return { sub: reason.subscription }; + } + }) + }; + } + + isSubscribedToStream(desc: BucketSource): boolean { + return (desc.subscribedToByDefault && this.includeDefaultStreams) || this.subscribedStreamNames.has(desc.name); } async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise { @@ -391,19 +519,19 @@ export class BucketParameterState { * For static buckets, we can keep track of which buckets have been updated. */ private async getCheckpointUpdateStatic(checkpoint: storage.StorageCheckpointUpdate): Promise { - const querier = this.querier; + const staticBuckets = [...this.staticBuckets.values()]; const update = checkpoint.update; if (update.invalidateDataBuckets) { return { - buckets: querier.staticBuckets, + buckets: staticBuckets, updatedBuckets: INVALIDATE_ALL_BUCKETS }; } const updatedBuckets = new Set(getIntersection(this.staticBuckets, update.updatedDataBuckets)); return { - buckets: querier.staticBuckets, + buckets: staticBuckets, updatedBuckets }; } @@ -414,7 +542,7 @@ export class BucketParameterState { private async getCheckpointUpdateDynamic(checkpoint: storage.StorageCheckpointUpdate): Promise { const querier = this.querier; const storage = this.bucketStorage; - const staticBuckets = querier.staticBuckets; + const staticBuckets = this.staticBuckets.values(); const update = checkpoint.update; let hasParameterChange = false; @@ -436,7 +564,7 @@ export class BucketParameterState { } } - let dynamicBuckets: BucketDescription[]; + let dynamicBuckets: ResolvedBucket[]; if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) { dynamicBuckets = await querier.queryDynamicBucketDescriptions({ getParameterSets(lookups) { @@ -458,7 +586,7 @@ export class BucketParameterState { } } } - const allBuckets = [...staticBuckets, ...dynamicBuckets]; + const allBuckets = [...staticBuckets, ...mergeBuckets(dynamicBuckets)]; if (invalidateDataBuckets) { return { @@ -517,3 +645,32 @@ function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number) const limited = buckets.slice(0, limit); return `${JSON.stringify(limited)}...`; } + +/** + * Resolves duplicate buckets in the given array, merging the inclusion reasons for duplicate. + * + * It's possible for duplicates to occur when a stream has multiple subscriptions, consider e.g. + * + * ``` + * sync_streams: + * assets_by_category: + * query: select * from assets where category in (request.parameters() -> 'categories') + * ``` + * + * Here, a client might subscribe once with `{"categories": [1]}` and once with `{"categories": [1, 2]}`. Since each + * subscription is evaluated independently, this would lead to three buckets, with a duplicate `assets_by_category[1]` + * bucket. + */ +function mergeBuckets(buckets: ResolvedBucket[]): ResolvedBucket[] { + const byBucketId: Record = {}; + + for (const bucket of buckets) { + if (Object.hasOwn(byBucketId, bucket.bucket)) { + byBucketId[bucket.bucket].inclusion_reasons.push(...bucket.inclusion_reasons); + } else { + byBucketId[bucket.bucket] = structuredClone(bucket); + } + } + + return Object.values(byBucketId); +} diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index cc8bfea30..fec216a2e 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -1,5 +1,11 @@ import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; -import { BucketDescription, BucketPriority, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules'; +import { + BucketDescription, + BucketPriority, + RequestJwtPayload, + RequestParameters, + SqlSyncRules +} from '@powersync/service-sync-rules'; import { AbortError } from 'ix/aborterror.js'; @@ -19,7 +25,6 @@ export interface SyncStreamParameters { bucketStorage: storage.SyncRulesBucketStorage; syncRules: SqlSyncRules; params: util.StreamingSyncRequest; - syncParams: RequestParameters; token: auth.JwtPayload; logger?: Logger; /** @@ -34,8 +39,7 @@ export interface SyncStreamParameters { export async function* streamResponse( options: SyncStreamParameters ): AsyncIterable { - const { syncContext, bucketStorage, syncRules, params, syncParams, token, tokenStreamOptions, tracker, signal } = - options; + const { syncContext, bucketStorage, syncRules, params, token, tokenStreamOptions, tracker, signal } = options; const logger = options.logger ?? defaultLogger; // We also need to be able to abort, so we create our own controller. @@ -58,7 +62,7 @@ export async function* streamResponse( bucketStorage, syncRules, params, - syncParams, + token, tracker, controller.signal, logger @@ -86,24 +90,22 @@ async function* streamResponseInner( bucketStorage: storage.SyncRulesBucketStorage, syncRules: SqlSyncRules, params: util.StreamingSyncRequest, - syncParams: RequestParameters, + tokenPayload: RequestJwtPayload, tracker: RequestTracker, signal: AbortSignal, logger: Logger ): AsyncGenerator { const { raw_data, binary_data } = params; - const checkpointUserId = util.checkpointUserId(syncParams.tokenParameters.user_id as string, params.client_id); + const userId = tokenPayload.sub; + const checkpointUserId = util.checkpointUserId(userId as string, params.client_id); const checksumState = new BucketChecksumState({ syncContext, bucketStorage, syncRules, - syncParams, - initialBucketPositions: params.buckets?.map((bucket) => ({ - name: bucket.name, - after: BigInt(bucket.after) - })), + tokenPayload, + syncRequest: params, logger: logger }); const stream = bucketStorage.watchCheckpointChanges({ @@ -228,7 +230,7 @@ async function* streamResponseInner( onRowsSent: markOperationsSent, abort_connection: signal, abort_batch: abortCheckpointSignal, - user_id: syncParams.userId, + user_id: userId, // Passing null here will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial // sync complete message instead. forPriority: !isLast ? priority : null, diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index d061e16d1..6edafbae4 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -13,9 +13,51 @@ export const BucketRequest = t.object({ export type BucketRequest = t.Decoded; +/** + * A sync steam that a client has expressed interest in by explicitly opening it on the client side. + */ +export const RequestedStreamSubscription = t.object({ + /** + * The defined name of the stream as it appears in sync stream definitions. + */ + stream: t.string, + /** + * An optional dictionary of parameters to pass to this specific stream. + */ + parameters: t.record(t.any).optional(), + /** + * Set when the client wishes to re-assign a different priority to this stream. + * + * Streams and sync rules can also assign a default priority, but clients are allowed to override those. This can be + * useful when the priority for partial syncs depends on e.g. the current page opened in a client. + */ + override_priority: t.union(t.number, t.Null) +}); + +export type RequestedStreamSubscription = t.Decoded; + +/** + * An overview of all subscribed streams as part of a streaming sync request. + */ +export const StreamSubscriptionRequest = t.object({ + /** + * Whether to sync default streams. + * + * When disabled, only explicitly-opened subscriptions are included. + */ + include_defaults: t.boolean.optional(), + + /** + * An array of sync streams the client has opened explicitly. + */ + subscriptions: t.array(RequestedStreamSubscription) +}); + +export type StreamSubscriptionRequest = t.Decoded; + export const StreamingSyncRequest = t.object({ /** - * Existing bucket states. + * Existing client-side bucket states. */ buckets: t.array(BucketRequest).optional(), @@ -47,7 +89,12 @@ export const StreamingSyncRequest = t.object({ /** * Unique client id. */ - client_id: t.string.optional() + client_id: t.string.optional(), + + /** + * If the client is aware of streams, an array of streams the client has opened. + */ + streams: StreamSubscriptionRequest.optional() }); export type StreamingSyncRequest = t.Decoded; @@ -60,7 +107,7 @@ export interface StreamingSyncCheckpointDiff { checkpoint_diff: { last_op_id: ProtocolOpId; write_checkpoint?: ProtocolOpId; - updated_buckets: BucketChecksumWithDescription[]; + updated_buckets: CheckpointBucket[]; removed_buckets: string[]; }; } @@ -99,10 +146,54 @@ export type StreamingSyncLine = */ export type ProtocolOpId = string; +export interface StreamDescription { + /** + * The name of the stream as it appears in the sync configuration. + */ + name: string; + + /** + * Whether this stream is subscribed to by default. + * + * For default streams, this field is still `true` if clients have an explicit subscription to the stream. + */ + is_default: boolean; + + /** + * If some subscriptions on this stream could not be resolved, e.g. due to an error, this array contains the faulty + * subscriptions along with an error message. + */ + errors: StreamSubscriptionError[]; +} + +export interface StreamSubscriptionError { + /** + * The subscription that errored - either the default subscription or some of the explicit subscriptions. + */ + subscription: 'default' | number; + /** + * A message describing the error on the subscription. + */ + message: string; +} + export interface Checkpoint { last_op_id: ProtocolOpId; write_checkpoint?: ProtocolOpId; - buckets: BucketChecksumWithDescription[]; + buckets: CheckpointBucket[]; + + /** + * All streams that the client is subscribed to. + * + * This field has two purposes: + * + * 1. It allows clients to determine which of their subscriptions actually works. E.g. if a user does + * `db.syncStream('non_existent_stream').subscribe()`, clients don't immediately know that the stream doesn't + * exist. Only after the next `checkpoint` line can they query this field and mark unresolved subscriptions. + *. 2. It allows clients to learn which default streams they have been subscribed to. This is relevant for APIs + * listing all streams on the client-side. + */ + streams: StreamDescription[]; } export interface BucketState { @@ -158,4 +249,46 @@ export interface BucketChecksum { count: number; } -export interface BucketChecksumWithDescription extends BucketChecksum, BucketDescription {} +/** + * The reason a particular bucket is included in a checkpoint. + * + * This information allows clients to associate individual buckets with sync streams they're subscribed to. Having that + * association is useful because it enables clients to track progress for individual sync streams. + */ +export type BucketSubscriptionReason = BucketDerivedFromDefaultStream | BucketDerivedFromExplicitSubscription; + +/** + * A bucket has been included in a checkpoint because it's part of a default stream. + */ +export type BucketDerivedFromDefaultStream = { + /** + * The index (into {@link Checkpoint.streams}) of the stream defining the bucket. + */ + default: number; +}; + +/** + * The bucket has been included in a checkpoint because it's part of a stream that a client has explicitly subscribed + * to. + */ +export type BucketDerivedFromExplicitSubscription = { + /** + * The index (into {@link StreamSubscriptionRequest.subscriptions}) of the subscription yielding this bucket. + */ + sub: number; +}; + +export interface ClientBucketDescription { + /** + * An opaque id of the bucket. + */ + bucket: string; + /** + * The priority used to synchronize this bucket, derived from its definition and an optional priority override from + * the stream subscription. + */ + priority: BucketPriority; + subscriptions: BucketSubscriptionReason[]; +} + +export interface CheckpointBucket extends BucketChecksum, ClientBucketDescription {} diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index 312a7dd6c..df43cc9bd 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -1,17 +1,27 @@ import { BucketChecksum, BucketChecksumState, + BucketChecksumStateOptions, BucketChecksumStateStorage, CHECKPOINT_INVALIDATE_ALL, ChecksumMap, InternalOpId, ReplicationCheckpoint, + StreamingSyncRequest, SyncContext, WatchFilterEvent } from '@/index.js'; import { JSONBig } from '@powersync/service-jsonbig'; -import { ParameterLookup, RequestParameters, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules'; -import { describe, expect, test } from 'vitest'; +import { + SqliteJsonRow, + ParameterLookup, + SqlSyncRules, + RequestJwtPayload, + BucketSource, + BucketSourceType, + BucketParameterQuerier +} from '@powersync/service-sync-rules'; +import { describe, expect, test, beforeEach } from 'vitest'; describe('BucketChecksumState', () => { // Single global[] bucket. @@ -55,6 +65,9 @@ bucket_definitions: maxDataFetchConcurrency: 10 }); + const syncRequest: StreamingSyncRequest = {}; + const tokenPayload: RequestJwtPayload = { sub: '' }; + test('global bucket with update', async () => { const storage = new MockBucketChecksumStateStorage(); // Set intial state @@ -62,7 +75,8 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, - syncParams: new RequestParameters({ sub: '' }, {}), + syncRequest, + tokenPayload, syncRules: SYNC_RULES_GLOBAL, bucketStorage: storage }); @@ -75,9 +89,10 @@ bucket_definitions: line.advance(); expect(line.checkpointLine).toEqual({ checkpoint: { - buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3 }], + buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }], last_op_id: '1', - write_checkpoint: undefined + write_checkpoint: undefined, + streams: [{ name: 'global', is_default: true, errors: [] }] } }); expect(line.bucketsToFetch).toEqual([ @@ -111,7 +126,7 @@ bucket_definitions: expect(line2.checkpointLine).toEqual({ checkpoint_diff: { removed_buckets: [], - updated_buckets: [{ bucket: 'global[]', checksum: 2, count: 2, priority: 3 }], + updated_buckets: [{ bucket: 'global[]', checksum: 2, count: 2, priority: 3, subscriptions: [{ default: 0 }] }], last_op_id: '2', write_checkpoint: undefined } @@ -129,9 +144,9 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, + tokenPayload, // Client sets the initial state here - initialBucketPositions: [{ name: 'global[]', after: 1n }], - syncParams: new RequestParameters({ sub: '' }, {}), + syncRequest: { buckets: [{ name: 'global[]', after: '1' }] }, syncRules: SYNC_RULES_GLOBAL, bucketStorage: storage }); @@ -144,9 +159,10 @@ bucket_definitions: line.advance(); expect(line.checkpointLine).toEqual({ checkpoint: { - buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3 }], + buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }], last_op_id: '1', - write_checkpoint: undefined + write_checkpoint: undefined, + streams: [{ name: 'global', is_default: true, errors: [] }] } }); expect(line.bucketsToFetch).toEqual([ @@ -167,7 +183,8 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, - syncParams: new RequestParameters({ sub: '' }, {}), + tokenPayload, + syncRequest, syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage }); @@ -180,11 +197,12 @@ bucket_definitions: expect(line.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'global[1]', checksum: 1, count: 1, priority: 3 }, - { bucket: 'global[2]', checksum: 1, count: 1, priority: 3 } + { bucket: 'global[1]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }, + { bucket: 'global[2]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '1', - write_checkpoint: undefined + write_checkpoint: undefined, + streams: [{ name: 'global', is_default: true, errors: [] }] } }); expect(line.bucketsToFetch).toEqual([ @@ -215,8 +233,8 @@ bucket_definitions: checkpoint_diff: { removed_buckets: [], updated_buckets: [ - { bucket: 'global[1]', checksum: 2, count: 2, priority: 3 }, - { bucket: 'global[2]', checksum: 2, count: 2, priority: 3 } + { bucket: 'global[1]', checksum: 2, count: 2, priority: 3, subscriptions: [{ default: 0 }] }, + { bucket: 'global[2]', checksum: 2, count: 2, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '2', write_checkpoint: undefined @@ -232,9 +250,9 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, + tokenPayload, // Client sets the initial state here - initialBucketPositions: [{ name: 'something_here[]', after: 1n }], - syncParams: new RequestParameters({ sub: '' }, {}), + syncRequest: { buckets: [{ name: 'something_here[]', after: '1' }] }, syncRules: SYNC_RULES_GLOBAL, bucketStorage: storage }); @@ -249,9 +267,10 @@ bucket_definitions: line.advance(); expect(line.checkpointLine).toEqual({ checkpoint: { - buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3 }], + buckets: [{ bucket: 'global[]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }], last_op_id: '1', - write_checkpoint: undefined + write_checkpoint: undefined, + streams: [{ name: 'global', is_default: true, errors: [] }] } }); expect(line.bucketsToFetch).toEqual([ @@ -273,7 +292,8 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, - syncParams: new RequestParameters({ sub: '' }, {}), + tokenPayload, + syncRequest, syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage }); @@ -310,7 +330,7 @@ bucket_definitions: removed_buckets: [], updated_buckets: [ // This does not include global[2], since it was not invalidated. - { bucket: 'global[1]', checksum: 2, count: 2, priority: 3 } + { bucket: 'global[1]', checksum: 2, count: 2, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '2', write_checkpoint: undefined @@ -325,7 +345,8 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, - syncParams: new RequestParameters({ sub: '' }, {}), + tokenPayload, + syncRequest, syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage }); @@ -358,8 +379,8 @@ bucket_definitions: checkpoint_diff: { removed_buckets: [], updated_buckets: [ - { bucket: 'global[1]', checksum: 2, count: 2, priority: 3 }, - { bucket: 'global[2]', checksum: 2, count: 2, priority: 3 } + { bucket: 'global[1]', checksum: 2, count: 2, priority: 3, subscriptions: [{ default: 0 }] }, + { bucket: 'global[2]', checksum: 2, count: 2, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '2', write_checkpoint: undefined @@ -379,7 +400,8 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, - syncParams: new RequestParameters({ sub: '' }, {}), + tokenPayload, + syncRequest, syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage }); @@ -393,11 +415,12 @@ bucket_definitions: expect(line.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'global[1]', checksum: 3, count: 3, priority: 3 }, - { bucket: 'global[2]', checksum: 3, count: 3, priority: 3 } + { bucket: 'global[1]', checksum: 3, count: 3, priority: 3, subscriptions: [{ default: 0 }] }, + { bucket: 'global[2]', checksum: 3, count: 3, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '3', - write_checkpoint: undefined + write_checkpoint: undefined, + streams: [{ name: 'global', is_default: true, errors: [] }] } }); expect(line.bucketsToFetch).toEqual([ @@ -444,7 +467,8 @@ bucket_definitions: bucket: 'global[1]', checksum: 4, count: 4, - priority: 3 + priority: 3, + subscriptions: [{ default: 0 }] } ], last_op_id: '4', @@ -480,7 +504,8 @@ bucket_definitions: const state = new BucketChecksumState({ syncContext, - syncParams: new RequestParameters({ sub: 'u1' }, {}), + tokenPayload: { sub: 'u1' }, + syncRequest, syncRules: SYNC_RULES_DYNAMIC, bucketStorage: storage }); @@ -496,10 +521,29 @@ bucket_definitions: expect(line.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'by_project[1]', checksum: 1, count: 1, priority: 3 }, - { bucket: 'by_project[2]', checksum: 1, count: 1, priority: 3 } + { + bucket: 'by_project[1]', + checksum: 1, + count: 1, + priority: 3, + subscriptions: [{ default: 0 }] + }, + { + bucket: 'by_project[2]', + checksum: 1, + count: 1, + priority: 3, + subscriptions: [{ default: 0 }] + } ], last_op_id: '1', + streams: [ + { + is_default: true, + name: 'by_project', + errors: [] + } + ], write_checkpoint: undefined } }); @@ -544,13 +588,301 @@ bucket_definitions: expect(line2.checkpointLine).toEqual({ checkpoint_diff: { removed_buckets: [], - updated_buckets: [{ bucket: 'by_project[3]', checksum: 1, count: 1, priority: 3 }], + updated_buckets: [ + { + bucket: 'by_project[3]', + checksum: 1, + count: 1, + priority: 3, + subscriptions: [{ default: 0 }] + } + ], last_op_id: '2', write_checkpoint: undefined } }); expect(line2.getFilteredBucketPositions()).toEqual(new Map([['by_project[3]', 0n]])); }); + + describe('streams', () => { + let source: { -readonly [P in keyof BucketSource]: BucketSource[P] }; + let storage: MockBucketChecksumStateStorage; + let staticBucketIds = ['stream|0[]']; + + function checksumState(options?: Partial) { + const rules = new SqlSyncRules(''); + rules.bucketSources.push(source); + + return new BucketChecksumState({ + syncContext, + syncRequest, + tokenPayload, + syncRules: rules, + bucketStorage: storage, + ...options + }); + } + + function createQuerier(ids: string[], subscription: number | null): BucketParameterQuerier { + return { + staticBuckets: ids.map((bucket) => ({ + definition: 'stream', + inclusion_reasons: subscription == null ? ['default'] : [{ subscription }], + bucket, + priority: 3 + })), + hasDynamicBuckets: false, + parameterQueryLookups: [], + queryDynamicBucketDescriptions: function (): never { + throw new Error('no dynamic buckets.'); + } + }; + } + + beforeEach(() => { + // Currently using mocked streams before streams are actually implemented as parsable rules. + source = { + name: 'stream', + type: BucketSourceType.SYNC_STREAM, + subscribedToByDefault: false, + pushBucketParameterQueriers(result, options) { + // Create a fake querier that resolves the global stream["default"] bucket by default and allows extracting + // additional buckets from parameters. + const subscriptions = options.streams['stream'] ?? []; + if (!this.subscribedToByDefault && !subscriptions.length) { + return; + } + + let hasExplicitDefaultSubscription = false; + for (const subscription of subscriptions) { + try { + let subscriptionParameters = []; + + if (subscription.parameters != null) { + subscriptionParameters = JSON.parse(subscription.parameters['ids'] as string).map( + (e: string) => `stream["${e}"]` + ); + } else { + hasExplicitDefaultSubscription = true; + } + + result.queriers.push(createQuerier([...subscriptionParameters], subscription.opaque_id)); + } catch (e) { + result.errors.push({ + descriptor: 'stream', + subscription, + message: `Error evaluating bucket ids: ${e.message}` + }); + } + } + + // If the stream is subscribed to by default and there is no explicit subscription that would match the default + // subscription, also include the default querier. + if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) { + result.queriers.push(createQuerier(['stream["default"]'], null)); + } + } + } satisfies Partial as any; + + storage = new MockBucketChecksumStateStorage(); + storage.updateTestChecksum({ bucket: 'stream["default"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'stream["a"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'stream["b"]', checksum: 1, count: 1 }); + }); + + test('includes defaults', async () => { + source.subscribedToByDefault = true; + const state = checksumState(); + + const line = await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + })!; + line?.advance(); + expect(line?.checkpointLine).toEqual({ + checkpoint: { + buckets: [ + { bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } + ], + last_op_id: '1', + write_checkpoint: undefined, + streams: [{ name: 'stream', is_default: true, errors: [] }] + } + }); + }); + + test('can exclude defaults', async () => { + source.subscribedToByDefault = true; + const state = checksumState({ syncRequest: { streams: { include_defaults: false, subscriptions: [] } } }); + + const line = await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + })!; + line?.advance(); + expect(line?.checkpointLine).toEqual({ + checkpoint: { + buckets: [], + last_op_id: '1', + write_checkpoint: undefined, + streams: [] + } + }); + }); + + test('custom subscriptions', async () => { + source.subscribedToByDefault = true; + + const state = checksumState({ + syncRequest: { + streams: { + subscriptions: [ + { stream: 'stream', parameters: { ids: '["a"]' }, override_priority: null }, + { stream: 'stream', parameters: { ids: '["b"]' }, override_priority: 1 } + ] + } + } + }); + + const line = await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + })!; + line?.advance(); + expect(line?.checkpointLine).toEqual({ + checkpoint: { + buckets: [ + { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] }, + { bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } + ], + last_op_id: '1', + write_checkpoint: undefined, + streams: [{ name: 'stream', is_default: true, errors: [] }] + } + }); + }); + + test('overlap between custom subscriptions', async () => { + const state = checksumState({ + syncRequest: { + streams: { + subscriptions: [ + { stream: 'stream', parameters: { ids: '["a", "b"]' }, override_priority: null }, + { stream: 'stream', parameters: { ids: '["b"]' }, override_priority: 1 } + ] + } + } + }); + + const line = await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + })!; + line?.advance(); + expect(line?.checkpointLine).toEqual({ + checkpoint: { + buckets: [ + { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] } + ], + last_op_id: '1', + write_checkpoint: undefined, + streams: [{ name: 'stream', is_default: false, errors: [] }] + } + }); + }); + + test('overlap between default and custom subscription', async () => { + source.subscribedToByDefault = true; + const state = checksumState({ + syncRequest: { + streams: { + subscriptions: [{ stream: 'stream', parameters: { ids: '["a", "default"]' }, override_priority: 1 }] + } + } + }); + + const line = await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + })!; + line?.advance(); + expect(line?.checkpointLine).toEqual({ + checkpoint: { + buckets: [ + { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { + bucket: 'stream["default"]', + checksum: 1, + count: 1, + priority: 1, + subscriptions: [{ sub: 0 }, { default: 0 }] + } + ], + last_op_id: '1', + write_checkpoint: undefined, + streams: [{ name: 'stream', is_default: true, errors: [] }] + } + }); + }); + + test('reports errors', async () => { + source.subscribedToByDefault = true; + + const state = checksumState({ + syncRequest: { + streams: { + subscriptions: [ + { stream: 'stream', parameters: { ids: '["a", "b"]' }, override_priority: 1 }, + { stream: 'stream', parameters: { ids: 'invalid json' }, override_priority: null } + ] + } + } + }); + + const line = await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + })!; + line?.advance(); + expect(line?.checkpointLine).toEqual({ + checkpoint: { + buckets: [ + { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { + bucket: 'stream["default"]', + checksum: 1, + count: 1, + priority: 3, + subscriptions: [{ default: 0 }] + } + ], + last_op_id: '1', + write_checkpoint: undefined, + streams: [ + { + name: 'stream', + is_default: true, + errors: [ + { + message: 'Error evaluating bucket ids: Unexpected token \'i\', "invalid json" is not valid JSON', + subscription: 1 + } + ] + } + ] + } + }); + }); + }); }); class MockBucketChecksumStateStorage implements BucketChecksumStateStorage { diff --git a/packages/sync-rules/src/BucketDescription.ts b/packages/sync-rules/src/BucketDescription.ts index dbaec0da6..8dd732f34 100644 --- a/packages/sync-rules/src/BucketDescription.ts +++ b/packages/sync-rules/src/BucketDescription.ts @@ -30,3 +30,19 @@ export interface BucketDescription { */ priority: BucketPriority; } + +/** + * A bucket that was resolved to a specific request including stream subscriptions. + * + * This includes information on why the bucket has been included in a checkpoint subset + * shown to clients. + */ +export interface ResolvedBucket extends BucketDescription { + /** + * The name of the sync rule or stream definition from which the bucket is derived. + */ + definition: string; + inclusion_reasons: BucketInclusionReason[]; +} + +export type BucketInclusionReason = 'default' | { subscription: number }; diff --git a/packages/sync-rules/src/BucketParameterQuerier.ts b/packages/sync-rules/src/BucketParameterQuerier.ts index 8a21ac4be..8842c6d7c 100644 --- a/packages/sync-rules/src/BucketParameterQuerier.ts +++ b/packages/sync-rules/src/BucketParameterQuerier.ts @@ -1,4 +1,5 @@ -import { BucketDescription } from './BucketDescription.js'; +import { BucketDescription, ResolvedBucket } from './BucketDescription.js'; +import { RequestedStream } from './SqlSyncRules.js'; import { RequestParameters, SqliteJsonRow, SqliteJsonValue } from './types.js'; import { normalizeParameterValue } from './utils.js'; @@ -14,7 +15,7 @@ export interface BucketParameterQuerier { * select request.user_id() as user_id() * select value as project_id from json_each(request.jwt() -> 'project_ids') */ - readonly staticBuckets: BucketDescription[]; + readonly staticBuckets: ResolvedBucket[]; /** * True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used. @@ -36,7 +37,24 @@ export interface BucketParameterQuerier { * * select id as user_id from users where users.id = request.user_id() */ - queryDynamicBucketDescriptions(source: ParameterLookupSource): Promise; + queryDynamicBucketDescriptions(source: ParameterLookupSource): Promise; +} + +/** + * An error that occurred while trying to resolve the bucket ids a request should have access to. + * + * A common scenario that could cause this to happen is when parameters need to have a certain structure. For instance, + * `... WHERE id IN stream.parameters -> 'ids'` is unresolvable when `ids` is not set to a JSON array. + */ +export interface QuerierError { + descriptor: string; + subscription?: RequestedStream; + message: string; +} + +export interface PendingQueriers { + queriers: BucketParameterQuerier[]; + errors: QuerierError[]; } export interface ParameterLookupSource { @@ -54,7 +72,7 @@ export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]) hasDynamicBuckets: parameterQueryLookups.length > 0, parameterQueryLookups: parameterQueryLookups, async queryDynamicBucketDescriptions(source: ParameterLookupSource) { - let results: BucketDescription[] = []; + let results: ResolvedBucket[] = []; for (let q of queriers) { if (q.hasDynamicBuckets) { results.push(...(await q.queryDynamicBucketDescriptions(source))); diff --git a/packages/sync-rules/src/BucketSource.ts b/packages/sync-rules/src/BucketSource.ts new file mode 100644 index 000000000..6131d78d7 --- /dev/null +++ b/packages/sync-rules/src/BucketSource.ts @@ -0,0 +1,81 @@ +import { BucketParameterQuerier, ParameterLookup, PendingQueriers } from './BucketParameterQuerier.js'; +import { ColumnDefinition } from './ExpressionType.js'; +import { SourceTableInterface } from './SourceTableInterface.js'; +import { GetQuerierOptions } from './SqlSyncRules.js'; +import { TablePattern } from './TablePattern.js'; +import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js'; + +/** + * An interface declaring + * + * - which buckets the sync service should create when processing change streams from the database. + * - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns). + * - which buckets a given connection has access to. + * + * There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream + * definitions that only consist of a single query. + */ +export interface BucketSource { + readonly name: string; + readonly type: BucketSourceType; + + readonly subscribedToByDefault: boolean; + + /** + * Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should + * be associated with. + * + * The returned {@link ParameterLookup} can be referenced by {@link pushBucketParameterQueriers} to allow the storage + * system to find buckets. + */ + evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[]; + + /** + * Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed + * data for rows to add to buckets. + */ + evaluateRow(options: EvaluateRowOptions): EvaluationResult[]; + + /** + * Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to. + * + * @param result The target array to insert queriers and errors into. + * @param options Options, including parameters that may affect the buckets loaded by this source. + */ + pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void; + + /** + * Whether {@link pushBucketParameterQueriers} may include a querier where + * {@link BucketParameterQuerier.hasDynamicBuckets} is true. + * + * This is mostly used for testing. + */ + hasDynamicBucketQueries(): boolean; + + getSourceTables(): Set; + + /** Whether the table possibly affects the buckets resolved by this source. */ + tableSyncsParameters(table: SourceTableInterface): boolean; + + /** Whether the table possibly affects the contents of buckets resolved by this source. */ + tableSyncsData(table: SourceTableInterface): boolean; + + /** + * Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this + * source. + * + * This is use to generate the client-side schema. + */ + resolveResultSets(schema: SourceSchema, tables: Record>): void; + + debugWriteOutputTables(result: Record): void; + + debugRepresentation(): any; +} + +export enum BucketSourceType { + SYNC_RULE, + SYNC_STREAM +} + +export type ResultSetDescription = { name: string; columns: ColumnDefinition[] }; diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index 95b2b4a10..66ae8440c 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -1,10 +1,12 @@ -import { BucketDescription } from './BucketDescription.js'; -import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js'; +import { BucketInclusionReason, ResolvedBucket } from './BucketDescription.js'; +import { BucketParameterQuerier, mergeBucketParameterQueriers, PendingQueriers } from './BucketParameterQuerier.js'; +import { BucketSource, BucketSourceType, ResultSetDescription } from './BucketSource.js'; +import { ColumnDefinition } from './ExpressionType.js'; import { IdSequence } from './IdSequence.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { SqlDataQuery } from './SqlDataQuery.js'; import { SqlParameterQuery } from './SqlParameterQuery.js'; -import { SyncRulesOptions } from './SqlSyncRules.js'; +import { GetQuerierOptions, SyncRulesOptions } from './SqlSyncRules.js'; import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js'; import { TablePattern } from './TablePattern.js'; import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js'; @@ -15,6 +17,7 @@ import { EvaluationResult, QueryParseOptions, RequestParameters, + SourceSchema, SqliteRow } from './types.js'; @@ -27,7 +30,7 @@ export interface QueryParseResult { errors: SqlRuleError[]; } -export class SqlBucketDescriptor { +export class SqlBucketDescriptor implements BucketSource { name: string; bucketParameters?: string[]; @@ -35,6 +38,14 @@ export class SqlBucketDescriptor { this.name = name; } + get type(): BucketSourceType { + return BucketSourceType.SYNC_RULE; + } + + public get subscribedToByDefault(): boolean { + return true; + } + /** * source table -> queries */ @@ -103,27 +114,47 @@ export class SqlBucketDescriptor { return results; } - getBucketParameterQuerier(parameters: RequestParameters): BucketParameterQuerier { - const staticBuckets = this.getStaticBucketDescriptions(parameters); + /** + * @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level. + */ + getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier { + const queriers: BucketParameterQuerier[] = []; + this.pushBucketParameterQueriers({ queriers, errors: [] }, options); + + return mergeBucketParameterQueriers(queriers); + } + + pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions) { + const reasons = [this.bucketInclusionReason()]; + const staticBuckets = this.getStaticBucketDescriptions(options.globalParameters, reasons); const staticQuerier = { staticBuckets, hasDynamicBuckets: false, parameterQueryLookups: [], queryDynamicBucketDescriptions: async () => [] } satisfies BucketParameterQuerier; + result.queriers.push(staticQuerier); if (this.parameterQueries.length == 0) { - return staticQuerier; + return; } - const dynamicQueriers = this.parameterQueries.map((query) => query.getBucketParameterQuerier(parameters)); - return mergeBucketParameterQueriers([staticQuerier, ...dynamicQueriers]); + const dynamicQueriers = this.parameterQueries.map((query) => + query.getBucketParameterQuerier(options.globalParameters, reasons) + ); + result.queriers.push(...dynamicQueriers); } - getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] { - let results: BucketDescription[] = []; + getStaticBucketDescriptions(parameters: RequestParameters, reasons: BucketInclusionReason[]): ResolvedBucket[] { + let results: ResolvedBucket[] = []; for (let query of this.globalParameterQueries) { - results.push(...query.getStaticBucketDescriptions(parameters)); + for (const desc of query.getStaticBucketDescriptions(parameters)) { + results.push({ + ...desc, + definition: this.name, + inclusion_reasons: reasons + }); + } } return results; } @@ -146,6 +177,10 @@ export class SqlBucketDescriptor { return result; } + private bucketInclusionReason(): BucketInclusionReason { + return 'default'; + } + tableSyncsData(table: SourceTableInterface): boolean { for (let query of this.dataQueries) { if (query.applies(table)) { @@ -163,4 +198,58 @@ export class SqlBucketDescriptor { } return false; } + + resolveResultSets(schema: SourceSchema, tables: Record>) { + for (let query of this.dataQueries) { + const outTables = query.getColumnOutputs(schema); + for (let table of outTables) { + tables[table.name] ??= {}; + for (let column of table.columns) { + if (column.name != 'id') { + tables[table.name][column.name] ??= column; + } + } + } + } + } + + debugWriteOutputTables(result: Record): void { + for (let q of this.dataQueries) { + result[q.table!] ??= []; + const r = { + query: q.sql + }; + + result[q.table!].push(r); + } + } + + debugRepresentation() { + let all_parameter_queries = [...this.parameterQueries.values()].flat(); + let all_data_queries = [...this.dataQueries.values()].flat(); + return { + name: this.name, + type: this.type.toString(), + bucket_parameters: this.bucketParameters, + global_parameter_queries: this.globalParameterQueries.map((q) => { + return { + sql: q.sql + }; + }), + parameter_queries: all_parameter_queries.map((q) => { + return { + sql: q.sql, + table: q.sourceTable, + input_parameters: q.inputParameters + }; + }), + data_queries: all_data_queries.map((q) => { + return { + sql: q.sql, + table: q.sourceTable, + columns: q.columnOutputNames() + }; + }) + }; + } } diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index 85b6dfefc..37bf7e11a 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -1,5 +1,10 @@ import { parse, SelectedColumn } from 'pgsql-ast-parser'; -import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY } from './BucketDescription.js'; +import { + BucketDescription, + BucketInclusionReason, + BucketPriority, + DEFAULT_BUCKET_PRIORITY +} from './BucketDescription.js'; import { BucketParameterQuerier, ParameterLookup, ParameterLookupSource } from './BucketParameterQuerier.js'; import { SqlRuleError } from './errors.js'; import { SourceTableInterface } from './SourceTableInterface.js'; @@ -450,7 +455,10 @@ export class SqlParameterQuery { } } - getBucketParameterQuerier(requestParameters: RequestParameters): BucketParameterQuerier { + getBucketParameterQuerier( + requestParameters: RequestParameters, + reasons: BucketInclusionReason[] + ): BucketParameterQuerier { const lookups = this.getLookups(requestParameters); if (lookups.length == 0) { // This typically happens when the query is pre-filtered using a where clause @@ -469,7 +477,11 @@ export class SqlParameterQuery { parameterQueryLookups: lookups, queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => { const bucketParameters = await source.getParameterSets(lookups); - return this.resolveBucketDescriptions(bucketParameters, requestParameters); + return this.resolveBucketDescriptions(bucketParameters, requestParameters).map((bucket) => ({ + ...bucket, + definition: this.descriptorName, + inclusion_reasons: reasons + })); } }; } diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index a77933630..fdf9cc5cf 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -1,9 +1,8 @@ import { isScalar, LineCounter, parseDocument, Scalar, YAMLMap, YAMLSeq } from 'yaml'; -import { BucketPriority, isValidPriority } from './BucketDescription.js'; -import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js'; +import { isValidPriority } from './BucketDescription.js'; +import { BucketParameterQuerier, mergeBucketParameterQueriers, QuerierError } from './BucketParameterQuerier.js'; import { SqlRuleError, SyncRulesErrors, YamlError } from './errors.js'; import { SqlEventDescriptor } from './events/SqlEventDescriptor.js'; -import { IdSequence } from './IdSequence.js'; import { validateSyncRulesSchema } from './json_schema.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js'; @@ -21,9 +20,11 @@ import { QueryParseOptions, RequestParameters, SourceSchema, + SqliteJsonRow, SqliteRow, SyncRules } from './types.js'; +import { BucketSource } from './BucketSource.js'; const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); @@ -39,8 +40,47 @@ export interface SyncRulesOptions { throwOnError?: boolean; } +export interface RequestedStream { + /** + * The parameters for the explicit stream subscription. + * + * Unlike {@link GetQuerierOptions.globalParameters}, these parameters are only applied to the particular stream. + */ + parameters: SqliteJsonRow | null; + + /** + * An opaque id of the stream subscription, used to associate buckets with the stream subscriptions that have caused + * them to be included. + */ + opaque_id: number; +} + +export interface GetQuerierOptions { + globalParameters: RequestParameters; + /** + * Whether the client is subscribing to default query streams. + * + * Client do this by default, but can disable the behavior if needed. + */ + hasDefaultStreams: boolean; + /** + * + * For streams, this is invoked to check whether the client has opened the relevant stream. + * + * @param name The name of the stream as it appears in the sync rule definitions. + * @returns If the strema has been opened by the client, the stream parameters for that particular stream. Otherwise + * null. + */ + streams: Record; +} + +export interface GetBucketParameterQuerierResult { + querier: BucketParameterQuerier; + errors: QuerierError[]; +} + export class SqlSyncRules implements SyncRules { - bucketDescriptors: SqlBucketDescriptor[] = []; + bucketSources: BucketSource[] = []; eventDescriptors: SqlEventDescriptor[] = []; content: string; @@ -95,7 +135,19 @@ export class SqlSyncRules implements SyncRules { return rules; } + // Bucket definitions using explicit parameter and data queries. const bucketMap = parsed.get('bucket_definitions') as YAMLMap; + const definitionNames = new Set(); + const checkUniqueName = (name: string, literal: Scalar) => { + if (definitionNames.has(name)) { + rules.errors.push(this.tokenError(literal, 'Duplicate stream or bucket definition.')); + return false; + } + + definitionNames.add(name); + return true; + }; + if (bucketMap == null) { rules.errors.push(new YamlError(new Error(`'bucket_definitions' is required`))); @@ -105,9 +157,12 @@ export class SqlSyncRules implements SyncRules { return rules; } - for (let entry of bucketMap.items) { + for (let entry of bucketMap?.items ?? []) { const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap }; const key = keyScalar.toString(); + if (!checkUniqueName(key, keyScalar)) { + continue; + } if (value == null || !(value instanceof YAMLMap)) { rules.errors.push(this.tokenError(keyScalar, `'${key}' bucket definition must be an object`)); @@ -116,17 +171,7 @@ export class SqlSyncRules implements SyncRules { const accept_potentially_dangerous_queries = value.get('accept_potentially_dangerous_queries', true)?.value == true; - let parseOptionPriority: BucketPriority | undefined = undefined; - if (value.has('priority')) { - const priorityValue = value.get('priority', true)!; - if (typeof priorityValue.value != 'number' || !isValidPriority(priorityValue.value)) { - rules.errors.push( - this.tokenError(priorityValue, 'Invalid priority, expected a number between 0 and 3 (inclusive).') - ); - } else { - parseOptionPriority = priorityValue.value; - } - } + const parseOptionPriority = rules.parsePriority(value); const queryOptions: QueryParseOptions = { ...options, @@ -161,7 +206,7 @@ export class SqlSyncRules implements SyncRules { return descriptor.addDataQuery(q, queryOptions); }); } - rules.bucketDescriptors.push(descriptor); + rules.bucketSources.push(descriptor); } const eventMap = parsed.get('event_definitions') as YAMLMap; @@ -280,8 +325,8 @@ export class SqlSyncRules implements SyncRules { evaluateRowWithErrors(options: EvaluateRowOptions): { results: EvaluatedRow[]; errors: EvaluationError[] } { let rawResults: EvaluationResult[] = []; - for (let query of this.bucketDescriptors) { - rawResults.push(...query.evaluateRow(options)); + for (let source of this.bucketSources) { + rawResults.push(...source.evaluateRow(options)); } const results = rawResults.filter(isEvaluatedRow) as EvaluatedRow[]; @@ -306,8 +351,8 @@ export class SqlSyncRules implements SyncRules { row: SqliteRow ): { results: EvaluatedParameters[]; errors: EvaluationError[] } { let rawResults: EvaluatedParametersResult[] = []; - for (let query of this.bucketDescriptors) { - rawResults.push(...query.evaluateParameterRow(table, row)); + for (let source of this.bucketSources) { + rawResults.push(...source.evaluateParameterRow(table, row)); } const results = rawResults.filter(isEvaluatedParameters) as EvaluatedParameters[]; @@ -315,18 +360,28 @@ export class SqlSyncRules implements SyncRules { return { results, errors }; } - getBucketParameterQuerier(parameters: RequestParameters): BucketParameterQuerier { - const queriers = this.bucketDescriptors.map((query) => query.getBucketParameterQuerier(parameters)); - return mergeBucketParameterQueriers(queriers); + getBucketParameterQuerier(options: GetQuerierOptions): GetBucketParameterQuerierResult { + const queriers: BucketParameterQuerier[] = []; + const errors: QuerierError[] = []; + const pending = { queriers, errors }; + + for (const source of this.bucketSources) { + if ((source.subscribedToByDefault && options.hasDefaultStreams) || source.name in options.streams) { + source.pushBucketParameterQueriers(pending, options); + } + } + + const querier = mergeBucketParameterQueriers(queriers); + return { querier, errors }; } hasDynamicBucketQueries() { - return this.bucketDescriptors.some((query) => query.hasDynamicBucketQueries()); + return this.bucketSources.some((s) => s.hasDynamicBucketQueries()); } getSourceTables(): TablePattern[] { const sourceTables = new Map(); - for (const bucket of this.bucketDescriptors) { + for (const bucket of this.bucketSources) { for (const r of bucket.getSourceTables()) { const key = `${r.connectionTag}.${r.schema}.${r.tablePattern}`; sourceTables.set(key, r); @@ -360,35 +415,31 @@ export class SqlSyncRules implements SyncRules { } tableSyncsData(table: SourceTableInterface): boolean { - for (const bucket of this.bucketDescriptors) { - if (bucket.tableSyncsData(table)) { - return true; - } - } - return false; + return this.bucketSources.some((b) => b.tableSyncsData(table)); } tableSyncsParameters(table: SourceTableInterface): boolean { - for (let bucket of this.bucketDescriptors) { - if (bucket.tableSyncsParameters(table)) { - return true; - } - } - return false; + return this.bucketSources.some((b) => b.tableSyncsParameters(table)); } debugGetOutputTables() { let result: Record = {}; - for (let bucket of this.bucketDescriptors) { - for (let q of bucket.dataQueries) { - result[q.table!] ??= []; - const r = { - query: q.sql - }; + for (let bucket of this.bucketSources) { + bucket.debugWriteOutputTables(result); + } + return result; + } - result[q.table!].push(r); + private parsePriority(value: YAMLMap) { + if (value.has('priority')) { + const priorityValue = value.get('priority', true)!; + if (typeof priorityValue.value != 'number' || !isValidPriority(priorityValue.value)) { + this.errors.push( + SqlSyncRules.tokenError(priorityValue, 'Invalid priority, expected a number between 0 and 3 (inclusive).') + ); + } else { + return priorityValue.value; } } - return result; } } diff --git a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts index 57a5451a9..a19930af3 100644 --- a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts +++ b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts @@ -222,9 +222,7 @@ export class TableValuedFunctionSqlParameterQuery { private getIndividualBucketDescription(row: SqliteRow, parameters: RequestParameters): BucketDescription | null { const mergedParams: ParameterValueSet = { - rawTokenPayload: parameters.rawTokenPayload, - rawUserParameters: parameters.rawUserParameters, - userId: parameters.userId, + ...parameters, lookup: (table, column) => { if (table == this.callTableName) { return row[column]!; diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index 79e3220d9..cac81ace5 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -1,5 +1,6 @@ export * from './BucketDescription.js'; export * from './BucketParameterQuerier.js'; +export * from './BucketSource.js'; export * from './errors.js'; export * from './events/SqlEventDescriptor.js'; export * from './events/SqlEventSourceQuery.js'; diff --git a/packages/sync-rules/src/schema-generators/SchemaGenerator.ts b/packages/sync-rules/src/schema-generators/SchemaGenerator.ts index 4b6541ba9..cde9585b2 100644 --- a/packages/sync-rules/src/schema-generators/SchemaGenerator.ts +++ b/packages/sync-rules/src/schema-generators/SchemaGenerator.ts @@ -10,18 +10,8 @@ export abstract class SchemaGenerator { protected getAllTables(source: SqlSyncRules, schema: SourceSchema) { let tables: Record> = {}; - for (let descriptor of source.bucketDescriptors) { - for (let query of descriptor.dataQueries) { - const outTables = query.getColumnOutputs(schema); - for (let table of outTables) { - tables[table.name] ??= {}; - for (let column of table.columns) { - if (column.name != 'id') { - tables[table.name][column.name] ??= column; - } - } - } - } + for (let descriptor of source.bucketSources) { + descriptor.resolveResultSets(schema, tables); } return Object.entries(tables).map(([name, columns]) => { diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index ac6d9035a..91b23a265 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -81,6 +81,11 @@ export interface ParameterValueSet { */ rawUserParameters: string; + /** + * For streams, the raw JSON string of stream parameters. + */ + rawStreamParameters: string | null; + /** * JSON string of raw request parameters. */ @@ -98,6 +103,8 @@ export class RequestParameters implements ParameterValueSet { */ rawUserParameters: string; + rawStreamParameters: string | null; + /** * JSON string of raw request parameters. */ @@ -121,6 +128,7 @@ export class RequestParameters implements ParameterValueSet { this.rawUserParameters = JSONBig.stringify(clientParameters); this.userParameters = toSyncRulesParameters(clientParameters); + this.rawStreamParameters = null; } lookup(table: string, column: string): SqliteJsonValue { @@ -131,6 +139,13 @@ export class RequestParameters implements ParameterValueSet { } throw new Error(`Unknown table: ${table}`); } + + withAddedStreamParameters(params: Record): RequestParameters { + const clone = structuredClone(this); + clone.rawStreamParameters = JSONBig.stringify(params); + + return clone; + } } /** diff --git a/packages/sync-rules/test/src/sync_rules.test.ts b/packages/sync-rules/test/src/sync_rules.test.ts index 91f3357ac..eb92f85a8 100644 --- a/packages/sync-rules/test/src/sync_rules.test.ts +++ b/packages/sync-rules/test/src/sync_rules.test.ts @@ -1,12 +1,21 @@ import { describe, expect, test } from 'vitest'; import { ParameterLookup, SqlSyncRules } from '../../src/index.js'; -import { ASSETS, BASIC_SCHEMA, PARSE_OPTIONS, TestSourceTable, USERS, normalizeTokenParameters } from './util.js'; +import { + ASSETS, + BASIC_SCHEMA, + PARSE_OPTIONS, + TestSourceTable, + USERS, + normalizeQuerierOptions, + normalizeTokenParameters +} from './util.js'; +import { SqlBucketDescriptor } from '../../src/SqlBucketDescriptor.js'; describe('sync rules', () => { test('parse empty sync rules', () => { const rules = SqlSyncRules.fromYaml('bucket_definitions: {}', PARSE_OPTIONS); - expect(rules.bucketDescriptors).toEqual([]); + expect(rules.bucketSources).toEqual([]); }); test('parse global sync rules', () => { @@ -19,7 +28,7 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.name).toEqual('mybucket'); expect(bucket.bucketParameters).toEqual([]); const dataQuery = bucket.dataQueries[0]; @@ -37,7 +46,7 @@ bucket_definitions: } ]); expect(rules.hasDynamicBucketQueries()).toBe(false); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({}))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({})).querier).toMatchObject({ staticBuckets: [{ bucket: 'mybucket[]', priority: 3 }], hasDynamicBuckets: false }); @@ -53,7 +62,7 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual([]); const param_query = bucket.globalParameterQueries[0]; @@ -61,15 +70,15 @@ bucket_definitions: expect(param_query.filter!.lookupParameterValue(normalizeTokenParameters({ is_admin: 1n }))).toEqual(1n); expect(param_query.filter!.lookupParameterValue(normalizeTokenParameters({ is_admin: 0n }))).toEqual(0n); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ is_admin: true }))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ is_admin: true })).querier).toMatchObject({ staticBuckets: [{ bucket: 'mybucket[]', priority: 3 }], hasDynamicBuckets: false }); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ is_admin: false }))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ is_admin: false })).querier).toMatchObject({ staticBuckets: [], hasDynamicBuckets: false }); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({}))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({})).querier).toMatchObject({ staticBuckets: [], hasDynamicBuckets: false }); @@ -85,7 +94,7 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual([]); const param_query = bucket.parameterQueries[0]; expect(param_query.bucketParameters).toEqual([]); @@ -109,14 +118,16 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual(['user_id', 'device_id']); const param_query = bucket.globalParameterQueries[0]; expect(param_query.bucketParameters).toEqual(['user_id', 'device_id']); expect( - rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'user1' }, { device_id: 'device1' })) + rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' }, { device_id: 'device1' })).querier .staticBuckets - ).toEqual([{ bucket: 'mybucket["user1","device1"]', priority: 3 }]); + ).toEqual([ + { bucket: 'mybucket["user1","device1"]', definition: 'mybucket', inclusion_reasons: ['default'], priority: 3 } + ]); const data_query = bucket.dataQueries[0]; expect(data_query.bucketParameters).toEqual(['user_id', 'device_id']); @@ -155,13 +166,13 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual(['user_id']); const param_query = bucket.globalParameterQueries[0]; expect(param_query.bucketParameters).toEqual(['user_id']); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'user1' })).staticBuckets).toEqual([ - { bucket: 'mybucket["user1"]', priority: 3 } - ]); + expect( + rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier.staticBuckets + ).toEqual([{ bucket: 'mybucket["user1"]', definition: 'mybucket', inclusion_reasons: ['default'], priority: 3 }]); const data_query = bucket.dataQueries[0]; expect(data_query.bucketParameters).toEqual(['user_id']); @@ -299,9 +310,9 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual(['user_id']); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'user1' }))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier).toMatchObject({ staticBuckets: [{ bucket: 'mybucket["USER1"]', priority: 3 }], hasDynamicBuckets: false }); @@ -336,9 +347,9 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual(['user_id']); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'user1' }))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier).toMatchObject({ staticBuckets: [{ bucket: 'mybucket["USER1"]', priority: 3 }], hasDynamicBuckets: false }); @@ -503,8 +514,8 @@ bucket_definitions: } ]); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ is_admin: true })).staticBuckets).toEqual([ - { bucket: 'mybucket[1]', priority: 3 } + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ is_admin: true })).querier.staticBuckets).toEqual([ + { bucket: 'mybucket[1]', definition: 'mybucket', inclusion_reasons: ['default'], priority: 3 } ]); }); @@ -546,7 +557,7 @@ bucket_definitions: PARSE_OPTIONS ); expect( - rules.getBucketParameterQuerier(normalizeTokenParameters({ int1: 314, float1: 3.14, float2: 314 })) + rules.getBucketParameterQuerier(normalizeQuerierOptions({ int1: 314, float1: 3.14, float2: 314 })).querier ).toMatchObject({ staticBuckets: [{ bucket: 'mybucket[314,3.14,314]', priority: 3 }] }); expect( @@ -574,7 +585,7 @@ bucket_definitions: PARSE_OPTIONS ); expect(rules.errors).toEqual([]); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'test' }))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'test' })).querier).toMatchObject({ staticBuckets: [{ bucket: 'mybucket["TEST"]', priority: 3 }], hasDynamicBuckets: false }); @@ -827,7 +838,7 @@ bucket_definitions: expect(rules.errors).toEqual([]); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({}))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({})).querier).toMatchObject({ staticBuckets: [ { bucket: 'highprio[]', priority: 0 }, { bucket: 'defaultprio[]', priority: 3 } @@ -852,7 +863,7 @@ bucket_definitions: expect(rules.errors).toEqual([]); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({}))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({})).querier).toMatchObject({ staticBuckets: [ { bucket: 'highprio[]', priority: 0 }, { bucket: 'defaultprio[]', priority: 3 } @@ -913,11 +924,11 @@ bucket_definitions: `, PARSE_OPTIONS ); - const bucket = rules.bucketDescriptors[0]; + const bucket = rules.bucketSources[0] as SqlBucketDescriptor; expect(bucket.bucketParameters).toEqual(['user_id']); expect(rules.hasDynamicBucketQueries()).toBe(true); - expect(rules.getBucketParameterQuerier(normalizeTokenParameters({ user_id: 'user1' }))).toMatchObject({ + expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier).toMatchObject({ hasDynamicBuckets: true, parameterQueryLookups: [ ParameterLookup.normalized('mybucket', '2', ['user1']), diff --git a/packages/sync-rules/test/src/util.ts b/packages/sync-rules/test/src/util.ts index b780cd0c2..3c098a9b0 100644 --- a/packages/sync-rules/test/src/util.ts +++ b/packages/sync-rules/test/src/util.ts @@ -1,5 +1,6 @@ import { DEFAULT_TAG, + GetQuerierOptions, RequestJwtPayload, RequestParameters, SourceTableInterface, @@ -60,3 +61,15 @@ export function normalizeTokenParameters( delete tokenPayload.parameters.user_id; return new RequestParameters(tokenPayload, user_parameters ?? {}); } + +export function normalizeQuerierOptions( + token_parameters: Record, + user_parameters?: Record +): GetQuerierOptions { + const globalParameters = normalizeTokenParameters(token_parameters, user_parameters); + return { + globalParameters, + hasDefaultStreams: true, + streams: {} + }; +}