Skip to content

Commit c65dbd0

Browse files
committed
Track bucket parameter changes.
1 parent 6258202 commit c65dbd0

File tree

9 files changed

+158
-52
lines changed

9 files changed

+158
-52
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 84 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,30 @@ import {
99
} from '@powersync/lib-services-framework';
1010
import {
1111
BroadcastIterable,
12+
getLookupBucketDefinitionName,
1213
ReplicationCheckpoint,
1314
storage,
1415
utils,
1516
WatchWriteCheckpointOptions
1617
} from '@powersync/service-core';
1718
import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
1819
import * as bson from 'bson';
20+
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
1921
import * as timers from 'timers/promises';
2022
import { MongoBucketStorage } from '../MongoBucketStorage.js';
2123
import { PowerSyncMongo } from './db.js';
22-
import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleCheckpointState, SyncRuleDocument } from './models.js';
24+
import {
25+
BucketDataDocument,
26+
BucketDataKey,
27+
BucketParameterDocument,
28+
SourceKey,
29+
SyncRuleCheckpointState,
30+
SyncRuleDocument
31+
} from './models.js';
2332
import { MongoBucketBatch } from './MongoBucketBatch.js';
2433
import { MongoCompactor } from './MongoCompactor.js';
2534
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
2635
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
27-
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
2836

