Skip to content

Commit 6285e80

Browse files
authored
feat: Add support ssubscribe (redis#1690)
1 parent 7effb62 commit 6285e80

File tree

7 files changed

+303
-12
lines changed

7 files changed

+303
-12
lines changed

lib/Command.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,17 @@ export interface CommandNameFlags {
4242
"psubscribe",
4343
"unsubscribe",
4444
"punsubscribe",
45+
"ssubscribe",
46+
"sunsubscribe",
4547
"ping",
4648
"quit"
4749
];
4850
// Commands that are valid in monitor mode
4951
VALID_IN_MONITOR_MODE: ["monitor", "auth"];
5052
// Commands that will turn current connection into subscriber mode
51-
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe"];
53+
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe", "ssubscribe"];
5254
// Commands that may make current connection quit subscriber mode
53-
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe"];
55+
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe", "sunsubscribe"];
5456
// Commands that will make client disconnect from server TODO shutdown?
5557
WILL_DISCONNECT: ["quit"];
5658
}
@@ -84,12 +86,14 @@ export default class Command implements Respondable {
8486
"psubscribe",
8587
"unsubscribe",
8688
"punsubscribe",
89+
"ssubscribe",
90+
"sunsubscribe",
8791
"ping",
8892
"quit",
8993
],
9094
VALID_IN_MONITOR_MODE: ["monitor", "auth"],
91-
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe"],
92-
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe"],
95+
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe", "ssubscribe"],
96+
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe", "sunsubscribe"],
9397
WILL_DISCONNECT: ["quit"],
9498
};
9599

