Skip to content

Commit f7f51b9

Browse files
committed
fix pubsub handler
1 parent dded3de commit f7f51b9

File tree

1 file changed

+19
-25
lines changed

1 file changed

+19
-25
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -174,30 +174,25 @@ export default class RedisCommandsQueue {
174174
});
175175
}
176176

177-
#setupPubSubHandler(command: Exclude<PubSubCommand, undefined>) {
177+
#setupPubSubHandler() {
178178
// RESP3 uses `onPush` to handle PubSub, so no need to modify `onReply`
179179
if (this.#respVersion !== 2) return;
180180

181-
// overriding `resolve` instead of using `.then` to make sure it'll be called before processing the next reply
182-
const { resolve } = command;
183-
command.resolve = () => {
184-
this.decoder.onReply = (reply => {
185-
if (Array.isArray(reply)) {
186-
if (this.#onPush(reply)) return;
187-
188-
if (PONG.equals(reply[0] as Buffer)) {
189-
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
190-
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
191-
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
192-
return;
193-
}
181+
this.decoder.onReply = (reply => {
182+
if (Array.isArray(reply)) {
183+
if (this.#onPush(reply)) return;
184+
185+
if (PONG.equals(reply[0] as Buffer)) {
186+
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
187+
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
188+
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
189+
return;
194190
}
195-
196-
return this.#onReply(reply);
197-
}) as Decoder['onReply'];
198-
this.decoder.getTypeMapping = () => RESP2_PUSH_TYPE_MAPPING;
199-
resolve();
200-
};
191+
}
192+
193+
return this.#onReply(reply);
194+
}) as Decoder['onReply'];
195+
this.decoder.getTypeMapping = () => RESP2_PUSH_TYPE_MAPPING;
201196
}
202197

203198
subscribe<T extends boolean>(
@@ -209,7 +204,7 @@ export default class RedisCommandsQueue {
209204
const command = this.#pubSub.subscribe(type, channels, listener, returnBuffers);
210205
if (!command) return;
211206

212-
this.#setupPubSubHandler(command);
207+
this.#setupPubSubHandler();
213208
return this.#addPubSubCommand(command);
214209
}
215210

@@ -246,8 +241,7 @@ export default class RedisCommandsQueue {
246241
const commands = this.#pubSub.resubscribe();
247242
if (!commands.length) return;
248243

249-
// using last command becasue of asap
250-
this.#setupPubSubHandler(commands[commands.length - 1]);
244+
this.#setupPubSubHandler();
251245
return Promise.all(
252246
commands.map(command => this.#addPubSubCommand(command, true))
253247
);
@@ -261,15 +255,15 @@ export default class RedisCommandsQueue {
261255
const command = this.#pubSub.extendChannelListeners(type, channel, listeners);
262256
if (!command) return;
263257

264-
this.#setupPubSubHandler(command);
258+
this.#setupPubSubHandler();
265259
return this.#addPubSubCommand(command);
266260
}
267261

268262
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
269263
const command = this.#pubSub.extendTypeListeners(type, listeners);
270264
if (!command) return;
271265

272-
this.#setupPubSubHandler(command);
266+
this.#setupPubSubHandler();
273267
return this.#addPubSubCommand(command);
274268
}
275269

0 commit comments

Comments
 (0)