Skip to content

Commit dacc750

Browse files
committed
feat(client): Add Redis Enterprise maintenance handling capabilities
- Introduced `EnterpriseMaintenanceManager` to manage Redis Enterprise maintenance events and push notifications. - Integrated `EnterpriseMaintenanceManager` into `RedisClient` to handle maintenance push notifications and manage socket transitions. - Implemented graceful handling of MOVING, MIGRATING, and FAILOVER push notifications, including socket replacement and timeout adjustments.
1 parent 9ac1f12 commit dacc750

File tree

4 files changed

+316
-3
lines changed

4 files changed

+316
-3
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/ty
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply, CommandTimeoutDuringMaintananceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8+
import { dbgMaintenance } from './enterprise-maintenance-manager';
89

910
export interface CommandOptions<T = TypeMapping> {
1011
chainId?: symbol;
@@ -79,6 +80,7 @@ export default class RedisCommandsQueue {
7980
#maintenanceCommandTimeout: number | undefined
8081

8182
setMaintenanceCommandTimeout(ms: number | undefined) {
83+
dbgMaintenance(`Setting maintenance command timeout to ${ms}`);
8284
// Prevent possible api misuse
8385
if (this.#maintenanceCommandTimeout === ms) return;
8486

@@ -112,6 +114,7 @@ export default class RedisCommandsQueue {
112114
};
113115
signal.addEventListener('abort', command.timeout.listener, { once: true });
114116
};
117+
dbgMaintenance(`Total of ${counter} timeouts reset to ${ms}`);
115118
}
116119

117120
get isPubSubActive() {
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
import EventEmitter from "events";
2+
import { RedisClientOptions } from ".";
3+
import RedisCommandsQueue from "./commands-queue";
4+
import RedisSocket from "./socket";
5+
import { RedisArgument } from "../..";
6+
import { isIP } from "net";
7+
import { lookup } from "dns/promises";
8+
import assert from "node:assert";
9+
import { setTimeout } from "node:timers/promises";
10+
11+
export const MAINTENANCE_EVENTS = {
12+
PAUSE_WRITING: "pause-writing",
13+
RESUME_WRITING: "resume-writing",
14+
TIMEOUTS_UPDATE: "timeouts-update",
15+
} as const;
16+
17+
const PN = {
18+
MOVING: "MOVING",
19+
MIGRATING: "MIGRATING",
20+
MIGRATED: "MIGRATED",
21+
FAILING_OVER: "FAILING_OVER",
22+
FAILED_OVER: "FAILED_OVER",
23+
};
24+
25+
export interface SocketTimeoutUpdate {
26+
inMaintenance: boolean;
27+
timeout?: number;
28+
}
29+
30+
export const dbgMaintenance = (...args: any[]) => {
31+
if (!process.env.DEBUG_MAINTENANCE) return;
32+
return console.log("[MNT]", ...args);
33+
};
34+
35+
export default class EnterpriseMaintenanceManager extends EventEmitter {
36+
#commandsQueue: RedisCommandsQueue;
37+
#options: RedisClientOptions;
38+
#isMaintenance = 0;
39+
40+
static setupDefaultMaintOptions(options: RedisClientOptions) {
41+
if (options.maintPushNotifications === undefined) {
42+
options.maintPushNotifications =
43+
options?.RESP === 3 ? "auto" : "disabled";
44+
}
45+
if (options.maintMovingEndpointType === undefined) {
46+
options.maintMovingEndpointType = "auto";
47+
}
48+
if (options.maintRelaxedSocketTimeout === undefined) {
49+
options.maintRelaxedSocketTimeout = 10000;
50+
}
51+
if (options.maintRelaxedCommandTimeout === undefined) {
52+
options.maintRelaxedCommandTimeout = 10000;
53+
}
54+
}
55+
56+
static async getHandshakeCommand(
57+
tls: boolean,
58+
host: string,
59+
options: RedisClientOptions,
60+
): Promise<
61+
| { cmd: Array<RedisArgument>; errorHandler: (error: Error) => void }
62+
| undefined
63+
> {
64+
if (options.maintPushNotifications === "disabled") return;
65+
66+
const movingEndpointType = await determineEndpoint(tls, host, options);
67+
return {
68+
cmd: [
69+
"CLIENT",
70+
"MAINT_NOTIFICATIONS",
71+
"ON",
72+
"moving-endpoint-type",
73+
movingEndpointType,
74+
],
75+
errorHandler: (error: Error) => {
76+
dbgMaintenance("handshake failed:", error);
77+
if (options.maintPushNotifications === "enabled") {
78+
throw error;
79+
}
80+
},
81+
};
82+
}
83+
84+
constructor(commandsQueue: RedisCommandsQueue, options: RedisClientOptions) {
85+
super();
86+
this.#commandsQueue = commandsQueue;
87+
this.#options = options;
88+
89+
this.#commandsQueue.addPushHandler(this.#onPush);
90+
}
91+
92+
#onPush = (push: Array<any>): boolean => {
93+
dbgMaintenance("ONPUSH:", push.map(String));
94+
switch (push[0].toString()) {
95+
case PN.MOVING: {
96+
// [ 'MOVING', '17', '15', '54.78.247.156:12075' ]
97+
// ^seq ^after ^new ip
98+
const afterSeconds = push[2];
99+
const url: string | null = push[3] ? String(push[3]) : null;
100+
dbgMaintenance("Received MOVING:", afterSeconds, url);
101+
this.#onMoving(afterSeconds, url);
102+
return true;
103+
}
104+
case PN.MIGRATING:
105+
case PN.FAILING_OVER: {
106+
dbgMaintenance("Received MIGRATING|FAILING_OVER");
107+
this.#onMigrating();
108+
return true;
109+
}
110+
case PN.MIGRATED:
111+
case PN.FAILED_OVER: {
112+
dbgMaintenance("Received MIGRATED|FAILED_OVER");
113+
this.#onMigrated();
114+
return true;
115+
}
116+
}
117+
return false;
118+
};
119+
120+
// Queue:
121+
// toWrite [ C D E ]
122+
// waitingForReply [ A B ] - aka In-flight commands
123+
//
124+
// time: ---1-2---3-4-5-6---------------------------
125+
//
126+
// 1. [EVENT] MOVING PN received
127+
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
128+
// 3. [EVENT] New socket connected
129+
// 4. [EVENT] In-flight commands completed
130+
// 5. [ACTION] Destroy old socket
131+
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
132+
#onMoving = async (
133+
afterSeconds: number,
134+
url: string | null,
135+
): Promise<void> => {
136+
// 1 [EVENT] MOVING PN received
137+
this.#onMigrating();
138+
139+
let host: string;
140+
let port: number;
141+
142+
// The special value `none` indicates that the `MOVING` message doesn’t need
143+
// to contain an endpoint. Instead it contains the value `null` then. In
144+
// such a corner case, the client is expected to schedule a graceful
145+
// reconnect to its currently configured endpoint after half of the grace
146+
// period that was communicated by the server is over.
147+
if (url === null) {
148+
assert(this.#options.maintMovingEndpointType === "none");
149+
assert(this.#options.socket !== undefined);
150+
assert("host" in this.#options.socket);
151+
assert(typeof this.#options.socket.host === "string");
152+
host = this.#options.socket.host;
153+
assert(typeof this.#options.socket.port === "number");
154+
port = this.#options.socket.port;
155+
const waitTime = (afterSeconds * 1000) / 2;
156+
dbgMaintenance(`Wait for ${waitTime}ms`);
157+
await setTimeout(waitTime);
158+
} else {
159+
const split = url.split(":");
160+
host = split[0];
161+
port = Number(split[1]);
162+
}
163+
164+
// 2 [ACTION] Pause writing
165+
dbgMaintenance("Pausing writing of new commands to old socket");
166+
this.emit(MAINTENANCE_EVENTS.PAUSE_WRITING);
167+
168+
const newSocket = new RedisSocket({
169+
...this.#options.socket,
170+
host,
171+
port,
172+
});
173+
dbgMaintenance(
174+
`Set timeout for new socket to ${this.#options.maintRelaxedSocketTimeout}`,
175+
);
176+
newSocket.setMaintenanceTimeout(this.#options.maintRelaxedSocketTimeout);
177+
dbgMaintenance(`Connecting to new socket: ${host}:${port}`);
178+
await newSocket.connect();
179+
dbgMaintenance(`Connected to new socket`);
180+
// 3 [EVENT] New socket connected
181+
182+
dbgMaintenance(`Wait for all in-flight commands to complete`);
183+
await this.#commandsQueue.waitForInflightCommandsToComplete();
184+
dbgMaintenance(`In-flight commands completed`);
185+
// 4 [EVENT] In-flight commands completed
186+
187+
// 5 + 6
188+
dbgMaintenance("Resume writing");
189+
this.emit(MAINTENANCE_EVENTS.RESUME_WRITING, newSocket);
190+
this.#onMigrated();
191+
};
192+
193+
#onMigrating = async () => {
194+
this.#isMaintenance++;
195+
if (this.#isMaintenance > 1) {
196+
dbgMaintenance(`Timeout relaxation already done`);
197+
return;
198+
}
199+
200+
this.#commandsQueue.inMaintenance = true;
201+
this.#commandsQueue.setMaintenanceCommandTimeout(
202+
this.#options.maintRelaxedCommandTimeout,
203+
);
204+
205+
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, {
206+
inMaintenance: true,
207+
timeout: this.#options.maintRelaxedSocketTimeout,
208+
} satisfies SocketTimeoutUpdate);
209+
};
210+
211+
#onMigrated = async () => {
212+
this.#isMaintenance--;
213+
assert(this.#isMaintenance >= 0);
214+
if (this.#isMaintenance > 0) {
215+
dbgMaintenance(`Not ready to unrelax timeouts yet`);
216+
return;
217+
}
218+
219+
this.#commandsQueue.inMaintenance = false;
220+
this.#commandsQueue.setMaintenanceCommandTimeout(undefined);
221+
222+
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, {
223+
inMaintenance: false,
224+
timeout: undefined,
225+
} satisfies SocketTimeoutUpdate);
226+
};
227+
}
228+
229+
export type MovingEndpointType =
230+
| "auto"
231+
| "internal-ip"
232+
| "internal-fqdn"
233+
| "external-ip"
234+
| "external-fqdn"
235+
| "none";
236+
237+
function isPrivateIP(ip: string): boolean {
238+
const version = isIP(ip);
239+
if (version === 4) {
240+
const octets = ip.split(".").map(Number);
241+
return (
242+
octets[0] === 10 ||
243+
(octets[0] === 172 && octets[1] >= 16 && octets[1] <= 31) ||
244+
(octets[0] === 192 && octets[1] === 168)
245+
);
246+
}
247+
if (version === 6) {
248+
return (
249+
ip.startsWith("fc") || // Unique local
250+
ip.startsWith("fd") || // Unique local
251+
ip === "::1" || // Loopback
252+
ip.startsWith("fe80") // Link-local unicast
253+
);
254+
}
255+
return false;
256+
}
257+
258+
async function determineEndpoint(
259+
tlsEnabled: boolean,
260+
host: string,
261+
options: RedisClientOptions,
262+
): Promise<MovingEndpointType> {
263+
assert(options.maintMovingEndpointType !== undefined);
264+
if (options.maintMovingEndpointType !== "auto") {
265+
dbgMaintenance(
266+
`Determine endpoint type: ${options.maintMovingEndpointType}`,
267+
);
268+
return options.maintMovingEndpointType;
269+
}
270+
271+
const ip = isIP(host) ? host : (await lookup(host, { family: 0 })).address;
272+
273+
const isPrivate = isPrivateIP(ip);
274+
275+
let result: MovingEndpointType;
276+
if (tlsEnabled) {
277+
result = isPrivate ? "internal-fqdn" : "external-fqdn";
278+
} else {
279+
result = isPrivate ? "internal-ip" : "external-ip";
280+
}
281+
282+
dbgMaintenance(`Determine endpoint type: ${result}`);
283+
return result;
284+
}

packages/client/lib/client/index.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import COMMANDS from '../commands';
2-
import RedisSocket, { RedisSocketOptions } from './socket';
2+
import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket';
33
import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx';
44
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
55
import { EventEmitter } from 'node:events';
@@ -20,6 +20,8 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider }
2020
import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
23+
import EnterpriseMaintenanceManager, { MAINTENANCE_EVENTS, MovingEndpointType, SocketTimeoutUpdate } from './enterprise-maintenance-manager';
24+
import assert from 'node:assert';
2325

2426
export interface RedisClientOptions<
2527
M extends RedisModules = RedisModules,
@@ -429,7 +431,7 @@ export default class RedisClient<
429431
}
430432

431433
readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
432-
readonly #socket: RedisSocket;
434+
#socket: RedisSocket;
433435
readonly #queue: RedisCommandsQueue;
434436
#selectedDB = 0;
435437
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
@@ -452,7 +454,6 @@ export default class RedisClient<
452454
return this._self.#clientSideCache;
453455
}
454456

