Skip to content

Commit ccc1f9c

Browse files
committed
feat: add handshake command
1 parent ad2b563 commit ccc1f9c

File tree

2 files changed

+76
-7
lines changed

2 files changed

+76
-7
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import EventEmitter from "events";
22
import { RedisClientOptions } from ".";
33
import RedisCommandsQueue from "./commands-queue";
4-
import RedisSocket from "./socket";
4+
import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from "./socket";
5+
import { RedisArgument } from "../..";
6+
import { isIP } from "net";
7+
import { lookup } from "dns/promises";
58

69
export const MAINTENANCE_EVENTS = {
710
PAUSE_WRITING: "pause-writing",
@@ -18,13 +21,19 @@ const PN = {
1821
};
1922

2023
export interface SocketTimeoutUpdate {
21-
inMaintenance: boolean,
22-
timeout?: number
24+
inMaintenance: boolean;
25+
timeout?: number;
2326
}
2427

2528
export default class EnterpriseMaintenanceManager extends EventEmitter {
2629
#commandsQueue: RedisCommandsQueue;
2730
#options: RedisClientOptions;
31+
32+
static async getHandshakeCommand(tls: boolean, host: string): Promise<Array<RedisArgument>> {
33+
const movingEndpointType = await determineEndpoint(tls, host);
34+
return ["CLIENT", "MAINT_NOTIFICATIONS", "ON", "moving-endpoint-type", movingEndpointType];
35+
}
36+
2837
constructor(commandsQueue: RedisCommandsQueue, options: RedisClientOptions) {
2938
super();
3039
this.#commandsQueue = commandsQueue;
@@ -103,7 +112,7 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
103112

104113
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, {
105114
inMaintenance: true,
106-
timeout: this.#options.gracefulMaintenance?.relaxedSocketTimeout
115+
timeout: this.#options.gracefulMaintenance?.relaxedSocketTimeout,
107116
} satisfies SocketTimeoutUpdate);
108117
};
109118

@@ -113,8 +122,53 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
113122

114123
this.emit(MAINTENANCE_EVENTS.TIMEOUTS_UPDATE, {
115124
inMaintenance: false,
116-
timeout: undefined
125+
timeout: undefined,
117126
} satisfies SocketTimeoutUpdate);
118127
};
128+
}
119129

120-
};
130+
type MovingEndpointType =
131+
| "internal-ip"
132+
| "internal-fqdn"
133+
| "external-ip"
134+
| "external-fqdn"
135+
| "none";
136+
137+
function isPrivateIP(ip: string): boolean {
138+
const version = isIP(ip);
139+
if (version === 4) {
140+
const octets = ip.split(".").map(Number);
141+
return (
142+
octets[0] === 10 ||
143+
(octets[0] === 172 && octets[1] >= 16 && octets[1] <= 31) ||
144+
(octets[0] === 192 && octets[1] === 168)
145+
);
146+
}
147+
if (version === 6) {
148+
return (
149+
ip.startsWith("fc") || // Unique local
150+
ip.startsWith("fd") || // Unique local
151+
ip === "::1" || // Loopback
152+
ip.startsWith("fe80") // Link-local unicast
153+
);
154+
}
155+
return false;
156+
}
157+
158+
async function determineEndpoint(
159+
tlsEnabled: boolean,
160+
host: string,
161+
): Promise<MovingEndpointType> {
162+
163+
const ip = isIP(host)
164+
? host
165+
: (await lookup(host, {family: 0})).address
166+
167+
const isPrivate = isPrivateIP(ip);
168+
169+
if (tlsEnabled) {
170+
return isPrivate ? "internal-fqdn" : "external-fqdn";
171+
} else {
172+
return isPrivate ? "internal-ip" : "external-ip";
173+
}
174+
}

packages/client/lib/client/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import COMMANDS from '../commands';
2-
import RedisSocket, { RedisSocketOptions } from './socket';
2+
import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket';
33
import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx';
44
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
55
import { EventEmitter } from 'node:events';
@@ -21,6 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
2323
import EnterpriseMaintenanceManager, { MAINTENANCE_EVENTS, SocketTimeoutUpdate } from './enterprise-maintenance-manager';
24+
import assert from 'node:assert';
2425

2526
export interface RedisClientOptions<
2627
M extends RedisModules = RedisModules,
@@ -761,6 +762,20 @@ export default class RedisClient<
761762
commands.push({cmd: this.#clientSideCache.trackingOn()});
762763
}
763764

765+
if(this.#options?.gracefulMaintenance) {
766+
const socket = this.#options.socket;
767+
assert(socket !== undefined);
768+
const { tls, host } = socket as RedisTcpSocketOptions;
769+
assert(tls !== undefined);
770+
assert(host !== undefined);
771+
commands.push({
772+
cmd: await EnterpriseMaintenanceManager.getHandshakeCommand(tls, host),
773+
errorHandler: (err: Error) => {
774+
console.log("Maintenance handshake failed: ", err);
775+
}
776+
});
777+
}
778+
764779
return commands;
765780
}
766781

0 commit comments

Comments
 (0)