Skip to content

Commit 7689ddb

Browse files
committed
feat(commands-queue): Introduce maintenance mode support for commands-queuefd
- Added `#inMaintenance` property and `set inMaintenance` setter to track maintenance mode state. - Introduced `#maintenanceCommandTimeout` and `setMaintenanceCommandTimeout` method to dynamically adjust command timeouts during maintenance. - Refactored timeout logic to prioritize `#maintenanceCommandTimeout` over individual command timeouts. - `CommandTimeoutDuringMaintananceError` is used when in maintenance mode.
1 parent 26e5994 commit 7689ddb

File tree

1 file changed

+57
-9
lines changed

1 file changed

+57
-9
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
1+
import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply, TimeoutError } from '../errors';
6+
import { AbortError, ErrorReply, CommandTimeoutDuringMaintananceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88

99
export interface CommandOptions<T = TypeMapping> {
@@ -30,6 +30,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3030
timeout: {
3131
signal: AbortSignal;
3232
listener: () => unknown;
33+
originalTimeout: number | undefined;
3334
} | undefined;
3435
}
3536

@@ -61,19 +62,62 @@ export default class RedisCommandsQueue {
6162
readonly #respVersion;
6263
readonly #maxLength;
6364
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
64-
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
65+
readonly #waitingForReply = new EmptyAwareSinglyLinkedList<CommandWaitingForReply>();
6566
readonly #onShardedChannelMoved;
6667
#chainInExecution: symbol | undefined;
6768
readonly decoder;
6869
readonly #pubSub = new PubSub();
6970

7071
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
72+
73+
#inMaintenance = false;
74+
75+
set inMaintenance(value: boolean) {
76+
this.#inMaintenance = value;
77+
}
78+
79+
#maintenanceCommandTimeout: number | undefined
80+
81+
setMaintenanceCommandTimeout(ms: number | undefined) {
82+
// Prevent possible api misuse
83+
if (this.#maintenanceCommandTimeout === ms) return;
84+
85+
this.#maintenanceCommandTimeout = ms;
86+
87+
let counter = 0;
88+
89+
// Overwrite timeouts of all eligible toWrite commands
90+
for(const node of this.#toWrite.nodes()) {
91+
const command = node.value;
92+
93+
// Remove timeout listener if it exists
94+
RedisCommandsQueue.#removeTimeoutListener(command)
95+
96+
// Determine newTimeout
97+
const newTimeout = this.#maintenanceCommandTimeout ?? command.timeout?.originalTimeout;
98+
// if no timeout is given and the command didnt have any timeout before, skip
99+
if (!newTimeout) return;
100+
101+
counter++;
102+
103+
// Overwrite the command's timeout
104+
const signal = AbortSignal.timeout(newTimeout);
105+
command.timeout = {
106+
signal,
107+
listener: () => {
108+
this.#toWrite.remove(node);
109+
command.reject(this.#inMaintenance ? new CommandTimeoutDuringMaintananceError(newTimeout) : new TimeoutError());
110+
},
111+
originalTimeout: command.timeout?.originalTimeout
112+
};
113+
signal.addEventListener('abort', command.timeout.listener, { once: true });
114+
};
115+
}
116+
71117
get isPubSubActive() {
72118
return this.#pubSub.isActive;
73119
}
74120

75-
#invalidateCallback?: (key: RedisArgument | null) => unknown;
76-
77121
constructor(
78122
respVersion: RespVersions,
79123
maxLength: number | null | undefined,
@@ -174,15 +218,19 @@ export default class RedisCommandsQueue {
174218
typeMapping: options?.typeMapping
175219
};
176220

177-
const timeout = options?.timeout;
221+
// If #maintenanceCommandTimeout was explicitly set, we should
222+
// use it instead of the timeout provided by the command
223+
const timeout = this.#maintenanceCommandTimeout || options?.timeout
178224
if (timeout) {
225+
179226
const signal = AbortSignal.timeout(timeout);
180227
value.timeout = {
181228
signal,
182229
listener: () => {
183230
this.#toWrite.remove(node);
184-
value.reject(new TimeoutError());
185-
}
231+
value.reject(this.#inMaintenance ? new CommandTimeoutDuringMaintananceError(timeout) : new TimeoutError());
232+
},
233+
originalTimeout: options?.timeout
186234
};
187235
signal.addEventListener('abort', value.timeout.listener, { once: true });
188236
}
@@ -438,7 +486,7 @@ export default class RedisCommandsQueue {
438486
}
439487

440488
static #removeTimeoutListener(command: CommandToWrite) {
441-
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
489+
command.timeout?.signal.removeEventListener('abort', command.timeout!.listener);
442490
}
443491

444492
static #flushToWrite(toBeSent: CommandToWrite, err: Error) {

0 commit comments

Comments
 (0)