Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2017 Kibae Shin ([email protected]), [Contributors](https://github.com/kibae/pg-logical-replication/graphs/contributors)
Copyright (c) 2025 Kibae Shin ([email protected]), [Contributors](https://github.com/kibae/pg-logical-replication/graphs/contributors)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
45 changes: 40 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

## 1. Install

- **pg-logical-replication** depends on [pq(node-postgres) >= 6.2.2](https://github.com/brianc/node-postgres)
- **pg-logical-replication** depends on [pg(node-postgres) >= 6.2.2](https://github.com/brianc/node-postgres)
and [eventemitter2](https://www.npmjs.com/package/eventemitter2)

```sh
Expand Down Expand Up @@ -71,7 +71,7 @@ const plugin = new Wal2JsonPlugin({

/**
* Wal2Json.Output
* https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
* https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
*/
service.on('data', (lsn: string, log: Wal2Json.Output) => {
// Do something what you want.
Expand Down Expand Up @@ -142,6 +142,14 @@ config ? : Partial<{
*/
timeoutSeconds: 0 | 10 | number;
};
flowControl?: {
/**
* If true, pause the stream until the data handler completes.
* This enables backpressure support for async handlers.
* Default: false
*/
enabled: boolean;
};
}>
)
```
Expand All @@ -161,10 +169,37 @@ config ? : Partial<{
- Usually this is done **automatically**.
- Manually use only when `new LogicalReplicationService({}, {acknowledge: {auto: false}})`.

### 3-4. Event
### 3-4. Flow Control (Backpressure)

When processing messages takes longer than the rate at which PostgreSQL sends them, the internal buffer can grow
indefinitely, leading to memory issues (OOM). The `flowControl` option enables backpressure support to prevent this.

```typescript
const service = new LogicalReplicationService(clientConfig, {
acknowledge: { auto: true, timeoutSeconds: 10 },
flowControl: { enabled: true } // Enable backpressure support
});

// Now async handlers are fully supported - the stream pauses until processing completes
service.on('data', async (lsn: string, log: Pgoutput.Message) => {
await someSlowAsyncOperation(log); // Safe: next message waits for this to complete
});
```

**How it works:**
- When `flowControl.enabled` is `true`, the stream is paused while processing each message
- Messages are queued and processed sequentially
- The stream resumes only after the handler (including async operations) completes
- This prevents memory overflow when handlers are slower than the incoming message rate

**Default behavior:**
- `flowControl.enabled` defaults to `false` for backward compatibility
- When disabled, messages are emitted immediately without waiting for handler completion

### 3-5. Event

- `on(event: 'start', listener: () => Promise<void> | void)`
- Emitted when start replication.
- Emitted when replication starts.
- `on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)`
- Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
- `on(event: 'error', listener: (err: Error) => void)`
Expand All @@ -173,7 +208,7 @@ config ? : Partial<{
- `on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)`
- A heartbeat check signal has been received from the server. You may need to run `service.acknowledge()`.

### 3-5. Misc. method
### 3-6. Misc. method

- `stop(): Promise<this>`
- Terminate the server's connection and stop replication.
Expand Down
26 changes: 13 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pg-logical-replication",
"version": "2.2.1",
"version": "2.3.0",
"description": "PostgreSQL Location Replication client - logical WAL replication streaming",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down Expand Up @@ -80,7 +80,7 @@
"@types/jest": "^30.0.0",
"@types/node": "^24.0.3",
"@types/pg": "^8.6.5",
"jest": "^30.0.0",
"jest": "^30.2.0",
"jest-junit": "^16.0.0",
"prettier": "^3.4.2",
"protobufjs": "^7.0.0",
Expand Down
73 changes: 71 additions & 2 deletions src/logical-replication-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ export interface LogicalReplicationConfig {
*/
timeoutSeconds: 0 | 10 | number;
};
/**
* Flow control (backpressure) configuration.
* When enabled, the stream will be paused until the data handler completes,
* preventing memory overflow when processing is slower than the incoming message rate.
*/
flowControl?: {
/**
* If true, pause the stream until the data handler completes.
* This enables backpressure support for async handlers.
* Default: false
*/
enabled: boolean;
};
}

export interface LogicalReplicationService {
Expand All @@ -42,6 +55,10 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR
timeoutSeconds: 10,
...(config?.acknowledge || {}),
},
flowControl: {
enabled: false,
...(config?.flowControl || {}),
},
};
}

Expand Down Expand Up @@ -74,9 +91,17 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR
return this._stop;
}

// Flow control (backpressure) queue
private _messageQueue: Array<{ lsn: string; data: any }> = [];
private _processing: boolean = false;

public async stop(): Promise<this> {
this._stop = true;

// Clear flow control queue
this._messageQueue = [];
this._processing = false;

this._connection?.removeAllListeners();
this._connection = null;

Expand Down Expand Up @@ -119,8 +144,15 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR

if (buffer[0] == 0x77) {
// XLogData
this.emit('data', lsn, plugin.parse(buffer.subarray(25)));
this._acknowledge(lsn);
if (this.config.flowControl!.enabled) {
// Flow control enabled: queue the message and process sequentially
this._messageQueue.push({ lsn, data: plugin.parse(buffer.subarray(25)) });
this._processQueue();
} else {
// Original behavior: emit immediately
this.emit('data', lsn, plugin.parse(buffer.subarray(25)));
this._acknowledge(lsn);
}
} else if (buffer[0] == 0x6b) {
// Primary keepalive message
const timestamp = Math.floor(
Expand All @@ -147,6 +179,43 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR
await this.acknowledge(lsn);
}

/**
* Process messages in the queue sequentially with backpressure support.
* Pauses the stream while processing and resumes when the queue is empty.
*/
private _processQueue(): void {
if (this._processing || this._stop) return;
this._processing = true;

// Pause the stream to prevent buffer overflow
// @ts-ignore - accessing internal stream property
this._connection?.stream?.pause?.();

const processNext = async (): Promise<void> => {
while (this._messageQueue.length > 0 && !this._stop) {
const message = this._messageQueue.shift()!;

try {
// Wait for all listeners to complete (supports async handlers)
await this.emitAsync('data', message.lsn, message.data);
await this._acknowledge(message.lsn);
} catch (e) {
this.emit('error', e);
}
}

this._processing = false;

// Resume the stream when queue is empty
if (!this._stop) {
// @ts-ignore - accessing internal stream property
this._connection?.stream?.resume?.();
}
};

processNext();
}

private lastStandbyStatusUpdatedTime = 0;
private checkStandbyStatusTimer: NodeJS.Timeout | null = null;
private checkStandbyStatus(enable: boolean) {
Expand Down
Loading
Loading