Skip to content

Commit 7026a0f

Browse files
committed
fix: handle corner cases
1 parent 4e80dc6 commit 7026a0f

File tree

2 files changed

+64
-28
lines changed

2 files changed

+64
-28
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { RedisArgument } from "../..";
66
import { isIP } from "net";
77
import { lookup } from "dns/promises";
88
import assert from "node:assert";
9+
import { setTimeout } from 'node:timers/promises'
910

1011
export const MAINTENANCE_EVENTS = {
1112
PAUSE_WRITING: "pause-writing",
@@ -26,13 +27,6 @@ export interface SocketTimeoutUpdate {
2627
timeout?: number;
2728
}
2829

29-
const DEFAULT_OPTIONS = {
30-
maintPushNotifications: "auto",
31-
maintMovingEndpointType: "auto",
32-
maintRelaxedCommandTimeout: 1000,
33-
maintRelaxedSocketTimeout: 1000,
34-
} as const;
35-
3630
export const dbgMaintenance = (...args: any[]) => {
3731
if (!process.env.DEBUG_MAINTENANCE) return;
3832
return console.log("[MNT]", ...args);
@@ -43,6 +37,22 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
4337
#options: RedisClientOptions;
4438
#isMaintenance = 0;
4539

40+
static setupDefaultMaintOptions(options: RedisClientOptions) {
41+
if (options.maintPushNotifications === undefined) {
42+
options.maintPushNotifications =
43+
options?.RESP === 3 ? "auto" : "disabled";
44+
}
45+
if (options.maintMovingEndpointType === undefined) {
46+
options.maintMovingEndpointType = "auto";
47+
}
48+
if (options.maintRelaxedSocketTimeout === undefined) {
49+
options.maintRelaxedSocketTimeout = 10000;
50+
}
51+
if (options.maintRelaxedCommandTimeout === undefined) {
52+
options.maintRelaxedCommandTimeout = 10000;
53+
}
54+
}
55+
4656
static async getHandshakeCommand(
4757
tls: boolean,
4858
host: string,
@@ -63,8 +73,8 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
6373
movingEndpointType,
6474
],
6575
errorHandler: (error: Error) => {
66-
dbgMaintenance('handshake failed:', error);
67-
if(options.maintPushNotifications === 'enabled') {
76+
dbgMaintenance("handshake failed:", error);
77+
if (options.maintPushNotifications === "enabled") {
6878
throw error;
6979
}
7080
},
@@ -74,22 +84,21 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
7484
constructor(commandsQueue: RedisCommandsQueue, options: RedisClientOptions) {
7585
super();
7686
this.#commandsQueue = commandsQueue;
77-
this.#options = { ...DEFAULT_OPTIONS, ...options };
87+
this.#options = options;
7888

7989
this.#commandsQueue.addPushHandler(this.#onPush);
8090
}
8191

8292
#onPush = (push: Array<any>): boolean => {
83-
dbgMaintenance(push.map((item) => item.toString()));
93+
dbgMaintenance(push);
8494
switch (push[0].toString()) {
8595
case PN.MOVING: {
8696
// [ 'MOVING', '17', '15', '54.78.247.156:12075' ]
8797
// ^seq ^after ^new ip
88-
const afterMs = push[2];
89-
const url = push[3];
90-
const [host, port] = url.toString().split(":");
91-
dbgMaintenance("Received MOVING:", afterMs, host, Number(port));
92-
this.#onMoving(afterMs, host, Number(port));
98+
const afterSeconds = push[2];
99+
const url: string | null = push[3];
100+
dbgMaintenance("Received MOVING:", afterSeconds, url);
101+
this.#onMoving(afterSeconds, url);
93102
return true;
94103
}
95104
case PN.MIGRATING:
@@ -121,13 +130,37 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
121130
// 5. [ACTION] Destroy old socket
122131
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
123132
#onMoving = async (
124-
_afterMs: number,
125-
host: string,
126-
port: number,
133+
afterSeconds: number,
134+
url: string | null
127135
): Promise<void> => {
128136
// 1 [EVENT] MOVING PN received
129137
this.#onMigrating();
130138

139+
let host: string
140+
let port: number
141+
142+
// The special value `none` indicates that the `MOVING` message doesn’t need
143+
// to contain an endpoint. Instead it contains the value `null` then. In
144+
// such a corner case, the client is expected to schedule a graceful
145+
// reconnect to its currently configured endpoint after half of the grace
146+
// period that was communicated by the server is over.
147+
if(url === null) {
148+
assert(this.#options.maintMovingEndpointType === 'none');
149+
assert(this.#options.socket !== undefined)
150+
assert('host' in this.#options.socket)
151+
assert(typeof this.#options.socket.host === 'string')
152+
host = this.#options.socket.host
153+
assert(typeof this.#options.socket.port === 'number')
154+
port = this.#options.socket.port
155+
const waitTime = afterSeconds * 1000 / 2;
156+
dbgMaintenance(`Wait for ${waitTime}ms`);
157+
await setTimeout(waitTime);
158+
} else {
159+
const split = url.split(':');
160+
host = split[0];
161+
port = Number(split[1]);
162+
}
163+
131164
// 2 [ACTION] Pause writing
132165
dbgMaintenance("Pausing writing of new commands to old socket");
133166
this.emit(MAINTENANCE_EVENTS.PAUSE_WRITING);
@@ -225,11 +258,13 @@ function isPrivateIP(ip: string): boolean {
225258
async function determineEndpoint(
226259
tlsEnabled: boolean,
227260
host: string,
228-
options: RedisClientOptions
261+
options: RedisClientOptions,
229262
): Promise<MovingEndpointType> {
230-
231263
assert(options.maintMovingEndpointType !== undefined);
232-
if (options.maintMovingEndpointType !== 'auto') return options.maintMovingEndpointType;
264+
if (options.maintMovingEndpointType !== "auto") {
265+
dbgMaintenance(`Determine endpoint type: ${options.maintMovingEndpointType}`);
266+
return options.maintMovingEndpointType;
267+
}
233268

234269
const ip = isIP(host) ? host : (await lookup(host, { family: 0 })).address;
235270

@@ -242,6 +277,6 @@ async function determineEndpoint(
242277
result = isPrivate ? "internal-ip" : "external-ip";
243278
}
244279

245-
dbgMaintenance(`Determine endpoint format: ${result}`);
280+
dbgMaintenance(`Determine endpoint type: ${result}`);
246281
return result;
247282
}

packages/client/lib/client/index.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ export default class RedisClient<
550550
throw new Error('Client Side Caching is only supported with RESP3');
551551
}
552552

553-
if (options?.maintPushNotifications !== 'disabled' && options?.RESP !== 3) {
553+
if (options?.maintPushNotifications && options?.maintPushNotifications !== 'disabled' && options?.RESP !== 3) {
554554
throw new Error('Graceful Maintenance is only supported with RESP3');
555555
}
556556

@@ -578,12 +578,13 @@ export default class RedisClient<
578578
this._commandOptions = options.commandOptions;
579579
}
580580

581-
if (options && options.maintPushNotifications === undefined) {
582-
options.maintPushNotifications =
583-
options?.RESP === 3 ? "auto" : "disabled";
584-
}
581+
585582

586583
if (options) {
584+
if(options.maintPushNotifications !== 'disabled') {
585+
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options)
586+
}
587+
587588
return RedisClient.parseOptions(options);
588589
}
589590

0 commit comments

Comments
 (0)