Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,

const r = await callback(currentLsn);

// Note: This may not reliably trigger a new replication message on Postgres 11 or 12,
// in which case there could be a delay in the client receiving the write checkpoint acknowledgement.
// Postgres 12 already reached EOL, and this is not a critical issue, so we're not fixing it.
// On postgres 13+, this works reliably.
await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT);

return r;
Expand Down
12 changes: 10 additions & 2 deletions modules/module-postgres/test/src/checkpoints.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@ const BASIC_SYNC_RULES = `bucket_definitions:
data:
- SELECT id, description, other FROM "test_data"`;

describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => {
describe('checkpoint tests', () => {
test('write checkpoints', { timeout: 50_000 }, async () => {
const factory = INITIALIZED_MONGO_STORAGE_FACTORY;
await using context = await WalStreamTestContext.open(factory);

await context.updateSyncRules(BASIC_SYNC_RULES);
const { pool } = context;
const api = new PostgresRouteAPIAdapter(pool);
const serverVersion = await context.connectionManager.getServerVersion();
if (serverVersion!.compareMain('13.0.0') < 0) {
// The test is not stable on Postgres 11 or 12. See the notes on
// PostgresRouteAPIAdapter.createReplicationHead() for details.
// Postgres 12 is already EOL, so not worth finding a fix - just skip the tests.
console.log('Skipping write checkpoint test on Postgres < 13.0.0');
return;
}

await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`);

Expand Down Expand Up @@ -59,7 +67,7 @@ describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => {

const start = Date.now();
while (lastWriteCheckpoint == null || lastWriteCheckpoint < BigInt(cp.writeCheckpoint)) {
if (Date.now() - start > 5_000) {
if (Date.now() - start > 3_000) {
throw new Error(
`Timeout while waiting for checkpoint. last: ${lastWriteCheckpoint}, waiting for: ${cp.writeCheckpoint}`
);
Expand Down
7 changes: 5 additions & 2 deletions packages/service-core-tests/src/tests/register-sync-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,11 @@ bucket_definitions:
await batch.commit('0/2');
});

// pause for a bit to give the stream time to interrupt the checkpoint
await timers.setTimeout(500);
if (sentRows >= 1000 && sentRows <= 2001) {
// pause for a bit to give the stream time to process interruptions.
// This covers the data batch above and the next one.
await timers.setTimeout(50);
}
}
}
if ('checkpoint_complete' in next) {
Expand Down