diff --git a/lib/DataHandler.ts b/lib/DataHandler.ts index fb635944..09ce92a3 100644 --- a/lib/DataHandler.ts +++ b/lib/DataHandler.ts @@ -14,6 +14,10 @@ export interface Condition { select: number; auth?: string | [string, string]; subscriber: false | SubscriptionSet; + /** + * Whether the connection has issued a subscribe command during `connect` or `ready`. + */ + hasIssuedSubscribe: boolean; } export type FlushQueueOptions = { diff --git a/lib/Redis.ts b/lib/Redis.ts index 00176582..866f0e8a 100644 --- a/lib/Redis.ts +++ b/lib/Redis.ts @@ -195,6 +195,7 @@ class Redis extends Commander implements DataHandledable { ? [options.username, options.password] : options.password, subscriber: false, + hasIssuedSubscribe: false, }; const _this = this; @@ -429,6 +430,17 @@ class Redis extends Commander implements DataHandledable { command.reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); return command.promise; } + + // Make sure know that a subscribe command is sent to the server + // In order to prevent race condition by sending another non-subscribe command + // before we've received the response of the previous subscribe command + if ( + (this.status === "connect" || this.status === "ready") && + Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) + ) { + this.condition.hasIssuedSubscribe = true; + } + if ( this.condition?.subscriber && !Command.checkFlag("VALID_IN_SUBSCRIBER_MODE", command.name) diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 9cb2c51f..1b01e6a8 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -77,14 +77,14 @@ export function connectHandler(self) { const clientCommandPromises = []; - if (self.options.connectionName) { + if (self.options.connectionName && !self.condition.hasIssuedSubscribe) { debug("set the connection name [%s]", self.options.connectionName); clientCommandPromises.push( self.client("setname", self.options.connectionName).catch(noop) ); } - if (!self.options.disableClientInfo) { + if (!self.options.disableClientInfo && !self.condition.hasIssuedSubscribe) { debug("set the client info"); clientCommandPromises.push( getPackageMeta() @@ -112,11 +112,16 @@ export function connectHandler(self) { Promise.all(clientCommandPromises) .catch(noop) .finally(() => { - if (!self.options.enableReadyCheck) { + // Ready check should not be performed after a subscribe command + // Because it might result in a race condition + // Additionally because we're using RESP2 another client should be used for normal commands + const shouldReadyCheck = self.options.enableReadyCheck && !self.condition.hasIssuedSubscribe; + + if (!shouldReadyCheck) { exports.readyHandler(self)(); } - if (self.options.enableReadyCheck) { + if (shouldReadyCheck) { self._readyCheck(function (err, info) { if (connectionEpoch !== self.connectionEpoch) { return; diff --git a/test/functional/pub_sub.ts b/test/functional/pub_sub.ts index b1147994..aff46a04 100644 --- a/test/functional/pub_sub.ts +++ b/test/functional/pub_sub.ts @@ -213,4 +213,22 @@ describe("pub/sub", function () { expect(await redis.set("foo", "bar")).to.eql("OK"); redis.disconnect(); }); + + it("should subscribe on connect without errors", (done) => { + const redis = new Redis(); + + redis.on("error", () => { + throw new Error("should not error"); + }); + + redis.on("connect", async () => { + await redis.subscribe("foo"); + redis.disconnect(); + + redis.on("end", () => { + // Make sure there's enough time for the error to be emitted + setTimeout(done, 500); + }); + }); + }); });