Skip to content

Commit 20ab341

Browse files
committed
feat(commands-queue): Introduce maintenance mode support for commands-queue
- Added `#inMaintenance` property and `set inMaintenance` setter to track maintenance mode state.d `#maintenanceCommandTimeout` and `setMaintenanceCommandTimeout` method to dynamically adjust command timeouts during maintenance.mmandTimeout` over individual command timeouts.DuringMaintananceError` is used when in maintenance mode.
1 parent 160a0c6 commit 20ab341

File tree

1 file changed

+56
-6
lines changed

1 file changed

+56
-6
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
1+
import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply, TimeoutError } from '../errors';
6+
import { AbortError, ErrorReply, CommandTimeoutDuringMaintananceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88

99
export interface CommandOptions<T = TypeMapping> {
@@ -30,6 +30,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3030
timeout: {
3131
signal: AbortSignal;
3232
listener: () => unknown;
33+
originalTimeout: number | undefined;
3334
} | undefined;
3435
}
3536

@@ -61,13 +62,58 @@ export default class RedisCommandsQueue {
6162
readonly #respVersion;
6263
readonly #maxLength;
6364
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
64-
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
65+
readonly #waitingForReply = new EmptyAwareSinglyLinkedList<CommandWaitingForReply>();
6566
readonly #onShardedChannelMoved;
6667
#chainInExecution: symbol | undefined;
6768
readonly decoder;
6869
readonly #pubSub = new PubSub();
6970

7071
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
72+
73+
#inMaintenance = false;
74+
75+
set inMaintenance(value: boolean) {
76+
this.#inMaintenance = value;
77+
}
78+
79+
#maintenanceCommandTimeout: number | undefined
80+
81+
setMaintenanceCommandTimeout(ms: number | undefined) {
82+
// Prevent possible api misuse
83+
if (this.#maintenanceCommandTimeout === ms) return;
84+
85+
this.#maintenanceCommandTimeout = ms;
86+
87+
let counter = 0;
88+
89+
// Overwrite timeouts of all eligible toWrite commands
90+
for(const node of this.#toWrite.nodes()) {
91+
const command = node.value;
92+
93+
// Remove timeout listener if it exists
94+
RedisCommandsQueue.#removeTimeoutListener(command)
95+
96+
// Determine newTimeout
97+
const newTimeout = this.#maintenanceCommandTimeout ?? command.timeout?.originalTimeout;
98+
// if no timeout is given and the command didnt have any timeout before, skip
99+
if (!newTimeout) return;
100+
101+
counter++;
102+
103+
// Overwrite the command's timeout
104+
const signal = AbortSignal.timeout(newTimeout);
105+
command.timeout = {
106+
signal,
107+
listener: () => {
108+
this.#toWrite.remove(node);
109+
command.reject(this.#inMaintenance ? new CommandTimeoutDuringMaintananceError(newTimeout) : new TimeoutError());
110+
},
111+
originalTimeout: command.timeout?.originalTimeout
112+
};
113+
signal.addEventListener('abort', command.timeout.listener, { once: true });
114+
};
115+
}
116+
71117
get isPubSubActive() {
72118
return this.#pubSub.isActive;
73119
}
@@ -172,15 +218,19 @@ export default class RedisCommandsQueue {
172218
typeMapping: options?.typeMapping
173219
};
174220

175-
const timeout = options?.timeout;
221+
// If #maintenanceCommandTimeout was explicitly set, we should
222+
// use it instead of the timeout provided by the command
223+
const timeout = this.#maintenanceCommandTimeout || options?.timeout
176224
if (timeout) {
225+
177226
const signal = AbortSignal.timeout(timeout);
178227
value.timeout = {
179228
signal,
180229
listener: () => {
181230
this.#toWrite.remove(node);
182-
value.reject(new TimeoutError());
183-
}
231+
value.reject(this.#inMaintenance ? new CommandTimeoutDuringMaintananceError(timeout) : new TimeoutError());
232+
},
233+
originalTimeout: options?.timeout
184234
};
185235
signal.addEventListener('abort', value.timeout.listener, { once: true });
186236
}

0 commit comments

Comments
 (0)