Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rotten-items-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres-storage': patch
---

Fix issue where compacting might fail with an "unexpected PUT operation" error.
5 changes: 5 additions & 0 deletions .changeset/rotten-nails-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core-tests': minor
---

Added compacting test for interleaved bucket operations.
8 changes: 8 additions & 0 deletions .changeset/six-dragons-whisper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
---

Unified compacting options between storage providers.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ interface CurrentBucketState {
/**
* Additional options, primarily for testing.
*/
export interface MongoCompactOptions extends storage.CompactOptions {
/** Minimum of 2 */
clearBatchLimit?: number;
/** Minimum of 1 */
moveBatchLimit?: number;
/** Minimum of 1 */
moveBatchQueryLimit?: number;
}
export interface MongoCompactOptions extends storage.CompactOptions {}

const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { MongoCompactOptions } from '@module/storage/implementation/MongoCompactor.js';
import { register } from '@powersync/service-core-tests';
import { describe } from 'vitest';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';

describe('Mongo Sync Bucket Storage Compact', () =>
register.registerCompactTests<MongoCompactOptions>(INITIALIZED_MONGO_STORAGE_FACTORY, {
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
}));
describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
33 changes: 19 additions & 14 deletions modules/module-postgres-storage/src/storage/PostgresCompactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,7 @@ interface CurrentBucketState {
/**
* Additional options, primarily for testing.
*/
export interface PostgresCompactOptions extends storage.CompactOptions {
/** Minimum of 2 */
clearBatchLimit?: number;
/** Minimum of 1 */
moveBatchLimit?: number;
/** Minimum of 1 */
moveBatchQueryLimit?: number;
}
export interface PostgresCompactOptions extends storage.CompactOptions {}

const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
Expand Down Expand Up @@ -99,15 +92,19 @@ export class PostgresCompactor {

let bucketLower: string | null = null;
let bucketUpper: string | null = null;
const MAX_CHAR = String.fromCodePoint(0xffff);

if (bucket?.includes('[')) {
if (bucket == null) {
bucketLower = '';
bucketUpper = MAX_CHAR;
} else if (bucket?.includes('[')) {
// Exact bucket name
bucketLower = bucket;
bucketUpper = bucket;
} else if (bucket) {
// Bucket definition name
bucketLower = `${bucket}[`;
bucketUpper = `${bucket}[\uFFFF`;
bucketUpper = `${bucket}[${MAX_CHAR}`;
}

let upperOpIdLimit = BIGINT_MAX;
Expand All @@ -126,10 +123,16 @@ export class PostgresCompactor {
bucket_data
WHERE
group_id = ${{ type: 'int4', value: this.group_id }}
AND bucket_name LIKE COALESCE(${{ type: 'varchar', value: bucketLower }}, '%')
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
AND bucket_name >= ${{ type: 'varchar', value: bucketLower }}
AND (
(
bucket_name = ${{ type: 'varchar', value: bucketUpper }}
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
)
OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison
)
ORDER BY
bucket_name,
bucket_name DESC,
op_id DESC
LIMIT
${{ type: 'int4', value: this.moveBatchQueryLimit }}
Expand All @@ -145,7 +148,9 @@ export class PostgresCompactor {
}

// Set upperBound for the next batch
upperOpIdLimit = batch[batch.length - 1].op_id;
const lastBatchItem = batch[batch.length - 1];
upperOpIdLimit = lastBatchItem.op_id;
bucketUpper = lastBatchItem.bucket_name;

for (const doc of batch) {
if (currentState == null || doc.bucket_name != currentState.bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ import { register } from '@powersync/service-core-tests';
import { describe } from 'vitest';
import { POSTGRES_STORAGE_FACTORY } from './util.js';

describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY, {}));
describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY));
149 changes: 142 additions & 7 deletions packages/service-core-tests/src/tests/register-compacting-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ const TEST_TABLE = test_utils.makeTestTable('test', ['id']);
* compactTests(() => new MongoStorageFactory(), { clearBatchLimit: 2, moveBatchLimit: 1, moveBatchQueryLimit: 1 }));
* ```
*/
export function registerCompactTests<CompactOptions extends storage.CompactOptions = storage.CompactOptions>(
generateStorageFactory: storage.TestStorageFactory,
compactOptions: CompactOptions
) {
export function registerCompactTests(generateStorageFactory: storage.TestStorageFactory) {
test('compacting (1)', async () => {
const sync_rules = test_utils.testRules(`
bucket_definitions:
Expand Down Expand Up @@ -87,7 +84,11 @@ bucket_definitions:
}
]);

