Skip to content

Commit 8e99e24

Browse files
committed
Add failing tests.
1 parent 725daa1 commit 8e99e24

File tree

5 files changed

+143
-15
lines changed

5 files changed

+143
-15
lines changed

modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,7 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => {
1111
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
1212

1313
try {
14-
await db.bucket_state.createIndex(
15-
{
16-
'_id.g': 1,
17-
last_op: 1
18-
},
19-
{ name: INDEX_NAME, unique: true }
20-
);
14+
await db.createBucketStateIndex();
2115
} finally {
2216
await db.client.close();
2317
}

modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag
1818
await db.db.createCollection('bucket_parameters');
1919
}
2020

21-
// Full migrations are not currently run for tests, so we manually create this
22-
await db.createCheckpointEventsCollection();
23-
2421
if (!options?.doNotClear) {
2522
await db.clear();
2623
}
2724

25+
// Full migrations are not currently run for tests, so we manually create the important ones
26+
await db.createCheckpointEventsCollection();
27+
await db.createBucketStateIndex();
28+
2829
return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions);
2930
};
3031
};

modules/module-mongodb-storage/src/storage/implementation/db.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ export class PowerSyncMongo {
127127
max: 50 // max number of documents
128128
});
129129
}
130+
131+
/**
132+
* Only use in migrations and tests.
133+
*/
134+
async createBucketStateIndex() {
135+
// TODO: Implement a better mechanism to use migrations in tests
136+
await this.bucket_state.createIndex(
137+
{
138+
'_id.g': 1,
139+
last_op: 1
140+
},
141+
{ name: 'bucket_updates', unique: true }
142+
);
143+
}
130144
}
131145

132146
export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ export interface BucketStateDocument {
9797
g: number;
9898
b: string;
9999
};
100+
/**
101+
* Important: There is an unique index on {'_id.g': 1, last_op: 1}.
102+
* That means the last_op must match an actual op in the bucket, and not the commit checkpoint.
103+
*/
100104
last_op: bigint;
101105
/**
102106
* If set, this can be treated as "cache" of a checksum at a specific point.
Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,122 @@
1-
import { register } from '@powersync/service-core-tests';
2-
import { describe } from 'vitest';
1+
import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests';
2+
import { describe, expect, test } from 'vitest';
33
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
4+
import { storage, SyncRulesBucketStorage } from '@powersync/service-core';
45

5-
describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
6-
describe('Mongo Sync Parameter Storage Compact', () =>
7-
register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
6+
describe('Mongo Sync Bucket Storage Compact', () => {
7+
register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
8+
9+
describe('with blank bucket_state', () => {
10+
// This can happen when migrating from older service versions, that did not populate bucket_state yet.
11+
const populate = async (bucketStorage: SyncRulesBucketStorage) => {
12+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
13+
await batch.save({
14+
sourceTable: TEST_TABLE,
15+
tag: storage.SaveOperationTag.INSERT,
16+
after: {
17+
id: 't1',
18+
owner_id: 'u1'
19+
},
20+
afterReplicaId: test_utils.rid('t1')
21+
});
22+
23+
await batch.save({
24+
sourceTable: TEST_TABLE,
25+
tag: storage.SaveOperationTag.INSERT,
26+
after: {
27+
id: 't2',
28+
owner_id: 'u2'
29+
},
30+
afterReplicaId: test_utils.rid('t2')
31+
});
32+
33+
await batch.commit('1/1');
34+
});
35+
36+
return bucketStorage.getCheckpoint();
37+
};
38+
39+
const setup = async () => {
40+
await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY();
41+
const syncRules = await factory.updateSyncRules({
42+
content: `
43+
bucket_definitions:
44+
by_user:
45+
parameters: select request.user_id() as user_id
46+
data: [select * from test where owner_id = bucket.user_id]
47+
`
48+
});
49+
const bucketStorage = factory.getInstance(syncRules);
50+
const { checkpoint } = await populate(bucketStorage);
51+
52+
return { bucketStorage, checkpoint, factory };
53+
};
54+
55+
test('full compact', async () => {
56+
const { bucketStorage, checkpoint, factory } = await setup();
57+
58+
// Simulate bucket_state from old version not being available
59+
await factory.db.bucket_state.deleteMany({});
60+
61+
await bucketStorage.compact({
62+
clearBatchLimit: 200,
63+
moveBatchLimit: 10,
64+
moveBatchQueryLimit: 10,
65+
maxOpId: checkpoint,
66+
signal: null as any
67+
});
68+
69+
const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user["u1"]', 'by_user["u2"]']);
70+
expect(checksumAfter.get('by_user["u1"]')).toEqual({
71+
bucket: 'by_user["u1"]',
72+
checksum: -659469718,
73+
count: 1
74+
});
75+
expect(checksumAfter.get('by_user["u2"]')).toEqual({
76+
bucket: 'by_user["u2"]',
77+
checksum: 430217650,
78+
count: 1
79+
});
80+
});
81+
82+
test('populatePersistentChecksumCache', async () => {
83+
// Populate old sync rules version
84+
const { factory } = await setup();
85+
86+
// Not populate another version (bucket definition name changed)
87+
const syncRules = await factory.updateSyncRules({
88+
content: `
89+
bucket_definitions:
90+
by_user2:
91+
parameters: select request.user_id() as user_id
92+
data: [select * from test where owner_id = bucket.user_id]
93+
`
94+
});
95+
const bucketStorage = factory.getInstance(syncRules);
96+
97+
await populate(bucketStorage);
98+
const { checkpoint } = await bucketStorage.getCheckpoint();
99+
100+
await bucketStorage.populatePersistentChecksumCache({
101+
maxOpId: checkpoint,
102+
signal: new AbortController().signal
103+
});
104+
105+
const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']);
106+
expect(checksumAfter.get('by_user2["u1"]')).toEqual({
107+
bucket: 'by_user2["u1"]',
108+
checksum: -659469718,
109+
count: 1
110+
});
111+
expect(checksumAfter.get('by_user2["u2"]')).toEqual({
112+
bucket: 'by_user2["u2"]',
113+
checksum: 430217650,
114+
count: 1
115+
});
116+
});
117+
});
118+
});
119+
120+
describe('Mongo Sync Parameter Storage Compact', () => {
121+
register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
122+
});

0 commit comments

Comments
 (0)