Skip to content

Commit ed3d53d

Browse files
committed
Start evaluating rows for sync streams
1 parent 81d2ddd commit ed3d53d

File tree

14 files changed

+575
-131
lines changed

14 files changed

+575
-131
lines changed

packages/service-core/src/routes/endpoints/sync-rules.ts

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
2-
import { SqlSyncRules, SyncRulesErrors } from '@powersync/service-sync-rules';
2+
import { SqlBucketDescriptor, SqlSyncRules, SyncRulesErrors } from '@powersync/service-sync-rules';
33
import type { FastifyPluginAsync } from 'fastify';
44
import * as t from 'ts-codec';
55

@@ -202,33 +202,36 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
202202

203203
return {
204204
valid: true,
205-
bucket_definitions: rules.bucketDescriptors.map((d) => {
206-
let all_parameter_queries = [...d.parameterQueries.values()].flat();
207-
let all_data_queries = [...d.dataQueries.values()].flat();
208-
return {
209-
name: d.name,
210-
bucket_parameters: d.bucketParameters,
211-
global_parameter_queries: d.globalParameterQueries.map((q) => {
212-
return {
213-
sql: q.sql
214-
};
215-
}),
216-
parameter_queries: all_parameter_queries.map((q) => {
217-
return {
218-
sql: q.sql,
219-
table: q.sourceTable,
220-
input_parameters: q.inputParameters
221-
};
222-
}),
223-
224-
data_queries: all_data_queries.map((q) => {
225-
return {
226-
sql: q.sql,
227-
table: q.sourceTable,
228-
columns: q.columnOutputNames()
229-
};
230-
})
231-
};
205+
bucket_definitions: rules.bucketSources.map((d) => {
206+
// TODO: Equivalent for streams
207+
if (d instanceof SqlBucketDescriptor) {
208+
let all_parameter_queries = [...d.parameterQueries.values()].flat();
209+
let all_data_queries = [...d.dataQueries.values()].flat();
210+
return {
211+
name: d.name,
212+
bucket_parameters: d.bucketParameters,
213+
global_parameter_queries: d.globalParameterQueries.map((q) => {
214+
return {
215+
sql: q.sql
216+
};
217+
}),
218+
parameter_queries: all_parameter_queries.map((q) => {
219+
return {
220+
sql: q.sql,
221+
table: q.sourceTable,
222+
input_parameters: q.inputParameters
223+
};
224+
}),
225+
226+
data_queries: all_data_queries.map((q) => {
227+
return {
228+
sql: q.sql,
229+
table: q.sourceTable,
230+
columns: q.columnOutputNames()
231+
};
232+
})
233+
};
234+
}
232235
}),
233236
source_tables: resolved_tables,
234237
data_tables: rules.debugGetOutputTables()

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import {
22
BucketDescription,
33
BucketPriority,
4-
isValidPriority,
54
RequestedStream,
65
RequestParameters,
76
ResolvedBucket,
8-
SqlSyncRules
7+
SqlSyncRules,
8+
SyncStream
99
} from '@powersync/service-sync-rules';
1010

1111
import * as storage from '../storage/storage-index.js';
@@ -234,11 +234,10 @@ export class BucketChecksumState {
234234
this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length });
235235
};
236236
bucketsToFetch = allBuckets;
237-
this.parameterState.syncRules.bucketDescriptors;
238237

239238
const subscriptions: util.StreamDescription[] = [];
240-
for (const desc of this.parameterState.syncRules.bucketDescriptors) {
241-
if (desc.type == SqlBucketDescriptorType.STREAM && this.parameterState.isSubscribedToStream(desc)) {
239+
for (const desc of this.parameterState.syncRules.bucketSources) {
240+
if (desc instanceof SyncStream && this.parameterState.isSubscribedToStream(desc)) {
242241
subscriptions.push({
243242
name: desc.name,
244243
is_default: desc.subscribedToByDefault
@@ -455,7 +454,7 @@ export class BucketParameterState {
455454
};
456455
}
457456

458-
isSubscribedToStream(desc: SqlBucketDescriptor): boolean {
457+
isSubscribedToStream(desc: SyncStream): boolean {
459458
return (desc.subscribedToByDefault && this.includeDefaultStreams) || this.subscribedStreamNames.has(desc.name);
460459
}
461460

packages/sync-rules/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
"@powersync/service-jsonbig": "workspace:^",
2525
"@syncpoint/wkx": "^0.5.0",
2626
"ajv": "^8.12.0",
27+
"lodash": "^4.17.21",
2728
"pgsql-ast-parser": "^11.1.0",
2829
"uuid": "^11.1.0",
2930
"yaml": "^2.3.1"
3031
},
3132
"devDependencies": {
33+
"@types/lodash": "^4.17.5",
3234
"@types/node": "^22.16.2",
3335
"vitest": "^3.0.5"
3436
}

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,29 @@ import { ColumnDefinition } from './ExpressionType.js';
44
import { SourceTableInterface } from './SourceTableInterface.js';
55
import { SqlTools } from './sql_filters.js';
66
import { TablePattern } from './TablePattern.js';
7-
import { QueryParameters, QuerySchema, SourceSchema, SourceSchemaTable, SqliteJsonRow, SqliteRow } from './types.js';
7+
import {
8+
EvaluationResult,
9+
QueryParameters,
10+
QuerySchema,
11+
SourceSchema,
12+
SourceSchemaTable,
13+
SqliteJsonRow,
14+
SqliteRow
15+
} from './types.js';
816
import { filterJsonRow } from './utils.js';
17+
import { castAsText } from './sql_functions.js';
918

1019
export interface RowValueExtractor {
1120
extract(tables: QueryParameters, into: SqliteRow): void;
1221
getTypes(schema: QuerySchema, into: Record<string, ColumnDefinition>): void;
1322
}
1423

24+
export interface EvaluateRowOptions {
25+
table: SourceTableInterface;
26+
row: SqliteRow;
27+
bucketIds: (params: QueryParameters) => string[];
28+
}
29+
1530
export interface BaseSqlDataQueryOptions {
1631
sourceTable: TablePattern;
1732
table: string;
@@ -149,6 +164,39 @@ export class BaseSqlDataQuery {
149164
return result;
150165
}
151166

167+
evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
168+
try {
169+
const { table, row, bucketIds } = options;
170+
171+
const tables = { [this.table]: this.addSpecialParameters(table, row) };
172+
const resolvedBucketIds = bucketIds(tables);
173+
174+
const data = this.transformRow(tables);
175+
let id = data.id;
176+
if (typeof id != 'string') {
177+
// While an explicit cast would be better, this covers against very common
178+
// issues when initially testing out sync, for example when the id column is an
179+
// auto-incrementing integer.
180+
// If there is no id column, we use a blank id. This will result in the user syncing
181+
// a single arbitrary row for this table - better than just not being able to sync
182+
// anything.
183+
id = castAsText(id) ?? '';
184+
}
185+
const outputTable = this.getOutputName(table.table);
186+
187+
return resolvedBucketIds.map((bucketId) => {
188+
return {
189+
bucket: bucketId,
190+
table: outputTable,
191+
id: id,
192+
data
193+
} as EvaluationResult;
194+
});
195+
} catch (e) {
196+
return [{ error: e.message ?? `Evaluating data query failed` }];
197+
}
198+
}
199+
152200
protected transformRow(tables: QueryParameters): SqliteJsonRow {
153201
let result: SqliteRow = {};
154202
for (let extractor of this.extractors) {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { BucketParameterQuerier, ParameterLookup } from './BucketParameterQuerier.js';
2+
import { ColumnDefinition } from './ExpressionType.js';
3+
import { SourceTableInterface } from './SourceTableInterface.js';
4+
import { GetQuerierOptions } from './SqlSyncRules.js';
5+
import { TablePattern } from './TablePattern.js';
6+
import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js';
7+
8+
/**
9+
* An interface declaring
10+
*
11+
* - which buckets the sync service should create when processing change streams from the database.
12+
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
13+
* - which buckets a given connection has access to.
14+
*
15+
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
16+
* definitions that only consist of a single query.
17+
*/
18+
export interface BucketSource {
19+
name: string;
20+
21+
/**
22+
* Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
23+
* be associated with.
24+
*
25+
* The returned {@link ParameterLookup} can be referenced by {@link pushBucketParameterQueriers} to allow the storage
26+
* system to find buckets.
27+
*/
28+
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[];
29+
30+
/**
31+
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
32+
* data for rows to add to buckets.
33+
*/
34+
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
35+
36+
/**
37+
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.
38+
*
39+
* @param result The target array to insert queriers into.
40+
* @param options Options, including parameters that may affect the buckets loaded by this source.
41+
*/
42+
pushBucketParameterQueriers(result: BucketParameterQuerier[], options: GetQuerierOptions): void;
43+
44+
/**
45+
* Whether {@link pushBucketParameterQueriers} may include a querier where
46+
* {@link BucketParameterQuerier.hasDynamicBuckets} is true.
47+
*
48+
* This is mostly used for testing.
49+
*/
50+
hasDynamicBucketQueries(): boolean;
51+
52+
getSourceTables(): Set<TablePattern>;
53+
54+
/** Whether the table possibly affects the buckets resolved by this source. */
55+
tableSyncsParameters(table: SourceTableInterface): boolean;
56+
57+
/** Whether the table possibly affects the contents of buckets resolved by this source. */
58+
tableSyncsData(table: SourceTableInterface): boolean;
59+
60+
/**
61+
* Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this
62+
* source.
63+
*
64+
* This is use to generate the client-side schema.
65+
*/
66+
resolveResultSets(schema: SourceSchema): ResultSetDescription[];
67+
}
68+
69+
export type ResultSetDescription = { name: string; columns: ColumnDefinition[] };

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BucketDescription, BucketInclusionReason, ResolvedBucket } from './BucketDescription.js';
22
import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js';
3+
import { BucketSource, ResultSetDescription } from './BucketSource.js';
34
import { IdSequence } from './IdSequence.js';
45
import { SourceTableInterface } from './SourceTableInterface.js';
56
import { SqlDataQuery } from './SqlDataQuery.js';
@@ -16,6 +17,7 @@ import {
1617
EvaluationResult,
1718
QueryParseOptions,
1819
RequestParameters,
20+
SourceSchema,
1921
SqliteRow,
2022
StreamParseOptions
2123
} from './types.js';
@@ -34,7 +36,7 @@ export enum SqlBucketDescriptorType {
3436
STREAM
3537
}
3638

37-
export class SqlBucketDescriptor {
39+
export class SqlBucketDescriptor implements BucketSource {
3840
name: string;
3941
bucketParameters?: string[];
4042
type: SqlBucketDescriptorType;
@@ -137,19 +139,16 @@ export class SqlBucketDescriptor {
137139
/**
138140
* @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level.
139141
*/
140-
getBucketParameterQuerier(options: GetQuerierOptions, parameters: RequestParameters): BucketParameterQuerier {
142+
getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier {
141143
const queriers: BucketParameterQuerier[] = [];
142-
this.pushBucketParameterQueriers(queriers, options, parameters);
144+
this.pushBucketParameterQueriers(queriers, options);
143145

144146
return mergeBucketParameterQueriers(queriers);
145147
}
146148

147-
pushBucketParameterQueriers(
148-
result: BucketParameterQuerier[],
149-
options: GetQuerierOptions,
150-
parameters: RequestParameters
151-
) {
149+
pushBucketParameterQueriers(result: BucketParameterQuerier[], options: GetQuerierOptions) {
152150
const reasons = [this.bucketInclusionReason(options)];
151+
const parameters = options.globalParameters;
153152
const staticBuckets = this.getStaticBucketDescriptions(parameters, reasons);
154153
const staticQuerier = {
155154
staticBuckets,
@@ -223,4 +222,13 @@ export class SqlBucketDescriptor {
223222
}
224223
return false;
225224
}
225+
226+
resolveResultSets(schema: SourceSchema): ResultSetDescription[] {
227+
const descriptions: ResultSetDescription[] = [];
228+
for (const query of this.dataQueries) {
229+
descriptions.push(...query.getColumnOutputs(schema));
230+
}
231+
232+
return descriptions;
233+
}
226234
}

packages/sync-rules/src/SqlDataQuery.ts

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -186,36 +186,15 @@ export class SqlDataQuery extends BaseSqlDataQuery {
186186
}
187187

188188
evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] {
189-
try {
190-
const tables = { [this.table]: this.addSpecialParameters(table, row) };
191-
const bucketParameters = this.filter.filterRow(tables);
192-
const bucketIds = bucketParameters.map((params) =>
193-
getBucketId(this.descriptorName, this.bucketParameters, params)
194-
);
195-
196-
const data = this.transformRow(tables);
197-
let id = data.id;
198-
if (typeof id != 'string') {
199-
// While an explicit cast would be better, this covers against very common
200-
// issues when initially testing out sync, for example when the id column is an
201-
// auto-incrementing integer.
202-
// If there is no id column, we use a blank id. This will result in the user syncing
203-
// a single arbitrary row for this table - better than just not being able to sync
204-
// anything.
205-
id = castAsText(id) ?? '';
189+
const query = this;
190+
191+
return this.evaluateRowWithOptions({
192+
table,
193+
row,
194+
bucketIds(tables) {
195+
const bucketParameters = query.filter.filterRow(tables);
196+
return bucketParameters.map((params) => getBucketId(query.descriptorName, query.bucketParameters, params));
206197
}
207-
const outputTable = this.getOutputName(table.table);
208-
209-
return bucketIds.map((bucketId) => {
210-
return {
211-
bucket: bucketId,
212-
table: outputTable,
213-
id: id,
214-
data
215-
} as EvaluationResult;
216-
});
217-
} catch (e) {
218-
return [{ error: e.message ?? `Evaluating data query failed` }];
219-
}
198+
});
220199
}
221200
}

0 commit comments

Comments
 (0)