-
Notifications
You must be signed in to change notification settings - Fork 25
[MongoDB Storage] Compact parameter lookups #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 15 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
d7374e9
Refactor getParameterSets to take a ReplicationCheckpoint.
rkistner 19e6dd5
Move getParameterSets to ReplicationCheckpoint.
rkistner c588665
Refactor internal MongoSyncBucketStorage checkpoint lookup.
rkistner 8defa6e
Query parameters at the checkpoint's clusterTime.
rkistner 440d32f
Add parameter compact action.
rkistner 90db592
Test consistency of compact.
rkistner 9cb618e
Fix snapshot implementation.
rkistner e8029e8
Add maxTimeMS on parameter queries.
rkistner 4628f5d
Keep track of last 10k seen keys.
rkistner 5e3dc27
Test that compacting actually has an effect.
rkistner feb644a
Add changeset.
rkistner d040b28
Add docs.
rkistner 019b098
Remove deleted parameter lookup values.
rkistner 10b14f5
Merge remote-tracking branch 'origin/main' into compact-parameters
rkistner fc32d7c
More tests and fixes.
rkistner d2bc507
Remove redundant check
rkistner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| --- | ||
| '@powersync/service-module-postgres-storage': minor | ||
| '@powersync/service-module-mongodb-storage': minor | ||
| '@powersync/service-core-tests': minor | ||
| '@powersync/service-core': minor | ||
| '@powersync/service-image': minor | ||
| --- | ||
|
|
||
| [MongoDB Storage] Compact action now also compacts parameter lookup storage. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # Parameter Lookup Implementation | ||
|
|
||
| Most of the other docs focus on bucket data, but parameter lookup data also contains some tricky bits. | ||
|
|
||
| ## Basic requirements | ||
|
|
||
| The essence of what we do when syncing data is: | ||
|
|
||
| 1. Get the latest checkpoint. | ||
| 2. Evaluate all parameter queries _at the state of the checkpoint_. | ||
| 3. Return bucket data for the checkpoint. | ||
|
|
||
| This doc focuses on point 2. | ||
|
|
||
| ## Current lookup implementation | ||
|
|
||
| We effectively store an "index" for exact lookups on parameter query tables. | ||
|
|
||
| The format is in MongoDB storage is: | ||
|
|
||
| _id: OpId # auto-incrementing op-id, using the same sequence as checkpoints | ||
| key: {g: <sync rules group id>, t: <table id>, k: RowReplicationId } # uniquely identifies the source row | ||
| lookup: doc # lookup entry for this source row | ||
| bucket_parameters: data # results returned to the parameter query | ||
|
|
||
| If one row evaluates to multiple lookups, those are each stored as a separate document with the same key. | ||
|
|
||
| When a row is deleted, we empty `bucket_parameters` for the same (key, lookup) combinations. | ||
|
|
||
| To query, we do: | ||
|
|
||
| 1. Filter by sync rules version: key.g. | ||
| 2. Filter by lookup. | ||
| 3. Filter by checkpoint: \_id <= checkpoint. | ||
| 4. Return the last parameter data for each (key, lookup) combination (highest \_id) | ||
|
|
||
| ## Compacting | ||
|
|
||
| In many cases, parameter query tables are updated infrequently, and compacting is not important. However, there are cases where parameter query tables are updated regularly in cron jobs (for example), and the resulting indefinite storage increase causes significant query overhead and other issues. | ||
|
|
||
| To handle this, we compact older data. For each (key.g, key, lookup) combination, we only need to keep the last copy (highest \_id). And if the last one is a remove operation (empty parameter_data), we can remove it completely. | ||
|
|
||
| One big consideration is sync clients may still need some of that data. To cover for this, parameter lookup queries should specifically use a _snapshot_ query mode, querying at the same snapshot that was used for the checkpoint lookup. This is different from the "Future Options: Snapshot queries" point above: We're not using a snapshot at the time the checkpoint was created, but rather a snapshot at the time the checkpoint was read. This means we always use a fresh snapshot. | ||
|
|
||
| # Alternatives | ||
|
|
||
| ## Future option: Incremental compacting | ||
|
|
||
| Right now, compacting scans through the entire collection to compact data. It should be possible to make this more incremental, only scanning through documents added since the last compact. | ||
|
|
||
| ## Future Option: Snapshot queries | ||
|
|
||
| If we could do a snapshot query with a snapshot matching the checkpoint, the rest of the implementation could become quite simple. We could "just" replicate the latest copy of parameter tables, and run arbitrary parameter queries on them. | ||
|
|
||
| Unforunately, running snapshot queries for specific checkpoints are not that simple. Tricky parts include associating a snapshot with a specific checkpoint, and snapshots typically expiring after a short duration. Nonetheless, this remains an option to consider in the future. | ||
|
|
||
| To implement this with MongoDB: | ||
|
|
||
| 1. Every time we `commit()` in the replication process, store the current clusterTime (we can use `$$CLUSTER_TIME` for this). | ||
| 2. When we query for data, use that clustertime. | ||
| 3. _Make sure we commit at least once every 5 minutes_, ideally every minute. | ||
|
|
||
| The last point means that replication issues could also turn into query issues: | ||
|
|
||
| 1. Replication process being down for 5 minutes means queries stop working. | ||
| 2. Being more than 5 minutes behind in replication is not an issue, as long as we keep doing new commits. | ||
| 3. Taking longer than 5 minutes to complete replication for a _single transaction_ will cause API failures. This includes operations such as adding or removing tables. | ||
|
|
||
| In theory, we could take this even further to run query parameter queries directly on the _source_ database, without replicating. | ||
|
|
||
| ## Compacting - Implementation alternatives | ||
|
|
||
| Instead of snapshot queries, some other alternatives are listed below. These are not used, just listed here in case we ever need to re-evaluate the implementation. | ||
|
|
||
| ### 1. Last active checkpoint | ||
|
|
||
| Compute a "last active" checkpoint - a checkpoint that started being active at least 5 minutes ago, meaning that we can cleanup data only used for checkpoints older than that. | ||
|
|
||
| The issues here are: | ||
|
|
||
| 1. We don't store older checkpoints, so it can be tricky to find an older checkpoint without waiting 5 minutes. | ||
| 2. It is difficult to build in hard guarantees for parameter queries here, without relying on time-based heuristics. | ||
| 3. Keep track of checkpoints used in the API service can be quite tricky. | ||
|
|
||
| ### 2. Merge / invalidate lookups | ||
|
|
||
| Instead of deleting older parameter lookup records, we can merge them. | ||
|
|
||
| Say we have two records with the same key and lookup, and \_id of A and B (A < B). The above approach would just delete A, if A < lastActiveCheckpoint. | ||
|
|
||
| What we can do instead is merge into: | ||
|
|
||
| _id: A | ||
| parameter_data: B.parameter_data | ||
| not_valid_before: B | ||
|
|
||
| The key here is the `not_valid_before` field: When we query for parameter data, we filter by \_id as usual. But if `checkpoint < not_valid_before`, we need to discard that checkpoint. | ||
|
|
||
| Now we still need to try to avoid merging recent parameter lookup records, otherwise we may keep on invalidating checkpoints as fast as we generate them. But this could function as a final safety check, | ||
| giving us proper consistency guarantees. | ||
|
|
||
| This roughly matches the design of `target_op` in MOVE operations. | ||
|
|
||
| This still does not cover deleted data: With this approach alone, we can never fully remove records after the source row was deleted, since we need that `not_valid_before` field. So this is not a complete solution. | ||
|
|
||
| ### 3. Globally invalidate checkpoints | ||
|
|
||
| Another alternative is to globally invalidate checkpoints when compacting. So: | ||
|
|
||
| 1. We pick a `lastActiveCheckpoint`. | ||
| 2. Persist `noCheckpointBefore: lastActiveCheckpoint` in the sync_rules collection. | ||
| 3. At some point between doing the parameter lookups and sending a `checkpoint_complete` message, we lookup the `noCheckpointBefore` checkpoint, and invalidate the checkpoint if required. | ||
|
|
||
| This allows us to cleanly delete older checkpoints, at the expense of needing to run another query. | ||
|
|
||
| This could also replace the current logic we have for `target_op` in MOVE operations. | ||
|
|
||
| To do the lookup very efficiently, we can apply some workarounds: | ||
|
|
||
| 1. For each parameter query (and data query?), store the clusterTime of the results. | ||
| 2. Right before sending checkpointComplete, query for the noCheckpointBefore value, using `afterClusterTime`. | ||
| 3. _We can cache those results_, re-using it for other clients. As long as the `afterClusterTime` condition is satisfied, we can use the cached value. |
108 changes: 108 additions & 0 deletions
108
modules/module-mongodb-storage/src/storage/implementation/MongoParameterCompactor.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| import { logger } from '@powersync/lib-services-framework'; | ||
| import { bson, CompactOptions, InternalOpId } from '@powersync/service-core'; | ||
| import { LRUCache } from 'lru-cache'; | ||
| import { PowerSyncMongo } from './db.js'; | ||
| import { mongo } from '@powersync/lib-service-mongodb'; | ||
| import { BucketParameterDocument } from './models.js'; | ||
|
|
||
| /** | ||
| * Compacts parameter lookup data (the bucket_parameters collection). | ||
| * | ||
| * This scans through the entire collection to find data to compact. | ||
| * | ||
| * For background, see the `/docs/parameters-lookups.md` file. | ||
| */ | ||
| export class MongoParameterCompactor { | ||
| constructor( | ||
| private db: PowerSyncMongo, | ||
| private group_id: number, | ||
| private checkpoint: InternalOpId, | ||
| private options: CompactOptions | ||
| ) {} | ||
|
|
||
| async compact() { | ||
| logger.info(`Compacting parameters for group ${this.group_id} up to checkpoint ${this.checkpoint}`); | ||
| // This is the currently-active checkpoint. | ||
| // We do not remove any data that may be used by this checkpoint. | ||
| // snapshot queries ensure that if any clients are still using older checkpoints, they would | ||
| // not be affected by this compaction. | ||
| const checkpoint = await this.checkpoint; | ||
| if (checkpoint == null) { | ||
| return; | ||
| } | ||
rkistner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Index on {'key.g': 1, lookup: 1, _id: 1} | ||
| // In theory, we could let MongoDB do more of the work here, by grouping by (key, lookup) | ||
| // in MongoDB already. However, that risks running into cases where MongoDB needs to process | ||
| // very large amounts of data before returning results, which could lead to timeouts. | ||
| const cursor = this.db.bucket_parameters.find( | ||
| { | ||
| 'key.g': this.group_id | ||
| }, | ||
| { | ||
| sort: { lookup: 1, _id: 1 }, | ||
| batchSize: 10_000, | ||
| projection: { _id: 1, key: 1, lookup: 1, bucket_parameters: 1 } | ||
| } | ||
| ); | ||
|
|
||
| // The index doesn't cover sorting by key, so we keep our own cache of the last seen key. | ||
| let lastByKey = new LRUCache<string, InternalOpId>({ | ||
| max: this.options.compactParameterCacheLimit ?? 10_000 | ||
| }); | ||
| let removeIds: InternalOpId[] = []; | ||
| let removeDeleted: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = []; | ||
|
|
||
| const flush = async (force: boolean) => { | ||
| if (removeIds.length >= 1000 || (force && removeIds.length > 0)) { | ||
| const results = await this.db.bucket_parameters.deleteMany({ _id: { $in: removeIds } }); | ||
| logger.info(`Removed ${results.deletedCount} (${removeIds.length}) superseded parameter entries`); | ||
| removeIds = []; | ||
| } | ||
|
|
||
| if (removeDeleted.length > 10 || (force && removeDeleted.length > 0)) { | ||
| const results = await this.db.bucket_parameters.bulkWrite(removeDeleted); | ||
| logger.info(`Removed ${results.deletedCount} (${removeDeleted.length}) deleted parameter entries`); | ||
| removeDeleted = []; | ||
| } | ||
| }; | ||
|
|
||
| while (await cursor.hasNext()) { | ||
| const batch = cursor.readBufferedDocuments(); | ||
| for (let doc of batch) { | ||
| if (doc._id >= checkpoint) { | ||
| continue; | ||
| } | ||
| const uniqueKey = ( | ||
| bson.serialize({ | ||
| k: doc.key, | ||
| l: doc.lookup | ||
| }) as Buffer | ||
| ).toString('base64'); | ||
| const previous = lastByKey.get(uniqueKey); | ||
| if (previous != null && previous < doc._id) { | ||
| // We have a newer entry for the same key, so we can remove the old one. | ||
| removeIds.push(previous); | ||
| } | ||
| lastByKey.set(uniqueKey, doc._id); | ||
|
|
||
| if (doc.bucket_parameters?.length == 0) { | ||
| // This is a delete operation, so we can remove it completely. | ||
| // For this we cannot remove the operation itself only: There is a possibility that | ||
| // there is still an earlier operation with the same key and lookup, that we don't have | ||
| // in the cache due to cache size limits. So we need to explicitly remove all earlier operations. | ||
| removeDeleted.push({ | ||
| deleteMany: { | ||
| filter: { 'key.g': doc.key.g, lookup: doc.lookup, _id: { $lte: doc._id }, key: doc.key } | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| await flush(false); | ||
| } | ||
|
|
||
| await flush(true); | ||
| logger.info('Parameter compaction completed'); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.