Skip to content

Commit be0ea66

Browse files
committed
Add basic write checkpoint tests.
1 parent 0b1867b commit be0ea66

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js';
2+
import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core';
3+
import { describe, test } from 'vitest';
4+
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
5+
import { WalStreamTestContext } from './wal_stream_utils.js';
6+
7+
import timers from 'node:timers/promises';
8+
9+
const BASIC_SYNC_RULES = `bucket_definitions:
10+
global:
11+
data:
12+
- SELECT id, description, other FROM "test_data"`;
13+
14+
describe('checkpoint tests', () => {
15+
test('write checkpoints', { timeout: 30_000 }, async () => {
16+
const factory = INITIALIZED_MONGO_STORAGE_FACTORY;
17+
await using context = await WalStreamTestContext.open(factory);
18+
19+
await context.updateSyncRules(BASIC_SYNC_RULES);
20+
const { pool } = context;
21+
const api = new PostgresRouteAPIAdapter(pool);
22+
23+
await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`);
24+
25+
await context.replicateSnapshot();
26+
27+
context.startStreaming();
28+
29+
const controller = new AbortController();
30+
try {
31+
const stream = context.factory.watchWriteCheckpoint(
32+
checkpointUserId('test_user', 'test_client'),
33+
controller.signal
34+
);
35+
36+
let lastWriteCheckpoint: bigint | null = null;
37+
38+
(async () => {
39+
try {
40+
for await (const cp of stream) {
41+
lastWriteCheckpoint = cp.writeCheckpoint;
42+
}
43+
} catch (e) {
44+
if (e.name != 'AbortError') {
45+
throw e;
46+
}
47+
}
48+
})();
49+
50+
for (let i = 0; i < 10; i++) {
51+
const cp = await createWriteCheckpoint({
52+
userId: 'test_user',
53+
clientId: 'test_client',
54+
api,
55+
storage: context.factory
56+
});
57+
58+
const start = Date.now();
59+
while (lastWriteCheckpoint == null || lastWriteCheckpoint < BigInt(cp.writeCheckpoint)) {
60+
if (Date.now() - start > 2_000) {
61+
throw new Error(`Timeout while waiting for checkpoint`);
62+
}
63+
await timers.setTimeout(0, undefined, { signal: controller.signal });
64+
}
65+
}
66+
} finally {
67+
controller.abort();
68+
}
69+
});
70+
});

0 commit comments

Comments
 (0)