Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
62d40f8
POC of incremental update lookups.
rkistner Feb 12, 2025
5837e14
Cache dynamic bucket lookups.
rkistner Mar 5, 2025
39e256a
Fix sizeCalculation.
rkistner Feb 12, 2025
4e52f86
Optimization: skip checking for bucket_parameters changes if there are
rkistner Feb 12, 2025
f8582c5
Fix tests.
rkistner Feb 13, 2025
c8e63be
Fix import.
rkistner Feb 25, 2025
74e66e3
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 11, 2025
6ebefa9
Improve parameter query filtering.
rkistner Mar 11, 2025
27c7577
Track last_op for each bucket.
rkistner Mar 11, 2025
48846ad
Fix cache size calculation.
rkistner Mar 11, 2025
1f73456
Update bucket counts after compact.
rkistner Mar 11, 2025
357477f
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 11, 2025
8f430db
Fix some tests.
rkistner Mar 12, 2025
dfbdf09
Use an explicit ParameterLookup class for better typing.
rkistner Mar 12, 2025
71309df
Fix sync-rules tests.
rkistner Mar 12, 2025
246c3b6
Fix another test.
rkistner Mar 12, 2025
b61b0ca
Add changeset.
rkistner Mar 12, 2025
95d98ee
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 13, 2025
4dc78b8
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 17, 2025
bdf9361
Remove op_count from bucket_state.
rkistner Mar 18, 2025
3cdf69d
Fix compact.
rkistner Mar 18, 2025
f28eaef
Cleanup and comments.
rkistner Mar 19, 2025
f1af3a1
Simplify type guard.
rkistner Mar 19, 2025
ec45ae4
Tweaks and tests for hasIntersection.
rkistner Mar 19, 2025
1a76b2c
Use set intersection.
rkistner Mar 19, 2025
66fed0f
Fix handling of checkpoints only containing a write checkpoint update.
rkistner Mar 19, 2025
7a6cff1
Use a Symbol instead of null for INVALIDATE_ALL_BUCKETS.
rkistner Mar 19, 2025
65889dd
Fix typo.
rkistner Mar 19, 2025
8f45ca5
Add tests; fix parameter query lookup issue.
rkistner Mar 19, 2025
80fbe0b
Update snapshots for postgres.
rkistner Mar 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/swift-wolves-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-core': minor
'@powersync/service-sync-rules': minor
---

Cache parameter queries and buckets to reduce incremental sync overhead
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

