Skip to content

Commit 6af9389

Browse files
committed
Redo checkpoint change tracking.
1 parent c65dbd0 commit 6af9389

File tree

5 files changed

+182
-127
lines changed

5 files changed

+182
-127
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 70 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99
} from '@powersync/lib-services-framework';
1010
import {
1111
BroadcastIterable,
12+
CheckpointChanges,
13+
GetCheckpointChangesOptions,
1214
getLookupBucketDefinitionName,
1315
ReplicationCheckpoint,
1416
storage,
@@ -758,37 +760,13 @@ export class MongoSyncBucketStorage
758760
/**
759761
* User-specific watch on the latest checkpoint and/or write checkpoint.
760762
*/
761-
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpoint> {
762-
const { user_id, signal, filter } = options;
763+
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
764+
const { user_id, signal } = options;
763765
let lastCheckpoint: utils.OpId | null = null;
764766
let lastWriteCheckpoint: bigint | null = null;
765767

766768
const iter = wrapWithAbort(this.sharedIter, signal);
767-
let shouldUpdate = false;
768769
for await (const event of iter) {
769-
if (filter) {
770-
if (event.invalidate) {
771-
shouldUpdate =
772-
filter({
773-
invalidate: true
774-
}) || shouldUpdate;
775-
}
776-
777-
if (event.dataBucket) {
778-
shouldUpdate =
779-
filter({
780-
changedDataBucket: event.dataBucket
781-
}) || shouldUpdate;
782-
}
783-
784-
if (event.parameterBucketDefinition) {
785-
shouldUpdate =
786-
filter({
787-
changedParameterBucketDefinition: event.parameterBucketDefinition
788-
}) || shouldUpdate;
789-
}
790-
}
791-
792770
if (event.checkpoint) {
793771
const { checkpoint, lsn } = event.checkpoint;
794772

@@ -814,16 +792,27 @@ export class MongoSyncBucketStorage
814792
continue;
815793
}
816794

795+
const updates: CheckpointChanges =
796+
lastCheckpoint == null
797+
? {
798+
invalidateDataBuckets: true,
799+
invalidateParameterBuckets: true,
800+
updatedDataBuckets: [],
801+
updatedParameterBucketDefinitions: []
802+
}
803+
: await this.getCheckpointChanges({
804+
lastCheckpoint: lastCheckpoint,
805+
nextCheckpoint: checkpoint
806+
});
807+
817808
lastWriteCheckpoint = currentWriteCheckpoint;
818809
lastCheckpoint = checkpoint;
819810

820-
if (shouldUpdate || filter == null) {
821-
yield {
822-
base: event.checkpoint!,
823-
writeCheckpoint: currentWriteCheckpoint
824-
};
825-
shouldUpdate = false;
826-
}
811+
yield {
812+
base: event.checkpoint!,
813+
writeCheckpoint: currentWriteCheckpoint,
814+
update: updates
815+
};
827816
}
828817
}
829818
}
@@ -936,6 +925,54 @@ export class MongoSyncBucketStorage
936925
];
937926
return pipeline;
938927
}
928+
929+
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
930+
const dataBuckets = await this.db.bucket_data
931+
.find(
932+
{
933+
'_id.g': this.group_id,
934+
'_id.o': { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }
935+
},
936+
{
937+
projection: {
938+
'_id.b': 1
939+
},
940+
limit: 1001,
941+
batchSize: 1001,
942+
singleBatch: true
943+
}
944+
)
945+
.toArray();
946+
const invalidateDataBuckets = dataBuckets.length > 1000;
947+
948+
const parameterUpdates = await this.db.bucket_parameters
949+
.find(
950+
{
951+
_id: { $gt: BigInt(options.lastCheckpoint), $lt: BigInt(options.nextCheckpoint) },
952+
'key.g': this.group_id
953+
},
954+
{
955+
projection: {
956+
lookup: 1
957+
},
958+
limit: 1001,
959+
batchSize: 1001,
960+
singleBatch: true
961+
}
962+
)
963+
.toArray();
964+
const invalidateParameterUpdates = parameterUpdates.length > 1000;
965+
966+
return {
967+
invalidateDataBuckets,
968+
updatedDataBuckets: invalidateDataBuckets ? [] : dataBuckets.map((b) => b._id.b),
969+
970+
invalidateParameterBuckets: invalidateParameterUpdates,
971+
updatedParameterBucketDefinitions: invalidateParameterUpdates
972+
? []
973+
: [...new Set<string>(parameterUpdates.map((p) => getLookupBucketDefinitionName(p.lookup)))]
974+
};
975+
}
939976
}
940977

