Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rotten-items-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres-storage': patch
---

Fix issue where compacting might fail with an "unexpected PUT operation" error.
5 changes: 5 additions & 0 deletions .changeset/rotten-nails-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core-tests': minor
---

Added compacting test for interleaved bucket operations.
8 changes: 8 additions & 0 deletions .changeset/six-dragons-whisper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
---

Unified compacting options between storage providers.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ interface CurrentBucketState {
/**
* Additional options, primarily for testing.
*/
export interface MongoCompactOptions extends storage.CompactOptions {
/** Minimum of 2 */
clearBatchLimit?: number;
/** Minimum of 1 */
moveBatchLimit?: number;
/** Minimum of 1 */
moveBatchQueryLimit?: number;
}
export interface MongoCompactOptions extends storage.CompactOptions {}

const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { MongoCompactOptions } from '@module/storage/implementation/MongoCompactor.js';
import { register } from '@powersync/service-core-tests';
import { describe } from 'vitest';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';

describe('Mongo Sync Bucket Storage Compact', () =>
register.registerCompactTests<MongoCompactOptions>(INITIALIZED_MONGO_STORAGE_FACTORY, {
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
}));
describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
37 changes: 22 additions & 15 deletions modules/module-postgres-storage/src/storage/PostgresCompactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,7 @@ interface CurrentBucketState {
/**
* Additional options, primarily for testing.
*/
export interface PostgresCompactOptions extends storage.CompactOptions {
/** Minimum of 2 */
clearBatchLimit?: number;
/** Minimum of 1 */
moveBatchLimit?: number;
/** Minimum of 1 */
moveBatchQueryLimit?: number;
}
export interface PostgresCompactOptions extends storage.CompactOptions {}

const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
Expand Down Expand Up @@ -99,15 +92,21 @@ export class PostgresCompactor {

let bucketLower: string | null = null;
let bucketUpper: string | null = null;

if (bucket?.includes('[')) {
// 0xFFFF fails to compare correctly. Querying with bucket_name < `\uffff` returns no results
// This seems to be due to 0xFFFF being a `Noncharacter` in Unicode.
const MAX_CHAR = String.fromCodePoint(0xfffd);

if (bucket == null) {
bucketLower = '';
bucketUpper = MAX_CHAR;
} else if (bucket?.includes('[')) {
// Exact bucket name
bucketLower = bucket;
bucketUpper = bucket;
} else if (bucket) {
// Bucket definition name
bucketLower = `${bucket}[`;
bucketUpper = `${bucket}[\uFFFF`;
bucketUpper = `${bucket}[${MAX_CHAR}`;
}

let upperOpIdLimit = BIGINT_MAX;
Expand All @@ -126,10 +125,16 @@ export class PostgresCompactor {
bucket_data
WHERE
group_id = ${{ type: 'int4', value: this.group_id }}
AND bucket_name LIKE COALESCE(${{ type: 'varchar', value: bucketLower }}, '%')
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
AND bucket_name >= ${{ type: 'varchar', value: bucketLower }}
AND (
(
bucket_name = ${{ type: 'varchar', value: bucketUpper }}
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
)
OR bucket_name < ${{ type: 'varchar', value: bucketUpper }}
)
ORDER BY
bucket_name,
bucket_name DESC,
op_id DESC
LIMIT
${{ type: 'int4', value: this.moveBatchQueryLimit }}
Expand All @@ -145,7 +150,9 @@ export class PostgresCompactor {
}

// Set upperBound for the next batch
upperOpIdLimit = batch[batch.length - 1].op_id;
const lastBatchItem = batch[batch.length - 1];
upperOpIdLimit = lastBatchItem.op_id;
bucketUpper = lastBatchItem.bucket_name;

for (const doc of batch) {
if (currentState == null || doc.bucket_name != currentState.bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ import { register } from '@powersync/service-core-tests';
import { describe } from 'vitest';
import { POSTGRES_STORAGE_FACTORY } from './util.js';

describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY, {}));
describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY));
149 changes: 142 additions & 7 deletions packages/service-core-tests/src/tests/register-compacting-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ const TEST_TABLE = test_utils.makeTestTable('test', ['id']);
* compactTests(() => new MongoStorageFactory(), { clearBatchLimit: 2, moveBatchLimit: 1, moveBatchQueryLimit: 1 }));
* ```
*/
export function registerCompactTests<CompactOptions extends storage.CompactOptions = storage.CompactOptions>(
generateStorageFactory: storage.TestStorageFactory,
compactOptions: CompactOptions
) {
export function registerCompactTests(generateStorageFactory: storage.TestStorageFactory) {
test('compacting (1)', async () => {
const sync_rules = test_utils.testRules(`
bucket_definitions:
Expand Down Expand Up @@ -87,7 +84,11 @@ bucket_definitions:
}
]);