await bucketStorage.compact(compactOptions);
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
});

const batchAfter = await test_utils.oneFromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]))
Expand Down Expand Up @@ -204,7 +205,11 @@ bucket_definitions:
}
]);

await bucketStorage.compact(compactOptions);
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
});

const batchAfter = await test_utils.oneFromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]))
Expand Down Expand Up @@ -285,7 +290,11 @@ bucket_definitions:
});
const checkpoint2 = result2!.flushed_op;

await bucketStorage.compact(compactOptions);
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
});

const batchAfter = await test_utils.oneFromAsync(
bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', '0']]))
Expand All @@ -307,4 +316,130 @@ bucket_definitions:
checksum: 1874612650
});
});

test('compacting (4)', async () => {
const sync_rules = test_utils.testRules(/* yaml */
` bucket_definitions:
grouped:
# The parameter query here is not important
# We specifically don't want to create bucket_parameter records here
# since the op_ids for bucket_data could vary between storage implementations.
parameters: select 'b' as b
data:
- select * from test where b = bucket.b`);

await using factory = await generateStorageFactory();
const bucketStorage = factory.getInstance(sync_rules);

const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
/**
* Repeatedly create operations which fall into different buckets.
* The bucket operations are purposely interleaved as the op_id increases.
* A large amount of operations are created here.
* The configured window of compacting operations is 100. This means the initial window will
* contain operations from multiple buckets.
*/
for (let count = 0; count < 100; count++) {
await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't1',
b: 'b1',
value: 'start'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.UPDATE,
after: {
id: 't1',
b: 'b1',
value: 'intermediate'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't2',
b: 'b2',
value: 'start'
},
afterReplicaId: test_utils.rid('t2')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.UPDATE,
after: {
id: 't1',
b: 'b1',
value: 'final'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.UPDATE,
after: {
id: 't2',
b: 'b2',
value: 'final'
},
afterReplicaId: test_utils.rid('t2')
});
}
});

const checkpoint = result!.flushed_op;

await bucketStorage.compact({
clearBatchLimit: 100,
moveBatchLimit: 100,
moveBatchQueryLimit: 100 // Larger limit for a larger window of operations
});

const batchAfter = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([
['grouped["b1"]', '0'],
['grouped["b2"]', '0']
])
)
);
const dataAfter = batchAfter.flatMap((b) => b.batch.data);

// The op_ids will vary between MongoDB and Postgres storage
expect(dataAfter).toMatchObject(
expect.arrayContaining([
{ op_id: '497', op: 'CLEAR', checksum: -937074151 },
{
op_id: '499',
op: 'PUT',
object_type: 'test',
object_id: 't1',
checksum: 52221819,
subkey: '6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423',
data: '{"id":"t1","b":"b1","value":"final"}'
},
{ op_id: '498', op: 'CLEAR', checksum: -234380197 },
{
op_id: '500',
op: 'PUT',
object_type: 'test',
object_id: 't2',
checksum: 2126669493,
subkey: '6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee',
data: '{"id":"t2","b":"b2","value":"final"}'
}
])
);
});
}
9 changes: 9 additions & 0 deletions packages/service-core/src/storage/SyncRulesBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ export interface CompactOptions {
* These can be individual bucket names, or bucket definition names.
*/
compactBuckets?: string[];

/** Minimum of 2 */
clearBatchLimit?: number;

/** Minimum of 1 */
moveBatchLimit?: number;

/** Minimum of 1 */
moveBatchQueryLimit?: number;
}

export interface TerminateOptions {
Expand Down
Loading