Skip to content

Commit f244fc8

Browse files
committed
Use versioned bucket ids
1 parent 2a3f669 commit f244fc8

File tree

14 files changed

+152
-26
lines changed

14 files changed

+152
-26
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const replicationMutex = new utils.Mutex();
4545
export interface MongoBucketBatchOptions {
4646
db: PowerSyncMongo;
4747
syncRules: SqlSyncRules;
48+
syncRulesId: string;
4849
groupId: number;
4950
slotName: string;
5051
lastCheckpointLsn: string | null;
@@ -72,6 +73,7 @@ export class MongoBucketBatch
7273
public readonly db: PowerSyncMongo;
7374
public readonly session: mongo.ClientSession;
7475
private readonly sync_rules: SqlSyncRules;
76+
private readonly syncRulesId: string;
7577

7678
private readonly group_id: number;
7779

@@ -127,6 +129,7 @@ export class MongoBucketBatch
127129
this.session = this.client.startSession();
128130
this.slot_name = options.slotName;
129131
this.sync_rules = options.syncRules;
132+
this.syncRulesId = options.syncRulesId;
130133
this.storeCurrentData = options.storeCurrentData;
131134
this.skipExistingRows = options.skipExistingRows;
132135
this.markRecordUnavailable = options.markRecordUnavailable;
@@ -461,7 +464,8 @@ export class MongoBucketBatch
461464
if (sourceTable.syncData) {
462465
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
463466
record: after,
464-
sourceTable
467+
sourceTable,
468+
bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(this.syncRulesId)
465469
});
466470

