Skip to content

Commit d784748

Browse files
authored
feat(sentinel): add sSubscribe/sUnsubscribe methods to Sentinel client (redis#3178)
Add sharded pub/sub methods (sSubscribe/sUnsubscribe) to the Sentinel client for API consistency with the standalone Redis client. Since Sentinel manages a single master with no sharding, these methods provide no sharding benefit - they simply pass through to the underlying Redis commands. This enables compatibility with libraries like @socket.io/redis-adapter that expect the sSubscribe method to be available. fixes redis#3177
1 parent ccf84f2 commit d784748

File tree

3 files changed

+84
-3
lines changed

3 files changed

+84
-3
lines changed

packages/client/lib/sentinel/index.spec.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,31 @@ describe('RedisSentinel', () => {
241241

242242
assert.equal(tester, false);
243243
}, testOptions)
244+
245+
testUtils.testWithClientSentinel('plain pubsub - sharded', async sentinel => {
246+
let pubSubResolve;
247+
const pubSubPromise = new Promise((res) => {
248+
pubSubResolve = res;
249+
});
250+
251+
let tester = false;
252+
await sentinel.sSubscribe('test', () => {
253+
tester = true;
254+
pubSubResolve && pubSubResolve(1);
255+
})
256+
257+
await sentinel.sPublish('test', 'hello world');
258+
await pubSubPromise;
259+
assert.equal(tester, true);
260+
261+
// now unsubscribe
262+
tester = false;
263+
await sentinel.sUnsubscribe('test')
264+
await sentinel.sPublish('test', 'hello world');
265+
await setTimeout(1000);
266+
267+
assert.equal(tester, false);
268+
}, testOptions);
244269
});
245270
});
246271

packages/client/lib/sentinel/index.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,26 @@ export default class RedisSentinel<
552552

553553
pUnsubscribe = this.PUNSUBSCRIBE;
554554

555+
async SSUBSCRIBE<T extends boolean = false>(
556+
channels: string | Array<string>,
557+
listener: PubSubListener<T>,
558+
bufferMode?: T
559+
) {
560+
return this._self.#internal.sSubscribe(channels, listener, bufferMode);
561+
}
562+
563+
sSubscribe = this.SSUBSCRIBE;
564+
565+
async SUNSUBSCRIBE<T extends boolean = false>(
566+
channels?: string | Array<string>,
567+
listener?: PubSubListener<T>,
568+
bufferMode?: T
569+
) {
570+
return this._self.#internal.sUnsubscribe(channels, listener, bufferMode);
571+
}
572+
573+
sUnsubscribe = this.SUNSUBSCRIBE;
574+
555575
/**
556576
* Acquires a master client lease for exclusive operations
557577
*
@@ -1066,6 +1086,22 @@ class RedisSentinelInternal<
10661086
return this.#pubSubProxy.pUnsubscribe(patterns, listener, bufferMode);
10671087
}
10681088

1089+
async sSubscribe<T extends boolean = false>(
1090+
channels: string | Array<string>,
1091+
listener: PubSubListener<T>,
1092+
bufferMode?: T
1093+
) {
1094+
return this.#pubSubProxy.sSubscribe(channels, listener, bufferMode);
1095+
}
1096+
1097+
async sUnsubscribe<T extends boolean = false>(
1098+
channels?: string | Array<string>,
1099+
listener?: PubSubListener<T>,
1100+
bufferMode?: T
1101+
) {
1102+
return this.#pubSubProxy.sUnsubscribe(channels, listener, bufferMode);
1103+
}
1104+
10691105
// observe/analyze/transform remediation functions
10701106
async observe() {
10711107
for (const node of this.#sentinelRootNodes) {

packages/client/lib/sentinel/pub-sub-proxy.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type Client = RedisClient<
1414
>;
1515

1616
type Subscriptions = Record<
17-
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
17+
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'] | PUBSUB_TYPE['SHARDED'],
1818
PubSubTypeListeners
1919
>;
2020

@@ -71,7 +71,8 @@ export class PubSubProxy extends EventEmitter {
7171
if (withSubscriptions && this.#subscriptions) {
7272
await Promise.all([
7373
client.extendPubSubListeners(PUBSUB_TYPE.CHANNELS, this.#subscriptions[PUBSUB_TYPE.CHANNELS]),
74-
client.extendPubSubListeners(PUBSUB_TYPE.PATTERNS, this.#subscriptions[PUBSUB_TYPE.PATTERNS])
74+
client.extendPubSubListeners(PUBSUB_TYPE.PATTERNS, this.#subscriptions[PUBSUB_TYPE.PATTERNS]),
75+
client.extendPubSubListeners(PUBSUB_TYPE.SHARDED, this.#subscriptions[PUBSUB_TYPE.SHARDED])
7576
]);
7677
}
7778

@@ -116,7 +117,8 @@ export class PubSubProxy extends EventEmitter {
116117
if (this.#state.connectPromise === undefined) {
117118
this.#subscriptions = {
118119
[PUBSUB_TYPE.CHANNELS]: this.#state.client.getPubSubListeners(PUBSUB_TYPE.CHANNELS),
119-
[PUBSUB_TYPE.PATTERNS]: this.#state.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS)
120+
[PUBSUB_TYPE.PATTERNS]: this.#state.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS),
121+
[PUBSUB_TYPE.SHARDED]: this.#state.client.getPubSubListeners(PUBSUB_TYPE.SHARDED)
120122
};
121123

122124
this.#state.client.destroy();
@@ -195,6 +197,24 @@ export class PubSubProxy extends EventEmitter {
195197
return this.#unsubscribe(client => client.PUNSUBSCRIBE(patterns, listener, bufferMode));
196198
}
197199

200+
sSubscribe<T extends boolean = false>(
201+
channels: string | Array<string>,
202+
listener: PubSubListener<T>,
203+
bufferMode?: T
204+
) {
205+
return this.#executeCommand(
206+
client => client.SSUBSCRIBE(channels, listener, bufferMode)
207+
);
208+
}
209+
210+
async sUnsubscribe<T extends boolean = false>(
211+
channels?: string | Array<string>,
212+
listener?: PubSubListener<T>,
213+
bufferMode?: T
214+
) {
215+
return this.#unsubscribe(client => client.SUNSUBSCRIBE(channels, listener, bufferMode));
216+
}
217+
198218
destroy() {
199219
this.#subscriptions = undefined;
200220
if (this.#state === undefined) return;

0 commit comments

Comments
 (0)