2937
export class MongoSyncBucketStorage
3038
extends DisposableObserver<storage.SyncRulesBucketStorageListener>
@@ -734,7 +742,10 @@ export class MongoSyncBucketStorage
734742
}
735743
} else if (update.ns.coll == this.db.bucket_data.collectionName) {
736744
const bucket = (update.documentKey._id as unknown as BucketDataKey).b;
737-
yield { bucket: bucket };
745+
yield { dataBucket: bucket };
746+
} else if (update.ns.coll == this.db.bucket_parameters.collectionName && update.fullDocument != null) {
747+
const bucketDefinition = getLookupBucketDefinitionName((update.fullDocument as BucketParameterDocument).lookup);
748+
yield { parameterBucketDefinition: bucketDefinition };
738749
}
739750
}
740751
}
@@ -757,15 +768,24 @@ export class MongoSyncBucketStorage
757768
for await (const event of iter) {
758769
if (filter) {
759770
if (event.invalidate) {
760-
shouldUpdate ||= filter({
761-
invalidate: true
762-
});
771+
shouldUpdate =
772+
filter({
773+
invalidate: true
774+
}) || shouldUpdate;
763775
}
764776

765-
if (event.bucket) {
766-
shouldUpdate ||= filter({
767-
bucket: event.bucket
768-
});
777+
if (event.dataBucket) {
778+
shouldUpdate =
779+
filter({
780+
changedDataBucket: event.dataBucket
781+
}) || shouldUpdate;
782+
}
783+
784+
if (event.parameterBucketDefinition) {
785+
shouldUpdate =
786+
filter({
787+
changedParameterBucketDefinition: event.parameterBucketDefinition
788+
}) || shouldUpdate;
769789
}
770790
}
771791

@@ -836,23 +856,57 @@ export class MongoSyncBucketStorage
836856

837857
private getChangeStreamPipeline() {
838858
const syncRulesId = this.group_id;
859+
const parsedSyncRules = this.getParsedSyncRules({
860+
// Not applicable here
861+
defaultSchema: 'default'
862+
});
863+
const staticBucketDefinitions = parsedSyncRules.getStaticBucketDefinitionList();
864+
const staticBucketFilters = staticBucketDefinitions.map((name) => {
865+
return {
866+
// Check that the docuemnt starts with bucket name
867+
'documentKey._id.b': {
868+
$gte: name + '[',
869+
$lt: name + '[\uFFFF'
870+
}
871+
};
872+
});
873+
874+
let filters: mongo.Document[] = [
875+
// sync_rules events
876+
{
877+
'ns.coll': this.db.sync_rules.collectionName,
878+
'documentKey._id': syncRulesId,
879+
operationType: { $in: ['insert', 'update', 'replace'] }
880+
},
881+
// bucket_data events
882+
{
883+
'ns.coll': this.db.bucket_data.collectionName,
884+
'documentKey._id.g': syncRulesId,
885+
operationType: 'insert',
886+
$or: staticBucketFilters
887+
},
888+
// bucket_parameters events
889+
{
890+
'ns.coll': this.db.bucket_parameters.collectionName,
891+
'fullDocument.key.g': syncRulesId,
892+
operationType: 'insert'
893+
}
894+
];
895+
896+
if (staticBucketFilters.length > 0) {
897+
// bucket_data events, only if there are static bucket definitions
898+
filters.push({
899+
'ns.coll': this.db.bucket_data.collectionName,
900+
'documentKey._id.g': syncRulesId,
901+
operationType: 'insert',
902+
$or: staticBucketFilters
903+
});
904+
}
905+
839906
const pipeline: mongo.Document[] = [
840907
{
841908
$match: {
842-
$or: [
843-
// sync_rules events
844-
{
845-
'ns.coll': this.db.sync_rules.collectionName,
846-
'documentKey._id': syncRulesId,
847-
operationType: { $in: ['insert', 'update', 'replace'] }
848-
},
849-
// bucket_data events
850-
{
851-
'ns.coll': this.db.bucket_data.collectionName,
852-
'documentKey._id.g': syncRulesId,
853-
operationType: 'insert'
854-
}
855-
]
909+
$or: filters
856910
}
857911
},
858912
{
@@ -865,6 +919,10 @@ export class MongoSyncBucketStorage
865919
// For sync_rules, this contains the sync_rules id
866920
'documentKey._id': 1,
867921

922+
// For bucket_parameters
923+
'fullDocument.key': 1,
924+
'fullDocument.lookup': 1,
925+
868926
// For sync_rules events
869927
'updateDescription.updatedFields.state': 1,
870928
'updateDescription.updatedFields.last_checkpoint': 1,
@@ -882,6 +940,7 @@ export class MongoSyncBucketStorage
882940

883941
interface BucketCheckpointEvent {
884942
checkpoint?: ReplicationCheckpoint;
885-
bucket?: string;
943+
dataBucket?: string;
944+
parameterBucketDefinition?: string;
886945
invalidate?: boolean;
887946
}

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ export interface WatchWriteCheckpointOptions {
222222
}
223223

224224
export interface WatchFilterEvent {
225-
bucket?: string;
225+
changedDataBucket?: string;
226+
changedParameterBucketDefinition?: string;
226227
invalidate?: boolean;
227228
}
228229

packages/service-core/src/storage/bson.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ export const serializeLookup = (lookup: SqliteJsonValue[]) => {
2929
return new bson.Binary(serializeLookupBuffer(lookup));
3030
};
3131

32+
export const getLookupBucketDefinitionName = (lookup: bson.Binary) => {
33+
const parsed = bson.deserialize(lookup.buffer, BSON_DESERIALIZE_OPTIONS).l as SqliteJsonValue[];
34+
return parsed[0] as string;
35+
};
36+
3237
/**
3338
* True if this is a bson.UUID.
3439
*

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -280,27 +280,44 @@ export class BucketParameterState {
280280
this.invalidated = true;
281281
this.pendingBuckets.clear();
282282
return true;
283-
} else if (event.bucket != null) {
283+
} else if (event.changedDataBucket != null) {
284284
const querier = this.querier;
285285
const staticBuckets = this.staticBuckets;
286286
if (querier.hasDynamicBuckets) {
287-
// TODO: Check if the dynamic buckets may match this one
288-
this.invalidated = true;
289-
this.pendingBuckets.clear();
290-
return true;
287+
// Check for a maybe-match on dynamic buckets
288+
// We could expand this to check our last set of buckets
289+
for (let def of this.querier.dynamicBucketDefinitions) {
290+
if (event.changedDataBucket.startsWith(def)) {
291+
return this.invalidatePendingBuckets();
292+
}
293+
}
291294
}
292295

293-
if (staticBuckets.has(event.bucket)) {
294-
this.pendingBuckets.add(event.bucket);
296+
// For static buckets we can check for an exact match
297+
if (staticBuckets.has(event.changedDataBucket)) {
298+
this.pendingBuckets.add(event.changedDataBucket);
295299
return true;
296300
} else {
297301
return false;
298302
}
303+
} else if (event.changedParameterBucketDefinition) {
304+
if (this.querier.dynamicBucketDefinitions.has(event.changedParameterBucketDefinition)) {
305+
// Potential change in dynamic bucket list - invalidate
306+
return this.invalidatePendingBuckets();
307+
}
308+
// The parameter bucket definition is not relevant for this connection
309+
return false;
299310
} else {
300311
return false;
301312
}
302313
};
303314

315+
private invalidatePendingBuckets() {
316+
this.invalidated = true;
317+
this.pendingBuckets.clear();
318+
return true;
319+
}
320+
304321
async getCheckpointUpdate(checkpoint: util.OpId): Promise<CheckpointUpdate> {
305322
const querier = this.querier;
306323
let update: CheckpointUpdate;

packages/service-core/test/src/sync/BucketChecksumState.test.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ bucket_definitions:
252252
// Invalidate the state for global[1] - will only re-check the single bucket.
253253
// This is essentially inconsistent state, but is the simplest way to test that
254254
// the filter is working.
255-
state.checkpointFilter({ bucket: 'global[1]' });
255+
state.checkpointFilter({ changedDataBucket: 'global[1]' });
256256

257257
const line2 = (await state.buildNextCheckpointLine({
258258
base: { checkpoint: '2', lsn: '2' },
@@ -460,8 +460,7 @@ bucket_definitions:
460460
state.updateBucketPosition({ bucket: 'by_project[1]', nextAfter: '1', hasMore: false });
461461
state.updateBucketPosition({ bucket: 'by_project[2]', nextAfter: '1', hasMore: false });
462462

463-
// Update bucket storage state
464-
state.checkpointFilter({ invalidate: true });
463+
state.checkpointFilter({ changedParameterBucketDefinition: 'by_project' });
465464

466465
storage.getParameterSets = async (checkpoint: OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> => {
467466
expect(checkpoint).toEqual('2');
@@ -494,7 +493,7 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage {
494493

495494
updateTestChecksum(checksum: BucketChecksum): void {
496495
this.state.set(checksum.bucket, checksum);
497-
this.filter?.({ bucket: checksum.bucket });
496+
this.filter?.({ changedDataBucket: checksum.bucket });
498497
}
499498

500499
invalidate() {

packages/sync-rules/src/BucketParameterQuerier.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ export interface BucketParameterQuerier {
1818
/**
1919
* True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used.
2020
*
21-
* If this is false, queryDynamicBucketDescriptions() will always return an empty array.
21+
* If this is false, queryDynamicBucketDescriptions() will always return an empty array,
22+
* and dynamicBucketDefinitions.size == 0.
2223
*/
2324
readonly hasDynamicBuckets: boolean;
2425

26+
readonly dynamicBucketDefinitions: Set<string>;
27+
2528
/**
2629
* These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint.
2730
*
@@ -44,9 +47,11 @@ export interface QueryBucketDescriptorOptions extends ParameterLookupSource {
4447
}
4548

4649
export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]): BucketParameterQuerier {
50+
const dynamicBucketDefinitions = new Set<string>(...queriers.flatMap((q) => [...q.dynamicBucketDefinitions]));
4751
return {
4852
staticBuckets: queriers.flatMap((q) => q.staticBuckets),
49-
hasDynamicBuckets: queriers.some((q) => q.hasDynamicBuckets),
53+
hasDynamicBuckets: dynamicBucketDefinitions.size > 0,
54+
dynamicBucketDefinitions,
5055
async queryDynamicBucketDescriptions(source: ParameterLookupSource) {
5156
let results: BucketDescription[] = [];
5257
for (let q of queriers) {

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import { BucketDescription } from './BucketDescription.js';
2-
import {
3-
BucketParameterQuerier,
4-
mergeBucketParameterQueriers,
5-
QueryBucketDescriptorOptions
6-
} from './BucketParameterQuerier.js';
2+
import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js';
73
import { IdSequence } from './IdSequence.js';
84
import { SourceTableInterface } from './SourceTableInterface.js';
95
import { SqlDataQuery } from './SqlDataQuery.js';
@@ -13,8 +9,8 @@ import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
139
import { TablePattern } from './TablePattern.js';
1410
import { SqlRuleError } from './errors.js';
1511
import {
16-
EvaluateRowOptions,
1712
EvaluatedParametersResult,
13+
EvaluateRowOptions,
1814
EvaluationResult,
1915
QueryParseOptions,
2016
RequestParameters,
@@ -117,6 +113,7 @@ export class SqlBucketDescriptor {
117113
const staticQuerier = {
118114
staticBuckets,
119115
hasDynamicBuckets: false,
116+
dynamicBucketDefinitions: new Set<string>(),
120117
queryDynamicBucketDescriptions: async () => []
121118
} satisfies BucketParameterQuerier;
122119

@@ -128,6 +125,13 @@ export class SqlBucketDescriptor {
128125
return mergeBucketParameterQueriers([staticQuerier, ...dynamicQueriers]);
129126
}
130127

128+
/**
129+
* Return bucket definition names for all static buckets.
130+
*/
131+
hasStaticBuckets(): boolean {
132+
return this.global_parameter_queries.length > 0;
133+
}
134+
131135
getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] {
132136
let results: BucketDescription[] = [];
133137
for (let query of this.global_parameter_queries) {

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import { parse, SelectedColumn } from 'pgsql-ast-parser';
2+
import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js';
3+
import { BucketParameterQuerier, ParameterLookupSource } from './BucketParameterQuerier.js';
24
import { SqlRuleError } from './errors.js';
35
import { SourceTableInterface } from './SourceTableInterface.js';
46
import { SqlTools } from './sql_filters.js';
57
import { checkUnsupportedFeatures, isClauseError, isParameterValueClause } from './sql_support.js';
68
import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
79
import { TablePattern } from './TablePattern.js';
810
import { TableQuerySchema } from './TableQuerySchema.js';
11+
import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js';
912
import {
1013
EvaluatedParameters,
1114
EvaluatedParametersResult,
@@ -21,13 +24,6 @@ import {
2124
SqliteRow
2225
} from './types.js';
2326
import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement } from './utils.js';
24-
import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js';
25-
import { BucketDescription, BucketPriority, defaultBucketPriority } from './BucketDescription.js';
26-
import {
27-
BucketParameterQuerier,
28-
ParameterLookupSource,
29-
QueryBucketDescriptorOptions
30-
} from './BucketParameterQuerier.js';
3127

3228
/**
3329
* Represents a parameter query, such as:
@@ -193,6 +189,10 @@ export class SqlParameterQuery {
193189
priority?: BucketPriority;
194190

195191
filter?: ParameterMatchClause;
192+
193+
/**
194+
* Bucket definition name.
195+
*/
196196
descriptor_name?: string;
197197

198198
/** _Input_ token / user parameters */
@@ -375,13 +375,15 @@ export class SqlParameterQuery {
375375
return {
376376
staticBuckets: [],
377377
hasDynamicBuckets: false,
378+
dynamicBucketDefinitions: new Set<string>(),
378379
queryDynamicBucketDescriptions: async () => []
379380
};
380381
}
381382

382383
return {
383384
staticBuckets: [],
384385
hasDynamicBuckets: true,
386+
dynamicBucketDefinitions: new Set<string>(this.descriptor_name!),
385387
queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => {
386388
const bucketParameters = await source.getParameterSets(lookups);
387389
return this.resolveBucketDescriptions(bucketParameters, requestParameters);

0 commit comments

Comments
 (0)