Skip to content

Commit c3523c2

Browse files
Copilotrcottinet
authored andcommitted
feat: add postgres transport with NOTIFY/LISTEN support
1 parent 8130489 commit c3523c2

File tree

7 files changed

+576
-14
lines changed

7 files changed

+576
-14
lines changed

README.md

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ Currently, it supports the following transports:
2121
<p>
2222
👉 <strong>Memory:</strong> A simple in-memory transport for testing purposes.<br />
2323
👉 <strong>Redis:</strong> A Redis transport for production usage.<br />
24-
👉 <strong>Mqtt:</strong> A Mqtt transport for production usage.
24+
👉 <strong>Mqtt:</strong> A Mqtt transport for production usage.<br />
25+
👉 <strong>Postgres:</strong> A PostgreSQL transport using NOTIFY/LISTEN for production usage.
2526
</p>
2627

2728
## Table of Contents
@@ -49,6 +50,7 @@ The module exposes a manager that can be used to register buses.
4950
import { BusManager } from '@boringnode/bus'
5051
import { redis } from '@boringnode/bus/transports/redis'
5152
import { mqtt } from '@boringnode/bus/transports/mqtt'
53+
import { postgres } from '@boringnode/bus/transports/postgres'
5254
import { memory } from '@boringnode/bus/transports/memory'
5355

5456
const manager = new BusManager({
@@ -69,7 +71,16 @@ const manager = new BusManager({
6971
port: 1883,
7072
}),
7173
},
72-
}
74+
postgres: {
75+
transport: postgres({
76+
host: 'localhost',
77+
port: 5432,
78+
database: 'mydb',
79+
user: 'postgres',
80+
password: 'password',
81+
}),
82+
},
83+
},
7384
})
7485
```
7586

@@ -88,6 +99,7 @@ By default, the bus will use the `default` transport. You can specify different
8899
```typescript
89100
manager.use('redis').publish('channel', 'Hello world')
90101
manager.use('mqtt').publish('channel', 'Hello world')
102+
manager.use('postgres').publish('channel', 'Hello world')
91103
```
92104

93105
### Without the manager
@@ -105,8 +117,8 @@ const transport = new RedisTransport({
105117

106118
const bus = new Bus(transport, {
107119
retryQueue: {
108-
retryInterval: '100ms'
109-
}
120+
retryInterval: '100ms',
121+
},
110122
})
111123
```
112124

@@ -126,10 +138,10 @@ const manager = new BusManager({
126138
port: 6379,
127139
}),
128140
retryQueue: {
129-
retryInterval: '100ms'
130-
}
141+
retryInterval: '100ms',
142+
},
131143
},
132-
}
144+
},
133145
})
134146

135147
manager.use('redis').publish('channel', 'Hello World')
@@ -143,13 +155,13 @@ You have multiple options to configure the retry queue.
143155
export interface RetryQueueOptions {
144156
// Enable the retry queue (default: true)
145157
enabled?: boolean
146-
158+
147159
// Defines if we allow duplicates messages in the retry queue (default: true)
148160
removeDuplicates?: boolean
149-
161+
150162
// The maximum size of the retry queue (default: null)
151163
maxSize?: number | null
152-
164+
153165
// The interval between each retry (default: false)
154166
retryInterval?: Duration | false
155167
}
@@ -169,7 +181,7 @@ const buggyTransport = new ChaosTransport(new MemoryTransport())
169181
const bus = new Bus(buggyTransport)
170182

171183
/**
172-
* Now, every time you will try to publish a message, the transport
184+
* Now, every time you will try to publish a message, the transport
173185
* will throw an error.
174186
*/
175187
buggyTransport.alwaysThrow()

