Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/good-moles-confess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-sync-rules': patch
---

Sync streams: Support table aliases in subqueries.
18 changes: 9 additions & 9 deletions packages/sync-rules/src/BaseSqlDataQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SelectedColumn } from 'pgsql-ast-parser';
import { SqlRuleError } from './errors.js';
import { ColumnDefinition } from './ExpressionType.js';
import { SourceTableInterface } from './SourceTableInterface.js';
import { SqlTools } from './sql_filters.js';
import { AvailableTable, SqlTools } from './sql_filters.js';
import { TablePattern } from './TablePattern.js';
import {
BucketIdTransformer,
Expand Down Expand Up @@ -31,7 +31,7 @@ export interface EvaluateRowOptions {

export interface BaseSqlDataQueryOptions {
sourceTable: TablePattern;
table: string;
table: AvailableTable;
sql: string;
columns: SelectedColumn[];
extractors: RowValueExtractor[];
Expand All @@ -52,7 +52,7 @@ export class BaseSqlDataQuery {
*
* This is used for the output table name.
*/
readonly table: string;
readonly table: AvailableTable;

/**
* The source SQL query, for debugging purposes.
Expand Down Expand Up @@ -121,12 +121,12 @@ export class BaseSqlDataQuery {
// Wildcard without alias - use source
return sourceTable;
} else {
return this.table;
return this.table.sqlName;
}
}

isUnaliasedWildcard() {
return this.sourceTable.isWildcard && this.table == this.sourceTable.tablePattern;
return this.sourceTable.isWildcard && !this.table.isAliased;
}

columnOutputNames(): string[] {
Expand Down Expand Up @@ -157,7 +157,7 @@ export class BaseSqlDataQuery {
this.getColumnOutputsFor(schemaTable, output);
}
result.push({
name: this.table,
name: this.table.sqlName,
columns: Object.values(output)
});
}
Expand All @@ -181,7 +181,7 @@ export class BaseSqlDataQuery {
try {
const { table, row, bucketIds } = options;

const tables = { [this.table]: this.addSpecialParameters(table, row) };
const tables = { [this.table.schemaName]: this.addSpecialParameters(table, row) };
const resolvedBucketIds = bucketIds(tables);

const data = this.transformRow(tables);
Expand Down Expand Up @@ -221,15 +221,15 @@ export class BaseSqlDataQuery {
protected getColumnOutputsFor(schemaTable: SourceSchemaTable, output: Record<string, ColumnDefinition>) {
const querySchema: QuerySchema = {
getColumn: (table, column) => {
if (table == this.table) {
if (table == this.table.schemaName) {
return schemaTable.getColumn(column);
} else {
// TODO: bucket parameters?
return undefined;
}
},
getColumns: (table) => {
if (table == this.table) {
if (table == this.table.schemaName) {
return schemaTable.getColumns();
} else {
return [];
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-rules/src/SqlBucketDescriptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ export class SqlBucketDescriptor implements BucketSource {

debugWriteOutputTables(result: Record<string, { query: string }[]>): void {
for (let q of this.dataQueries) {
result[q.table!] ??= [];
result[q.table!.sqlName] ??= [];
const r = {
query: q.sql
};

result[q.table!].push(r);
result[q.table!.sqlName].push(r);
}
}

Expand Down
12 changes: 6 additions & 6 deletions packages/sync-rules/src/SqlDataQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { BaseSqlDataQuery, BaseSqlDataQueryOptions, RowValueExtractor } from './
import { SqlRuleError } from './errors.js';
import { ExpressionType } from './ExpressionType.js';
import { SourceTableInterface } from './SourceTableInterface.js';
import { SqlTools } from './sql_filters.js';
import { AvailableTable, SqlTools } from './sql_filters.js';
import { checkUnsupportedFeatures, isClauseError } from './sql_support.js';
import { SyncRulesOptions } from './SqlSyncRules.js';
import { TablePattern } from './TablePattern.js';
Expand Down Expand Up @@ -48,7 +48,7 @@ export class SqlDataQuery extends BaseSqlDataQuery {
if (tableRef?.name == null) {
throw new SqlRuleError('Must SELECT from a single table', sql, q.from?.[0]._location);
}
const alias: string = tableRef.alias ?? tableRef.name;
const alias = AvailableTable.fromAst(tableRef);

const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name);
let querySchema: QuerySchema | undefined = undefined;
Expand All @@ -71,7 +71,7 @@ export class SqlDataQuery extends BaseSqlDataQuery {
const where = q.where;
const tools = new SqlTools({
table: alias,
parameterTables: ['bucket'],
parameterTables: [new AvailableTable('bucket')],
valueTables: [alias],
compatibilityContext: compatibility,
sql,
Expand Down Expand Up @@ -123,7 +123,7 @@ export class SqlDataQuery extends BaseSqlDataQuery {
} else {
extractors.push({
extract: (tables, output) => {
const row = tables[alias];
const row = tables[alias.schemaName];
for (let key in row) {
if (key.startsWith('_')) {
continue;
Expand All @@ -132,7 +132,7 @@ export class SqlDataQuery extends BaseSqlDataQuery {
}
},
getTypes(schema, into) {
for (let column of schema.getColumns(alias)) {
for (let column of schema.getColumns(alias.schemaName)) {
into[column.name] ??= column;
}
}
Expand All @@ -146,7 +146,7 @@ export class SqlDataQuery extends BaseSqlDataQuery {
// Not performing schema-based validation - assume there is an id
hasId = true;
} else {
const idType = querySchema.getColumn(alias, 'id')?.type ?? ExpressionType.NONE;
const idType = querySchema.getColumn(alias.schemaName, 'id')?.type ?? ExpressionType.NONE;
if (!idType.isNone()) {
hasId = true;
}
Expand Down
16 changes: 8 additions & 8 deletions packages/sync-rules/src/SqlParameterQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { BucketParameterQuerier, ParameterLookup, ParameterLookupSource } from './BucketParameterQuerier.js';
import { SqlRuleError } from './errors.js';
import { SourceTableInterface } from './SourceTableInterface.js';
import { SqlTools } from './sql_filters.js';
import { AvailableTable, SqlTools } from './sql_filters.js';
import { checkUnsupportedFeatures, isClauseError, isParameterValueClause } from './sql_support.js';
import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
import { TablePattern } from './TablePattern.js';
Expand All @@ -33,7 +33,7 @@ import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement, normalizePa

export interface SqlParameterQueryOptions {
sourceTable: TablePattern;
table: string;
table: AvailableTable;
sql: string;
lookupExtractors: Record<string, RowValueClause>;
parameterExtractors: Record<string, ParameterValueClause>;
Expand Down Expand Up @@ -95,8 +95,8 @@ export class SqlParameterQuery {
if (tableRef?.name == null) {
throw new SqlRuleError('Must SELECT from a single table', sql, q.from?.[0]._location);
}
const alias: string = q.from?.[0].name.alias ?? tableRef.name;
if (tableRef.name != alias) {
const alias = new AvailableTable(tableRef.name, q.from?.[0].name.alias);
if (alias.isAliased) {
errors.push(new SqlRuleError('Table aliases not supported in parameter queries', sql, q.from?.[0]._location));
}
const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name);
Expand All @@ -119,7 +119,7 @@ export class SqlParameterQuery {

const tools = new SqlTools({
table: alias,
parameterTables: ['token_parameters', 'user_parameters'],
parameterTables: [new AvailableTable('token_parameters'), new AvailableTable('user_parameters')],
sql,
supportsExpandingParameters: true,
supportsParameterExpressions: true,
Expand Down Expand Up @@ -214,7 +214,7 @@ export class SqlParameterQuery {
*
* Currently, this always matches sourceTable.name.
*/
readonly table: string;
readonly table: AvailableTable;

/**
* The source SQL query, for debugging purposes.
Expand Down Expand Up @@ -308,7 +308,7 @@ export class SqlParameterQuery {
*/
evaluateParameterRow(row: SqliteRow): EvaluatedParametersResult[] {
const tables = {
[this.table]: row
[this.table.schemaName]: row
};
try {
const filterParameters = this.filter.filterRow(tables);
Expand Down Expand Up @@ -336,7 +336,7 @@ export class SqlParameterQuery {
}

private transformRows(row: SqliteRow): SqliteRow[] {
const tables = { [this.table]: row };
const tables = { [this.table.sqlName]: row };
let result: SqliteRow = {};
for (let key in this.lookupExtractors) {
const extractor = this.lookupExtractors[key];
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-rules/src/StaticSqlParameterQuery.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SelectedColumn, SelectFromStatement } from 'pgsql-ast-parser';
import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY } from './BucketDescription.js';
import { SqlRuleError } from './errors.js';
import { SqlTools } from './sql_filters.js';
import { AvailableTable, SqlTools } from './sql_filters.js';
import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js';
import {
BucketIdTransformer,
Expand Down Expand Up @@ -43,7 +43,7 @@ export class StaticSqlParameterQuery {

const tools = new SqlTools({
table: undefined,
parameterTables: ['token_parameters', 'user_parameters'],
parameterTables: [new AvailableTable('token_parameters'), new AvailableTable('user_parameters')],
supportsParameterExpressions: true,
compatibilityContext: options.compatibility,
sql
Expand Down
11 changes: 8 additions & 3 deletions packages/sync-rules/src/TableQuerySchema.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { ColumnDefinition } from './ExpressionType.js';
import { AvailableTable } from './sql_filters.js';
import { QuerySchema, SourceSchemaTable } from './types.js';

/**
* Exposes a list of {@link SourceSchemaTable}s as a {@link QuerySchema} by only exposing the subset of the schema
* referenced in a `FROM` clause.
*/
export class TableQuerySchema implements QuerySchema {
constructor(
private tables: SourceSchemaTable[],
private alias: string
private alias: AvailableTable
) {}

getColumn(table: string, column: string): ColumnDefinition | undefined {
if (table != this.alias) {
if (table != this.alias.schemaName) {
return undefined;
}
for (let table of this.tables) {
Expand All @@ -21,7 +26,7 @@ export class TableQuerySchema implements QuerySchema {
}

getColumns(table: string): ColumnDefinition[] {
if (table != this.alias) {
if (table != this.alias.schemaName) {
return [];
}
let columns: Record<string, ColumnDefinition> = {};
Expand Down
16 changes: 8 additions & 8 deletions packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { FromCall, SelectFromStatement } from 'pgsql-ast-parser';
import { SqlRuleError } from './errors.js';
import { SqlTools } from './sql_filters.js';
import { AvailableTable, SqlTools } from './sql_filters.js';
import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js';
import { generateTableValuedFunctions, TableValuedFunction } from './TableValuedFunctions.js';
import {
Expand All @@ -26,7 +26,7 @@ export interface TableValuedFunctionSqlParameterQueryOptions {
filter: ParameterValueClause | undefined;
callClause: ParameterValueClause | undefined;
function: TableValuedFunction;
callTableName: string;
callTable: AvailableTable;

errors: SqlRuleError[];
}
Expand Down Expand Up @@ -59,12 +59,12 @@ export class TableValuedFunctionSqlParameterQuery {
throw new SqlRuleError(`Table-valued function ${call.function.name} is not defined.`, sql, call);
}

const callTable = call.alias?.name ?? call.function.name;
const callTable = AvailableTable.fromCall(call);
const callExpression = call.args[0];

const tools = new SqlTools({
table: callTable,
parameterTables: ['token_parameters', 'user_parameters', callTable],
parameterTables: [new AvailableTable('token_parameters'), new AvailableTable('user_parameters'), callTable],
supportsParameterExpressions: true,
compatibilityContext: compatibility,
sql
Expand Down Expand Up @@ -108,7 +108,7 @@ export class TableValuedFunctionSqlParameterQuery {
filter: isClauseError(filter) ? undefined : filter,
callClause: isClauseError(callClause) ? undefined : callClause,
function: functionImpl,
callTableName: callTable,
callTable,
priority: priority ?? DEFAULT_BUCKET_PRIORITY,
queryId,
errors
Expand Down Expand Up @@ -186,7 +186,7 @@ export class TableValuedFunctionSqlParameterQuery {
*
* Only used internally.
*/
readonly callTableName: string;
readonly callTable: AvailableTable;

readonly errors: SqlRuleError[];

Expand All @@ -201,7 +201,7 @@ export class TableValuedFunctionSqlParameterQuery {
this.filter = options.filter;
this.callClause = options.callClause;
this.function = options.function;
this.callTableName = options.callTableName;
this.callTable = options.callTable;

this.errors = options.errors;
}
Expand Down Expand Up @@ -232,7 +232,7 @@ export class TableValuedFunctionSqlParameterQuery {
const mergedParams: ParameterValueSet = {
...parameters,
lookup: (table, column) => {
if (table == this.callTableName) {
if (table == this.callTable.schemaName) {
return row[column]!;
} else {
return parameters.lookup(table, column);
Expand Down
10 changes: 5 additions & 5 deletions packages/sync-rules/src/events/SqlEventSourceQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { BaseSqlDataQuery, BaseSqlDataQueryOptions, RowValueExtractor } from '..
import { SqlRuleError } from '../errors.js';
import { ExpressionType } from '../ExpressionType.js';
import { SourceTableInterface } from '../SourceTableInterface.js';
import { SqlTools } from '../sql_filters.js';
import { AvailableTable, SqlTools } from '../sql_filters.js';
import { checkUnsupportedFeatures, isClauseError } from '../sql_support.js';
import { SyncRulesOptions } from '../SqlSyncRules.js';
import { TablePattern } from '../TablePattern.js';
Expand Down Expand Up @@ -49,7 +49,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery {
if (tableRef?.name == null) {
throw new SqlRuleError('Must SELECT from a single table', sql, q.from?.[0]._location);
}
const alias: string = tableRef.alias ?? tableRef.name;
const alias = AvailableTable.fromAst(tableRef);

const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name);
let querySchema: QuerySchema | undefined = undefined;
Expand Down Expand Up @@ -99,7 +99,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery {
} else {
extractors.push({
extract: (tables, output) => {
const row = tables[alias];
const row = tables[alias.schemaName];
for (let key in row) {
if (key.startsWith('_')) {
continue;
Expand All @@ -108,7 +108,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery {
}
},
getTypes(schema, into) {
for (let column of schema.getColumns(alias)) {
for (let column of schema.getColumns(alias.schemaName)) {
into[column.name] ??= column;
}
}
Expand Down Expand Up @@ -136,7 +136,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery {

evaluateRowWithErrors(table: SourceTableInterface, row: SqliteRow): EvaluatedEventRowWithErrors {
try {
const tables = { [this.table!]: this.addSpecialParameters(table, row) };
const tables = { [this.table!.schemaName]: this.addSpecialParameters(table, row) };

const data = this.transformRow(tables);
return {
Expand Down
Loading