Skip to content
9 changes: 9 additions & 0 deletions .changeset/little-beans-carry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-sync-rules': minor
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
---

Cleanup on internal sync rules implementation and APIs.
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ export class MongoBucketBatch
* Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable}
*/
protected getTableEvents(table: storage.SourceTable): SqlEventDescriptor[] {
return this.sync_rules.event_descriptors.filter((evt) =>
return this.sync_rules.eventDescriptors.filter((evt) =>
[...evt.getSourceTables()].some((sourceTable) => sourceTable.matches(table))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ export class PersistedBatch {
k: sourceKey
},
lookup: binLookup,
bucket_parameters: result.bucket_parameters
bucket_parameters: result.bucketParameters
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ export class PostgresBucketBatch
* TODO maybe share this with an abstract class
*/
protected getTableEvents(table: storage.SourceTable): sync_rules.SqlEventDescriptor[] {
return this.sync_rules.event_descriptors.filter((evt) =>
return this.sync_rules.eventDescriptors.filter((evt) =>
[...evt.getSourceTables()].some((sourceTable) => sourceTable.matches(table))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class PostgresPersistedBatch {
const base64 = binLookup.toString('base64');
remaining_lookups.delete(base64);
const hexLookup = binLookup.toString('hex');
const serializedBucketParameters = JSONBig.stringify(result.bucket_parameters);
const serializedBucketParameters = JSONBig.stringify(result.bucketParameters);
this.parameterDataInserts.push({
group_id: this.group_id,
source_table: table.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ bucket_definitions:

const parameters = new RequestParameters({ sub: 'u1' }, {});

const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0];

const lookups = q1.getLookups(parameters);
expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]);
Expand Down Expand Up @@ -474,7 +474,7 @@ bucket_definitions:

const parameters = new RequestParameters({ sub: 'unknown' }, {});

const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0];

const lookups = q1.getLookups(parameters);
expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]);
Expand Down Expand Up @@ -564,15 +564,15 @@ bucket_definitions:
const parameters = new RequestParameters({ sub: 'u1' }, {});

// Test intermediate values - could be moved to sync_rules.test.ts
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0];
const lookups1 = q1.getLookups(parameters);
expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]);

const parameter_sets1 = await bucketStorage.getParameterSets(checkpoint, lookups1);
parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]);

const q2 = sync_rules.bucket_descriptors[0].parameter_queries[1];
const q2 = sync_rules.bucketDescriptors[0].parameterQueries[1];
const lookups2 = q2.getLookups(parameters);
expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]);