package.json

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,17 @@
3737
"@japa/runner": "^5.0.0",
3838
"@swc/core": "^1.15.8",
3939
"@testcontainers/hivemq": "^11.11.0",
40+
"@testcontainers/postgresql": "^11.11.0",
4041
"@testcontainers/redis": "^11.11.0",
4142
"@types/node": "^20.17.19",
4243
"@types/object-hash": "^3.0.6",
44+
"@types/pg": "^8.11.10",
4345
"c8": "^10.1.3",
4446
"del-cli": "^7.0.0",
4547
"eslint": "^9.39.2",
4648
"ioredis": "^5.9.0",
4749
"mqtt": "^5.14.1",
50+
"pg": "^8.18.0",
4851
"prettier": "^3.7.4",
4952
"release-it": "^19.2.3",
5053
"testcontainers": "^11.11.0",
@@ -58,11 +61,15 @@
5861
"object-hash": "^3.0.0"
5962
},
6063
"peerDependencies": {
61-
"ioredis": "^5.0.0"
64+
"ioredis": "^5.0.0",
65+
"pg": "^8.0.0"
6266
},
6367
"peerDependenciesMeta": {
6468
"ioredis": {
6569
"optional": true
70+
},
71+
"pg": {
72+
"optional": true
6673
}
6774
},
6875
"author": "Romain Lanz <romain.lanz@pm.me>",

