Skip to content

Commit 27c7577

Browse files
committed
Track last_op for each bucket.
1 parent 6ebefa9 commit 27c7577

File tree

5 files changed

+143
-31
lines changed

5 files changed

+143
-31
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { migrations } from '@powersync/service-core';
2+
import * as storage from '../../../storage/storage-index.js';
3+
import { MongoStorageConfig } from '../../../types/types.js';
4+
5+
const INDEX_NAME = 'bucket_updates';
6+
7+
export const up: migrations.PowerSyncMigrationFunction = async (context) => {
8+
const {
9+
service_context: { configuration }
10+
} = context;
11+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
12+
13+
try {
14+
await db.bucket_state.createIndex(
15+
{
16+
'_id.g': 1,
17+
last_op: 1
18+
},
19+
{ name: INDEX_NAME, unique: true }
20+
);
21+
} finally {
22+
await db.client.close();
23+
}
24+
};
25+
26+
export const down: migrations.PowerSyncMigrationFunction = async (context) => {
27+
const {
28+
service_context: { configuration }
29+
} = context;
30+
31+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
32+
33+
try {
34+
if (await db.bucket_state.indexExists(INDEX_NAME)) {
35+
await db.bucket_state.dropIndex(INDEX_NAME);
36+
}
37+
} finally {
38+
await db.client.close();
39+
}
40+
};

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { PowerSyncMongo } from './db.js';
3232
import {
3333
BucketDataDocument,
3434
BucketDataKey,
35+
BucketStateDocument,
3536
SourceKey,
3637
SourceTableDocument,
3738
SyncRuleCheckpointState,
@@ -588,6 +589,13 @@ export class MongoSyncBucketStorage
588589
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
589590
);
590591

