From 8edfc0b0b973763269e94df4f4d79e3297ffcc8d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Mar 2025 10:23:22 +0200 Subject: [PATCH 1/3] Only do a single commit per chunk, increasing transaction throughput. --- .../src/replication/WalStream.ts | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 83c83baf4..2388bab34 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -673,8 +673,10 @@ WHERE oid = $1::regclass`, await this.storage.startBatch( { zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: false }, async (batch) => { - // Replication never starts in the middle of a transaction - let inTx = false; + // We don't handle any plain keepalive messages while we have transactions. + // While we have transactions, we use that to advance the position. + // Replication never starts in the middle of a transaction, so this starts as false. + let skipKeepalive = false; let count = 0; for await (const chunk of replicationStream.pgoutputDecode()) { @@ -695,17 +697,24 @@ WHERE oid = $1::regclass`, */ const assumeKeepAlive = !exposesLogicalMessages; let keepAliveDetected = false; + const lastCommit = messages.findLast((msg) => msg.tag == 'commit'); for (const msg of messages) { if (msg.tag == 'relation') { await this.handleRelation(batch, getPgOutputRelation(msg), true); } else if (msg.tag == 'begin') { - inTx = true; + // This may span multiple transactions in the same chunk, or even across chunks. + skipKeepalive = true; } else if (msg.tag == 'commit') { Metrics.getInstance().transactions_replicated_total.add(1); - inTx = false; - await batch.commit(msg.lsn!, { createEmptyCheckpoints }); - await this.ack(msg.lsn!, replicationStream); + if (msg == lastCommit) { + // Only commit if this is the last commit in the chunk. + // This effectively lets us batch multiple transactions within the same chunk + // into a single flush, increasing throughput for many small transactions. + skipKeepalive = false; + await batch.commit(msg.lsn!, { createEmptyCheckpoints }); + await this.ack(msg.lsn!, replicationStream); + } } else { if (count % 100 == 0) { logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`); @@ -726,7 +735,7 @@ WHERE oid = $1::regclass`, } } - if (!inTx) { + if (!skipKeepalive) { if (assumeKeepAlive || keepAliveDetected) { // Reset the detection flag. keepAliveDetected = false; From 7c2e36a20fcb7843a4a333b57331a812911e84df Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Mar 2025 10:24:54 +0200 Subject: [PATCH 2/3] Changeset. --- .changeset/chilly-flowers-fix.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/chilly-flowers-fix.md diff --git a/.changeset/chilly-flowers-fix.md b/.changeset/chilly-flowers-fix.md new file mode 100644 index 000000000..348c7acd2 --- /dev/null +++ b/.changeset/chilly-flowers-fix.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres': minor +'@powersync/service-core': minor +'@powersync/service-image': minor +--- + +[Postgres] Only flush once per replicated chunk, increasing transaction replication throughput. From 2856a480476888fe5145663c3d03dd808c9661ed Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Mar 2025 10:30:38 +0200 Subject: [PATCH 3/3] Test Postgres 17. --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4b52eb67c..a96232cc2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -86,7 +86,7 @@ jobs: strategy: fail-fast: false matrix: - postgres-version: [11, 12, 13, 14, 15, 16] + postgres-version: [11, 12, 13, 14, 15, 16, 17] steps: - uses: actions/checkout@v4