455-
456457
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
457458
return this._self.#options;
458459
}
@@ -510,6 +511,16 @@ export default class RedisClient<
510511
this.#options = this.#initiateOptions(options);
511512
this.#queue = this.#initiateQueue();
512513
this.#socket = this.#createSocket(this.#options);
514+
515+
if(options?.maintPushNotifications !== 'disabled') {
516+
new EnterpriseMaintenanceManager(this.#queue, this.#options!)
517+
.on(MAINTENANCE_EVENTS.PAUSE_WRITING, () => this._self.#pausedForMaintenance = true)
518+
.on(MAINTENANCE_EVENTS.RESUME_WRITING, this.#resumeFromMaintenance.bind(this))
519+
.on(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, (value: SocketTimeoutUpdate) => {
520+
this._self.#socket.inMaintenance = value.inMaintenance;
521+
this._self.#socket.setMaintenanceTimeout(value.timeout);
522+
});
523+
};
513524

514525
if (options?.clientSideCache) {
515526
if (options.clientSideCache instanceof ClientSideCacheProvider) {
@@ -567,7 +578,13 @@ export default class RedisClient<
567578
this._commandOptions = options.commandOptions;
568579
}
569580

581+
582+
570583
if (options) {
584+
if(options.maintPushNotifications !== 'disabled') {
585+
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options)
586+
}
587+
571588
return RedisClient.parseOptions(options);
572589
}
573590

@@ -744,6 +761,13 @@ export default class RedisClient<
744761
commands.push({cmd: this.#clientSideCache.trackingOn()});
745762
}
746763

764+
assert(this.#options?.socket !== undefined);
765+
const { tls, host } = this.#options?.socket as RedisTcpSocketOptions;
766+
const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(!!tls, host!, this.#options);
767+
if(maintenanceHandshakeCmd) {
768+
commands.push(maintenanceHandshakeCmd);
769+
};
770+
747771
return commands;
748772
}
749773

0 commit comments

Comments
 (0)