941978
interface BucketCheckpointEvent {

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import { DisposableObserver, ReplicationAssertionError } from '@powersync/lib-services-framework';
3-
import { BroadcastIterable, LastValueSink, storage, utils, WatchWriteCheckpointOptions } from '@powersync/service-core';
3+
import {
4+
BroadcastIterable,
5+
CheckpointChanges,
6+
GetCheckpointChangesOptions,
7+
LastValueSink,
8+
storage,
9+
utils,
10+
WatchWriteCheckpointOptions
11+
} from '@powersync/service-core';
412
import { JSONBig } from '@powersync/service-jsonbig';
513
import * as sync_rules from '@powersync/service-sync-rules';
614
import * as uuid from 'uuid';
@@ -690,11 +698,11 @@ export class PostgresSyncRulesStorage
690698
return this.makeActiveCheckpoint(activeCheckpoint);
691699
}
692700

693-
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpoint> {
701+
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
694702
let lastCheckpoint: utils.OpId | null = null;
695703
let lastWriteCheckpoint: bigint | null = null;
696704

697-
const { signal, user_id, filter } = options;
705+
const { signal, user_id } = options;
698706

699707
const iter = wrapWithAbort(this.sharedIterator, signal);
700708
for await (const cp of iter) {
@@ -724,12 +732,16 @@ export class PostgresSyncRulesStorage
724732
lastWriteCheckpoint = currentWriteCheckpoint;
725733
lastCheckpoint = checkpoint;
726734

727-
// We do not track individual bucket updates yet, always send an invalidation event.
728-
filter?.({
729-
invalidate: true
730-
});
731-
732-
yield { base: cp, writeCheckpoint: currentWriteCheckpoint };
735+
yield {
736+
base: cp,
737+
writeCheckpoint: currentWriteCheckpoint,
738+
update: {
739+
invalidateDataBuckets: true,
740+
invalidateParameterBuckets: true,
741+
updatedDataBuckets: [],
742+
updatedParameterBucketDefinitions: []
743+
}
744+
};
733745
}
734746
}
735747

@@ -791,6 +803,16 @@ export class PostgresSyncRulesStorage
791803
}
792804
}
793805

806+
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
807+
// We do not track individual changes yet
808+
return {
809+
invalidateDataBuckets: true,
810+
invalidateParameterBuckets: true,
811+
updatedDataBuckets: [],
812+
updatedParameterBucketDefinitions: []
813+
};
814+
}
815+
794816
private makeActiveCheckpoint(row: models.ActiveCheckpointDecoded | null) {
795817
return {
796818
checkpoint: utils.timestampToOpId(row?.last_checkpoint ?? 0n),

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@ export interface SyncRulesBucketStorage
7373
*/
7474
getParameterSets(checkpoint: util.OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]>;
7575

76+
getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges>;
77+
7678
/**
7779
* Yields the latest user write checkpoint whenever the sync checkpoint updates.
7880
*
7981
* The stream stops or errors if this is not the active sync rules (anymore).
8082
*/
81-
watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<WriteCheckpoint>;
83+
watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<StorageCheckpointUpdate>;
8284

8385
/**
8486
* Get a "batch" of data for a checkpoint.
@@ -217,8 +219,6 @@ export interface WatchWriteCheckpointOptions {
217219
user_id: string;
218220

219221
signal: AbortSignal;
220-
221-
filter?: (event: WatchFilterEvent) => boolean;
222222
}
223223

224224
export interface WatchFilterEvent {
@@ -231,3 +231,19 @@ export interface WriteCheckpoint {
231231
base: ReplicationCheckpoint;
232232
writeCheckpoint: bigint | null;
233233
}
234+
235+
export interface StorageCheckpointUpdate extends WriteCheckpoint {
236+
update: CheckpointChanges;
237+
}
238+
239+
export interface GetCheckpointChangesOptions {
240+
lastCheckpoint: util.OpId;
241+
nextCheckpoint: util.OpId;
242+
}
243+
244+
export interface CheckpointChanges {
245+
updatedDataBuckets: string[];
246+
invalidateDataBuckets: boolean;
247+
updatedParameterBucketDefinitions: string[];
248+
invalidateParameterBuckets: boolean;
249+
}

0 commit comments

Comments
 (0)