diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 320f1b2b6..19a8acb9e 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -252,6 +252,10 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, const r = await callback(currentLsn); + // Note: This may not reliably trigger a new replication message on Postgres 11 or 12, + // in which case there could be a delay in the client receiving the write checkpoint acknowledgement. + // Postgres 12 already reached EOL, and this is not a critical issue, so we're not fixing it. + // On postgres 13+, this works reliably. await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT); return r; diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index 844e2ee89..7ad247ea1 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -12,7 +12,7 @@ const BASIC_SYNC_RULES = `bucket_definitions: data: - SELECT id, description, other FROM "test_data"`; -describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => { +describe('checkpoint tests', () => { test('write checkpoints', { timeout: 50_000 }, async () => { const factory = INITIALIZED_MONGO_STORAGE_FACTORY; await using context = await WalStreamTestContext.open(factory); @@ -20,6 +20,14 @@ describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => { await context.updateSyncRules(BASIC_SYNC_RULES); const { pool } = context; const api = new PostgresRouteAPIAdapter(pool); + const serverVersion = await context.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + // The test is not stable on Postgres 11 or 12. See the notes on + // PostgresRouteAPIAdapter.createReplicationHead() for details. + // Postgres 12 is already EOL, so not worth finding a fix - just skip the tests. + console.log('Skipping write checkpoint test on Postgres < 13.0.0'); + return; + } await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`); @@ -59,7 +67,7 @@ describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => { const start = Date.now(); while (lastWriteCheckpoint == null || lastWriteCheckpoint < BigInt(cp.writeCheckpoint)) { - if (Date.now() - start > 5_000) { + if (Date.now() - start > 3_000) { throw new Error( `Timeout while waiting for checkpoint. last: ${lastWriteCheckpoint}, waiting for: ${cp.writeCheckpoint}` ); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index b9b5d965e..6bf9a2f5e 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -520,8 +520,11 @@ bucket_definitions: await batch.commit('0/2'); }); - // pause for a bit to give the stream time to interrupt the checkpoint - await timers.setTimeout(500); + if (sentRows >= 1000 && sentRows <= 2001) { + // pause for a bit to give the stream time to process interruptions. + // This covers the data batch above and the next one. + await timers.setTimeout(50); + } } } if ('checkpoint_complete' in next) {