467471
for (let error of syncErrors) {

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ export class MongoSyncBucketStorage
154154
logger: options.logger,
155155
db: this.db,
156156
syncRules: this.sync_rules.parsed(options).sync_rules,
157+
syncRulesId: `${this.sync_rules.id}`,
157158
groupId: this.group_id,
158159
slotName: this.slot_name,
159160
lastCheckpointLsn: checkpoint_lsn,

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ export class PostgresSyncRulesStorage
330330
logger: options.logger ?? framework.logger,
331331
db: this.db,
332332
sync_rules: this.sync_rules.parsed(options).sync_rules,
333+
syncRulesId: `${this.sync_rules.id}`,
333334
group_id: this.group_id,
334335
slot_name: this.slot_name,
335336
last_checkpoint_lsn: checkpoint_lsn,

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export interface PostgresBucketBatchOptions {
2525
logger: Logger;
2626
db: lib_postgres.DatabaseClient;
2727
sync_rules: sync_rules.SqlSyncRules;
28+
syncRulesId: string;
2829
group_id: number;
2930
slot_name: string;
3031
last_checkpoint_lsn: string | null;
@@ -73,6 +74,7 @@ export class PostgresBucketBatch
7374

7475
protected write_checkpoint_batch: storage.CustomWriteCheckpointOptions[];
7576
protected readonly sync_rules: sync_rules.SqlSyncRules;
77+
private readonly syncRulesId: string;
7678
protected batch: OperationBatch | null;
7779
private lastWaitingLogThrottled = 0;
7880
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
@@ -88,6 +90,7 @@ export class PostgresBucketBatch
8890
this.resumeFromLsn = options.resumeFromLsn;
8991
this.write_checkpoint_batch = [];
9092
this.sync_rules = options.sync_rules;
93+
this.syncRulesId = options.syncRulesId;
9194
this.markRecordUnavailable = options.markRecordUnavailable;
9295
this.batch = null;
9396
this.persisted_op = null;
@@ -825,7 +828,8 @@ export class PostgresBucketBatch
825828
if (sourceTable.syncData) {
826829
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
827830
record: after,
828-
sourceTable
831+
sourceTable,
832+
bucketIdTransformer: sync_rules.SqlSyncRules.versionedBucketIdTransformer(this.syncRulesId)
829833
});
830834

831835
for (const error of syncErrors) {

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { SourceTableInterface } from './SourceTableInterface.js';
55
import { SqlTools } from './sql_filters.js';
66
import { TablePattern } from './TablePattern.js';
77
import {
8+
BucketIdTransformer,
89
EvaluationResult,
910
QueryParameters,
1011
QuerySchema,
@@ -25,6 +26,7 @@ export interface EvaluateRowOptions {
2526
table: SourceTableInterface;
2627
row: SqliteRow;
2728
bucketIds: (params: QueryParameters) => string[];
29+
bucketIdTransformer: BucketIdTransformer | null;
2830
}
2931

3032
export interface BaseSqlDataQueryOptions {
@@ -177,7 +179,7 @@ export class BaseSqlDataQuery {
177179

178180
evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
179181
try {
180-
const { table, row, bucketIds } = options;
182+
const { table, row, bucketIds, bucketIdTransformer } = options;
181183

182184
const tables = { [this.table]: this.addSpecialParameters(table, row) };
183185
const resolvedBucketIds = bucketIds(tables);
@@ -197,7 +199,7 @@ export class BaseSqlDataQuery {
197199

198200
return resolvedBucketIds.map((bucketId) => {
199201
return {
200-
bucket: bucketId,
202+
bucket: bucketIdTransformer ? bucketIdTransformer(bucketId) : bucketId,
201203
table: outputTable,
202204
id: id,
203205
data

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,13 @@ export class SqlBucketDescriptor implements BucketSource {
104104
continue;
105105
}
106106

107-
results.push(...query.evaluateRow(options.sourceTable, applyRowContext(options.record, this.compatibility)));
107+
results.push(
108+
...query.evaluateRow(
109+
options.sourceTable,
110+
applyRowContext(options.record, this.compatibility),
111+
options.bucketIdTransformer
112+
)
113+
);
108114
}
109115
return results;
110116
}

packages/sync-rules/src/SqlDataQuery.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { checkUnsupportedFeatures, isClauseError } from './sql_support.js';
1010
import { SyncRulesOptions } from './SqlSyncRules.js';
1111
import { TablePattern } from './TablePattern.js';
1212
import { TableQuerySchema } from './TableQuerySchema.js';
13-
import { EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js';
13+
import { BucketIdTransformer, EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js';
1414
import { getBucketId, isSelectStatement } from './utils.js';
1515

1616
export interface SqlDataQueryOptions extends BaseSqlDataQueryOptions {
@@ -185,10 +185,15 @@ export class SqlDataQuery extends BaseSqlDataQuery {
185185
this.filter = options.filter;
186186
}
187187

188-
evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] {
188+
evaluateRow(
189+
table: SourceTableInterface,
190+
row: SqliteRow,
191+
bucketIdTransformer: BucketIdTransformer | null
192+
): EvaluationResult[] {
189193
return this.evaluateRowWithOptions({
190194
table,
191195
row,
196+
bucketIdTransformer,
192197
bucketIds: (tables) => {
193198
const bucketParameters = this.filter.filterRow(tables);
194199
return bucketParameters.map((params) => getBucketId(this.descriptorName, this.bucketParameters, params));

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ export interface GetBucketParameterQuerierResult {
8585
export class SqlSyncRules implements SyncRules {
8686
bucketSources: BucketSource[] = [];
8787
eventDescriptors: SqlEventDescriptor[] = [];
88+
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
8889

8990
content: string;
9091

@@ -157,6 +158,7 @@ export class SqlSyncRules implements SyncRules {
157158
}
158159

159160
compatibility = new CompatibilityContext(edition, options);
161+
rules.compatibility = compatibility;
160162
}
161163

162164
// Bucket definitions using explicit parameter and data queries.
@@ -384,9 +386,17 @@ export class SqlSyncRules implements SyncRules {
384386
}
385387

386388
evaluateRowWithErrors(options: EvaluateRowOptions): { results: EvaluatedRow[]; errors: EvaluationError[] } {
389+
const resolvedOptions = this.compatibility.isEnabled(CompatibilityOption.versionedBucketIds)
390+
? options
391+
: {
392+
...options,
393+
// Disable bucket id transformer when the option is unused.
394+
transformBucketIds: null
395+
};
396+
387397
let rawResults: EvaluationResult[] = [];
388398
for (let source of this.bucketSources) {
389-
rawResults.push(...source.evaluateRow(options));
399+
rawResults.push(...source.evaluateRow(resolvedOptions));
390400
}
391401

392402
const results = rawResults.filter(isEvaluatedRow) as EvaluatedRow[];
@@ -502,4 +512,8 @@ export class SqlSyncRules implements SyncRules {
502512
}
503513
}
504514
}
515+
516+
static versionedBucketIdTransformer(version: string) {
517+
return (bucketId: string) => `${version}#${bucketId}`;
518+
}
505519
}

packages/sync-rules/src/compatibility.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,15 @@ export class CompatibilityOption {
2222
CompatibilityEdition.SYNC_STREAMS
2323
);
2424

25+
static versionedBucketIds = new CompatibilityOption(
26+
'versioned_bucket_ids',
27+
'Encode the version of sync rules in generated bucket ids, which avoids clients downloading data twice and improves client-side progress estimates.',
28+
CompatibilityEdition.SYNC_STREAMS
29+
);
30+
2531
static byName: Record<string, CompatibilityOption> = Object.freeze({
26-
timestamps_iso8601: this.timestampsIso8601
32+
timestamps_iso8601: this.timestampsIso8601,
33+
versioned_bucket_ids: this.versionedBucketIds
2734
});
2835
}
2936

packages/sync-rules/src/streams/stream.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,6 @@ export class SyncStream implements BucketSource {
143143
columns: this.data.columnOutputNames()
144144
}
145145
};
146-
147-
throw new Error('Method not implemented.');
148146
}
149147

150148
debugWriteOutputTables(result: Record<string, { query: string }[]>): void {
@@ -181,6 +179,7 @@ export class SyncStream implements BucketSource {
181179
return this.data.evaluateRowWithOptions({
182180
table: options.sourceTable,
183181
row: applyRowContext(options.record, this.compatibility),
182+
bucketIdTransformer: options.bucketIdTransformer,
184183
bucketIds() {
185184
const bucketIds: string[] = [];
186185
for (const variant of stream.variants) {

0 commit comments

Comments
 (0)