Skip to content

Commit fc32d7c

Browse files
committed
More tests and fixes.
1 parent 10b14f5 commit fc32d7c

File tree

6 files changed

+184
-90
lines changed

6 files changed

+184
-90
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoParameterCompactor.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { logger } from '@powersync/lib-services-framework';
2-
import { bson, InternalOpId } from '@powersync/service-core';
2+
import { bson, CompactOptions, InternalOpId } from '@powersync/service-core';
33
import { LRUCache } from 'lru-cache';
44
import { PowerSyncMongo } from './db.js';
55
import { mongo } from '@powersync/lib-service-mongodb';
@@ -16,7 +16,8 @@ export class MongoParameterCompactor {
1616
constructor(
1717
private db: PowerSyncMongo,
1818
private group_id: number,
19-
private checkpoint: InternalOpId
19+
private checkpoint: InternalOpId,
20+
private options: CompactOptions
2021
) {}
2122

2223
async compact() {
@@ -47,7 +48,7 @@ export class MongoParameterCompactor {
4748

4849
// The index doesn't cover sorting by key, so we keep our own cache of the last seen key.
4950
let lastByKey = new LRUCache<string, InternalOpId>({
50-
max: 10_000
51+
max: this.options.compactParameterCacheLimit ?? 10_000
5152
});
5253
let removeIds: InternalOpId[] = [];
5354
let removeDeleted: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
@@ -91,7 +92,7 @@ export class MongoParameterCompactor {
9192
// there is still an earlier operation with the same key and lookup, that we don't have
9293
// in the cache due to cache size limits. So we need to explicitly remove all earlier operations.
9394
removeDeleted.push({
94-
deleteOne: {
95+
deleteMany: {
9596
filter: { 'key.g': doc.key.g, lookup: doc.lookup, _id: { $lte: doc._id }, key: doc.key }
9697
}
9798
});

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@ export class MongoSyncBucketStorage
704704
const checkpoint = await this.getCheckpointInternal();
705705
await new MongoCompactor(this.db, this.group_id, options).compact();
706706
if (checkpoint != null && options?.compactParameterData) {
707-
await new MongoParameterCompactor(this.db, this.group_id, checkpoint.checkpoint).compact();
707+
await new MongoParameterCompactor(this.db, this.group_id, checkpoint.checkpoint, options).compact();
708708
}
709709
}
710710

packages/service-core-tests/src/tests/register-compacting-tests.ts

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { storage } from '@powersync/service-core';
22
import { expect, test } from 'vitest';
33
import * as test_utils from '../test-utils/test-utils-index.js';
4-
import { ParameterLookup } from '@powersync/service-sync-rules';
54

65
const TEST_TABLE = test_utils.makeTestTable('test', ['id']);
76

@@ -433,87 +432,3 @@ bucket_definitions:
433432
);
434433
});
435434
}
436-
437-
export function registerParameterCompactTests(generateStorageFactory: storage.TestStorageFactory) {
438-
test('compacting parameters', async () => {
439-
await using factory = await generateStorageFactory();
440-
const syncRules = await factory.updateSyncRules({
441-
content: `
442-
bucket_definitions:
443-
test:
444-
parameters: select id from test where id = request.user_id()
445-
data: []
446-
`
447-
});
448-
const bucketStorage = factory.getInstance(syncRules);
449-
450-
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
451-
await batch.save({
452-
sourceTable: TEST_TABLE,
453-
tag: storage.SaveOperationTag.INSERT,
454-
after: {
455-
id: 't1'
456-
},
457-
afterReplicaId: 't1'
458-
});
459-
460-
await batch.save({
461-
sourceTable: TEST_TABLE,
462-
tag: storage.SaveOperationTag.INSERT,
463-
after: {
464-
id: 't2'
465-
},
466-
afterReplicaId: 't2'
467-
});
468-
469-
await batch.commit('1/1');
470-
});
471-
472-
const lookup = ParameterLookup.normalized('test', '1', ['t1']);
473-
474-
const checkpoint1 = await bucketStorage.getCheckpoint();
475-
const parameters1 = await checkpoint1.getParameterSets([lookup]);
476-
expect(parameters1).toEqual([{ id: 't1' }]);
477-
478-
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
479-
await batch.save({
480-
sourceTable: TEST_TABLE,
481-
tag: storage.SaveOperationTag.UPDATE,
482-
before: {
483-
id: 't1'
484-
},
485-
beforeReplicaId: 't1',
486-
after: {
487-
id: 't1'
488-
},
489-
afterReplicaId: 't1'
490-
});
491-
492-
await batch.save({
493-
sourceTable: TEST_TABLE,
494-
tag: storage.SaveOperationTag.DELETE,
495-
before: {
496-
id: 't1'
497-
},
498-
beforeReplicaId: 't1'
499-
});
500-
await batch.commit('1/2');
501-
});
502-
const checkpoint2 = await bucketStorage.getCheckpoint();
503-
const parameters2 = await checkpoint2.getParameterSets([lookup]);
504-
expect(parameters2).toEqual([]);
505-
506-
const statsBefore = await bucketStorage.factory.getStorageMetrics();
507-
await bucketStorage.compact({ compactParameterData: true });
508-
509-
// Check consistency
510-
const parameters1b = await checkpoint1.getParameterSets([lookup]);
511-
const parameters2b = await checkpoint2.getParameterSets([lookup]);
512-
expect(parameters1b).toEqual([{ id: 't1' }]);
513-
expect(parameters2b).toEqual([]);
514-
515-
// Check storage size
516-
const statsAfter = await bucketStorage.factory.getStorageMetrics();
517-
expect(statsAfter.parameters_size_bytes).toBeLessThan(statsBefore.parameters_size_bytes);
518-
});
519-
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import { storage } from '@powersync/service-core';
2+
import { ParameterLookup } from '@powersync/service-sync-rules';
3+
import { expect, test } from 'vitest';
4+
import * as test_utils from '../test-utils/test-utils-index.js';
5+
6+
const TEST_TABLE = test_utils.makeTestTable('test', ['id']);
7+
8+
export function registerParameterCompactTests(generateStorageFactory: storage.TestStorageFactory) {
9+
test('compacting parameters', async () => {
10+
await using factory = await generateStorageFactory();
11+
const syncRules = await factory.updateSyncRules({
12+
content: `
13+
bucket_definitions:
14+
test:
15+
parameters: select id from test where id = request.user_id()
16+
data: []
17+
`
18+
});
19+
const bucketStorage = factory.getInstance(syncRules);
20+
21+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
22+
await batch.save({
23+
sourceTable: TEST_TABLE,
24+
tag: storage.SaveOperationTag.INSERT,
25+
after: {
26+
id: 't1'
27+
},
28+
afterReplicaId: 't1'
29+
});
30+
31+
await batch.save({
32+
sourceTable: TEST_TABLE,
33+
tag: storage.SaveOperationTag.INSERT,
34+
after: {
35+
id: 't2'
36+
},
37+
afterReplicaId: 't2'
38+
});
39+
40+
await batch.commit('1/1');
41+
});
42+
43+
const lookup = ParameterLookup.normalized('test', '1', ['t1']);
44+
45+
const checkpoint1 = await bucketStorage.getCheckpoint();
46+
const parameters1 = await checkpoint1.getParameterSets([lookup]);
47+
expect(parameters1).toEqual([{ id: 't1' }]);
48+
49+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
50+
await batch.save({
51+
sourceTable: TEST_TABLE,
52+
tag: storage.SaveOperationTag.UPDATE,
53+
before: {
54+
id: 't1'
55+
},
56+
beforeReplicaId: 't1',
57+
after: {
58+
id: 't1'
59+
},
60+
afterReplicaId: 't1'
61+
});
62+
63+
await batch.save({
64+
sourceTable: TEST_TABLE,
65+
tag: storage.SaveOperationTag.DELETE,
66+
before: {
67+
id: 't1'
68+
},
69+
beforeReplicaId: 't1'
70+
});
71+
await batch.commit('1/2');
72+
});
73+
const checkpoint2 = await bucketStorage.getCheckpoint();
74+
const parameters2 = await checkpoint2.getParameterSets([lookup]);
75+
expect(parameters2).toEqual([]);
76+
77+
const statsBefore = await bucketStorage.factory.getStorageMetrics();
78+
await bucketStorage.compact({ compactParameterData: true });
79+
80+
// Check consistency
81+
const parameters1b = await checkpoint1.getParameterSets([lookup]);
82+
const parameters2b = await checkpoint2.getParameterSets([lookup]);
83+
expect(parameters1b).toEqual([{ id: 't1' }]);
84+
expect(parameters2b).toEqual([]);
85+
86+
// Check storage size
87+
const statsAfter = await bucketStorage.factory.getStorageMetrics();
88+
expect(statsAfter.parameters_size_bytes).toBeLessThan(statsBefore.parameters_size_bytes);
89+
});
90+
91+
for (let cacheLimit of [1, 10]) {
92+
test(`compacting deleted parameters with cache size ${cacheLimit}`, async () => {
93+
await using factory = await generateStorageFactory();
94+
const syncRules = await factory.updateSyncRules({
95+
content: `
96+
bucket_definitions:
97+
test:
98+
parameters: select id from test where uid = request.user_id()
99+
data: []
100+
`
101+
});
102+
const bucketStorage = factory.getInstance(syncRules);
103+
104+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
105+
await batch.save({
106+
sourceTable: TEST_TABLE,
107+
tag: storage.SaveOperationTag.INSERT,
108+
after: {
109+
id: 't1',
110+
uid: 'u1'
111+
},
112+
afterReplicaId: 't1'
113+
});
114+
// Interleave with another operation, to evict the other cache entry when compacting.
115+
await batch.save({
116+
sourceTable: TEST_TABLE,
117+
tag: storage.SaveOperationTag.INSERT,
118+
after: {
119+
id: 't2',
120+
uid: 'u1'
121+
},
122+
afterReplicaId: 't2'
123+
});
124+
125+
await batch.commit('1/1');
126+
});
127+
128+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
129+
await batch.save({
130+
sourceTable: TEST_TABLE,
131+
tag: storage.SaveOperationTag.DELETE,
132+
before: {
133+
id: 't1',
134+
uid: 'u1'
135+
},
136+
beforeReplicaId: 't1'
137+
});
138+
await batch.commit('2/1');
139+
});
140+
141+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
142+
await batch.save({
143+
sourceTable: TEST_TABLE,
144+
tag: storage.SaveOperationTag.UPDATE,
145+
after: {
146+
id: 't2',
147+
uid: 'u2'
148+
},
149+
afterReplicaId: 't2'
150+
});
151+
await batch.commit('3/1');
152+
});
153+
154+
const lookup = ParameterLookup.normalized('test', '1', ['u1']);
155+
156+
const checkpoint1 = await bucketStorage.getCheckpoint();
157+
const parameters1 = await checkpoint1.getParameterSets([lookup]);
158+
expect(parameters1).toEqual([]);
159+
160+
const statsBefore = await bucketStorage.factory.getStorageMetrics();
161+
await bucketStorage.compact({ compactParameterData: true, compactParameterCacheLimit: cacheLimit });
162+
163+
// Check consistency
164+
const parameters1b = await checkpoint1.getParameterSets([lookup]);
165+
expect(parameters1b).toEqual([]);
166+
167+
// Check storage size
168+
const statsAfter = await bucketStorage.factory.getStorageMetrics();
169+
expect(statsAfter.parameters_size_bytes).toBeLessThan(statsBefore.parameters_size_bytes);
170+
});
171+
}
172+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from './register-bucket-validation-tests.js';
22
export * from './register-compacting-tests.js';
3+
export * from './register-parameter-compacting-tests.js';
34
export * from './register-data-storage-tests.js';
45
export * from './register-migration-tests.js';
56
export * from './register-sync-tests.js';

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ export interface CompactOptions {
203203

204204
/** Minimum of 1 */
205205
moveBatchQueryLimit?: number;
206+
207+
/**
208+
* Internal/testing use: Cache size for compacting parameters.
209+
*/
210+
compactParameterCacheLimit?: number;
206211
}
207212

208213
export interface ClearStorageOptions {

0 commit comments

Comments
 (0)