Skip to content

Commit 4321aca

Browse files
committed
Start with new unified syntax
1 parent 04acfef commit 4321aca

File tree

8 files changed

+548
-48
lines changed

8 files changed

+548
-48
lines changed

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { SqlDataQuery } from './SqlDataQuery.js';
66
import { SqlParameterQuery } from './SqlParameterQuery.js';
77
import { SyncRulesOptions } from './SqlSyncRules.js';
88
import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
9+
import { StreamQuery } from './StreamQuery.js';
910
import { TablePattern } from './TablePattern.js';
1011
import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js';
1112
import { SqlRuleError } from './errors.js';
@@ -15,7 +16,8 @@ import {
1516
EvaluationResult,
1617
QueryParseOptions,
1718
RequestParameters,
18-
SqliteRow
19+
SqliteRow,
20+
StreamParseOptions
1921
} from './types.js';
2022

2123
export interface QueryParseResult {
@@ -81,6 +83,23 @@ export class SqlBucketDescriptor {
8183
};
8284
}
8385

86+
addUnifiedStreamQuery(sql: string, options: StreamParseOptions): QueryParseResult {
87+
const [query, errors] = StreamQuery.fromSql(this.name, sql, options);
88+
for (const parameterQuery of query.inferredParameters) {
89+
if (parameterQuery instanceof StaticSqlParameterQuery) {
90+
this.globalParameterQueries.push(parameterQuery);
91+
} else {
92+
this.parameterQueries.push(parameterQuery);
93+
}
94+
}
95+
this.dataQueries.push(query.data);
96+
97+
return {
98+
parsed: true,
99+
errors
100+
};
101+
}
102+
84103
evaluateRow(options: EvaluateRowOptions): EvaluationResult[] {
85104
let results: EvaluationResult[] = [];
86105
for (let query of this.dataQueries) {

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
RequestParameters,
2323
SourceSchema,
2424
SqliteRow,
25+
StreamParseOptions,
2526
SyncRules
2627
} from './types.js';
2728

@@ -95,19 +96,36 @@ export class SqlSyncRules implements SyncRules {
9596
return rules;
9697
}
9798

99+
// Bucket definitions using explicit parameter and data queries.
98100
const bucketMap = parsed.get('bucket_definitions') as YAMLMap;
99-
if (bucketMap == null) {
100-
rules.errors.push(new YamlError(new Error(`'bucket_definitions' is required`)));
101+
// Streams (which also map to buckets internally) with a new syntax and options.
102+
const streamMap = parsed.get('streams') as YAMLMap;
103+
const definitionNames = new Set<string>();
104+
const checkUniqueName = (name: string, literal: Scalar) => {
105+
if (definitionNames.has(name)) {
106+
rules.errors.push(this.tokenError(literal, 'Duplicate stream or bucket definition.'));
107+
return false;
108+
}
109+
110+
definitionNames.add(name);
111+
return true;
112+
};
113+
114+
if (bucketMap == null && streamMap == null) {
115+
rules.errors.push(new YamlError(new Error(`Either 'bucket_definitions' or 'streams' are required`)));
101116

102117
if (throwOnError) {
103118
rules.throwOnError();
104119
}
105120
return rules;
106121
}
107122

108-
for (let entry of bucketMap.items) {
123+
for (let entry of bucketMap?.items ?? []) {
109124
const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap };
110125
const key = keyScalar.toString();
126+
if (!checkUniqueName(key, keyScalar)) {
127+
continue;
128+
}
111129

112130
if (value == null || !(value instanceof YAMLMap)) {
113131
rules.errors.push(this.tokenError(keyScalar, `'${key}' bucket definition must be an object`));
@@ -116,17 +134,7 @@ export class SqlSyncRules implements SyncRules {
116134

117135
const accept_potentially_dangerous_queries =
118136
value.get('accept_potentially_dangerous_queries', true)?.value == true;
119-
let parseOptionPriority: BucketPriority | undefined = undefined;
120-
if (value.has('priority')) {
121-
const priorityValue = value.get('priority', true)!;
122-
if (typeof priorityValue.value != 'number' || !isValidPriority(priorityValue.value)) {
123-
rules.errors.push(
124-
this.tokenError(priorityValue, 'Invalid priority, expected a number between 0 and 3 (inclusive).')
125-
);
126-
} else {
127-
parseOptionPriority = priorityValue.value;
128-
}
129-
}
137+
const parseOptionPriority = rules.parsePriority(value);
130138

131139
const queryOptions: QueryParseOptions = {
132140
...options,
@@ -164,6 +172,38 @@ export class SqlSyncRules implements SyncRules {
164172
rules.bucketDescriptors.push(descriptor);
165173
}
166174

175+
for (const entry of streamMap?.items ?? []) {
176+
const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap };
177+
const key = keyScalar.toString();
178+
if (!checkUniqueName(key, keyScalar)) {
179+
continue;
180+
}
181+
182+
const descriptor = new SqlBucketDescriptor(key);
183+
184+
const accept_potentially_dangerous_queries =
185+
value.get('accept_potentially_dangerous_queries', true)?.value == true;
186+
187+
const queryOptions: StreamParseOptions = {
188+
...options,
189+
accept_potentially_dangerous_queries,
190+
priority: rules.parsePriority(value),
191+
default: value.get('default', true)?.value == true
192+
};
193+
194+
const data = value.get('query', true) as unknown;
195+
if (data instanceof Scalar) {
196+
rules.withScalar(data, (q) => {
197+
return descriptor.addUnifiedStreamQuery(q, queryOptions);
198+
});
199+
} else {
200+
rules.errors.push(this.tokenError(data, 'Must be a string.'));
201+
continue;
202+
}
203+
204+
rules.bucketDescriptors.push(descriptor);
205+
}
206+
167207
const eventMap = parsed.get('event_definitions') as YAMLMap;
168208
for (const event of eventMap?.items ?? []) {
169209
const { key, value } = event as { key: Scalar; value: YAMLSeq };
@@ -391,4 +431,17 @@ export class SqlSyncRules implements SyncRules {
391431
}
392432
return result;
393433
}
434+
435+
private parsePriority(value: YAMLMap) {
436+
if (value.has('priority')) {
437+
const priorityValue = value.get('priority', true)!;
438+
if (typeof priorityValue.value != 'number' || !isValidPriority(priorityValue.value)) {
439+
this.errors.push(
440+
SqlSyncRules.tokenError(priorityValue, 'Invalid priority, expected a number between 0 and 3 (inclusive).')
441+
);
442+
} else {
443+
return priorityValue.value;
444+
}
445+
}
446+
}
394447
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import { parse } from 'pgsql-ast-parser';
2+
import { ParameterValueClause, QuerySchema, StreamParseOptions } from './types.js';
3+
import { SqlRuleError } from './errors.js';
4+
import { isSelectStatement } from './utils.js';
5+
import { checkUnsupportedFeatures, isClauseError } from './sql_support.js';
6+
import { SqlDataQuery, SqlDataQueryOptions } from './SqlDataQuery.js';
7+
import { RowValueExtractor } from './BaseSqlDataQuery.js';
8+
import { TablePattern } from './TablePattern.js';
9+
import { TableQuerySchema } from './TableQuerySchema.js';
10+
import { SqlTools } from './sql_filters.js';
11+
import { ExpressionType } from './ExpressionType.js';
12+
import { SqlParameterQuery } from './SqlParameterQuery.js';
13+
import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
14+
import { DEFAULT_BUCKET_PRIORITY } from './BucketDescription.js';
15+
16+
/**
17+
* Represents a query backing a stream definition.
18+
*
19+
* Streams are a new way to define sync rules that don't require separate data and
20+
* parameter queries. However, since most of the sync service is built around that
21+
* distiction at the moment, stream queries are implemented by desugaring a unified
22+
* query into its individual components.
23+
*/
24+
export class StreamQuery {
25+
inferredParameters: (SqlParameterQuery | StaticSqlParameterQuery)[];
26+
data: SqlDataQuery;
27+
28+
static fromSql(descriptorName: string, sql: string, options: StreamParseOptions): [StreamQuery, SqlRuleError[]] {
29+
const [query, ...illegalRest] = parse(sql, { locationTracking: true });
30+
const schema = options.schema;
31+
const parameters: (SqlParameterQuery | StaticSqlParameterQuery)[] = [];
32+
const errors: SqlRuleError[] = [];
33+
34+
// TODO: Share more of this code with SqlDataQuery
35+
if (illegalRest.length > 0) {
36+
throw new SqlRuleError('Only a single SELECT statement is supported', sql, illegalRest[0]?._location);
37+
}
38+
39+
if (!isSelectStatement(query)) {
40+
throw new SqlRuleError('Only SELECT statements are supported', sql, query._location);
41+
}
42+
43+
if (query.from == null || query.from.length != 1 || query.from[0].type != 'table') {
44+
throw new SqlRuleError('Must SELECT from a single table', sql, query.from?.[0]._location);
45+
}
46+
47+
errors.push(...checkUnsupportedFeatures(sql, query));
48+
49+
const tableRef = query.from?.[0].name;
50+
if (tableRef?.name == null) {
51+
throw new SqlRuleError('Must SELECT from a single table', sql, query.from?.[0]._location);
52+
}
53+
const alias: string = tableRef.alias ?? tableRef.name;
54+
55+
const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name);
56+
let querySchema: QuerySchema | undefined = undefined;
57+
if (schema) {
58+
const tables = schema.getTables(sourceTable);
59+
if (tables.length == 0) {
60+
const e = new SqlRuleError(
61+
`Table ${sourceTable.schema}.${sourceTable.tablePattern} not found`,
62+
sql,
63+
query.from?.[0]?._location
64+
);
65+
e.type = 'warning';
66+
67+
errors.push(e);
68+
} else {
69+
querySchema = new TableQuerySchema(tables, alias);
70+
}
71+
}
72+
73+
const where = query.where;
74+
const tools = new SqlTools({
75+
table: alias,
76+
parameterTables: [],
77+
valueTables: [alias],
78+
sql,
79+
schema: querySchema,
80+
supportsStreamInputs: true,
81+
supportsParameterExpressions: true
82+
});
83+
tools.checkSpecificNameCase(tableRef);
84+
const filter = tools.compileWhereClause(where);
85+
const inputParameterNames = filter.inputParameters.map((p) => `bucket.${p.key}`);
86+
87+
// Build parameter queries based on inferred bucket parameters
88+
if (tools.inferredParameters.length) {
89+
const extractors: Record<string, ParameterValueClause> = {};
90+
for (const inferred of tools.inferredParameters) {
91+
extractors[inferred.name] = inferred.clause;
92+
}
93+
94+
parameters.push(
95+
new StaticSqlParameterQuery({
96+
sql,
97+
queryId: 'static',
98+
descriptorName,
99+
parameterExtractors: extractors,
100+
bucketParameters: tools.inferredParameters.map((p) => p.name),
101+
filter: undefined, // TODO
102+
priority: DEFAULT_BUCKET_PRIORITY // Ignored here
103+
})
104+
);
105+
}
106+
107+
let hasId = false;
108+
let hasWildcard = false;
109+
let extractors: RowValueExtractor[] = [];
110+
111+
for (let column of query.columns ?? []) {
112+
const name = tools.getOutputName(column);
113+
if (name != '*') {
114+
const clause = tools.compileRowValueExtractor(column.expr);
115+
if (isClauseError(clause)) {
116+
// Error logged already
117+
continue;
118+
}
119+
extractors.push({
120+
extract: (tables, output) => {
121+
output[name] = clause.evaluate(tables);
122+
},
123+
getTypes(schema, into) {
124+
const def = clause.getColumnDefinition(schema);
125+
126+
into[name] = { name, type: def?.type ?? ExpressionType.NONE, originalType: def?.originalType };
127+
}
128+
});
129+
} else {
130+
extractors.push({
131+
extract: (tables, output) => {
132+
const row = tables[alias];
133+
for (let key in row) {
134+
if (key.startsWith('_')) {
135+
continue;
136+
}
137+
output[key] ??= row[key];
138+
}
139+
},
140+
getTypes(schema, into) {
141+
for (let column of schema.getColumns(alias)) {
142+
into[column.name] ??= column;
143+
}
144+
}
145+
});
146+
}
147+
if (name == 'id') {
148+
hasId = true;
149+
} else if (name == '*') {
150+
hasWildcard = true;
151+
if (querySchema == null) {
152+
// Not performing schema-based validation - assume there is an id
153+
hasId = true;
154+
} else {
155+
const idType = querySchema.getColumn(alias, 'id')?.type ?? ExpressionType.NONE;
156+
if (!idType.isNone()) {
157+
hasId = true;
158+
}
159+
}
160+
}
161+
}
162+
if (!hasId) {
163+
const error = new SqlRuleError(`Query must return an "id" column`, sql, query.columns?.[0]._location);
164+
if (hasWildcard) {
165+
// Schema-based validations are always warnings
166+
error.type = 'warning';
167+
}
168+
errors.push(error);
169+
}
170+
171+
errors.push(...tools.errors);
172+
173+
const data: SqlDataQueryOptions = {
174+
sourceTable,
175+
table: alias,
176+
sql,
177+
filter,
178+
columns: query.columns ?? [],
179+
descriptorName,
180+
bucketParameters: inputParameterNames,
181+
tools,
182+
extractors
183+
};
184+
return [new StreamQuery(parameters, data), errors];
185+
}
186+
187+
private constructor(parameters: (SqlParameterQuery | StaticSqlParameterQuery)[], data: SqlDataQueryOptions) {
188+
this.inferredParameters = parameters;
189+
this.data = new SqlDataQuery(data);
190+
}
191+
}

0 commit comments

Comments
 (0)