Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stupid-lizards-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-sync-rules': patch
---

Sync streams: Fix `auth.parameter()` to use top-level parameters instead of the nested `parameters` object that the legacy `token_parameters` table uses.
2 changes: 1 addition & 1 deletion packages/sync-rules/src/streams/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const STREAM_FUNCTIONS: Record<string, Record<string, SqlParameterFunctio
return v.rawTokenPayload;
},
extractJsonParsed: function (v: ParameterValueSet) {
return v.tokenParameters;
return v.parsedTokenPayload;
},
sourceDescription: 'JWT payload as JSON',
sourceDocumentation: 'JWT payload as a JSON string. This is always validated against trusted keys',
Expand Down
17 changes: 12 additions & 5 deletions packages/sync-rules/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ export interface ParameterValueSet {
* JSON string of raw request parameters.
*/
rawTokenPayload: string;
tokenParameters: SqliteJsonRow;
parsedTokenPayload: SqliteJsonRow;
legacyTokenParameters: SqliteJsonRow;

userId: string;
}

export class RequestParameters implements ParameterValueSet {
tokenParameters: SqliteJsonRow;
parsedTokenPayload: SqliteJsonRow;
legacyTokenParameters: SqliteJsonRow;
userParameters: SqliteJsonRow;

/**
Expand All @@ -128,7 +130,8 @@ export class RequestParameters implements ParameterValueSet {

constructor(tokenPayload: RequestJwtPayload | RequestParameters, clientParameters?: Record<string, any>) {
if (tokenPayload instanceof RequestParameters) {
this.tokenParameters = tokenPayload.tokenParameters;
this.parsedTokenPayload = tokenPayload.parsedTokenPayload;
this.legacyTokenParameters = tokenPayload.legacyTokenParameters;
this.userParameters = tokenPayload.userParameters;
this.rawUserParameters = tokenPayload.rawUserParameters;
this.rawTokenPayload = tokenPayload.rawTokenPayload;
Expand All @@ -149,7 +152,11 @@ export class RequestParameters implements ParameterValueSet {

// Client and token parameters don't contain DateTime values or other custom types, so we don't need to consider
// compatibility.
this.tokenParameters = toSyncRulesParameters(tokenParameters, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
this.parsedTokenPayload = tokenPayload;
this.legacyTokenParameters = toSyncRulesParameters(
tokenParameters,
CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY
);
this.userId = tokenPayload.sub;
this.rawTokenPayload = JSONBig.stringify(tokenPayload);

Expand All @@ -161,7 +168,7 @@ export class RequestParameters implements ParameterValueSet {

lookup(table: string, column: string): SqliteJsonValue {
if (table == 'token_parameters') {
return this.tokenParameters[column];
return this.legacyTokenParameters[column];
} else if (table == 'user_parameters') {
return this.userParameters[column];
} else if (table == 'subscription_parameters' && this.streamParameters != null) {
Expand Down
112 changes: 75 additions & 37 deletions packages/sync-rules/test/src/streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import {
CompatibilityEdition,
DEFAULT_TAG,
GetBucketParameterQuerierResult,
GetQuerierOptions,
mergeBucketParameterQueriers,
ParameterLookup,
QuerierError,
RequestParameters,
SourceTableInterface,
SqliteJsonRow,
SqliteRow,
Expand Down Expand Up @@ -63,6 +65,36 @@ describe('streams', () => {
]);
});

test('legacy token parameter', async () => {
const desc = parseStream(`SELECT * FROM issues WHERE owner_id = auth.parameter('$.parameters.test')`);

const queriers: BucketParameterQuerier[] = [];
const errors: QuerierError[] = [];
const pending = { queriers, errors };
desc.pushBucketParameterQueriers(
pending,
normalizeQuerierOptions(
{ test: 'foo' },
{},
{ stream: [{ opaque_id: 0, parameters: null }] },
bucketIdTransformer
)
);

expect(mergeBucketParameterQueriers(queriers).staticBuckets).toEqual([
{
bucket: '1#stream|0["foo"]',
definition: 'stream',
inclusion_reasons: [
{
subscription: 0
}
],
priority: 3
}
]);
});

describe('or', () => {
test('parameter match or request condition', async () => {
const desc = parseStream("SELECT * FROM issues WHERE owner_id = auth.user_id() OR auth.parameter('is_admin')");
Expand All @@ -74,17 +106,17 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: {
user_id: 'u1',
token: {
sub: 'u1',
is_admin: false
}
})
).toStrictEqual(['1#stream|0["u1"]']);

expect(
await queryBucketIds(desc, {
token_parameters: {
user_id: 'u1',
token: {
sub: 'u1',
is_admin: true
}
})
Expand All @@ -103,8 +135,8 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: {
user_id: 'u1'
token: {
sub: 'u1'
}
})
).toStrictEqual(['1#stream|0["u1"]', '1#stream|1[]']);
Expand All @@ -121,14 +153,14 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: {
token: {
is_admin: false
}
})
).toStrictEqual(['1#stream|0[]']);
expect(
await queryBucketIds(desc, {
token_parameters: {
token: {
is_admin: true
}
})
Expand Down Expand Up @@ -162,15 +194,15 @@ describe('streams', () => {
expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'whatever]' })).toStrictEqual(['1#stream|0[]']);
expect(
await queryBucketIds(desc, {
token_parameters: {
token: {
a: false,
b: false
}
})
).toStrictEqual([]);
expect(
await queryBucketIds(desc, {
token_parameters: {
token: {
a: true,
b: false
}
Expand Down Expand Up @@ -204,12 +236,13 @@ describe('streams', () => {

return [{ result: 'i1' }];
}
expect(
await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: false }, getParameterSets })
).toStrictEqual(['1#stream|0["i1"]']);
expect(
await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: true }, getParameterSets })
).toStrictEqual(['1#stream|1[]', '1#stream|0["i1"]']);
expect(await queryBucketIds(desc, { token: { sub: 'u1', is_admin: false }, getParameterSets })).toStrictEqual([
'1#stream|0["i1"]'
]);
expect(await queryBucketIds(desc, { token: { sub: 'u1', is_admin: true }, getParameterSets })).toStrictEqual([
'1#stream|1[]',
'1#stream|0["i1"]'
]);
});
});

Expand All @@ -236,7 +269,7 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'user1' },
token: { sub: 'user1' },
getParameterSets(lookups) {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]);

Expand Down Expand Up @@ -267,7 +300,7 @@ describe('streams', () => {
// Should return bucket id for admin users
expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'u' },
token: { sub: 'u' },
getParameterSets: (lookups: ParameterLookup[]) => {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u'])]);
return [{ result: 'u' }];
Expand All @@ -278,7 +311,7 @@ describe('streams', () => {
// And not for others
expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'u2' },
token: { sub: 'u2' },
getParameterSets: (lookups: ParameterLookup[]) => {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['u2'])]);
return [];
Expand Down Expand Up @@ -331,7 +364,7 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'a' },
token: { sub: 'a' },
getParameterSets
})
).toStrictEqual(['1#stream|1["b"]']);
Expand All @@ -343,7 +376,7 @@ describe('streams', () => {
expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i' })).toStrictEqual(['1#stream|0["i"]']);
expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'a' },
token: { sub: 'a' },
parameters: { issue_id: ['i1', 'i2'] }
})
).toStrictEqual(['1#stream|0["i1"]', '1#stream|0["i2"]']);
Expand All @@ -359,7 +392,7 @@ describe('streams', () => {
]);
expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'a' },
token: { sub: 'a' },
parameters: { labels: ['l1', 'l2'] },
getParameterSets(lookups) {
expect(lookups).toHaveLength(1);
Expand Down Expand Up @@ -401,7 +434,7 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'user1' },
token: { sub: 'user1' },
getParameterSets(lookups) {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]);

Expand Down Expand Up @@ -504,7 +537,7 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'user1' },
token: { sub: 'user1' },
getParameterSets(lookups) {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]);

Expand Down Expand Up @@ -559,7 +592,7 @@ describe('streams', () => {

expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'user1' },
token: { sub: 'user1' },
getParameterSets(lookups) {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]);

Expand All @@ -569,7 +602,7 @@ describe('streams', () => {
).toStrictEqual(['1#stream|0["issue_id"]']);
expect(
await queryBucketIds(desc, {
token_parameters: { user_id: 'user1', is_admin: true },
token: { sub: 'user1', is_admin: true },
getParameterSets(lookups) {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', ['user1'])]);

Expand Down Expand Up @@ -654,7 +687,7 @@ function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface
async function createQueriers(
stream: SyncStream,
options?: {
token_parameters?: Record<string, any>;
token?: Record<string, any>;
parameters?: Record<string, any>;
getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[];
}
Expand All @@ -663,23 +696,28 @@ async function createQueriers(
const errors: QuerierError[] = [];
const pending = { queriers, errors };

stream.pushBucketParameterQueriers(
pending,
normalizeQuerierOptions(
options?.token_parameters ?? {},
{},
{ stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] },
bucketIdTransformer
)
);
const querierOptions: GetQuerierOptions = {
hasDefaultStreams: true,
globalParameters: new RequestParameters(
{
sub: 'test-user',
...options?.token
},
{}
),
streams: { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] },
bucketIdTransformer
};

stream.pushBucketParameterQueriers(pending, querierOptions);

return { querier: mergeBucketParameterQueriers(queriers), errors };
}

async function queryBucketIds(
stream: SyncStream,
options?: {
token_parameters?: Record<string, any>;
token?: Record<string, any>;
parameters?: Record<string, any>;
getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[];
}
Expand Down