diff --git a/.changeset/stupid-lizards-vanish.md b/.changeset/stupid-lizards-vanish.md new file mode 100644 index 000000000..694d1b722 --- /dev/null +++ b/.changeset/stupid-lizards-vanish.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-sync-rules': patch +--- + +Sync streams: Fix `auth.parameter()` to use top-level parameters instead of the nested `parameters` object that the legacy `token_parameters` table uses. diff --git a/packages/sync-rules/src/streams/functions.ts b/packages/sync-rules/src/streams/functions.ts index cc81dc1de..3c06232da 100644 --- a/packages/sync-rules/src/streams/functions.ts +++ b/packages/sync-rules/src/streams/functions.ts @@ -34,7 +34,7 @@ export const STREAM_FUNCTIONS: Record) { if (tokenPayload instanceof RequestParameters) { - this.tokenParameters = tokenPayload.tokenParameters; + this.parsedTokenPayload = tokenPayload.parsedTokenPayload; + this.legacyTokenParameters = tokenPayload.legacyTokenParameters; this.userParameters = tokenPayload.userParameters; this.rawUserParameters = tokenPayload.rawUserParameters; this.rawTokenPayload = tokenPayload.rawTokenPayload; @@ -149,7 +152,11 @@ export class RequestParameters implements ParameterValueSet { // Client and token parameters don't contain DateTime values or other custom types, so we don't need to consider // compatibility. - this.tokenParameters = toSyncRulesParameters(tokenParameters, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + this.parsedTokenPayload = tokenPayload; + this.legacyTokenParameters = toSyncRulesParameters( + tokenParameters, + CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY + ); this.userId = tokenPayload.sub; this.rawTokenPayload = JSONBig.stringify(tokenPayload); @@ -161,7 +168,7 @@ export class RequestParameters implements ParameterValueSet { lookup(table: string, column: string): SqliteJsonValue { if (table == 'token_parameters') { - return this.tokenParameters[column]; + return this.legacyTokenParameters[column]; } else if (table == 'user_parameters') { return this.userParameters[column]; } else if (table == 'subscription_parameters' && this.streamParameters != null) { diff --git a/packages/sync-rules/test/src/streams.test.ts b/packages/sync-rules/test/src/streams.test.ts index cad559439..ec30f6585 100644 --- a/packages/sync-rules/test/src/streams.test.ts +++ b/packages/sync-rules/test/src/streams.test.ts @@ -6,9 +6,11 @@ import { CompatibilityEdition, DEFAULT_TAG, GetBucketParameterQuerierResult, + GetQuerierOptions, mergeBucketParameterQueriers, ParameterLookup, QuerierError, + RequestParameters, SourceTableInterface, SqliteJsonRow, SqliteRow, @@ -63,6 +65,36 @@ describe('streams', () => { ]); }); + test('legacy token parameter', async () => { + const desc = parseStream(`SELECT * FROM issues WHERE owner_id = auth.parameter('$.parameters.test')`); + + const queriers: BucketParameterQuerier[] = []; + const errors: QuerierError[] = []; + const pending = { queriers, errors }; + desc.pushBucketParameterQueriers( + pending, + normalizeQuerierOptions( + { test: 'foo' }, + {}, + { stream: [{ opaque_id: 0, parameters: null }] }, + bucketIdTransformer + ) + ); + + expect(mergeBucketParameterQueriers(queriers).staticBuckets).toEqual([ + { + bucket: '1#stream|0["foo"]', + definition: 'stream', + inclusion_reasons: [ + { + subscription: 0 + } + ], + priority: 3 + } + ]); + }); + describe('or', () => { test('parameter match or request condition', async () => { const desc = parseStream("SELECT * FROM issues WHERE owner_id = auth.user_id() OR auth.parameter('is_admin')"); @@ -74,8 +106,8 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { - user_id: 'u1', + token: { + sub: 'u1', is_admin: false } }) @@ -83,8 +115,8 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { - user_id: 'u1', + token: { + sub: 'u1', is_admin: true } }) @@ -103,8 +135,8 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { - user_id: 'u1' + token: { + sub: 'u1' } }) ).toStrictEqual(['1#stream|0["u1"]', '1#stream|1[]']); @@ -121,14 +153,14 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { + token: { is_admin: false } }) ).toStrictEqual(['1#stream|0[]']); expect( await queryBucketIds(desc, { - token_parameters: { + token: { is_admin: true } }) @@ -162,7 +194,7 @@ describe('streams', () => { expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'whatever]' })).toStrictEqual(['1#stream|0[]']); expect( await queryBucketIds(desc, { - token_parameters: { + token: { a: false, b: false } @@ -170,7 +202,7 @@ describe('streams', () => { ).toStrictEqual([]); expect( await queryBucketIds(desc, { - token_parameters: { + token: { a: true, b: false } @@ -204,12 +236,13 @@ describe('streams', () => { return [{ result: 'i1' }]; } - expect( - await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: false }, getParameterSets }) - ).toStrictEqual(['1#stream|0["i1"]']); - expect( - await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: true }, getParameterSets }) - ).toStrictEqual(['1#stream|1[]', '1#stream|0["i1"]']); + expect(await queryBucketIds(desc, { token: { sub: 'u1', is_admin: false }, getParameterSets })).toStrictEqual([ + '1#stream|0["i1"]' + ]); + expect(await queryBucketIds(desc, { token: { sub: 'u1', is_admin: true }, getParameterSets })).toStrictEqual([ + '1#stream|1[]', + '1#stream|0["i1"]' + ]); }); }); @@ -236,7 +269,7 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'user1' }, + token: { sub: 'user1' }, getParameterSets(lookups) { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); @@ -267,7 +300,7 @@ describe('streams', () => { // Should return bucket id for admin users expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'u' }, + token: { sub: 'u' }, getParameterSets: (lookups: ParameterLookup[]) => { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u'])]); return [{ result: 'u' }]; @@ -278,7 +311,7 @@ describe('streams', () => { // And not for others expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'u2' }, + token: { sub: 'u2' }, getParameterSets: (lookups: ParameterLookup[]) => { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u2'])]); return []; @@ -331,7 +364,7 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'a' }, + token: { sub: 'a' }, getParameterSets }) ).toStrictEqual(['1#stream|1["b"]']); @@ -343,7 +376,7 @@ describe('streams', () => { expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i' })).toStrictEqual(['1#stream|0["i"]']); expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'a' }, + token: { sub: 'a' }, parameters: { issue_id: ['i1', 'i2'] } }) ).toStrictEqual(['1#stream|0["i1"]', '1#stream|0["i2"]']); @@ -359,7 +392,7 @@ describe('streams', () => { ]); expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'a' }, + token: { sub: 'a' }, parameters: { labels: ['l1', 'l2'] }, getParameterSets(lookups) { expect(lookups).toHaveLength(1); @@ -401,7 +434,7 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'user1' }, + token: { sub: 'user1' }, getParameterSets(lookups) { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); @@ -504,7 +537,7 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'user1' }, + token: { sub: 'user1' }, getParameterSets(lookups) { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); @@ -559,7 +592,7 @@ describe('streams', () => { expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'user1' }, + token: { sub: 'user1' }, getParameterSets(lookups) { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); @@ -569,7 +602,7 @@ describe('streams', () => { ).toStrictEqual(['1#stream|0["issue_id"]']); expect( await queryBucketIds(desc, { - token_parameters: { user_id: 'user1', is_admin: true }, + token: { sub: 'user1', is_admin: true }, getParameterSets(lookups) { expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]); @@ -654,7 +687,7 @@ function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface async function createQueriers( stream: SyncStream, options?: { - token_parameters?: Record; + token?: Record; parameters?: Record; getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[]; } @@ -663,15 +696,20 @@ async function createQueriers( const errors: QuerierError[] = []; const pending = { queriers, errors }; - stream.pushBucketParameterQueriers( - pending, - normalizeQuerierOptions( - options?.token_parameters ?? {}, - {}, - { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] }, - bucketIdTransformer - ) - ); + const querierOptions: GetQuerierOptions = { + hasDefaultStreams: true, + globalParameters: new RequestParameters( + { + sub: 'test-user', + ...options?.token + }, + {} + ), + streams: { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] }, + bucketIdTransformer + }; + + stream.pushBucketParameterQueriers(pending, querierOptions); return { querier: mergeBucketParameterQueriers(queriers), errors }; } @@ -679,7 +717,7 @@ async function createQueriers( async function queryBucketIds( stream: SyncStream, options?: { - token_parameters?: Record; + token?: Record; parameters?: Record; getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[]; }