Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2b024ff
Introduce getBucketParameterQuerier().
rkistner Feb 5, 2025
0124b29
Extract BucketStorage interfaces into separate source files.
rkistner Feb 10, 2025
5df24a0
Add initial test for BucketChecksumState.
rkistner Feb 10, 2025
fdd0b84
Move BucketChecksumState to a separate file.
rkistner Feb 11, 2025
f60c705
Fix sending data after a partial checkpoint.
rkistner Feb 11, 2025
912fd9e
Expand comments.
rkistner Feb 12, 2025
6258202
Filter checkpoints according to modified buckets.
rkistner Feb 12, 2025
c65dbd0
Track bucket parameter changes.
rkistner Feb 12, 2025
6af9389
Redo checkpoint change tracking.
rkistner Feb 12, 2025
dde29e3
Revert changestream changes.
rkistner Feb 12, 2025
ca2f83a
Fixes.
rkistner Feb 12, 2025
68fe248
Revert incremental change lookup.
rkistner Feb 12, 2025
c71df58
Fix storage cache, fixing a major performance regression.
rkistner Feb 13, 2025
b57fb63
Add concurrent connection testing to test-client.
rkistner Feb 17, 2025
5e7c523
Merge branch 'feat/bucket-priorities' into optimize-bucket-lookups
rkistner Feb 17, 2025
409b57d
Fix lockfile.
rkistner Feb 17, 2025
436eee6
Add changeset.
rkistner Feb 17, 2025
a2f5c4e
Fix tests.
rkistner Feb 17, 2025
e8892c3
Fix test for interrupting checkpoints.
rkistner Feb 17, 2025
b5038d6
Fix unhandled AbortErrors.
rkistner Feb 17, 2025
9707f9d
Improve concurrent-connections script.
rkistner Feb 17, 2025
9d4b166
Fix cache in postgres storage.
rkistner Feb 18, 2025
b08a8de
Remove DisposableObserver.
rkistner Feb 18, 2025
6c034a6
Merge remote-tracking branch 'origin/feat/bucket-priorities' into opt…
rkistner Feb 18, 2025
ff8c3ce
Post-merge conflict fixes.
rkistner Feb 18, 2025
0b3d7a3
Expand test.
rkistner Feb 18, 2025
bc5326c
Merge branch 'main' into optimize-bucket-lookups
rkistner Feb 18, 2025
ea0c570
Merge branch 'main' into optimize-bucket-lookups
rkistner Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/chilled-mirrors-marry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
'@powersync/service-sync-rules': minor
'@powersync/service-errors': patch
---

Minor optimizations to new checkpoint calulations.
214 changes: 17 additions & 197 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import { LRUCache } from 'lru-cache/min';
import * as timers from 'timers/promises';

import { storage, sync, utils } from '@powersync/service-core';
import { storage } from '@powersync/service-core';

import { DisposableObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { v4 as uuid } from 'uuid';
Expand All @@ -26,26 +24,7 @@ export class MongoBucketStorage
// TODO: This is still Postgres specific and needs to be reworked
public readonly slot_name_prefix: string;

private readonly storageCache = new LRUCache<number, MongoSyncBucketStorage>({
max: 3,
fetchMethod: async (id) => {
const doc2 = await this.db.sync_rules.findOne(
{
_id: id
},
{ limit: 1 }
);
if (doc2 == null) {
// Deleted in the meantime?
return undefined;
}
const rules = new MongoPersistedSyncRulesContent(this.db, doc2);
return this.getInstance(rules);
},
dispose: (storage) => {
storage[Symbol.dispose]();
}
});
private activeStorageCache: MongoSyncBucketStorage | undefined;

public readonly db: PowerSyncMongo;

Expand Down Expand Up @@ -289,19 +268,22 @@ export class MongoBucketStorage
});
}

async getActiveCheckpoint(): Promise<storage.ActiveCheckpoint> {
const doc = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
},
{
sort: { _id: -1 },
limit: 1,
projection: { _id: 1, last_checkpoint: 1, last_checkpoint_lsn: 1 }
}
);
async getActiveStorage(): Promise<MongoSyncBucketStorage | null> {
const content = await this.getActiveSyncRulesContent();
if (content == null) {
return null;
}

return this.makeActiveCheckpoint(doc);
// It is important that this instance is cached.
// Not for the instance construction itself, but to ensure that internal caches on the instance
// are re-used properly.
if (this.activeStorageCache?.group_id == content.id) {
return this.activeStorageCache;
} else {
const instance = this.getInstance(content);
this.activeStorageCache = instance;
return instance;
}
}

