Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wet-games-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-sync-rules': patch
---

Warn when a sync stream with auto-subscribe uses stream parameters.
2 changes: 1 addition & 1 deletion packages/service-core/test/src/routes/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-fr
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { describe, expect, it } from 'vitest';
import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js';
import { mockServiceContext } from './mocks.js';

Expand Down
32 changes: 16 additions & 16 deletions packages/sync-rules/src/request_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ export interface SqlParameterFunction {
call: (parameters: ParameterValueSet, ...args: SqliteValue[]) => SqliteValue;
getReturnType(): ExpressionType;
parameterCount: number;
/** request.user_id(), request.jwt(), token_parameters.* */
usesAuthenticatedRequestParameters: boolean;
/** request.parameters(), user_parameters.* */
usesUnauthenticatedRequestParameters: boolean;
/**
* Whether this function returns data derived from usage parameters.
*
* This can be:
*
* 1. `subscription`, for unauthenticated subscription parameters like `subscription.parameters()`.
* 2. `authenticated`, for parameters authenticated by a trusted backend (like `request.user_id()`).
* 3. `unauthenticated`, for global unauthenticated request parameters like (like `request.parameters()`).
*/
parameterUsage: 'subscription' | 'authenticated' | 'unauthenticated' | null;
detail: string;
documentation: string;
}
Expand All @@ -31,8 +37,7 @@ export function parameterFunctions(options: {
extractJsonParsed: (v: ParameterValueSet) => any;
sourceDescription: string;
sourceDocumentation: string;
usesAuthenticatedRequestParameters: boolean;
usesUnauthenticatedRequestParameters: boolean;
parameterUsage: SqlParameterFunction['parameterUsage'];
}) {
const allParameters: SqlParameterFunction = {
debugName: `${options.schema}.parameters`,
Expand All @@ -45,8 +50,7 @@ export function parameterFunctions(options: {
},
detail: options.sourceDescription,
documentation: `Returns ${options.sourceDocumentation}`,
usesAuthenticatedRequestParameters: options.usesAuthenticatedRequestParameters,
usesUnauthenticatedRequestParameters: options.usesUnauthenticatedRequestParameters
parameterUsage: options.parameterUsage
};

const extractParameter: SqlParameterFunction = {
Expand All @@ -68,8 +72,7 @@ export function parameterFunctions(options: {
},
detail: `Extract value from ${options.sourceDescription}`,
documentation: `Returns an extracted value (via the key as the second argument) from ${options.sourceDocumentation}`,
usesAuthenticatedRequestParameters: options.usesAuthenticatedRequestParameters,
usesUnauthenticatedRequestParameters: options.usesUnauthenticatedRequestParameters
parameterUsage: options.parameterUsage
};

return { parameters: allParameters, parameter: extractParameter };
Expand All @@ -87,8 +90,7 @@ export function globalRequestParameterFunctions(schema: string) {
sourceDescription: 'Unauthenticated request parameters as JSON',
sourceDocumentation:
'parameters passed by the client as a JSON string. These parameters are not authenticated - any value can be passed in by the client.',
usesAuthenticatedRequestParameters: false,
usesUnauthenticatedRequestParameters: true
parameterUsage: 'unauthenticated'
});
}

Expand All @@ -103,8 +105,7 @@ export const request_jwt: SqlParameterFunction = {
},
detail: 'JWT payload as JSON',
documentation: 'The JWT payload as a JSON string. This is always validated against trusted keys.',
usesAuthenticatedRequestParameters: true,
usesUnauthenticatedRequestParameters: false
parameterUsage: 'authenticated'
};

export function generateUserIdFunction(debugName: string, sameAsDesc: string): SqlParameterFunction {
Expand All @@ -119,8 +120,7 @@ export function generateUserIdFunction(debugName: string, sameAsDesc: string): S
},
detail: 'Authenticated user id',
documentation: `The id of the authenticated user.\nSame as \`${sameAsDesc} ->> 'sub'\`.`,
usesAuthenticatedRequestParameters: true,
usesUnauthenticatedRequestParameters: false
parameterUsage: 'authenticated'
};
}

