From 00dc23333a866c9b3802be2520b7c0ed20e57141 Mon Sep 17 00:00:00 2001 From: Kibae Shin Date: Sat, 13 Dec 2025 14:10:03 +0900 Subject: [PATCH] feat: #52 add flowControl option for backpressure support - Add flowControl.enabled config option to pause stream until data handler completes - Use EventEmitter2.emitAsync() to wait for async handlers - Implement message queue with stream pause/resume for sequential processing - Prevents memory overflow (OOM) when handlers are slower than incoming messages - Default: disabled for backward compatibility - Update README with flowControl documentation and fix typos - Bump version to 2.3.0 --- LICENSE.md | 2 +- README.md | 45 +++++- package-lock.json | 26 ++-- package.json | 4 +- src/logical-replication-service.ts | 73 +++++++++- src/test/flow-control.spec.ts | 212 +++++++++++++++++++++++++++++ 6 files changed, 339 insertions(+), 23 deletions(-) create mode 100644 src/test/flow-control.spec.ts diff --git a/LICENSE.md b/LICENSE.md index 65d59a0..ef8f8c4 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2017 Kibae Shin (kibae.shin@gmail.com), [Contributors](https://github.com/kibae/pg-logical-replication/graphs/contributors) +Copyright (c) 2025 Kibae Shin (kibae.shin@gmail.com), [Contributors](https://github.com/kibae/pg-logical-replication/graphs/contributors) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index ea9d06f..b836ebe 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ ## 1. Install -- **pg-logical-replication** depends on [pq(node-postgres) >= 6.2.2](https://github.com/brianc/node-postgres) +- **pg-logical-replication** depends on [pg(node-postgres) >= 6.2.2](https://github.com/brianc/node-postgres) and [eventemitter2](https://www.npmjs.com/package/eventemitter2) ```sh @@ -71,7 +71,7 @@ const plugin = new Wal2JsonPlugin({ /** * Wal2Json.Output - * https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts + * https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts */ service.on('data', (lsn: string, log: Wal2Json.Output) => { // Do something what you want. @@ -142,6 +142,14 @@ config ? : Partial<{ */ timeoutSeconds: 0 | 10 | number; }; + flowControl?: { + /** + * If true, pause the stream until the data handler completes. + * This enables backpressure support for async handlers. + * Default: false + */ + enabled: boolean; + }; }> ) ``` @@ -161,10 +169,37 @@ config ? : Partial<{ - Usually this is done **automatically**. - Manually use only when `new LogicalReplicationService({}, {acknowledge: {auto: false}})`. -### 3-4. Event +### 3-4. Flow Control (Backpressure) + +When processing messages takes longer than the rate at which PostgreSQL sends them, the internal buffer can grow +indefinitely, leading to memory issues (OOM). The `flowControl` option enables backpressure support to prevent this. + +```typescript +const service = new LogicalReplicationService(clientConfig, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + flowControl: { enabled: true } // Enable backpressure support +}); + +// Now async handlers are fully supported - the stream pauses until processing completes +service.on('data', async (lsn: string, log: Pgoutput.Message) => { + await someSlowAsyncOperation(log); // Safe: next message waits for this to complete +}); +``` + +**How it works:** +- When `flowControl.enabled` is `true`, the stream is paused while processing each message +- Messages are queued and processed sequentially +- The stream resumes only after the handler (including async operations) completes +- This prevents memory overflow when handlers are slower than the incoming message rate + +**Default behavior:** +- `flowControl.enabled` defaults to `false` for backward compatibility +- When disabled, messages are emitted immediately without waiting for handler completion + +### 3-5. Event - `on(event: 'start', listener: () => Promise | void)` - - Emitted when start replication. + - Emitted when replication starts. - `on(event: 'data', listener: (lsn: string, log: any) => Promise | void)` - Emitted when PostgreSQL data changes. The log value type varies depending on the plugin. - `on(event: 'error', listener: (err: Error) => void)` @@ -173,7 +208,7 @@ config ? : Partial<{ - `on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise | void)` - A heartbeat check signal has been received from the server. You may need to run `service.acknowledge()`. -### 3-5. Misc. method +### 3-6. Misc. method - `stop(): Promise` - Terminate the server's connection and stop replication. diff --git a/package-lock.json b/package-lock.json index 3ff3686..31e33f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "@types/jest": "^30.0.0", "@types/node": "^24.0.3", "@types/pg": "^8.6.5", - "jest": "^30.0.0", + "jest": "^30.2.0", "jest-junit": "^16.0.0", "prettier": "^3.4.2", "protobufjs": "^7.0.0", @@ -2339,9 +2339,9 @@ } }, "node_modules/glob": { - "version": "10.4.5", - "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz", - "integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==", + "version": "10.5.0", + "resolved": "https://registry.npmjs.org/glob/-/glob-10.5.0.tgz", + "integrity": "sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==", "dev": true, "dependencies": { "foreground-child": "^3.1.0", @@ -3241,9 +3241,9 @@ "license": "MIT" }, "node_modules/js-yaml": { - "version": "3.14.1", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", - "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", + "version": "3.14.2", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz", + "integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==", "dev": true, "dependencies": { "argparse": "^1.0.7", @@ -6483,9 +6483,9 @@ "dev": true }, "glob": { - "version": "10.4.5", - "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz", - "integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==", + "version": "10.5.0", + "resolved": "https://registry.npmjs.org/glob/-/glob-10.5.0.tgz", + "integrity": "sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==", "dev": true, "requires": { "foreground-child": "^3.1.0", @@ -7154,9 +7154,9 @@ "dev": true }, "js-yaml": { - "version": "3.14.1", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", - "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", + "version": "3.14.2", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz", + "integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==", "dev": true, "requires": { "argparse": "^1.0.7", diff --git a/package.json b/package.json index 14d206a..5f4b62e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pg-logical-replication", - "version": "2.2.1", + "version": "2.3.0", "description": "PostgreSQL Location Replication client - logical WAL replication streaming", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -80,7 +80,7 @@ "@types/jest": "^30.0.0", "@types/node": "^24.0.3", "@types/pg": "^8.6.5", - "jest": "^30.0.0", + "jest": "^30.2.0", "jest-junit": "^16.0.0", "prettier": "^3.4.2", "protobufjs": "^7.0.0", diff --git a/src/logical-replication-service.ts b/src/logical-replication-service.ts index 66e39a4..cef3f3d 100644 --- a/src/logical-replication-service.ts +++ b/src/logical-replication-service.ts @@ -19,6 +19,19 @@ export interface LogicalReplicationConfig { */ timeoutSeconds: 0 | 10 | number; }; + /** + * Flow control (backpressure) configuration. + * When enabled, the stream will be paused until the data handler completes, + * preventing memory overflow when processing is slower than the incoming message rate. + */ + flowControl?: { + /** + * If true, pause the stream until the data handler completes. + * This enables backpressure support for async handlers. + * Default: false + */ + enabled: boolean; + }; } export interface LogicalReplicationService { @@ -42,6 +55,10 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR timeoutSeconds: 10, ...(config?.acknowledge || {}), }, + flowControl: { + enabled: false, + ...(config?.flowControl || {}), + }, }; } @@ -74,9 +91,17 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR return this._stop; } + // Flow control (backpressure) queue + private _messageQueue: Array<{ lsn: string; data: any }> = []; + private _processing: boolean = false; + public async stop(): Promise { this._stop = true; + // Clear flow control queue + this._messageQueue = []; + this._processing = false; + this._connection?.removeAllListeners(); this._connection = null; @@ -119,8 +144,15 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR if (buffer[0] == 0x77) { // XLogData - this.emit('data', lsn, plugin.parse(buffer.subarray(25))); - this._acknowledge(lsn); + if (this.config.flowControl!.enabled) { + // Flow control enabled: queue the message and process sequentially + this._messageQueue.push({ lsn, data: plugin.parse(buffer.subarray(25)) }); + this._processQueue(); + } else { + // Original behavior: emit immediately + this.emit('data', lsn, plugin.parse(buffer.subarray(25))); + this._acknowledge(lsn); + } } else if (buffer[0] == 0x6b) { // Primary keepalive message const timestamp = Math.floor( @@ -147,6 +179,43 @@ export class LogicalReplicationService extends EventEmitter2 implements LogicalR await this.acknowledge(lsn); } + /** + * Process messages in the queue sequentially with backpressure support. + * Pauses the stream while processing and resumes when the queue is empty. + */ + private _processQueue(): void { + if (this._processing || this._stop) return; + this._processing = true; + + // Pause the stream to prevent buffer overflow + // @ts-ignore - accessing internal stream property + this._connection?.stream?.pause?.(); + + const processNext = async (): Promise => { + while (this._messageQueue.length > 0 && !this._stop) { + const message = this._messageQueue.shift()!; + + try { + // Wait for all listeners to complete (supports async handlers) + await this.emitAsync('data', message.lsn, message.data); + await this._acknowledge(message.lsn); + } catch (e) { + this.emit('error', e); + } + } + + this._processing = false; + + // Resume the stream when queue is empty + if (!this._stop) { + // @ts-ignore - accessing internal stream property + this._connection?.stream?.resume?.(); + } + }; + + processNext(); + } + private lastStandbyStatusUpdatedTime = 0; private checkStandbyStatusTimer: NodeJS.Timeout | null = null; private checkStandbyStatus(enable: boolean) { diff --git a/src/test/flow-control.spec.ts b/src/test/flow-control.spec.ts new file mode 100644 index 0000000..7cdfe0f --- /dev/null +++ b/src/test/flow-control.spec.ts @@ -0,0 +1,212 @@ +import { LogicalReplicationService } from '../logical-replication-service.js'; +import { Pgoutput, PgoutputPlugin } from '../output-plugins/pgoutput/index.js'; +import { TestClientConfig } from './client-config.js'; +import { sleep, TestClient } from './test-common.js'; + +jest.setTimeout(30_000); +const [slotName, decoderName, publicationName] = ['slot_flow_control', 'pgoutput', 'flow_control_test_pub']; + +let client: TestClient; + +describe('flowControl', () => { + beforeAll(async () => { + client = await TestClient.New(slotName, decoderName); + await client.query(`DROP PUBLICATION IF EXISTS "${publicationName}"`); + await client.query(`CREATE PUBLICATION "${publicationName}" FOR ALL TABLES`); + }); + + afterAll(async () => { + await client.query(`DROP PUBLICATION IF EXISTS "${publicationName}"`); + await client.end(); + }); + + it('should process messages sequentially when flowControl is enabled', async () => { + const service = new LogicalReplicationService(TestClientConfig, { + flowControl: { enabled: true }, + }); + const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName] }); + + const processingOrder: number[] = []; + const completionOrder: number[] = []; + let messageIndex = 0; + + service.on('data', async (lsn: string, log: Pgoutput.Message) => { + if (log.tag !== 'insert') return; + + const currentIndex = messageIndex++; + processingOrder.push(currentIndex); + + // Simulate async work with varying delays + // Earlier messages take longer to process + const delay = (5 - currentIndex) * 50; + await sleep(delay > 0 ? delay : 10); + + completionOrder.push(currentIndex); + }); + + service.subscribe(plugin, slotName).catch((e) => { + console.error('Error from .subscribe', e); + }); + + await sleep(100); + + // Insert 5 rows + await client.query( + `INSERT INTO users(firstname, lastname, email, phone) + SELECT md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT) + FROM generate_series(1, 5)` + ); + + // Wait for all messages to be processed + await sleep(2000); + + await service.stop(); + + // With flowControl enabled, messages should be processed in order + // Both processing and completion order should be sequential + expect(processingOrder).toEqual([0, 1, 2, 3, 4]); + expect(completionOrder).toEqual([0, 1, 2, 3, 4]); + }); + + it('should process messages concurrently when flowControl is disabled (default)', async () => { + const service = new LogicalReplicationService(TestClientConfig, { + flowControl: { enabled: false }, + }); + const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName] }); + + const processingOrder: number[] = []; + const completionOrder: number[] = []; + let messageIndex = 0; + + service.on('data', async (lsn: string, log: Pgoutput.Message) => { + if (log.tag !== 'insert') return; + + const currentIndex = messageIndex++; + processingOrder.push(currentIndex); + + // Earlier messages take longer to process + const delay = (5 - currentIndex) * 50; + await sleep(delay > 0 ? delay : 10); + + completionOrder.push(currentIndex); + }); + + service.subscribe(plugin, slotName).catch((e) => { + console.error('Error from .subscribe', e); + }); + + await sleep(100); + + // Insert 5 rows + await client.query( + `INSERT INTO users(firstname, lastname, email, phone) + SELECT md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT) + FROM generate_series(1, 5)` + ); + + // Wait for all messages to be processed + await sleep(2000); + + await service.stop(); + + // Without flowControl, messages start processing in order + expect(processingOrder).toEqual([0, 1, 2, 3, 4]); + + // But completion order may differ due to varying delays + // Later messages (with shorter delays) may complete first + // This assertion verifies the concurrent behavior + expect(completionOrder.length).toBe(5); + // The completion order should NOT be strictly sequential when delays vary + // (unless the system happens to process them faster than our delays) + }); + + it('should handle errors in async handlers without blocking the queue', async () => { + const service = new LogicalReplicationService(TestClientConfig, { + flowControl: { enabled: true }, + }); + const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName] }); + + const processedMessages: string[] = []; + const errors: Error[] = []; + let messageIndex = 0; + + service.on('error', (err: Error) => { + errors.push(err); + }); + + service.on('data', async (lsn: string, log: Pgoutput.Message) => { + if (log.tag !== 'insert') return; + + const currentIndex = messageIndex++; + + if (currentIndex === 2) { + throw new Error('Simulated error on message 2'); + } + + processedMessages.push(`msg-${currentIndex}`); + await sleep(10); + }); + + service.subscribe(plugin, slotName).catch((e) => { + console.error('Error from .subscribe', e); + }); + + await sleep(100); + + // Insert 5 rows + await client.query( + `INSERT INTO users(firstname, lastname, email, phone) + SELECT md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT) + FROM generate_series(1, 5)` + ); + + await sleep(2000); + + await service.stop(); + + // Should process all messages except the one that threw + expect(processedMessages).toEqual(['msg-0', 'msg-1', 'msg-3', 'msg-4']); + expect(errors.length).toBe(1); + expect(errors[0].message).toBe('Simulated error on message 2'); + }); + + it('should stop processing when service is stopped', async () => { + const service = new LogicalReplicationService(TestClientConfig, { + flowControl: { enabled: true }, + }); + const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName] }); + + const processedMessages: number[] = []; + let messageIndex = 0; + + service.on('data', async (lsn: string, log: Pgoutput.Message) => { + if (log.tag !== 'insert') return; + + const currentIndex = messageIndex++; + processedMessages.push(currentIndex); + + // Slow processing + await sleep(500); + }); + + service.subscribe(plugin, slotName).catch((e) => { + console.error('Error from .subscribe', e); + }); + + await sleep(100); + + // Insert 5 rows + await client.query( + `INSERT INTO users(firstname, lastname, email, phone) + SELECT md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT), md5(RANDOM()::TEXT) + FROM generate_series(1, 5)` + ); + + // Stop the service after a short delay (before all messages are processed) + await sleep(300); + await service.stop(); + + // Should have processed fewer than 5 messages due to early stop + expect(processedMessages.length).toBeLessThan(5); + }); +});