diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 52a07a7e3b..68d76a66d1 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -27,10 +27,6 @@ export interface CommandToWrite extends CommandWaitingForReply { signal: AbortSignal; listener: () => unknown; } | undefined; - timeout: { - signal: AbortSignal; - listener: () => unknown; - } | undefined; } interface CommandWaitingForReply { @@ -38,6 +34,12 @@ interface CommandWaitingForReply { reject(err: unknown): void; channelsCounter: number | undefined; typeMapping: TypeMapping | undefined; + node?: DoublyLinkedNode + timeout: { + signal: AbortSignal; + toBeSent: boolean; + listener: () => unknown; + } | undefined; } export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void; @@ -54,7 +56,7 @@ export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply = new SinglyLinkedList(); + readonly #waitingForReply = new DoublyLinkedList(); readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; @@ -78,11 +80,21 @@ export default class RedisCommandsQueue { } #onReply(reply: ReplyUnion) { - this.#waitingForReply.shift()!.resolve(reply); + const node = this.#waitingForReply.shift()!; + if (node.timeout) { + RedisCommandsQueue.#removeTimeoutListener(node) + node.timeout = undefined; + } + node.resolve(reply); } #onErrorReply(err: ErrorReply) { - this.#waitingForReply.shift()!.reject(err); + const node = this.#waitingForReply.shift()!; + if (node.timeout) { + RedisCommandsQueue.#removeTimeoutListener(node) + node.timeout = undefined; + } + node.reject(err); } #onPush(push: Array) { @@ -156,7 +168,6 @@ export default class RedisCommandsQueue { } return new Promise((resolve, reject) => { - let node: DoublyLinkedNode; const value: CommandToWrite = { args, chainId: options?.chainId, @@ -173,9 +184,16 @@ export default class RedisCommandsQueue { const signal = AbortSignal.timeout(timeout); value.timeout = { signal, + toBeSent: true, listener: () => { - this.#toWrite.remove(node); - value.reject(new TimeoutError()); + const reject = value.reject; + if (value.timeout!.toBeSent) { + this.#toWrite.remove(value.node as DoublyLinkedNode); + } else { + value.resolve = () => {}; + value.reject = () => {}; + } + reject(new TimeoutError()); } }; signal.addEventListener('abort', value.timeout.listener, { once: true }); @@ -186,14 +204,14 @@ export default class RedisCommandsQueue { value.abort = { signal, listener: () => { - this.#toWrite.remove(node); + this.#toWrite.remove(value.node as DoublyLinkedNode); value.reject(new AbortError()); } }; signal.addEventListener('abort', value.abort.listener, { once: true }); } - node = this.#toWrite.add(value, options?.asap); + value.node = this.#toWrite.add(value, options?.asap); }); } @@ -408,8 +426,7 @@ export default class RedisCommandsQueue { toSend.abort = undefined; } if (toSend.timeout) { - RedisCommandsQueue.#removeTimeoutListener(toSend); - toSend.timeout = undefined; + toSend.timeout.toBeSent = false; } this.#chainInExecution = toSend.chainId; toSend.chainId = undefined; @@ -431,7 +448,7 @@ export default class RedisCommandsQueue { command.abort!.signal.removeEventListener('abort', command.abort!.listener); } - static #removeTimeoutListener(command: CommandToWrite) { + static #removeTimeoutListener(command: CommandToWrite | CommandWaitingForReply) { command.timeout!.signal.removeEventListener('abort', command.timeout!.listener); }