Skip to content

Commit 963a395

Browse files
feat: Sharded pub/sub support via dedicated subscribers (#1956)
* Added sharded pub/sub support by implementing a cluster subscriber group * Rewrote the resubscribe logic for sharded PubSub by making the sharded subscriber group aware of the channels. * Fixed potentially leaking connections when calling disconnect on the cluster object * Added and extended the integration test cases in regards to sharded pubsub. * Added a Github action that allows running tests manually * Provided documentation about how to use sharded pub/sub --------- Co-authored-by: Tihomir Krasimirov Mateev <[email protected]>
1 parent 65aed15 commit 963a395

File tree

12 files changed

+728
-64
lines changed

12 files changed

+728
-64
lines changed

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ concurrency:
88

99
jobs:
1010
test:
11-
uses: ./.github/workflows/test.yml
11+
uses: ./.github/workflows/test_with_cov.yml
1212
release:
1313
runs-on: ubuntu-latest
1414
needs: test

.github/workflows/test.yml

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
strategy:
1212
fail-fast: false
1313
matrix:
14-
node: [12.x, 14.x, 16.x, 18.x]
14+
node: [12.x, 14.x, 16.x, 18.x, 20.x]
1515
steps:
1616
- name: Git checkout
1717
uses: actions/checkout@v2
@@ -33,27 +33,3 @@ jobs:
3333
- run: npm run build
3434
- run: npm run test:tsd
3535
- run: npm run test:cov || npm run test:cov || npm run test:cov
36-
- name: Coveralls
37-
if: matrix.node == '18.x'
38-
uses: coverallsapp/github-action@master
39-
with:
40-
github-token: ${{ secrets.GITHUB_TOKEN }}
41-
flag-name: node-${{matrix.node}}
42-
parallel: true
43-
44-
# test-cluster:
45-
# runs-on: ubuntu-latest
46-
# steps:
47-
# - uses: actions/checkout@v2
48-
# - name: Build and test cluster
49-
# run: bash test/cluster/docker/main.sh
50-
51-
code-coverage:
52-
needs: test
53-
runs-on: ubuntu-latest
54-
steps:
55-
- name: Coveralls
56-
uses: coverallsapp/github-action@master
57-
with:
58-
github-token: ${{ secrets.GITHUB_TOKEN }}
59-
parallel-finished: true

.github/workflows/test_with_cov.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ jobs:
77
strategy:
88
fail-fast: false
99
matrix:
10-
node: [12.x, 14.x, 16.x, 18.x]
10+
node: [12.x, 14.x, 16.x, 18.x, 20.x]
1111
steps:
1212
- name: Git checkout
1313
uses: actions/checkout@v2

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba
4646
| Version | Branch | Node.js Version | Redis Version |
4747
| -------------- | ------ | --------------- | --------------- |
4848
| 5.x.x (latest) | main | >= 12 | 2.6.12 ~ latest |
49-
| 4.x.x | v4 | >= 6 | 2.6.12 ~ 7 |
49+
| 4.x.x | v4 | >= 8 | 2.6.12 ~ 7 |
5050

5151
Refer to [CHANGELOG.md](CHANGELOG.md) for features and bug fixes introduced in v5.
5252

@@ -1196,6 +1196,38 @@ sub.subscribe("news", () => {
11961196
});
11971197
```
11981198

1199+
### Sharded Pub/Sub
1200+
1201+
For sharded Pub/Sub, use the `spublish` and `ssubscribe` commands instead of the traditional `publish` and `subscribe`. With the old commands, the Redis cluster handles message propagation behind the scenes, allowing you to publish or subscribe to any node without considering sharding. However, this approach has scalability limitations that are addressed with sharded Pub/Sub. Here’s what you need to know:
1202+
1203+
1. Instead of a single subscriber connection, there is now one subscriber connection per shard. Because of the potential overhead, you can enable or disable the use of the cluster subscriber group with the `shardedSubscribers` option. By default, this option is set to `false`, meaning sharded subscriptions are disabled. You should enable this option when establishing your cluster connection before using `ssubscribe`.
1204+
2. All channel names that you pass to a single `ssubscribe` need to map to the same hash slot. You can call `ssubscribe` multiple times on the same cluster client instance to subscribe to channels across slots. The cluster's subscriber group takes care of forwarding the `ssubscribe` command to the shard that is responsible for the channels.
1205+
1206+
The following basic example shows you how to use sharded Pub/Sub:
1207+
1208+
```javascript
1209+
const cluster: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});
1210+
1211+
//Register the callback
1212+
cluster.on("smessage", (channel, message) => {
1213+
console.log(message);
1214+
});
1215+
1216+
1217+
//Subscribe to the channels on the same slot
1218+
cluster.ssubscribe("channel{my}:1", "channel{my}:2").then( ( count: number ) => {
1219+
console.log(count);
1220+
}).catch( (err) => {
1221+
console.log(err);
1222+
});
1223+
1224+
//Publish a message
1225+
cluster.spublish("channel{my}:1", "This is a test message to my first channel.").then((value: number) => {
1226+
console.log("Published a message to channel{my}:1");
1227+
});
1228+
```
1229+
1230+
11991231
### Events
12001232

12011233
| Event | Description |

lib/cluster/ClusterOptions.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@ export interface ClusterOptions extends CommanderOptions {
121121
*/
122122
slotsRefreshInterval?: number;
123123

124+
125+
/**
126+
* Use sharded subscribers instead of a single subscriber.
127+
*
128+
* If sharded subscribers are used, then one additional subscriber connection per master node
129+
* is established. If you don't plan to use SPUBLISH/SSUBSCRIBE, then this should be disabled.
130+
*
131+
* @default false
132+
*/
133+
shardedSubscribers?: boolean;
134+
124135
/**
125136
* Passed to the constructor of `Redis`
126137
*
@@ -216,4 +227,5 @@ export const DEFAULT_CLUSTER_OPTIONS: ClusterOptions = {
216227
dnsLookup: lookup,
217228
enableAutoPipelining: false,
218229
autoPipeliningIgnoredCommands: [],
230+
shardedSubscribers: false,
219231
};

lib/cluster/ClusterSubscriber.ts

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,18 @@ const debug = Debug("cluster:subscriber");
88

99
export default class ClusterSubscriber {
1010
private started = false;
11+
12+
//There is only one connection for the entire pool
1113
private subscriber: any = null;
1214
private lastActiveSubscriber: any;
1315

16+
//The slot range for which this subscriber is responsible
17+
private slotRange: number[] = []
18+
1419
constructor(
1520
private connectionPool: ConnectionPool,
16-
private emitter: EventEmitter
21+
private emitter: EventEmitter,
22+
private isSharded : boolean = false
1723
) {
1824
// If the current node we're using as the subscriber disappears
1925
// from the node pool for some reason, we will select a new one
@@ -47,6 +53,22 @@ export default class ClusterSubscriber {
4753
return this.subscriber;
4854
}
4955

56+
/**
57+
* Associate this subscriber to a specific slot range.
58+
*
59+
* Returns the range or an empty array if the slot range couldn't be associated.
60+
*
61+
* BTW: This is more for debugging and testing purposes.
62+
*
63+
* @param range
64+
*/
65+
associateSlotRange(range: number[]): number[] {
66+
if (this.isSharded) {
67+
this.slotRange = range;
68+
}
69+
return this.slotRange;
70+
}
71+
5072
start(): void {
5173
this.started = true;
5274
this.selectSubscriber();
@@ -59,9 +81,13 @@ export default class ClusterSubscriber {
5981
this.subscriber.disconnect();
6082
this.subscriber = null;
6183
}
62-
debug("stopped");
6384
}
6485

86+
isStarted(): boolean {
87+
return this.started;
88+
}
89+
90+
6591
private onSubscriberEnd = () => {
6692
if (!this.started) {
6793
debug(
@@ -112,13 +138,17 @@ export default class ClusterSubscriber {
112138
* provided for the subscriber is correct, and if not, the current subscriber
113139
* will be disconnected and a new subscriber will be selected.
114140
*/
141+
let connectionPrefix = "subscriber";
142+
if (this.isSharded)
143+
connectionPrefix = "ssubscriber";
144+
115145
this.subscriber = new Redis({
116146
port: options.port,
117147
host: options.host,
118148
username: options.username,
119149
password: options.password,
120150
enableReadyCheck: true,
121-
connectionName: getConnectionName("subscriber", options.connectionName),
151+
connectionName: getConnectionName(connectionPrefix, options.connectionName),
122152
lazyConnect: true,
123153
tls: options.tls,
124154
// Don't try to reconnect the subscriber connection. If the connection fails
@@ -179,17 +209,27 @@ export default class ClusterSubscriber {
179209
for (const event of [
180210
"message",
181211
"messageBuffer",
182-
"smessage",
183-
"smessageBuffer",
184212
]) {
185213
this.subscriber.on(event, (arg1, arg2) => {
186214
this.emitter.emit(event, arg1, arg2);
187215
});
188216
}
217+
189218
for (const event of ["pmessage", "pmessageBuffer"]) {
190219
this.subscriber.on(event, (arg1, arg2, arg3) => {
191220
this.emitter.emit(event, arg1, arg2, arg3);
192221
});
193222
}
223+
224+
if (this.isSharded == true) {
225+
for (const event of [
226+
"smessage",
227+
"smessageBuffer",
228+
]) {
229+
this.subscriber.on(event, (arg1, arg2) => {
230+
this.emitter.emit(event, arg1, arg2);
231+
});
232+
}
233+
}
194234
}
195235
}

0 commit comments

Comments
 (0)