592+
await this.db.bucket_state.deleteMany(
593+
{
594+
_id: idPrefixFilter<BucketStateDocument['_id']>({ g: this.group_id }, ['b'])
595+
},
596+
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
597+
);
598+
591599
await this.db.source_tables.deleteMany(
592600
{
593601
group_id: this.group_id
@@ -870,40 +878,36 @@ export class MongoSyncBucketStorage
870878
private async getDataBucketChanges(
871879
options: GetCheckpointChangesOptions
872880
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
873-
// The query below can be slow, since we don't have an index on _id.o.
874-
// We could try to query the oplog for these, but that is risky.
875-
// Or we could store updated buckets in a separate collection, and query those.
876-
// For now, we ignore this optimization
877-
878-
// const dataBucketDocuments = await this.db.bucket_data
879-
// .find(
880-
// {
881-
// '_id.g': this.group_id,
882-
// '_id.o': { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }
883-
// },
884-
// {
885-
// projection: {
886-
// '_id.b': 1
887-
// },
888-
// limit: 1001,
889-
// batchSize: 1001,
890-
// singleBatch: true
891-
// }
892-
// )
893-
// .toArray();
894-
895-
// const buckets = dataBucketDocuments.map((doc) => doc._id.b);
896-
// const invalidateDataBuckets = buckets.length > 1000;
881+
const bucketStateUpdates = await this.db.bucket_state
882+
.find(
883+
{
884+
// We have an index on (_id.g, last_op).
885+
'_id.g': this.group_id,
886+
last_op: { $gt: BigInt(options.lastCheckpoint) }
887+
},
888+
{
889+
projection: {
890+
'_id.b': 1
891+
},
892+
limit: 1001,
893+
batchSize: 1001,
894+
singleBatch: true
895+
}
896+
)
897+
.toArray();
897898

898-
// return {
899-
// invalidateDataBuckets: invalidateDataBuckets,
900-
// updatedDataBuckets: invalidateDataBuckets ? [] : buckets
901-
// };
899+
const buckets = bucketStateUpdates.map((doc) => doc._id.b);
900+
const invalidateDataBuckets = buckets.length > 1000;
902901

903902
return {
904-
invalidateDataBuckets: true,
905-
updatedDataBuckets: []
903+
invalidateDataBuckets: invalidateDataBuckets,
904+
updatedDataBuckets: invalidateDataBuckets ? [] : buckets
906905
};
906+
907+
// return {
908+
// invalidateDataBuckets: true,
909+
// updatedDataBuckets: []
910+
// };
907911
}
908912

909913
private async getParameterBucketChanges(

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import { EvaluatedParameters, EvaluatedRow } from '@powersync/service-sync-rules
44
import * as bson from 'bson';
55

66
import { logger } from '@powersync/lib-services-framework';
7-
import { InternalOpId, storage, utils } from '@powersync/service-core';
7+
import { BucketState, InternalOpId, storage, utils } from '@powersync/service-core';
88
import { currentBucketKey } from './MongoBucketBatch.js';
99
import { MongoIdSequence } from './MongoIdSequence.js';
1010
import { PowerSyncMongo } from './db.js';
1111
import {
1212
BucketDataDocument,
1313
BucketParameterDocument,
14+
BucketStateDocument,
1415
CurrentBucket,
1516
CurrentDataDocument,
1617
SourceKey
@@ -48,6 +49,7 @@ export class PersistedBatch {
4849
bucketData: mongo.AnyBulkWriteOperation<BucketDataDocument>[] = [];
4950
bucketParameters: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
5051
currentData: mongo.AnyBulkWriteOperation<CurrentDataDocument>[] = [];
52+
bucketStates: Map<string, BucketStateUpdate> = new Map();
5153

5254
/**
5355
* For debug logging only.
@@ -66,6 +68,19 @@ export class PersistedBatch {
6668
this.currentSize = writtenSize;
6769
}
6870

71+
private incrementBucket(bucket: string, op_id: InternalOpId) {
72+
let existingState = this.bucketStates.get(bucket);
73+
if (existingState) {
74+
existingState.lastOp = op_id;
75+
existingState.incrementCount += 1;
76+
} else {
77+
this.bucketStates.set(bucket, {
78+
lastOp: op_id,
79+
incrementCount: 1
80+
});
81+
}
82+
}
83+
6984
saveBucketData(options: {
7085
op_seq: MongoIdSequence;
7186
sourceKey: storage.ReplicaId;
@@ -111,6 +126,7 @@ export class PersistedBatch {
111126
}
112127
}
113128
});
129+
this.incrementBucket(k.bucket, op_id);
114130
}
115131

116132
for (let bd of remaining_buckets.values()) {
@@ -138,6 +154,7 @@ export class PersistedBatch {
138154
}
139155
});
140156
this.currentSize += 200;
157+
this.incrementBucket(bd.bucket, op_id);
141158
}
142159
}
143160

@@ -268,6 +285,14 @@ export class PersistedBatch {
268285
});
269286
}
270287

288+
if (this.bucketStates.size > 0) {
289+
await db.bucket_state.bulkWrite(this.getBucketStateUpdates(), {
290+
session,
291+
// Per-bucket operation - order doesn't matter
292+
ordered: false
293+
});
294+
}
295+
271296
const duration = performance.now() - startAt;
272297
logger.info(
273298
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
@@ -278,7 +303,37 @@ export class PersistedBatch {
278303
this.bucketData = [];
279304
this.bucketParameters = [];
280305
this.currentData = [];
306+
this.bucketStates.clear();
281307
this.currentSize = 0;
282308
this.debugLastOpId = null;
283309
}
310+
311+
private getBucketStateUpdates(): mongo.AnyBulkWriteOperation<BucketStateDocument>[] {
312+
return Array.from(this.bucketStates.entries()).map(([bucket, state]) => {
313+
return {
314+
updateOne: {
315+
filter: {
316+
_id: {
317+
g: this.group_id,
318+
b: bucket
319+
}
320+
},
321+
update: {
322+
$set: {
323+
last_op: state.lastOp
324+
},
325+
$inc: {
326+
op_count: state.incrementCount
327+
}
328+
},
329+
upsert: true
330+
}
331+
} satisfies mongo.AnyBulkWriteOperation<BucketStateDocument>;
332+
});
333+
}
334+
}
335+
336+
interface BucketStateUpdate {
337+
lastOp: InternalOpId;
338+
incrementCount: number;
284339
}

modules/module-mongodb-storage/src/storage/implementation/db.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { MongoStorageConfig } from '../../types/types.js';
66
import {
77
BucketDataDocument,
88
BucketParameterDocument,
9+
BucketStateDocument,
910
CurrentDataDocument,
1011
CustomWriteCheckpointDocument,
1112
IdSequenceDocument,
@@ -33,6 +34,7 @@ export class PowerSyncMongo {
3334
readonly write_checkpoints: mongo.Collection<WriteCheckpointDocument>;
3435
readonly instance: mongo.Collection<InstanceDocument>;
3536
readonly locks: mongo.Collection<lib_mongo.locks.Lock>;
37+
readonly bucket_state: mongo.Collection<BucketStateDocument>;
3638

3739
readonly client: mongo.MongoClient;
3840
readonly db: mongo.Db;
@@ -55,6 +57,7 @@ export class PowerSyncMongo {
5557
this.write_checkpoints = db.collection('write_checkpoints');
5658
this.instance = db.collection('instance');
5759
this.locks = this.db.collection('locks');
60+
this.bucket_state = this.db.collection('bucket_state');
5861
}
5962

6063
/**
@@ -70,6 +73,7 @@ export class PowerSyncMongo {
7073
await this.write_checkpoints.deleteMany({});
7174
await this.instance.deleteOne({});
7275
await this.locks.deleteMany({});
76+
await this.bucket_state.deleteMany({});
7377
}
7478

7579
/**

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ export interface SourceTableDocument {
7575
snapshot_done: boolean | undefined;
7676
}
7777

78+
export interface BucketStateDocument {
79+
_id: {
80+
g: number;
81+
b: string;
82+
};
83+
last_op: bigint;
84+
op_count: number;
85+
}
86+
7887
export interface IdSequenceDocument {
7988
_id: string;
8089
op_id: bigint;

0 commit comments

Comments
 (0)