Skip to content

Commit f87ee2e

Browse files
committed
Support overlap operator
1 parent 3ad57ba commit f87ee2e

File tree

5 files changed

+261
-32
lines changed

5 files changed

+261
-32
lines changed

packages/sync-rules/src/sql_filters.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
OPERATOR_JSON_EXTRACT_JSON,
1414
OPERATOR_JSON_EXTRACT_SQL,
1515
OPERATOR_NOT,
16+
OPERATOR_OVERLAP,
1617
SQL_FUNCTIONS,
1718
SqlFunction,
1819
castOperator,
@@ -466,6 +467,98 @@ export class SqlTools {
466467
}
467468
}
468469

470+
compileOverlapClause(
471+
left: Expr,
472+
leftFilter: CompiledClause,
473+
right: Expr,
474+
rightFilter: CompiledClause
475+
): CompiledClause {
476+
// Special cases:
477+
// parameterValue IN rowValue
478+
// rowValue IN parameterValue
479+
// All others are handled by standard function composition
480+
481+
const composeType = this.getComposeType(OPERATOR_OVERLAP, [leftFilter, rightFilter], [left, right]);
482+
if (composeType.errorClause != null) {
483+
return composeType.errorClause;
484+
} else if (composeType.argsType != null) {
485+
// This is a standard supported configuration, takes precedence over
486+
// the special cases below.
487+
return this.composeFunction(OPERATOR_OVERLAP, [leftFilter, rightFilter], [left, right]);
488+
} else if (isParameterValueClause(leftFilter) && isRowValueClause(rightFilter)) {
489+
// token_parameters.value IN table.some_array
490+
// bucket.param IN table.some_array
491+
const inputParam = this.basicInputParameter(leftFilter);
492+
493+
return {
494+
error: false,
495+
inputParameters: [inputParam],
496+
unbounded: true,
497+
filterRow(tables: QueryParameters): TrueIfParametersMatch {
498+
const aValue = rightFilter.evaluate(tables);
499+
if (aValue == null) {
500+
return MATCH_CONST_FALSE;
501+
}
502+
const values = JSON.parse(aValue as string);
503+
if (!Array.isArray(values)) {
504+
throw new Error('Not an array');
505+
}
506+
return values.map((value) => {
507+
return { [inputParam.key]: value };
508+
});
509+
},
510+
usesAuthenticatedRequestParameters: leftFilter.usesAuthenticatedRequestParameters,
511+
usesUnauthenticatedRequestParameters: leftFilter.usesUnauthenticatedRequestParameters
512+
} satisfies ParameterMatchClause;
513+
} else if (
514+
this.supportsExpandingParameters &&
515+
isRowValueClause(leftFilter) &&
516+
isParameterValueClause(rightFilter)
517+
) {
518+
// table.some_value && token_parameters.some_array
519+
// This expands into "OR(table_some_value = <value>)" for each value of both arrays.
520+
// We only support one such filter per query
521+
const key = `${rightFilter.key}[*]`;
522+
523+
const inputParam: InputParameter = {
524+
key: key,
525+
expands: true,
526+
filteredRowToLookupValue: (filterParameters) => {
527+
return filterParameters[key];
528+
},
529+
parametersToLookupValue: (parameters) => {
530+
return rightFilter.lookupParameterValue(parameters);
531+
}
532+
};
533+
534+
return {
535+
error: false,
536+
inputParameters: [inputParam],
537+
unbounded: false,
538+
filterRow(tables: QueryParameters): TrueIfParametersMatch {
539+
const value = leftFilter.evaluate(tables);
540+
if (!isJsonValue(value)) {
541+
// Cannot persist, e.g. BLOB
542+
return MATCH_CONST_FALSE;
543+
}
544+
545+
const values = JSON.parse(value as string);
546+
if (!Array.isArray(values)) {
547+
throw new Error('Not an array');
548+
}
549+
return values.map((value) => {
550+
return { [inputParam.key]: value };
551+
});
552+
},
553+
usesAuthenticatedRequestParameters: rightFilter.usesAuthenticatedRequestParameters,
554+
usesUnauthenticatedRequestParameters: rightFilter.usesUnauthenticatedRequestParameters
555+
} satisfies ParameterMatchClause;
556+
} else {
557+
// Not supported, return the error previously computed
558+
return this.error(composeType.error!, composeType.errorExpr);
559+
}
560+
}
561+
469562
parameterMatchClause(staticFilter: RowValueClause, otherFilter: ParameterValueClause) {
470563
const inputParam = this.basicInputParameter(otherFilter);
471564

packages/sync-rules/src/sql_functions.ts

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -743,23 +743,47 @@ export function evaluateOperator(op: string, a: SqliteValue, b: SqliteValue): Sq
743743
return sqliteBool(sqliteBool(a) && sqliteBool(b));
744744
case 'OR':
745745
return sqliteBool(sqliteBool(a) || sqliteBool(b));
746-
case 'IN':
746+
case 'IN': {
747747
if (a == null || b == null) {
748748
return null;
749749
}
750-
if (typeof b != 'string') {
751-
throw new Error('IN is only supported on JSON arrays');
750+
const bParsed = checkJsonArray(b, 'IN is only supported on JSON arrays');
751+
return sqliteBool(bParsed.includes(a));
752+
}
753+
case '&&': {
754+
// a && b evaluates to true iff they're both arrays and have a non-empty intersection.
755+
if (a == null || b == null) {
756+
return null;
752757
}
753-
const bParsed = JSON.parse(b);
754-
if (!Array.isArray(bParsed)) {
755-
throw new Error('IN is only supported on JSON arrays');
758+
759+
const aParsed = checkJsonArray(a, '&& is only supported on JSON arrays');
760+
const bParsed = checkJsonArray(a, '&& is only supported on JSON arrays');
761+
762+
for (const elementInA in aParsed) {
763+
if (bParsed.includes(elementInA)) {
764+
return sqliteBool(true);
765+
}
756766
}
757-
return sqliteBool(bParsed.includes(a));
767+
return sqliteBool(false);
768+
}
758769
default:
759770
throw new Error(`Operator not supported: ${op}`);
760771
}
761772
}
762773

774+
export function checkJsonArray(value: SqliteValue, errorMessage: string): any[] {
775+
if (typeof value != 'string') {
776+
throw new Error(errorMessage);
777+
}
778+
779+
const parsed = JSON.parse(value);
780+
if (!Array.isArray(parsed)) {
781+
throw new Error(value);
782+
}
783+
784+
return parsed;
785+
}
786+
763787
export function getOperatorReturnType(op: string, left: ExpressionType, right: ExpressionType) {
764788
switch (op) {
765789
case '=':
@@ -799,6 +823,7 @@ export function getOperatorReturnType(op: string, left: ExpressionType, right: E
799823
case 'OR':
800824
return ExpressionType.INTEGER;
801825
case 'IN':
826+
case '&&':
802827
return ExpressionType.INTEGER;
803828
default:
804829
return ExpressionType.NONE;
@@ -928,6 +953,8 @@ export const OPERATOR_NOT: SqlFunction = {
928953

929954
export const OPERATOR_IN = getOperatorFunction('IN');
930955

956+
export const OPERATOR_OVERLAP = getOperatorFunction('&&');
957+
931958
export function castOperator(castTo: string | undefined): SqlFunction | null {
932959
if (castTo == null || !CAST_TYPES.has(castTo)) {
933960
return null;

packages/sync-rules/src/streams/filter.ts

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { TablePattern } from '../TablePattern.js';
44
import { ParameterMatchClause, ParameterValueClause, RowValueClause, SqliteJsonValue } from '../types.js';
55
import { isJsonValue, normalizeParameterValue } from '../utils.js';
66
import { SqlTools } from '../sql_filters.js';
7-
import { OPERATOR_NOT } from '../sql_functions.js';
7+
import { checkJsonArray, OPERATOR_NOT } from '../sql_functions.js';
88
import { ParameterLookup } from '../BucketParameterQuerier.js';
99

1010
import { StreamVariant } from './variant.js';
@@ -107,6 +107,10 @@ export class Subquery {
107107
const id = context.queryCounter.toString();
108108
context.queryCounter++;
109109

110+
if (variant.parameters.length == 0) {
111+
throw new Error('Unsupported subquery without parameter, must depend on request parameters');
112+
}
113+
110114
return [variant, id] satisfies [StreamVariant, string];
111115
});
112116

@@ -175,38 +179,36 @@ export class InOperator extends FilterOperator {
175179

176180
compile(context: BucketCompilationContext): void {
177181
const subqueryEvaluator = this.right.compileEvaluator(context);
178-
179182
const filter = this.left;
180-
// Something like `SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issue WHERE owner_id = request.user())`
181-
// This groups rows into buckets identified by comments.issue_id, which happens in filterRow.
182-
// When a user connects, we need to resolve all the issue ids they own. This happens with an indirection:
183-
// 1. In the subquery evaluator, we create an index from owner_id to issue ids.
184-
// 2. When we have users, we use that index to find issue ids dynamically, with which we can build the buckets
185-
// to sync.
186-
context.currentVariant.parameters.push({
187-
lookup: {
188-
type: 'in',
189-
subquery: subqueryEvaluator
190-
},
191-
filterRow(options) {
192-
const tables = { [options.sourceTable.table]: options.record };
193-
const value = filter.evaluate(tables);
194-
if (isJsonValue(value)) {
195-
return [normalizeParameterValue(value)];
196-
} else {
197-
return [];
198-
}
199-
}
200-
});
201183

202184
if (isRowValueClause(this.left)) {
185+
// Something like `SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issue WHERE owner_id = request.user())`
186+
// This groups rows into buckets identified by comments.issue_id, which happens in filterRow.
187+
// When a user connects, we need to resolve all the issue ids they own. This happens with an indirection:
188+
// 1. In the subquery evaluator, we create an index from owner_id to issue ids.
189+
// 2. When we have users, we use that index to find issue ids dynamically, with which we can build the buckets
190+
// to sync.
191+
context.currentVariant.parameters.push({
192+
lookup: {
193+
type: 'in',
194+
subquery: subqueryEvaluator
195+
},
196+
filterRow(options) {
197+
const tables = { [options.sourceTable.table]: options.record };
198+
const value = filter.evaluate(tables);
199+
if (isJsonValue(value)) {
200+
return [normalizeParameterValue(value)];
201+
} else {
202+
return [];
203+
}
204+
}
205+
});
203206
} else if (isParameterValueClause(this.left)) {
204207
// Something like `SELECT * FROM comments WHERE request.user_id() IN (SELECT id FROM users WHERE is_admin)`.
205208
// This doesn't introduce a bucket parameter, but we still need to create a parameter lookup to determine whether
206209
// a given user should have access to the bucket or not. Because lookups can only be exact (we can't evaluate
207210
// `SELECT id FROM users WHERE is_admin` into a single ParameterLookup), we need to push the filter down into
208211
// the subquery, e.g. `WHERE EXISTS(SELECT id FROM users WHERE is_admin AND id = request.user_id())`.
209-
const left = this.left;
210212
const subqueryEvaluator = this.right.compileEvaluator(context);
211213

212214
context.currentVariant.requestFilters.push({
@@ -222,6 +224,42 @@ export class InOperator extends FilterOperator {
222224
}
223225
}
224226

227+
/**
228+
* An operator of the form `<left> && <right>`, where `right` is a subqery.
229+
*/
230+
export class OverlapOperator extends FilterOperator {
231+
private left: RowValueClause;
232+
private right: Subquery;
233+
234+
constructor(left: RowValueClause, right: Subquery) {
235+
super();
236+
this.left = left;
237+
this.right = right;
238+
}
239+
240+
compile(context: BucketCompilationContext): void {
241+
const subqueryEvaluator = this.right.compileEvaluator(context);
242+
const filter = this.left;
243+
244+
context.currentVariant.parameters.push({
245+
lookup: {
246+
type: 'overlap',
247+
subquery: subqueryEvaluator
248+
},
249+
filterRow(options) {
250+
const tables = { [options.sourceTable.table]: options.record };
251+
const value = filter.evaluate(tables);
252+
if (value == null) {
253+
return [];
254+
}
255+
256+
const parsed = checkJsonArray(value, 'Left side of && must evaluate to an array');
257+
return parsed.map(normalizeParameterValue);
258+
}
259+
});
260+
}
261+
}
262+
225263
/**
226264
* A filter that matches when _something_ exists in a subquery.
227265
*

packages/sync-rules/src/streams/from_sql.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
InOperator,
2727
Not,
2828
Or,
29+
OverlapOperator,
2930
ScalarExpression,
3031
Subquery
3132
} from './filter.js';
@@ -297,7 +298,35 @@ class SyncStreamCompiler {
297298
}
298299

299300
private compileOverlapOperator(tools: SqlTools, clause: ExprBinary): FilterOperator {
300-
throw 'todo';
301+
const left = tools.compileClause(clause.left);
302+
if (isClauseError(left)) {
303+
return recoverErrorClause(tools);
304+
}
305+
306+
if (clause.right.type == 'select') {
307+
if (!isRowValueClause(left)) {
308+
if (!isClauseError(left)) {
309+
tools.error('The left-hand side of an && operator must be derived from the row to sync..', clause.left);
310+
}
311+
312+
return recoverErrorClause(tools);
313+
}
314+
315+
const subqueryResult = this.compileSubquery(clause.right);
316+
if (!subqueryResult) {
317+
return recoverErrorClause(tools);
318+
}
319+
const [subquery] = subqueryResult;
320+
return new OverlapOperator(left, subquery);
321+
}
322+
323+
const right = tools.compileClause(clause.right);
324+
325+
// For cases 3-5, we can actually uses SqlTools.compileClause. Case 3 and 4 are handled specially in there and return
326+
// a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value
327+
// or a parameter-value clause which we can wrap in EvaluateSimpleCondition.
328+
const combined = tools.compileOverlapClause(clause.left, left, clause.right, right);
329+
return compiledClauseToFilter(tools, combined);
301330
}
302331

303332
private compileSubquery(stmt: SelectStatement): [Subquery, SqlTools] | undefined {

0 commit comments

Comments
 (0)