Skip to content

Commit 346382e

Browse files
Postgres Compact Fix (#217)
1 parent 88ab679 commit 346382e

File tree

9 files changed

+191
-37
lines changed

9 files changed

+191
-37
lines changed

.changeset/rotten-items-explode.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
---
4+
5+
Fix issue where compacting might fail with an "unexpected PUT operation" error.

.changeset/rotten-nails-cheat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core-tests': minor
3+
---
4+
5+
Added compacting test for interleaved bucket operations.

.changeset/six-dragons-whisper.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core-tests': patch
5+
'@powersync/service-core': patch
6+
---
7+
8+
Unified compacting options between storage providers.

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,7 @@ interface CurrentBucketState {
3232
/**
3333
* Additional options, primarily for testing.
3434
*/
35-
export interface MongoCompactOptions extends storage.CompactOptions {
36-
/** Minimum of 2 */
37-
clearBatchLimit?: number;
38-
/** Minimum of 1 */
39-
moveBatchLimit?: number;
40-
/** Minimum of 1 */
41-
moveBatchQueryLimit?: number;
42-
}
35+
export interface MongoCompactOptions extends storage.CompactOptions {}
4336

4437
const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
4538
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
1-
import { MongoCompactOptions } from '@module/storage/implementation/MongoCompactor.js';
21
import { register } from '@powersync/service-core-tests';
32
import { describe } from 'vitest';
43
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
54

6-
describe('Mongo Sync Bucket Storage Compact', () =>
7-
register.registerCompactTests<MongoCompactOptions>(INITIALIZED_MONGO_STORAGE_FACTORY, {
8-
clearBatchLimit: 2,
9-
moveBatchLimit: 1,
10-
moveBatchQueryLimit: 1
11-
}));
5+
describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));

modules/module-postgres-storage/src/storage/PostgresCompactor.ts

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,7 @@ interface CurrentBucketState {
3535
/**
3636
* Additional options, primarily for testing.
3737
*/
38-
export interface PostgresCompactOptions extends storage.CompactOptions {
39-
/** Minimum of 2 */
40-
clearBatchLimit?: number;
41-
/** Minimum of 1 */
42-
moveBatchLimit?: number;
43-
/** Minimum of 1 */
44-
moveBatchQueryLimit?: number;
45-
}
38+
export interface PostgresCompactOptions extends storage.CompactOptions {}
4639

4740
const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
4841
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
@@ -99,15 +92,19 @@ export class PostgresCompactor {
9992

10093
let bucketLower: string | null = null;
10194
let bucketUpper: string | null = null;
95+
const MAX_CHAR = String.fromCodePoint(0xffff);
10296

103-
if (bucket?.includes('[')) {
97+
if (bucket == null) {
98+
bucketLower = '';
99+
bucketUpper = MAX_CHAR;
100+
} else if (bucket?.includes('[')) {
104101
// Exact bucket name
105102
bucketLower = bucket;
106103
bucketUpper = bucket;
107104
} else if (bucket) {
108105
// Bucket definition name
109106
bucketLower = `${bucket}[`;
110-
bucketUpper = `${bucket}[\uFFFF`;
107+
bucketUpper = `${bucket}[${MAX_CHAR}`;
111108
}
112109

113110
let upperOpIdLimit = BIGINT_MAX;
@@ -126,10 +123,16 @@ export class PostgresCompactor {
126123
bucket_data
127124
WHERE
128125
group_id = ${{ type: 'int4', value: this.group_id }}
129-
AND bucket_name LIKE COALESCE(${{ type: 'varchar', value: bucketLower }}, '%')
130-
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
126+
AND bucket_name >= ${{ type: 'varchar', value: bucketLower }}
127+
AND (
128+
(
129+
bucket_name = ${{ type: 'varchar', value: bucketUpper }}
130+
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
131+
)
132+
OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison
133+
)
131134
ORDER BY
132-
bucket_name,
135+
bucket_name DESC,
133136
op_id DESC
134137
LIMIT
135138
${{ type: 'int4', value: this.moveBatchQueryLimit }}
@@ -145,7 +148,9 @@ export class PostgresCompactor {
145148
}
146149

147150
// Set upperBound for the next batch
148-
upperOpIdLimit = batch[batch.length - 1].op_id;
151+
const lastBatchItem = batch[batch.length - 1];
152+
upperOpIdLimit = lastBatchItem.op_id;
153+
bucketUpper = lastBatchItem.bucket_name;
149154

150155
for (const doc of batch) {
151156
if (currentState == null || doc.bucket_name != currentState.bucket) {

modules/module-postgres-storage/test/src/storage_compacting.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ import { register } from '@powersync/service-core-tests';
22
import { describe } from 'vitest';
33
import { POSTGRES_STORAGE_FACTORY } from './util.js';
44

5-
describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY, {}));
5+
describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY));

packages/service-core-tests/src/tests/register-compacting-tests.ts

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ const TEST_TABLE = test_utils.makeTestTable('test', ['id']);
1515
* compactTests(() => new MongoStorageFactory(), { clearBatchLimit: 2, moveBatchLimit: 1, moveBatchQueryLimit: 1 }));
1616
* ```
1717
*/
18-
export function registerCompactTests<CompactOptions extends storage.CompactOptions = storage.CompactOptions>(
19-
generateStorageFactory: storage.TestStorageFactory,
20-
compactOptions: CompactOptions
21-
) {
18+
export function registerCompactTests(generateStorageFactory: storage.TestStorageFactory) {
2219
test('compacting (1)', async () => {
2320
const sync_rules = test_utils.testRules(`
2421
bucket_definitions:
@@ -87,7 +84,11 @@ bucket_definitions:
8784
}
8885
]);
8986

