Skip to content

Commit f804b09

Browse files
committed
use asap fro pubsub resubscribe
1 parent d8cb5de commit f804b09

File tree

2 files changed

+9
-14
lines changed

2 files changed

+9
-14
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ export default class RedisCommandsQueue {
223223
listener: PubSubListener<T>,
224224
returnBuffers?: T
225225
) {
226-
return this._pushPubSubCommand(
226+
return this._addPubSubCommand(
227227
this._pubSub.subscribe(type, channels, listener, returnBuffers)
228228
);
229229
}
@@ -234,7 +234,7 @@ export default class RedisCommandsQueue {
234234
listener?: PubSubListener<T>,
235235
returnBuffers?: T
236236
) {
237-
return this._pushPubSubCommand(
237+
return this._addPubSubCommand(
238238
this._pubSub.unsubscribe(type, channels, listener, returnBuffers)
239239
);
240240
}
@@ -244,7 +244,7 @@ export default class RedisCommandsQueue {
244244
if (!commands.length) return;
245245

246246
return Promise.all(
247-
commands.map(command => this._pushPubSubCommand(command))
247+
commands.map(command => this._addPubSubCommand(command, true))
248248
);
249249
}
250250

@@ -253,13 +253,13 @@ export default class RedisCommandsQueue {
253253
channel: string,
254254
listeners: ChannelListeners
255255
) {
256-
return this._pushPubSubCommand(
256+
return this._addPubSubCommand(
257257
this._pubSub.extendChannelListeners(type, channel, listeners)
258258
);
259259
}
260260

261261
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
262-
return this._pushPubSubCommand(
262+
return this._addPubSubCommand(
263263
this._pubSub.extendTypeListeners(type, listeners)
264264
);
265265
}
@@ -268,11 +268,11 @@ export default class RedisCommandsQueue {
268268
return this._pubSub.getTypeListeners(type);
269269
}
270270

271-
private _pushPubSubCommand(command: PubSubCommand) {
271+
private _addPubSubCommand(command: PubSubCommand, asap = false) {
272272
if (command === undefined) return;
273273

274274
return new Promise<void>((resolve, reject) => {
275-
this._toWrite.push({
275+
(asap ? this._toWrite.unshift : this._toWrite.push)({
276276
args: command.args,
277277
chainId: undefined,
278278
abort: undefined,
@@ -287,7 +287,7 @@ export default class RedisCommandsQueue {
287287
},
288288
channelsCounter: command.channelsCounter,
289289
typeMapping: PUSH_TYPE_MAPPING
290-
});
290+
});
291291
});
292292
}
293293

packages/client/lib/client/index.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ export default class RedisClient<
335335

336336
private _initiateSocket(): RedisSocket {
337337
const socketInitiator = async (): Promise<void> => {
338-
const promises = [];
338+
const promises = [this._queue.resubscribe()];
339339

340340
if (this._monitorCallback) {
341341
promises.push(
@@ -408,11 +408,6 @@ export default class RedisClient<
408408
}
409409
}
410410

411-
const resubscribePromise = this._queue.resubscribe();
412-
if (resubscribePromise) {
413-
promises.push(resubscribePromise);
414-
}
415-
416411
if (promises.length) {
417412
this._write();
418413
await Promise.all(promises);

0 commit comments

Comments
 (0)