Expand Down
55 changes: 38 additions & 17 deletions packages/sync-rules/src/streams/from_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
} from 'pgsql-ast-parser';
import { STREAM_FUNCTIONS } from './functions.js';
import { CompatibilityEdition } from '../compatibility.js';
import { DetectRequestParameters } from '../validators.js';

export function syncStreamFromSql(
descriptorName: string,
Expand All @@ -56,6 +57,7 @@ class SyncStreamCompiler {
descriptorName: string;
sql: string;
options: StreamParseOptions;
parameterDetector: DetectRequestParameters = new DetectRequestParameters();

errors: SqlRuleError[];

Expand Down Expand Up @@ -108,6 +110,19 @@ class SyncStreamCompiler {
}

this.errors.push(...tools.errors);
if (this.parameterDetector.usesStreamParameters && stream.subscribedToByDefault) {
const error = new SqlRuleError(
'Clients subscribe to this stream by default, but it uses subscription parameters. Default subscriptions use ' +
'null for all parameters, which can lead to unintentional results. Try removing the parameter or not ' +
'marking the stream as auto-subscribe.',
tools.sql,
undefined
);
error.type = 'warning';

this.errors.push(error);
}

return stream;
}

Expand Down Expand Up @@ -259,7 +274,8 @@ class SyncStreamCompiler {
}

const regularClause = tools.compileClause(clause);
return compiledClauseToFilter(tools, clause?._location ?? null, regularClause);
this.parameterDetector.accept(regularClause);
return this.compiledClauseToFilter(tools, clause?._location ?? null, regularClause);
}