90-
await bucketStorage.compact(compactOptions);
87+
await bucketStorage.compact({
88+
clearBatchLimit: 2,
89+
moveBatchLimit: 1,
90+
moveBatchQueryLimit: 1
91+
});
9192

9293
const batchAfter = await test_utils.oneFromAsync(
9394
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]))
@@ -204,7 +205,11 @@ bucket_definitions:
204205
}
205206
]);
206207

207-
await bucketStorage.compact(compactOptions);
208+
await bucketStorage.compact({
209+
clearBatchLimit: 2,
210+
moveBatchLimit: 1,
211+
moveBatchQueryLimit: 1
212+
});
208213

209214
const batchAfter = await test_utils.oneFromAsync(
210215
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]))
@@ -285,7 +290,11 @@ bucket_definitions:
285290
});
286291
const checkpoint2 = result2!.flushed_op;
287292

288-
await bucketStorage.compact(compactOptions);
293+
await bucketStorage.compact({
294+
clearBatchLimit: 2,
295+
moveBatchLimit: 1,
296+
moveBatchQueryLimit: 1
297+
});
289298

290299
const batchAfter = await test_utils.oneFromAsync(
291300
bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', '0']]))
@@ -307,4 +316,130 @@ bucket_definitions:
307316
checksum: 1874612650
308317
});
309318
});
319+
320+
test('compacting (4)', async () => {
321+
const sync_rules = test_utils.testRules(/* yaml */
322+
` bucket_definitions:
323+
grouped:
324+
# The parameter query here is not important
325+
# We specifically don't want to create bucket_parameter records here
326+
# since the op_ids for bucket_data could vary between storage implementations.
327+
parameters: select 'b' as b
328+
data:
329+
- select * from test where b = bucket.b`);
330+
331+
await using factory = await generateStorageFactory();
332+
const bucketStorage = factory.getInstance(sync_rules);
333+
334+
const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
335+
/**
336+
* Repeatedly create operations which fall into different buckets.
337+
* The bucket operations are purposely interleaved as the op_id increases.
338+
* A large amount of operations are created here.
339+
* The configured window of compacting operations is 100. This means the initial window will
340+
* contain operations from multiple buckets.
341+
*/
342+
for (let count = 0; count < 100; count++) {
343+
await batch.save({
344+
sourceTable: TEST_TABLE,
345+
tag: storage.SaveOperationTag.INSERT,
346+
after: {
347+
id: 't1',
348+
b: 'b1',
349+
value: 'start'
350+
},
351+
afterReplicaId: test_utils.rid('t1')
352+
});
353+
354+
await batch.save({
355+
sourceTable: TEST_TABLE,
356+
tag: storage.SaveOperationTag.UPDATE,
357+
after: {
358+
id: 't1',
359+
b: 'b1',
360+
value: 'intermediate'
361+
},
362+
afterReplicaId: test_utils.rid('t1')
363+
});
364+
365+
await batch.save({
366+
sourceTable: TEST_TABLE,
367+
tag: storage.SaveOperationTag.INSERT,
368+
after: {
369+
id: 't2',
370+
b: 'b2',
371+
value: 'start'
372+
},
373+
afterReplicaId: test_utils.rid('t2')
374+
});
375+
376+
await batch.save({
377+
sourceTable: TEST_TABLE,
378+
tag: storage.SaveOperationTag.UPDATE,
379+
after: {
380+
id: 't1',
381+
b: 'b1',
382+
value: 'final'
383+
},
384+
afterReplicaId: test_utils.rid('t1')
385+
});
386+
387+
await batch.save({
388+
sourceTable: TEST_TABLE,
389+
tag: storage.SaveOperationTag.UPDATE,
390+
after: {
391+
id: 't2',
392+
b: 'b2',
393+
value: 'final'
394+
},
395+
afterReplicaId: test_utils.rid('t2')
396+
});
397+
}
398+
});
399+
400+
const checkpoint = result!.flushed_op;
401+
402+
await bucketStorage.compact({
403+
clearBatchLimit: 100,
404+
moveBatchLimit: 100,
405+
moveBatchQueryLimit: 100 // Larger limit for a larger window of operations
406+
});
407+
408+
const batchAfter = await test_utils.fromAsync(
409+
bucketStorage.getBucketDataBatch(
410+
checkpoint,
411+
new Map([
412+
['grouped["b1"]', '0'],
413+
['grouped["b2"]', '0']
414+
])
415+
)
416+
);
417+
const dataAfter = batchAfter.flatMap((b) => b.batch.data);
418+
419+
// The op_ids will vary between MongoDB and Postgres storage
420+
expect(dataAfter).toMatchObject(
421+
expect.arrayContaining([
422+
{ op_id: '497', op: 'CLEAR', checksum: -937074151 },
423+
{
424+
op_id: '499',
425+
op: 'PUT',
426+
object_type: 'test',
427+
object_id: 't1',
428+
checksum: 52221819,
429+
subkey: '6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423',
430+
data: '{"id":"t1","b":"b1","value":"final"}'
431+
},
432+
{ op_id: '498', op: 'CLEAR', checksum: -234380197 },
433+
{
434+
op_id: '500',
435+
op: 'PUT',
436+
object_type: 'test',
437+
object_id: 't2',
438+
checksum: 2126669493,
439+
subkey: '6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee',
440+
data: '{"id":"t2","b":"b2","value":"final"}'
441+
}
442+
])
443+
);
444+
});
310445
}

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ export interface CompactOptions {
179179
* These can be individual bucket names, or bucket definition names.
180180
*/
181181
compactBuckets?: string[];
182+
183+
/** Minimum of 2 */
184+
clearBatchLimit?: number;
185+
186+
/** Minimum of 1 */
187+
moveBatchLimit?: number;
188+
189+
/** Minimum of 1 */
190+
moveBatchQueryLimit?: number;
182191
}
183192

184193
export interface TerminateOptions {

0 commit comments

Comments
 (0)