lib/DataHandler.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,18 @@ export default class DataHandler {
143143
this.redis.emit("pmessageBuffer", pattern, reply[2], reply[3]);
144144
break;
145145
}
146+
case "smessage": {
147+
if (this.redis.listeners("smessage").length > 0) {
148+
this.redis.emit(
149+
"smessage",
150+
reply[1].toString(),
151+
reply[2] ? reply[2].toString() : ""
152+
);
153+
}
154+
this.redis.emit("smessageBuffer", reply[1], reply[2]);
155+
break;
156+
}
157+
case "ssubscribe":
146158
case "subscribe":
147159
case "psubscribe": {
148160
const channel = reply[1].toString();
@@ -156,6 +168,7 @@ export default class DataHandler {
156168
}
157169
break;
158170
}
171+
case "sunsubscribe":
159172
case "unsubscribe":
160173
case "punsubscribe": {
161174
const channel = reply[1] ? reply[1].toString() : null;

lib/SubscriptionSet.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export default class SubscriptionSet {
1010
private set: { [key: string]: { [channel: string]: boolean } } = {
1111
subscribe: {},
1212
psubscribe: {},
13+
ssubscribe: {},
1314
};
1415

1516
add(set: AddSet, channel: string) {
@@ -27,7 +28,8 @@ export default class SubscriptionSet {
2728
isEmpty(): boolean {
2829
return (
2930
this.channels("subscribe").length === 0 &&
30-
this.channels("psubscribe").length === 0
31+
this.channels("psubscribe").length === 0 &&
32+
this.channels("ssubscribe").length === 0
3133
);
3234
}
3335
}
@@ -39,5 +41,8 @@ function mapSet(set: AddSet | DelSet): AddSet {
3941
if (set === "punsubscribe") {
4042
return "psubscribe";
4143
}
44+
if (set === "sunsubscribe") {
45+
return "ssubscribe";
46+
}
4247
return set;
4348
}

lib/cluster/ClusterSubscriber.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,17 @@ export default class ClusterSubscriber {
6464

6565
private onSubscriberEnd = () => {
6666
if (!this.started) {
67-
debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting.");
67+
debug(
68+
"subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting."
69+
);
6870
return;
6971
}
7072
// If the subscriber closes whilst it's still the active connection,
7173
// we might as well try to connecting to a new node if possible to
7274
// minimise the number of missed publishes.
7375
debug("subscriber has disconnected, selecting a new one...");
7476
this.selectSubscriber();
75-
}
77+
};
7678

7779
private selectSubscriber() {
7880
const lastActiveSubscriber = this.lastActiveSubscriber;
@@ -122,7 +124,7 @@ export default class ClusterSubscriber {
122124
// Don't try to reconnect the subscriber connection. If the connection fails
123125
// we will get an end event (handled below), at which point we'll pick a new
124126
// node from the pool and try to connect to that as the subscriber connection.
125-
retryStrategy: null
127+
retryStrategy: null,
126128
});
127129

128130
// Ignore the errors since they're handled in the connection pool.
@@ -136,22 +138,25 @@ export default class ClusterSubscriber {
136138
this.subscriber.once("end", this.onSubscriberEnd);
137139

138140
// Re-subscribe previous channels
139-
const previousChannels = { subscribe: [], psubscribe: [] };
141+
const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
140142
if (lastActiveSubscriber) {
141143
const condition =
142144
lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
143145
if (condition && condition.subscriber) {
144146
previousChannels.subscribe = condition.subscriber.channels("subscribe");
145147
previousChannels.psubscribe =
146148
condition.subscriber.channels("psubscribe");
149+
previousChannels.ssubscribe =
150+
condition.subscriber.channels("ssubscribe");
147151
}
148152
}
149153
if (
150154
previousChannels.subscribe.length ||
151-
previousChannels.psubscribe.length
155+
previousChannels.psubscribe.length ||
156+
previousChannels.ssubscribe.length
152157
) {
153158
let pending = 0;
154-
for (const type of ["subscribe", "psubscribe"]) {
159+
for (const type of ["subscribe", "psubscribe", "ssubscribe"]) {
155160
const channels = previousChannels[type];
156161
if (channels.length) {
157162
pending += 1;
@@ -171,7 +176,12 @@ export default class ClusterSubscriber {
171176
} else {
172177
this.lastActiveSubscriber = this.subscriber;
173178
}
174-
for (const event of ["message", "messageBuffer"]) {
179+
for (const event of [
180+
"message",
181+
"messageBuffer",
182+
"smessage",
183+
"smessageBuffer",
184+
]) {
175185
this.subscriber.on(event, (arg1, arg2) => {
176186
this.emitter.emit(event, arg1, arg2);
177187
});

lib/redis/event_handler.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,11 @@ export function readyHandler(self) {
283283
debug("psubscribe %d channels", psubscribeChannels.length);
284284
self.psubscribe(psubscribeChannels);
285285
}
286+
const ssubscribeChannels = condition.subscriber.channels("ssubscribe");
287+
if (ssubscribeChannels.length) {
288+
debug("ssubscribe %d channels", ssubscribeChannels.length);
289+
self.ssubscribe(ssubscribeChannels);
290+
}
286291
}
287292
}
288293

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import MockServer, { getConnectionName } from "../../helpers/mock_server";
2+
import { expect } from "chai";
3+
import { Cluster } from "../../../lib";
4+
import * as sinon from "sinon";
5+
import Redis from "../../../lib/Redis";
6+
import { noop } from "../../../lib/utils";
7+
8+
describe("cluster:spub/ssub", function () {
9+
it("should receive messages", (done) => {
10+
const handler = function (argv) {
11+
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
12+
return [
13+
[0, 1, ["127.0.0.1", 30001]],
14+
[2, 16383, ["127.0.0.1", 30002]],
15+
];
16+
}
17+
};
18+
const node1 = new MockServer(30001, handler);
19+
new MockServer(30002, handler);
20+
21+
const options = [{ host: "127.0.0.1", port: "30001" }];
22+
const ssub = new Cluster(options);
23+
24+
ssub.ssubscribe("test cluster", function () {
25+
node1.write(node1.findClientByName("ioredis-cluster(subscriber)"), [
26+
"smessage",
27+
"test shard channel",
28+
"hi",
29+
]);
30+
});
31+
ssub.on("smessage", function (channel, message) {
32+
expect(channel).to.eql("test shard channel");
33+
expect(message).to.eql("hi");
34+
ssub.disconnect();
35+
done();
36+
});
37+
});
38+
39+
it("should works when sending regular commands", (done) => {
40+
const handler = function (argv) {
41+
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
42+
return [[0, 16383, ["127.0.0.1", 30001]]];
43+
}
44+
};
45+
new MockServer(30001, handler);
46+
47+
const ssub = new Cluster([{ port: "30001" }]);
48+
49+
ssub.ssubscribe("test cluster", function () {
50+
ssub.set("foo", "bar").then((res) => {
51+
expect(res).to.eql("OK");
52+
ssub.disconnect();
53+
done();
54+
});
55+
});
56+
});
57+
58+
it("supports password", (done) => {
59+
const handler = function (argv, c) {
60+
if (argv[0] === "auth") {
61+
c.password = argv[1];
62+
return;
63+
}
64+
if (argv[0] === "ssubscribe") {
65+
expect(c.password).to.eql("abc");
66+
expect(getConnectionName(c)).to.eql("ioredis-cluster(subscriber)");
67+
}
68+
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
69+
return [[0, 16383, ["127.0.0.1", 30001]]];
70+
}
71+
};
72+
new MockServer(30001, handler);
73+
74+
const ssub = new Redis.Cluster([{ port: "30001", password: "abc" }]);
75+
76+
ssub.ssubscribe("test cluster", function () {
77+
ssub.disconnect();
78+
done();
79+
});
80+
});
81+
82+
it("should re-ssubscribe after reconnection", (done) => {
83+
new MockServer(30001, function (argv) {
84+
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
85+
return [[0, 16383, ["127.0.0.1", 30001]]];
86+
} else if (argv[0] === "ssubscribe" || argv[0] === "psubscribe") {
87+
return [argv[0], argv[1]];
88+
}
89+
});
90+
const client = new Cluster([{ host: "127.0.0.1", port: "30001" }]);
91+
92+
client.ssubscribe("test cluster", function () {
93+
const stub = sinon
94+
.stub(Redis.prototype, "ssubscribe")
95+
.callsFake((channels) => {
96+
expect(channels).to.eql(["test cluster"]);
97+
stub.restore();
98+
client.disconnect();
99+
done();
100+
return Redis.prototype.ssubscribe.apply(this, arguments);
101+
});
102+
client.once("end", function () {
103+
client.connect().catch(noop);
104+
});
105+
client.disconnect();
106+
});
107+
});
108+
});

0 commit comments

Comments
 (0)