private compileInOperator(tools: SqlTools, clause: ExprBinary): FilterOperator {
Expand Down Expand Up @@ -308,7 +324,7 @@ class SyncStreamCompiler {
// left clause doesn't depend on row data however, we can push it down into the subquery where it would be
// introduced as a parameter: `EXISTS (SELECT _ FROM users WHERE is_admin AND user_id = request.user_id())`.
const additionalClause = subqueryTools.parameterMatchClause(subquery.column, left);
subquery.addFilter(compiledClauseToFilter(subqueryTools, null, additionalClause));
subquery.addFilter(this.compiledClauseToFilter(subqueryTools, null, additionalClause));
return new ExistsOperator(location, subquery);
} else {
// Case 1
Expand All @@ -322,7 +338,7 @@ class SyncStreamCompiler {
// a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value
// or a parameter-value clause which we can wrap in EvaluateSimpleCondition.
const combined = tools.compileInClause(clause.left, left, clause.right, right);
return compiledClauseToFilter(tools, location, combined);
return this.compiledClauseToFilter(tools, location, combined);
}

private compileOverlapOperator(tools: SqlTools, clause: ExprBinary): FilterOperator {
Expand Down Expand Up @@ -356,7 +372,7 @@ class SyncStreamCompiler {
// a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value
// or a parameter-value clause which we can wrap in EvaluateSimpleCondition.
const combined = tools.compileOverlapClause(clause.left, left, clause.right, right);
return compiledClauseToFilter(tools, location, combined);
return this.compiledClauseToFilter(tools, location, combined);
}

private compileSubquery(stmt: SelectStatement): [Subquery, SqlTools] | undefined {
Expand Down Expand Up @@ -399,7 +415,10 @@ class SyncStreamCompiler {
const where = tools.compileClause(query.where);

this.errors.push(...tools.errors);
return [new Subquery(sourceTable, column, compiledClauseToFilter(tools, query.where?._location, where)), tools];
return [
new Subquery(sourceTable, column, this.compiledClauseToFilter(tools, query.where?._location, where)),
tools
];
}

private checkValidSelectStatement(stmt: Statement) {
Expand Down Expand Up @@ -446,6 +465,20 @@ class SyncStreamCompiler {
sourceTable
};
}

compiledClauseToFilter(tools: SqlTools, location: NodeLocation | nil, regularClause: CompiledClause) {
this.parameterDetector.accept(regularClause);

if (isScalarExpression(regularClause)) {
return new EvaluateSimpleCondition(location ?? null, regularClause);
} else if (isParameterMatchClause(regularClause)) {
return new CompareRowValueWithStreamParameter(location ?? null, regularClause);
} else if (isClauseError(regularClause)) {
return recoverErrorClause(tools);
} else {
throw new Error('Unknown clause type');
}
}
}

function isScalarExpression(clause: CompiledClause): clause is ScalarExpression {
Expand All @@ -456,15 +489,3 @@ function recoverErrorClause(tools: SqlTools): EvaluateSimpleCondition {
// An error has already been logged.
return new EvaluateSimpleCondition(null, tools.compileClause(null) as StaticValueClause);
}

function compiledClauseToFilter(tools: SqlTools, location: NodeLocation | nil, regularClause: CompiledClause) {
if (isScalarExpression(regularClause)) {
return new EvaluateSimpleCondition(location ?? null, regularClause);
} else if (isParameterMatchClause(regularClause)) {
return new CompareRowValueWithStreamParameter(location ?? null, regularClause);
} else if (isClauseError(regularClause)) {
return recoverErrorClause(tools);
} else {
throw new Error('Unknown clause type');
}
}
6 changes: 2 additions & 4 deletions packages/sync-rules/src/streams/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ export const STREAM_FUNCTIONS: Record<string, Record<string, SqlParameterFunctio
sourceDescription: 'Unauthenticated subscription parameters as JSON',
sourceDocumentation:
'parameters passed by the client for this stream as a JSON string. These parameters are not authenticated - any value can be passed in by the client.',
usesAuthenticatedRequestParameters: false,
usesUnauthenticatedRequestParameters: true
parameterUsage: 'subscription'
})
},
connection: {
Expand All @@ -38,8 +37,7 @@ export const STREAM_FUNCTIONS: Record<string, Record<string, SqlParameterFunctio
},
sourceDescription: 'JWT payload as JSON',
sourceDocumentation: 'JWT payload as a JSON string. This is always validated against trusted keys',
usesAuthenticatedRequestParameters: true,
usesUnauthenticatedRequestParameters: false
parameterUsage: 'authenticated'
})
}
};
8 changes: 6 additions & 2 deletions packages/sync-rules/src/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export class DetectRequestParameters {
usesAuthenticatedRequestParameters: boolean = false;
/** request.parameters(), user_parameters.* */
usesUnauthenticatedRequestParameters: boolean = false;
/** subscription.parameters(), subscription.parameters('a') */
usesStreamParameters: boolean = false;

accept(clause?: CompiledClause) {
if (clause == null) {
Expand All @@ -19,8 +21,10 @@ export class DetectRequestParameters {
if (isRequestFunctionCall(clause)) {
const f = clause.function;

this.usesAuthenticatedRequestParameters ||= f.usesAuthenticatedRequestParameters;
this.usesUnauthenticatedRequestParameters ||= f.usesUnauthenticatedRequestParameters;
this.usesAuthenticatedRequestParameters ||= f.parameterUsage == 'authenticated';
this.usesUnauthenticatedRequestParameters ||=
f.parameterUsage == 'unauthenticated' || f.parameterUsage == 'subscription';
this.usesStreamParameters ||= f.parameterUsage == 'subscription';
} else if (isLegacyParameterFromTableClause(clause)) {
const table = clause.table;
if (table == 'token_parameters') {
Expand Down
24 changes: 24 additions & 0 deletions packages/sync-rules/test/src/streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,30 @@ describe('streams', () => {
expect.toBeSqlRuleError("Function 'request.user_id' is not defined", 'request.user_id()')
]);
});

describe('auto-subscribe with parameters', () => {
const optionsWithAutoSubscribe = { ...options, auto_subscribe: true };

function expectWarning(sql: string) {
const [_, errors] = syncStreamFromSql('s', sql, optionsWithAutoSubscribe);
expect(errors).toHaveLength(1);

const error = errors[0];
expect(error.message).toContain(
'Clients subscribe to this stream by default, but it uses subscription parameters'
);
}

test('in simple filter', () => {
expectWarning(`SELECT * FROM issues WHERE id = subscription.parameter('s')`);
});

test('in subquery', () => {
expectWarning(
`SELECT * FROM issues WHERE owner_id IN (SELECT id FROM "users" WHERE id = subscription.parameter('s'))`
);
});
});
});

describe('normalization', () => {
Expand Down