const INDEX_NAME = 'bucket_updates';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.bucket_state.createIndex(
{
'_id.g': 1,
last_op: 1
},
{ name: INDEX_NAME, unique: true }
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.bucket_state.indexExists(INDEX_NAME)) {
await db.bucket_state.dropIndex(INDEX_NAME);
}
} finally {
await db.client.close();
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,12 @@ export class MongoCompactor {
let lastOpId: BucketDataKey | null = null;
let targetOp: bigint | null = null;
let gotAnOp = false;
let numberOfOpsToClear = 0;
for await (let op of query.stream()) {
if (op.op == 'MOVE' || op.op == 'REMOVE' || op.op == 'CLEAR') {
checksum = utils.addChecksums(checksum, op.checksum);
lastOpId = op._id;
numberOfOpsToClear += 1;
if (op.op != 'CLEAR') {
gotAnOp = true;
}
Expand All @@ -337,7 +339,7 @@ export class MongoCompactor {
return;
}

logger.info(`Flushing CLEAR at ${lastOpId?.o}`);
logger.info(`Flushing CLEAR for ${numberOfOpsToClear} ops at ${lastOpId?.o}`);
await this.db.bucket_data.deleteMany(
{
_id: {
Expand All @@ -362,6 +364,22 @@ export class MongoCompactor {
},
{ session }
);

// Note: This does not update anything if there is no existing state
await this.db.bucket_state.updateOne(
{
_id: {
g: this.group_id,
b: bucket
}
},
{
$inc: {
op_count: 1 - numberOfOpsToClear
}
},
{ session }
);
},
{
writeConcern: { w: 'majority' },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import {
} from '@powersync/lib-services-framework';
import {
BroadcastIterable,
CHECKPOINT_INVALIDATE_ALL,
CheckpointChanges,
GetCheckpointChangesOptions,
InternalOpId,
internalToExternalOpId,
ProtocolOpId,
ReplicationCheckpoint,
SourceTable,
storage,
utils,
WatchWriteCheckpointOptions
WatchWriteCheckpointOptions,
CHECKPOINT_INVALIDATE_ALL,
deserializeParameterLookup
} from '@powersync/service-core';
import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import { LRUCache } from 'lru-cache';
import * as timers from 'timers/promises';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { PowerSyncMongo } from './db.js';
import {
BucketDataDocument,
BucketDataKey,
BucketStateDocument,
SourceKey,
SourceTableDocument,
SyncRuleCheckpointState,
Expand All @@ -39,6 +41,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
import { JSONBig } from '@powersync/service-jsonbig';

export class MongoSyncBucketStorage
extends BaseObserver<storage.SyncRulesBucketStorageListener>
Expand Down Expand Up @@ -154,7 +157,7 @@ export class MongoSyncBucketStorage

await callback(batch);
await batch.flush();
if (batch.last_flushed_op) {
if (batch.last_flushed_op != null) {
return { flushed_op: batch.last_flushed_op };
} else {
return null;
Expand Down Expand Up @@ -252,7 +255,7 @@ export class MongoSyncBucketStorage
return result!;
}

async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
const lookupFilter = lookups.map((lookup) => {
return storage.serializeLookup(lookup);
});
Expand Down Expand Up @@ -585,6 +588,13 @@ export class MongoSyncBucketStorage
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.bucket_state.deleteMany(
{
_id: idPrefixFilter<BucketStateDocument['_id']>({ g: this.group_id }, ['b'])
},
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.source_tables.deleteMany(
{
group_id: this.group_id
Expand Down Expand Up @@ -795,12 +805,7 @@ export class MongoSyncBucketStorage

const updates: CheckpointChanges =
lastCheckpoint == null
? {
invalidateDataBuckets: true,
invalidateParameterBuckets: true,
updatedDataBuckets: [],
updatedParameterBucketDefinitions: []
}
? CHECKPOINT_INVALIDATE_ALL
: await this.getCheckpointChanges({
lastCheckpoint: lastCheckpoint,
nextCheckpoint: checkpoint
Expand Down Expand Up @@ -869,7 +874,119 @@ export class MongoSyncBucketStorage
return pipeline;
}

private async getDataBucketChanges(
options: GetCheckpointChangesOptions
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
const bucketStateUpdates = await this.db.bucket_state
.find(
{
// We have an index on (_id.g, last_op).
'_id.g': this.group_id,
last_op: { $gt: BigInt(options.lastCheckpoint) }
},
{
projection: {
'_id.b': 1
},
limit: 1001,
batchSize: 1001,
singleBatch: true
}
)
.toArray();

const buckets = bucketStateUpdates.map((doc) => doc._id.b);
const invalidateDataBuckets = buckets.length > 1000;

return {
invalidateDataBuckets: invalidateDataBuckets,
updatedDataBuckets: invalidateDataBuckets ? [] : buckets
};
}

private async getParameterBucketChanges(
options: GetCheckpointChangesOptions
): Promise<Pick<CheckpointChanges, 'updatedParameterLookups' | 'invalidateParameterBuckets'>> {
// TODO: limit max query running time
const parameterUpdates = await this.db.bucket_parameters
.find(
{
_id: { $gt: BigInt(options.lastCheckpoint), $lt: BigInt(options.nextCheckpoint) },
'key.g': this.group_id
},
{
projection: {
lookup: 1
},
limit: 1001,
batchSize: 1001,
singleBatch: true
}
)
.toArray();
const invalidateParameterUpdates = parameterUpdates.length > 1000;

return {
invalidateParameterBuckets: invalidateParameterUpdates,
updatedParameterLookups: invalidateParameterUpdates
? new Set<string>()
: new Set<string>(parameterUpdates.map((p) => JSONBig.stringify(deserializeParameterLookup(p.lookup))))
};
}

// TODO:
// We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do
// more efficient lookups in some cases.
private checkpointChangesCache = new LRUCache<string, CheckpointChanges, { options: GetCheckpointChangesOptions }>({
max: 50,
maxSize: 10 * 1024 * 1024,
sizeCalculation: (value: CheckpointChanges) => {
const paramSize = [...value.updatedParameterLookups].reduce<number>((a, b) => a + b.length, 0);
const bucketSize = [...value.updatedDataBuckets].reduce<number>((a, b) => a + b.length, 0);
return 100 + paramSize + bucketSize;
},
fetchMethod: async (_key, _staleValue, options) => {
return this.getCheckpointChangesInternal(options.context.options);
}
});

private _hasDynamicBucketsCached: boolean | undefined = undefined;

private hasDynamicBucketQueries(): boolean {
if (this._hasDynamicBucketsCached != null) {
return this._hasDynamicBucketsCached;
}
const syncRules = this.getParsedSyncRules({
defaultSchema: 'default' // n/a
});
const hasDynamicBuckets = syncRules.hasDynamicBucketQueries();
this._hasDynamicBucketsCached = hasDynamicBuckets;
return hasDynamicBuckets;
}

async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
return CHECKPOINT_INVALIDATE_ALL;
if (!this.hasDynamicBucketQueries()) {
// Special case when we have no dynamic parameter queries.
// In this case, we can avoid doing any queries.
return {
invalidateDataBuckets: true,
updatedDataBuckets: [],
invalidateParameterBuckets: false,
updatedParameterLookups: new Set<string>()
};
}
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;
const result = await this.checkpointChangesCache.fetch(key, { context: { options } });
return result!;
}

private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
const dataUpdates = await this.getDataBucketChanges(options);
const parameterUpdates = await this.getParameterBucketChanges(options);

return {
...dataUpdates,
...parameterUpdates
};
}
}
Loading