Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/chilly-flowers-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres': minor
'@powersync/service-core': minor
'@powersync/service-image': minor
---

[Postgres] Only flush once per replicated chunk, increasing transaction replication throughput.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
strategy:
fail-fast: false
matrix:
postgres-version: [11, 12, 13, 14, 15, 16]
postgres-version: [11, 12, 13, 14, 15, 16, 17]

steps:
- uses: actions/checkout@v4
Expand Down
23 changes: 16 additions & 7 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,10 @@ WHERE oid = $1::regclass`,
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: false },
async (batch) => {
// Replication never starts in the middle of a transaction
let inTx = false;
// We don't handle any plain keepalive messages while we have transactions.
// While we have transactions, we use that to advance the position.
// Replication never starts in the middle of a transaction, so this starts as false.
let skipKeepalive = false;
let count = 0;

for await (const chunk of replicationStream.pgoutputDecode()) {
Expand All @@ -695,17 +697,24 @@ WHERE oid = $1::regclass`,
*/
const assumeKeepAlive = !exposesLogicalMessages;
let keepAliveDetected = false;
const lastCommit = messages.findLast((msg) => msg.tag == 'commit');

for (const msg of messages) {
if (msg.tag == 'relation') {
await this.handleRelation(batch, getPgOutputRelation(msg), true);
} else if (msg.tag == 'begin') {
inTx = true;
// This may span multiple transactions in the same chunk, or even across chunks.
skipKeepalive = true;
} else if (msg.tag == 'commit') {
Metrics.getInstance().transactions_replicated_total.add(1);
inTx = false;
await batch.commit(msg.lsn!, { createEmptyCheckpoints });
await this.ack(msg.lsn!, replicationStream);
if (msg == lastCommit) {
// Only commit if this is the last commit in the chunk.
// This effectively lets us batch multiple transactions within the same chunk
// into a single flush, increasing throughput for many small transactions.
skipKeepalive = false;
await batch.commit(msg.lsn!, { createEmptyCheckpoints });
await this.ack(msg.lsn!, replicationStream);
}
} else {
if (count % 100 == 0) {
logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`);
Expand All @@ -726,7 +735,7 @@ WHERE oid = $1::regclass`,
}
}

if (!inTx) {
if (!skipKeepalive) {
if (assumeKeepAlive || keepAliveDetected) {
// Reset the detection flag.
keepAliveDetected = false;
Expand Down