Expand Down
12 changes: 6 additions & 6 deletions packages/service-core/src/routes/endpoints/sync-rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {

return {
valid: true,
bucket_definitions: rules.bucket_descriptors.map((d) => {
let all_parameter_queries = [...d.parameter_queries.values()].flat();
let all_data_queries = [...d.data_queries.values()].flat();
bucket_definitions: rules.bucketDescriptors.map((d) => {
let all_parameter_queries = [...d.parameterQueries.values()].flat();
let all_data_queries = [...d.dataQueries.values()].flat();
return {
name: d.name,
bucket_parameters: d.bucket_parameters,
global_parameter_queries: d.global_parameter_queries.map((q) => {
bucket_parameters: d.bucketParameters,
global_parameter_queries: d.globalParameterQueries.map((q) => {
return {
sql: q.sql
};
Expand All @@ -217,7 +217,7 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
return {
sql: q.sql,
table: q.sourceTable,
input_parameters: q.input_parameters
input_parameters: q.inputParameters
};
}),

Expand Down
4 changes: 2 additions & 2 deletions packages/service-core/src/sync/BucketChecksumState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class BucketChecksumState {
*/
async buildNextCheckpointLine(next: storage.StorageCheckpointUpdate): Promise<CheckpointLine | null> {
const { writeCheckpoint, base } = next;
const user_id = this.parameterState.syncParams.user_id;
const user_id = this.parameterState.syncParams.userId;

const storage = this.bucketStorage;

Expand Down Expand Up @@ -378,7 +378,7 @@ export class BucketParameterState {
);
this.logger.error(error.message, {
checkpoint: checkpoint,
user_id: this.syncParams.user_id,
user_id: this.syncParams.userId,
buckets: update.buckets.length
});

Expand Down
4 changes: 2 additions & 2 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async function* streamResponseInner(
): AsyncGenerator<util.StreamingSyncLine | string | null> {
const { raw_data, binary_data } = params;

const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id);
const checkpointUserId = util.checkpointUserId(syncParams.tokenParameters.user_id as string, params.client_id);

const checksumState = new BucketChecksumState({
syncContext,
Expand Down Expand Up @@ -228,7 +228,7 @@ async function* streamResponseInner(
onRowsSent: markOperationsSent,
abort_connection: signal,
abort_batch: abortCheckpointSignal,
user_id: syncParams.user_id,
user_id: syncParams.userId,
// Passing null here will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
// sync complete message instead.
forPriority: !isLast ? priority : null,
Expand Down
109 changes: 83 additions & 26 deletions packages/sync-rules/src/BaseSqlDataQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,88 @@ export interface RowValueExtractor {
getTypes(schema: QuerySchema, into: Record<string, ColumnDefinition>): void;
}

export class BaseSqlDataQuery {
sourceTable?: TablePattern;
table?: string;
sql?: string;
columns?: SelectedColumn[];
extractors: RowValueExtractor[] = [];
descriptor_name?: string;
bucket_parameters?: string[];
tools?: SqlTools;

ruleId?: string;

errors: SqlRuleError[] = [];
export interface BaseSqlDataQueryOptions {
sourceTable: TablePattern;
table: string;
sql: string;
columns: SelectedColumn[];
extractors: RowValueExtractor[];
descriptorName: string;
bucketParameters: string[];
tools: SqlTools;

errors?: SqlRuleError[];
}

constructor() {}
export class BaseSqlDataQuery {
/**
* Source table or table pattern.
*/
readonly sourceTable: TablePattern;

/**
* The table name or alias used in the query.
*
* This is used for the output table name.
*/
readonly table: string;

/**
* The source SQL query, for debugging purposes.
*/
readonly sql: string;

/**
* Query columns, for debugging purposes.
*/
readonly columns: SelectedColumn[];

/**
* Extracts input row into output row. This is the column list in the SELECT part of the query.
*
* This may include plain column names, wildcards, and basic expressions.
*/
readonly extractors: RowValueExtractor[];

/**
* Bucket definition name.
*/
readonly descriptorName: string;
/**
* Bucket parameter names, without the `bucket.` prefix.
*
* These are received from the associated parameter query (if any), and must match the filters
* used in the data query.
*/
readonly bucketParameters: string[];
/**
* Used to generate debugging info.
*/
private readonly tools: SqlTools;

readonly errors: SqlRuleError[];

constructor(options: BaseSqlDataQueryOptions) {
this.sourceTable = options.sourceTable;
this.table = options.table;
this.sql = options.sql;
this.columns = options.columns;
this.extractors = options.extractors;
this.descriptorName = options.descriptorName;
this.bucketParameters = options.bucketParameters;
this.tools = options.tools;
this.errors = options.errors ?? [];
}

applies(table: SourceTableInterface) {
return this.sourceTable?.matches(table);
return this.sourceTable.matches(table);
}

addSpecialParameters(table: SourceTableInterface, row: SqliteRow) {
if (this.sourceTable!.isWildcard) {
if (this.sourceTable.isWildcard) {
return {
...row,
_table_suffix: this.sourceTable!.suffix(table.table)
_table_suffix: this.sourceTable.suffix(table.table)
};
} else {
return row;
Expand All @@ -48,17 +105,17 @@ export class BaseSqlDataQuery {
// Wildcard without alias - use source
return sourceTable;
} else {
return this.table!;
return this.table;
}
}

isUnaliasedWildcard() {
return this.sourceTable!.isWildcard && this.table == this.sourceTable!.tablePattern;
return this.sourceTable.isWildcard && this.table == this.sourceTable.tablePattern;
}

columnOutputNames(): string[] {
return this.columns!.map((c) => {
return this.tools!.getOutputName(c);
return this.columns.map((c) => {
return this.tools.getOutputName(c);
});
}

Expand All @@ -67,7 +124,7 @@ export class BaseSqlDataQuery {

if (this.isUnaliasedWildcard()) {
// Separate results
for (let schemaTable of schema.getTables(this.sourceTable!)) {
for (let schemaTable of schema.getTables(this.sourceTable)) {
let output: Record<string, ColumnDefinition> = {};

this.getColumnOutputsFor(schemaTable, output);
Expand All @@ -80,11 +137,11 @@ export class BaseSqlDataQuery {
} else {
// Merged results
let output: Record<string, ColumnDefinition> = {};
for (let schemaTable of schema.getTables(this.sourceTable!)) {
for (let schemaTable of schema.getTables(this.sourceTable)) {
this.getColumnOutputsFor(schemaTable, output);
}
result.push({
name: this.table!,
name: this.table,
columns: Object.values(output)
});
}
Expand All @@ -103,15 +160,15 @@ export class BaseSqlDataQuery {
protected getColumnOutputsFor(schemaTable: SourceSchemaTable, output: Record<string, ColumnDefinition>) {
const querySchema: QuerySchema = {
getColumn: (table, column) => {
if (table == this.table!) {
if (table == this.table) {
return schemaTable.getColumn(column);
} else {
// TODO: bucket parameters?
return undefined;
}
},
getColumns: (table) => {
if (table == this.table!) {
if (table == this.table) {
return schemaTable.getColumns();
} else {
return [];
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-rules/src/BucketDescription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
export type BucketPriority = 0 | 1 | 2 | 3;

export const defaultBucketPriority: BucketPriority = 3;
export const DEFAULT_BUCKET_PRIORITY: BucketPriority = 3;

export const isValidPriority = (i: number): i is BucketPriority => {
return Number.isInteger(i) && i >= 0 && i <= 3;
Expand Down
Loading