await bucketStorage.compact(compactOptions);
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
});

const batchAfter = await test_utils.oneFromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]))
Expand Down Expand Up @@ -204,7 +205,11 @@ bucket_definitions:
}
]);

await bucketStorage.compact(compactOptions);
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
});

const batchAfter = await test_utils.oneFromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]))
Expand Down Expand Up @@ -285,7 +290,11 @@ bucket_definitions:
});
const checkpoint2 = result2!.flushed_op;

await bucketStorage.compact(compactOptions);
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
});

const batchAfter = await test_utils.oneFromAsync(
bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', '0']]))
Expand All @@ -307,4 +316,130 @@ bucket_definitions:
checksum: 1874612650
});
});

test('compacting (3)', async () => {
const sync_rules = test_utils.testRules(/* yaml */
` bucket_definitions:
grouped:
# The parameter query here is not important
# We specifically don't want to create bucket_parameter records here
# since the op_ids for bucket_data could vary between storage implementations.
parameters: select 'b' as b
data:
- select * from test where b = bucket.b`);

await using factory = await generateStorageFactory();
const bucketStorage = factory.getInstance(sync_rules);

const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
/**
* Repeatedly create operations which fall into different buckets.
* The bucket operations are purposely interleaved as the op_id increases.
* A large amount of operations are created here.
* The configured window of compacting operations is 100. This means the initial window will
* contain operations from multiple buckets.
*/
for (let count = 0; count < 100; count++) {
await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't1',
b: 'b1',
value: 'start'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.UPDATE,
after: {
id: 't1',
b: 'b1',
value: 'intermediate'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't2',
b: 'b2',
value: 'start'
},
afterReplicaId: test_utils.rid('t2')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.UPDATE,
after: {
id: 't1',
b: 'b1',
value: 'final'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.UPDATE,
after: {
id: 't2',
b: 'b2',
value: 'final'
},
afterReplicaId: test_utils.rid('t2')
});
}
});

const checkpoint = result!.flushed_op;

await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 100 // Larger limit for a larger window of operations
});

const batchAfter = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([
['grouped["b1"]', '0'],
['grouped["b2"]', '0']
])
)
);
const dataAfter = batchAfter.flatMap((b) => b.batch.data);

// The op_ids will vary between MongoDB and Postgres storage
expect(dataAfter).toMatchObject(
expect.arrayContaining([
{ op_id: '497', op: 'CLEAR', checksum: -937074151 },
{
op_id: '499',
op: 'PUT',
object_type: 'test',
object_id: 't1',
checksum: 52221819,
subkey: '6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423',
data: '{"id":"t1","b":"b1","value":"final"}'
},
{ op_id: '498', op: 'CLEAR', checksum: -234380197 },
{
op_id: '500',
op: 'PUT',
object_type: 'test',
object_id: 't2',
checksum: 2126669493,
subkey: '6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee',
data: '{"id":"t2","b":"b2","value":"final"}'
}
])
);
});
}
9 changes: 9 additions & 0 deletions packages/service-core/src/storage/SyncRulesBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ export interface CompactOptions {
* These can be individual bucket names, or bucket definition names.
*/
compactBuckets?: string[];

/** Minimum of 2 */
clearBatchLimit?: number;

/** Minimum of 1 */
moveBatchLimit?: number;

/** Minimum of 1 */
moveBatchQueryLimit?: number;
}

export interface TerminateOptions {
Expand Down
Loading