async getStorageMetrics(): Promise<storage.StorageMetrics> {
Expand Down Expand Up @@ -387,166 +369,4 @@ export class MongoBucketStorage

return instance!._id;
}

private makeActiveCheckpoint(doc: SyncRuleDocument | null) {
return {
checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n),
lsn: doc?.last_checkpoint_lsn ?? null,
hasSyncRules() {
return doc != null;
},
getBucketStorage: async () => {
if (doc == null) {
return null;
}
return (await this.storageCache.fetch(doc._id)) ?? null;
}
} satisfies storage.ActiveCheckpoint;
}

/**
* Instance-wide watch on the latest available checkpoint (op_id + lsn).
*/
private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable<storage.ActiveCheckpoint> {
const pipeline: mongo.Document[] = [
{
$match: {
'fullDocument.state': 'ACTIVE',
operationType: { $in: ['insert', 'update'] }
}
},
{
$project: {
operationType: 1,
'fullDocument._id': 1,
'fullDocument.last_checkpoint': 1,
'fullDocument.last_checkpoint_lsn': 1
}
}
];

// Use this form instead of (doc: SyncRuleDocument | null = null),
// otherwise we get weird "doc: never" issues.
let doc = null as SyncRuleDocument | null;
let clusterTime = null as mongo.Timestamp | null;

await this.client.withSession(async (session) => {
doc = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
},
{
session,
sort: { _id: -1 },
limit: 1,
projection: {
_id: 1,
last_checkpoint: 1,
last_checkpoint_lsn: 1
}
}
);
const time = session.clusterTime?.clusterTime ?? null;
clusterTime = time;
});
if (clusterTime == null) {
throw new ServiceError(ErrorCode.PSYNC_S2401, 'Could not get clusterTime');
}

if (signal.aborted) {
return;
}

if (doc) {
yield this.makeActiveCheckpoint(doc);
}

const stream = this.db.sync_rules.watch(pipeline, {
fullDocument: 'updateLookup',
// Start at the cluster time where we got the initial doc, to make sure
// we don't skip any updates.
// This may result in the first operation being a duplicate, but we filter
// it out anyway.
startAtOperationTime: clusterTime
});

signal.addEventListener(
'abort',
() => {
stream.close();
},
{ once: true }
);

let lastOp: storage.ActiveCheckpoint | null = null;

for await (const update of stream.stream()) {
if (signal.aborted) {
break;
}
if (update.operationType != 'insert' && update.operationType != 'update') {
continue;
}
const doc = update.fullDocument!;
if (doc == null) {
continue;
}

const op = this.makeActiveCheckpoint(doc);
// Check for LSN / checkpoint changes - ignore other metadata changes
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
lastOp = op;
yield op;
}
}
}

// Nothing is done here until a subscriber starts to iterate
private readonly sharedIter = new sync.BroadcastIterable((signal) => {
return this.watchActiveCheckpoint(signal);
});

/**
* User-specific watch on the latest checkpoint and/or write checkpoint.
*/
async *watchWriteCheckpoint(user_id: string, signal: AbortSignal): AsyncIterable<storage.WriteCheckpoint> {
let lastCheckpoint: utils.OpId | null = null;
let lastWriteCheckpoint: bigint | null = null;

const iter = wrapWithAbort(this.sharedIter, signal);
for await (const cp of iter) {
const { checkpoint, lsn } = cp;

// lsn changes are not important by itself.
// What is important is:
// 1. checkpoint (op_id) changes.
// 2. write checkpoint changes for the specific user
const bucketStorage = await cp.getBucketStorage();
if (!bucketStorage) {
continue;
}

const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};

const currentWriteCheckpoint = await bucketStorage.lastWriteCheckpoint({
user_id,
heads: {
...lsnFilters
}
});

if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) {
// No change - wait for next one
// In some cases, many LSNs may be produced in a short time.
// Add a delay to throttle the write checkpoint lookup a bit.
await timers.setTimeout(20 + 10 * Math.random());
continue;
}

lastWriteCheckpoint = currentWriteCheckpoint;
lastCheckpoint = checkpoint;

yield { base: cp, writeCheckpoint: currentWriteCheckpoint };
}
}
}
Loading
Loading