src/transports/postgres.ts

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/**
2+
* @boringnode/bus
3+
*
4+
* @license MIT
5+
* @copyright BoringNode
6+
*/
7+
8+
import { Client } from 'pg'
9+
import { assert } from '@poppinss/utils/assert'
10+
11+
import debug from '../debug.js'
12+
import { JsonEncoder } from '../encoders/json_encoder.js'
13+
import type {
14+
Transport,
15+
TransportEncoder,
16+
TransportMessage,
17+
Serializable,
18+
SubscribeHandler,
19+
PostgresTransportConfig,
20+
} from '../types/main.js'
21+
22+
export function postgres(config: PostgresTransportConfig, encoder?: TransportEncoder) {
23+
return () => new PostgresTransport(config, encoder)
24+
}
25+
26+
export class PostgresTransport implements Transport {
27+
readonly #publisher: Client
28+
readonly #subscriber: Client
29+
readonly #encoder: TransportEncoder
30+
readonly #channelHandlers: Map<string, SubscribeHandler<any>> = new Map()
31+
#publisherConnected: boolean = false
32+
#subscriberConnected: boolean = false
33+
34+
#id: string | undefined
35+
36+
constructor(config: PostgresTransportConfig, encoder?: TransportEncoder)
37+
constructor(config: string, encoder?: TransportEncoder)
38+
constructor(options: PostgresTransportConfig | string, encoder?: TransportEncoder) {
39+
this.#encoder = encoder ?? new JsonEncoder()
40+
41+
/**
42+
* If a connection string is passed, use it for both publisher and subscriber
43+
*/
44+
if (typeof options === 'string') {
45+
this.#publisher = new Client({ connectionString: options })
46+
this.#subscriber = new Client({ connectionString: options })
47+
return
48+
}
49+
50+
/**
51+
* If a config object is passed, create both publisher and subscriber
52+
*/
53+
this.#publisher = new Client(options)
54+
this.#subscriber = new Client(options)
55+
}
56+
57+
setId(id: string): Transport {
58+
this.#id = id
59+
60+
return this
61+
}
62+
63+
async #ensureConnected(): Promise<void> {
64+
if (!this.#publisherConnected) {
65+
await this.#publisher.connect()
66+
this.#publisherConnected = true
67+
}
68+
if (!this.#subscriberConnected) {
69+
await this.#subscriber.connect()
70+
this.#subscriberConnected = true
71+
}
72+
}
73+
74+
async disconnect(): Promise<void> {
75+
this.#publisherConnected = false
76+
this.#subscriberConnected = false
77+
78+
const promises: Promise<void>[] = []
79+
80+
try {
81+
promises.push(this.#publisher.end())
82+
} catch (err) {
83+
// Ignore errors during disconnect
84+
}
85+
86+
try {
87+
promises.push(this.#subscriber.end())
88+
} catch (err) {
89+
// Ignore errors during disconnect
90+
}
91+
92+
await Promise.allSettled(promises)
93+
}
94+
95+
async publish(channel: string, message: Serializable): Promise<void> {
96+
assert(this.#id, 'You must set an id before publishing a message')
97+
98+
await this.#ensureConnected()
99+
100+
const encoded = this.#encoder.encode({ payload: message, busId: this.#id })
101+
const payloadString = typeof encoded === 'string' ? encoded : encoded.toString('base64')
102+
103+
// Use pg's built-in escaping methods to safely escape the identifiers and literals
104+
const escapedChannel = this.#publisher.escapeIdentifier(channel)
105+
const escapedPayload = this.#publisher.escapeLiteral(payloadString)
106+
107+
// Use NOTIFY to send the message
108+
await this.#publisher.query(`NOTIFY ${escapedChannel}, ${escapedPayload}`)
109+
}
110+
111+
async subscribe<T extends Serializable>(
112+
channel: string,
113+
handler: SubscribeHandler<T>
114+
): Promise<void> {
115+
await this.#ensureConnected()
116+
117+
// Store the handler for this channel
118+
this.#channelHandlers.set(channel, handler)
119+
120+
// Set up the notification listener if not already set
121+
if (this.#subscriber.listenerCount('notification') === 0) {
122+
this.#subscriber.on('notification', (msg) => {
123+
if (msg.channel) {
124+
const channelHandler = this.#channelHandlers.get(msg.channel)
125+
if (channelHandler && msg.payload) {
126+
debug('received message for channel "%s"', msg.channel)
127+
128+
try {
129+
const data = this.#encoder.decode<TransportMessage<T>>(msg.payload)
130+
131+
/**
132+
* Ignore messages published by this bus instance
133+
*/
134+
if (data.busId === this.#id) {
135+
debug('ignoring message published by the same bus instance')
136+
return
137+
}
138+
139+
channelHandler(data.payload)
140+
} catch (error) {
141+
debug('error decoding message: %o', error)
142+
}
143+
}
144+
}
145+
})
146+
}
147+
148+
// Subscribe to the channel using LISTEN
149+
const escapedChannel = this.#subscriber.escapeIdentifier(channel)
150+
await this.#subscriber.query(`LISTEN ${escapedChannel}`)
151+
}
152+
153+
onReconnect(callback: () => void): void {
154+
// PostgreSQL client doesn't have built-in reconnection events
155+
// We'll listen to connection errors and trigger callback on reconnect
156+
this.#subscriber.on('error', (err) => {
157+
debug('subscriber error: %o', err)
158+
})
159+
160+
this.#subscriber.on('end', () => {
161+
debug('subscriber connection ended')
162+
this.#subscriberConnected = false
163+
// Attempt to reconnect
164+
this.#subscriber
165+
.connect()
166+
.then(() => {
167+
this.#subscriberConnected = true
168+
callback()
169+
// Re-subscribe to all channels
170+
for (const channel of this.#channelHandlers.keys()) {
171+
const escapedChannel = this.#subscriber.escapeIdentifier(channel)
172+
this.#subscriber.query(`LISTEN ${escapedChannel}`).catch((err) => {
173+
debug('error re-subscribing to channel %s: %o', channel, err)
174+
})
175+
}
176+
})
177+
.catch((err) => {
178+
debug('error reconnecting: %o', err)
179+
})
180+
})
181+
}
182+
183+
async unsubscribe(channel: string): Promise<void> {
184+
this.#channelHandlers.delete(channel)
185+
const escapedChannel = this.#subscriber.escapeIdentifier(channel)
186+
await this.#subscriber.query(`UNLISTEN ${escapedChannel}`)
187+
}
188+
}

src/types/main.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77

88
import type { RedisOptions } from 'ioredis'
99
import type { IClientOptions } from 'mqtt'
10+
import type { ClientConfig } from 'pg'
1011

1112
export type { Redis, Cluster } from 'ioredis'
13+
export type { Client } from 'pg'
1214
export type TransportFactory = () => Transport
1315

1416
/**
@@ -66,6 +68,14 @@ export interface MqttTransportConfig {
6668
options?: IClientOptions
6769
}
6870

71+
export interface PostgresTransportConfig extends ClientConfig {
72+
/**
73+
* Connection string for PostgreSQL. If provided, it will be used instead
74+
* of the individual connection properties.
75+
*/
76+
connectionString?: string
77+
}
78+
6979
export interface Transport {
7080
setId: (id: string) => Transport
7181
onReconnect: (callback: () => void) => void

0 commit comments

Comments
 (0)