From 208a0d250f98f7690a4981ee39d0013bef999aec Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 10 Sep 2025 11:05:37 +0300 Subject: [PATCH 01/14] Hitless upgrades (#3021) * feat(errors): Add specialized timeout error types for maintenance scenarios - Added `SocketTimeoutDuringMaintananceError`, a subclass of `TimeoutError`, to handle socket timeouts during maintenance. - Added `CommandTimeoutDuringMaintenanceError`, another subclass of `TimeoutError`, to address command write timeouts during maintenance. * feat(linked-list): Add EmptyAwareSinglyLinkedList and enhance DoublyLinkedList functionality - Introduced `EmptyAwareSinglyLinkedList`, a subclass of `SinglyLinkedList` that emits an `empty` event when the list becomes empty due to `reset`, `shift`, or `remove` operations. - Added `nodes()` iterator method to `DoublyLinkedList` for iterating over nodes directly. - Enhanced unit tests for `DoublyLinkedList` and `SinglyLinkedList` to cover edge cases and new functionality. - Added comprehensive tests for `EmptyAwareSinglyLinkedList` to validate `empty` event emission under various scenarios. - Improved code formatting and consistency. * refactor(commands-queue): Improve push notification handling - Replaced `setInvalidateCallback` with a more flexible `addPushHandler` method, allowing multiple handlers for push notifications. - Introduced the `PushHandler` type to standardize push notification processing. - Refactored `RedisCommandsQueue` to use a `#pushHandlers` array, enabling dynamic and modular handling of push notifications. - Updated `RedisClient` to leverage the new handler mechanism for `invalidate` push notifications, simplifying and decoupling logic. * feat(commands-queue): Add method to wait for in-flight commands to complete - Introduced `waitForInflightCommandsToComplete` method to asynchronously wait for all in-flight commands to finish processing. - Utilized the `empty` event from `#waitingForReply` to signal when all commands have been completed. * feat(commands-queue): Introduce maintenance mode support for commands-queue - Added `#maintenanceCommandTimeout` and `setMaintenanceCommandTimeout` method to dynamically adjust command timeouts during maintenance * refator(client): Extract socket event listener setup into helper method * refactor(socket): Add maintenance mode support and dynamic timeout handling - Added `#maintenanceTimeout` and `setMaintenanceTimeout` method to dynamically adjust socket timeouts during maintenance. * feat(client): Add Redis Enterprise maintenance configuration options - Added `maintPushNotifications` option to control how the client handles Redis Enterprise maintenance push notifications (`disabled`, `enabled`, `au to`). - Added `maintMovingEndpointType` option to specify the endpoint type for reconnecting during a MOVING notification (`auto`, `internal-ip`, `external-ip`, etc.). - Added `maintRelaxedCommandTimeout` option to define a relaxed timeout for commands during maintenance. - Added `maintRelaxedSocketTimeout` option to define a relaxed timeout for the socket during maintenance. - Enforced RESP3 requirement for maintenance-related features (`maintPushNotifications`). * feat(client): Add socket helpers and pause mechanism - Introduced `#paused` flag with corresponding `_pause` and `_unpause` methods to temporarily halt writing commands to the socket during maintenance windows. - Updated `#write` method to respect the `#paused` flag, preventing new commands from being written during maintenance. - Added `_ejectSocket` method to safely detach from and return the current socket - Added `_insertSocket` method to receive and start using a new socket * feat(client): Add Redis Enterprise maintenance handling capabilities - Introduced `EnterpriseMaintenanceManager` to manage Redis Enterprise maintenance events and push notifications. - Integrated `EnterpriseMaintenanceManager` into `RedisClient` to handle maintenance push notifications and manage socket transitions. - Implemented graceful handling of MOVING, MIGRATING, and FAILOVER push notifications, including socket replacement and timeout adjustments. * test: add E2E test infrastructure for Redis maintenance scenarios * test: add E2E tests for Redis Enterprise maintenance timeout handling (#3) * test: add connection handoff test --------- Co-authored-by: Pavel Pashov Co-authored-by: Pavel Pashov <60297174+PavelPashov@users.noreply.github.com> --- package-lock.json | 16 - packages/bloom/package.json | 2 +- packages/client/lib/client/commands-queue.ts | 111 ++++-- .../client/enterprise-maintenance-manager.ts | 348 ++++++++++++++++++ packages/client/lib/client/index.ts | 196 ++++++++-- .../client/lib/client/linked-list.spec.ts | 111 ++++-- packages/client/lib/client/linked-list.ts | 41 ++- packages/client/lib/client/socket.ts | 26 +- packages/client/lib/errors.ts | 12 + .../test-scenario/connection-handoff.e2e.ts | 126 +++++++ .../test-scenario/fault-injector-client.ts | 187 ++++++++++ .../test-scenario/push-notification.e2e.ts | 94 +++++ .../test-scenario/test-command-runner.ts | 108 ++++++ .../tests/test-scenario/test-scenario.util.ts | 197 ++++++++++ .../timeout-during-notifications.e2e.ts | 159 ++++++++ packages/entraid/package.json | 2 +- packages/json/package.json | 2 +- packages/redis/package.json | 10 +- packages/search/package.json | 2 +- packages/time-series/package.json | 2 +- 20 files changed, 1635 insertions(+), 117 deletions(-) create mode 100644 packages/client/lib/client/enterprise-maintenance-manager.ts create mode 100644 packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts create mode 100644 packages/client/lib/tests/test-scenario/fault-injector-client.ts create mode 100644 packages/client/lib/tests/test-scenario/push-notification.e2e.ts create mode 100644 packages/client/lib/tests/test-scenario/test-command-runner.ts create mode 100644 packages/client/lib/tests/test-scenario/test-scenario.util.ts create mode 100644 packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts diff --git a/package-lock.json b/package-lock.json index 736abe70a46..288e109c979 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7319,22 +7319,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "packages/authx": { - "name": "@redis/authx", - "version": "5.0.0-next.5", - "extraneous": true, - "license": "MIT", - "dependencies": { - "@azure/msal-node": "^2.16.1" - }, - "devDependencies": {}, - "engines": { - "node": ">= 18" - }, - "peerDependencies": { - "@redis/client": "^5.0.0-next.5" - } - }, "packages/bloom": { "name": "@redis/bloom", "version": "5.8.2", diff --git a/packages/bloom/package.json b/packages/bloom/package.json index e2ff5a8b42d..4f46bce4ad7 100644 --- a/packages/bloom/package.json +++ b/packages/bloom/package.json @@ -13,7 +13,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" }, "devDependencies": { "@redis/test-utils": "*" diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 52a07a7e3b5..ae67ca28cd6 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,10 +1,11 @@ -import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; +import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; -import { AbortError, ErrorReply, TimeoutError } from '../errors'; +import { AbortError, ErrorReply, CommandTimeoutDuringMaintenanceError, TimeoutError } from '../errors'; import { MonitorCallback } from '.'; +import { dbgMaintenance } from './enterprise-maintenance-manager'; export interface CommandOptions { chainId?: symbol; @@ -30,6 +31,7 @@ export interface CommandToWrite extends CommandWaitingForReply { timeout: { signal: AbortSignal; listener: () => unknown; + originalTimeout: number | undefined; } | undefined; } @@ -50,22 +52,74 @@ const RESP2_PUSH_TYPE_MAPPING = { [RESP_TYPES.SIMPLE_STRING]: Buffer }; +// Try to handle a push notification. Return whether you +// successfully consumed the notification or not. This is +// important in order for the queue to be able to pass the +// notification to another handler if the current one did not +// succeed. +type PushHandler = (pushItems: Array) => boolean; + export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply = new SinglyLinkedList(); + readonly #waitingForReply = new EmptyAwareSinglyLinkedList(); readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); + #pushHandlers: PushHandler[] = [this.#onPush.bind(this)]; + + #maintenanceCommandTimeout: number | undefined + + setMaintenanceCommandTimeout(ms: number | undefined) { + // Prevent possible api misuse + if (this.#maintenanceCommandTimeout === ms) { + dbgMaintenance(`Queue already set maintenanceCommandTimeout to ${ms}, skipping`); + return; + }; + + dbgMaintenance(`Setting maintenance command timeout to ${ms}`); + this.#maintenanceCommandTimeout = ms; + + if(this.#maintenanceCommandTimeout === undefined) { + dbgMaintenance(`Queue will keep maintenanceCommandTimeout for exisitng commands, just to be on the safe side. New commands will receive normal timeouts`); + return; + } + + let counter = 0; + const total = this.#toWrite.length; + + // Overwrite timeouts of all eligible toWrite commands + for(const node of this.#toWrite.nodes()) { + const command = node.value; + + // Remove timeout listener if it exists + RedisCommandsQueue.#removeTimeoutListener(command) + + counter++; + const newTimeout = this.#maintenanceCommandTimeout; + + // Overwrite the command's timeout + const signal = AbortSignal.timeout(newTimeout); + command.timeout = { + signal, + listener: () => { + this.#toWrite.remove(node); + command.reject(new CommandTimeoutDuringMaintenanceError(newTimeout)); + }, + originalTimeout: command.timeout?.originalTimeout + }; + signal.addEventListener('abort', command.timeout.listener, { once: true }); + }; + dbgMaintenance(`Total of ${counter} of ${total} timeouts reset to ${ms}`); + } + get isPubSubActive() { return this.#pubSub.isActive; } - #invalidateCallback?: (key: RedisArgument | null) => unknown; - constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -107,6 +161,7 @@ export default class RedisCommandsQueue { } return true; } + return false } #getTypeMapping() { @@ -119,30 +174,27 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), //TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used onPush: push => { - if (!this.#onPush(push)) { - // currently only supporting "invalidate" over RESP3 push messages - switch (push[0].toString()) { - case "invalidate": { - if (this.#invalidateCallback) { - if (push[1] !== null) { - for (const key of push[1]) { - this.#invalidateCallback(key); - } - } else { - this.#invalidateCallback(null); - } - } - break; - } - } + for(const pushHandler of this.#pushHandlers) { + if(pushHandler(push)) return } }, getTypeMapping: () => this.#getTypeMapping() }); } - setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { - this.#invalidateCallback = callback; + addPushHandler(handler: PushHandler): void { + this.#pushHandlers.push(handler); + } + + async waitForInflightCommandsToComplete(): Promise { + // In-flight commands already completed + if(this.#waitingForReply.length === 0) { + return + }; + // Otherwise wait for in-flight commands to fire `empty` event + return new Promise(resolve => { + this.#waitingForReply.events.on('empty', resolve) + }); } addCommand( @@ -168,15 +220,20 @@ export default class RedisCommandsQueue { typeMapping: options?.typeMapping }; - const timeout = options?.timeout; + // If #maintenanceCommandTimeout was explicitly set, we should + // use it instead of the timeout provided by the command + const timeout = this.#maintenanceCommandTimeout ?? options?.timeout; + const wasInMaintenance = this.#maintenanceCommandTimeout !== undefined; if (timeout) { + const signal = AbortSignal.timeout(timeout); value.timeout = { signal, listener: () => { this.#toWrite.remove(node); - value.reject(new TimeoutError()); - } + value.reject(wasInMaintenance ? new CommandTimeoutDuringMaintenanceError(timeout) : new TimeoutError()); + }, + originalTimeout: options?.timeout }; signal.addEventListener('abort', value.timeout.listener, { once: true }); } @@ -432,7 +489,7 @@ export default class RedisCommandsQueue { } static #removeTimeoutListener(command: CommandToWrite) { - command.timeout!.signal.removeEventListener('abort', command.timeout!.listener); + command.timeout?.signal.removeEventListener('abort', command.timeout!.listener); } static #flushToWrite(toBeSent: CommandToWrite, err: Error) { diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts new file mode 100644 index 00000000000..d4766d9e533 --- /dev/null +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -0,0 +1,348 @@ +import { RedisClientOptions } from "."; +import RedisCommandsQueue from "./commands-queue"; +import { RedisArgument } from "../.."; +import { isIP } from "net"; +import { lookup } from "dns/promises"; +import assert from "node:assert"; +import { setTimeout } from "node:timers/promises"; +import RedisSocket from "./socket"; +import diagnostics_channel from "node:diagnostics_channel"; + +export const MAINTENANCE_EVENTS = { + PAUSE_WRITING: "pause-writing", + RESUME_WRITING: "resume-writing", + TIMEOUTS_UPDATE: "timeouts-update", +} as const; + +const PN = { + MOVING: "MOVING", + MIGRATING: "MIGRATING", + MIGRATED: "MIGRATED", + FAILING_OVER: "FAILING_OVER", + FAILED_OVER: "FAILED_OVER", +}; + +export type DiagnosticsEvent = { + type: string; + timestamp: number; + data?: Object; +}; + +export const dbgMaintenance = (...args: any[]) => { + if (!process.env.REDIS_DEBUG_MAINTENANCE) return; + return console.log("[MNT]", ...args); +}; + +export const emitDiagnostics = (event: DiagnosticsEvent) => { + if (!process.env.REDIS_EMIT_DIAGNOSTICS) return; + + const channel = diagnostics_channel.channel("redis.maintenance"); + channel.publish(event); +}; + +export interface MaintenanceUpdate { + relaxedCommandTimeout?: number; + relaxedSocketTimeout?: number; +} + +interface Client { + _ejectSocket: () => RedisSocket; + _insertSocket: (socket: RedisSocket) => void; + _pause: () => void; + _unpause: () => void; + _maintenanceUpdate: (update: MaintenanceUpdate) => void; + duplicate: (options: RedisClientOptions) => Client; + connect: () => Promise; + destroy: () => void; +} + +export default class EnterpriseMaintenanceManager { + #commandsQueue: RedisCommandsQueue; + #options: RedisClientOptions; + #isMaintenance = 0; + #client: Client; + + static setupDefaultMaintOptions(options: RedisClientOptions) { + if (options.maintPushNotifications === undefined) { + options.maintPushNotifications = + options?.RESP === 3 ? "auto" : "disabled"; + } + if (options.maintMovingEndpointType === undefined) { + options.maintMovingEndpointType = "auto"; + } + if (options.maintRelaxedSocketTimeout === undefined) { + options.maintRelaxedSocketTimeout = 10000; + } + if (options.maintRelaxedCommandTimeout === undefined) { + options.maintRelaxedCommandTimeout = 10000; + } + } + + static async getHandshakeCommand( + tls: boolean, + host: string, + options: RedisClientOptions, + ): Promise< + | { cmd: Array; errorHandler: (error: Error) => void } + | undefined + > { + if (options.maintPushNotifications === "disabled") return; + + const movingEndpointType = await determineEndpoint(tls, host, options); + return { + cmd: [ + "CLIENT", + "MAINT_NOTIFICATIONS", + "ON", + "moving-endpoint-type", + movingEndpointType, + ], + errorHandler: (error: Error) => { + dbgMaintenance("handshake failed:", error); + if (options.maintPushNotifications === "enabled") { + throw error; + } + }, + }; + } + + constructor( + commandsQueue: RedisCommandsQueue, + client: Client, + options: RedisClientOptions, + ) { + this.#commandsQueue = commandsQueue; + this.#options = options; + this.#client = client; + + this.#commandsQueue.addPushHandler(this.#onPush); + } + + #onPush = (push: Array): boolean => { + dbgMaintenance("ONPUSH:", push.map(String)); + + if (!Array.isArray(push) || !["MOVING", "MIGRATING", "MIGRATED", "FAILING_OVER", "FAILED_OVER"].includes(String(push[0]))) { + return false; + } + + const type = String(push[0]); + + emitDiagnostics({ + type, + timestamp: Date.now(), + data: { + push: push.map(String), + }, + }); + switch (type) { + case PN.MOVING: { + // [ 'MOVING', '17', '15', '54.78.247.156:12075' ] + // ^seq ^after ^new ip + const afterSeconds = push[2]; + const url: string | null = push[3] ? String(push[3]) : null; + dbgMaintenance("Received MOVING:", afterSeconds, url); + this.#onMoving(afterSeconds, url); + return true; + } + case PN.MIGRATING: + case PN.FAILING_OVER: { + dbgMaintenance("Received MIGRATING|FAILING_OVER"); + this.#onMigrating(); + return true; + } + case PN.MIGRATED: + case PN.FAILED_OVER: { + dbgMaintenance("Received MIGRATED|FAILED_OVER"); + this.#onMigrated(); + return true; + } + } + return false; + }; + + // Queue: + // toWrite [ C D E ] + // waitingForReply [ A B ] - aka In-flight commands + // + // time: ---1-2---3-4-5-6--------------------------- + // + // 1. [EVENT] MOVING PN received + // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) + // 3. [EVENT] New socket connected + // 4. [EVENT] In-flight commands completed + // 5. [ACTION] Destroy old socket + // 6. [ACTION] Resume writing -> we are going to write to the new socket from now on + #onMoving = async ( + afterSeconds: number, + url: string | null, + ): Promise => { + // 1 [EVENT] MOVING PN received + this.#onMigrating(); + + let host: string; + let port: number; + + // The special value `none` indicates that the `MOVING` message doesn’t need + // to contain an endpoint. Instead it contains the value `null` then. In + // such a corner case, the client is expected to schedule a graceful + // reconnect to its currently configured endpoint after half of the grace + // period that was communicated by the server is over. + if (url === null) { + assert(this.#options.maintMovingEndpointType === "none"); + assert(this.#options.socket !== undefined); + assert("host" in this.#options.socket); + assert(typeof this.#options.socket.host === "string"); + host = this.#options.socket.host; + assert(typeof this.#options.socket.port === "number"); + port = this.#options.socket.port; + const waitTime = (afterSeconds * 1000) / 2; + dbgMaintenance(`Wait for ${waitTime}ms`); + await setTimeout(waitTime); + } else { + const split = url.split(":"); + host = split[0]; + port = Number(split[1]); + } + + // 2 [ACTION] Pause writing + dbgMaintenance("Pausing writing of new commands to old socket"); + this.#client._pause(); + + dbgMaintenance("Creating new tmp client"); + let start = performance.now(); + + const tmpOptions = this.#options; + // If the URL is provided, it takes precedense + if(tmpOptions.url) { + const u = new URL(tmpOptions.url); + u.hostname = host; + u.port = String(port); + tmpOptions.url = u.toString(); + } else { + tmpOptions.socket = { + ...tmpOptions.socket, + host, + port + } + } + const tmpClient = this.#client.duplicate(tmpOptions); + dbgMaintenance(`Tmp client created in ${( performance.now() - start ).toFixed(2)}ms`); + dbgMaintenance( + `Set timeout for tmp client to ${this.#options.maintRelaxedSocketTimeout}`, + ); + tmpClient._maintenanceUpdate({ + relaxedCommandTimeout: this.#options.maintRelaxedCommandTimeout, + relaxedSocketTimeout: this.#options.maintRelaxedSocketTimeout, + }); + dbgMaintenance(`Connecting tmp client: ${host}:${port}`); + start = performance.now(); + await tmpClient.connect(); + dbgMaintenance(`Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`); + // 3 [EVENT] New socket connected + + dbgMaintenance(`Wait for all in-flight commands to complete`); + await this.#commandsQueue.waitForInflightCommandsToComplete(); + dbgMaintenance(`In-flight commands completed`); + // 4 [EVENT] In-flight commands completed + + dbgMaintenance("Swap client sockets..."); + const oldSocket = this.#client._ejectSocket(); + const newSocket = tmpClient._ejectSocket(); + this.#client._insertSocket(newSocket); + tmpClient._insertSocket(oldSocket); + tmpClient.destroy(); + dbgMaintenance("Swap client sockets done."); + // 5 + 6 + dbgMaintenance("Resume writing"); + this.#client._unpause(); + this.#onMigrated(); + }; + + #onMigrating = () => { + this.#isMaintenance++; + if (this.#isMaintenance > 1) { + dbgMaintenance(`Timeout relaxation already done`); + return; + } + + const update: MaintenanceUpdate = { + relaxedCommandTimeout: this.#options.maintRelaxedCommandTimeout, + relaxedSocketTimeout: this.#options.maintRelaxedSocketTimeout, + }; + + this.#client._maintenanceUpdate(update); + }; + + #onMigrated = () => { + //ensure that #isMaintenance doesnt go under 0 + this.#isMaintenance = Math.max(this.#isMaintenance - 1, 0); + if (this.#isMaintenance > 0) { + dbgMaintenance(`Not ready to unrelax timeouts yet`); + return; + } + + const update: MaintenanceUpdate = { + relaxedCommandTimeout: undefined, + relaxedSocketTimeout: undefined + }; + + this.#client._maintenanceUpdate(update); + }; +} + +export type MovingEndpointType = + | "auto" + | "internal-ip" + | "internal-fqdn" + | "external-ip" + | "external-fqdn" + | "none"; + +function isPrivateIP(ip: string): boolean { + const version = isIP(ip); + if (version === 4) { + const octets = ip.split(".").map(Number); + return ( + octets[0] === 10 || + (octets[0] === 172 && octets[1] >= 16 && octets[1] <= 31) || + (octets[0] === 192 && octets[1] === 168) + ); + } + if (version === 6) { + return ( + ip.startsWith("fc") || // Unique local + ip.startsWith("fd") || // Unique local + ip === "::1" || // Loopback + ip.startsWith("fe80") // Link-local unicast + ); + } + return false; +} + +async function determineEndpoint( + tlsEnabled: boolean, + host: string, + options: RedisClientOptions, +): Promise { + assert(options.maintMovingEndpointType !== undefined); + if (options.maintMovingEndpointType !== "auto") { + dbgMaintenance( + `Determine endpoint type: ${options.maintMovingEndpointType}`, + ); + return options.maintMovingEndpointType; + } + + const ip = isIP(host) ? host : (await lookup(host, { family: 0 })).address; + + const isPrivate = isPrivateIP(ip); + + let result: MovingEndpointType; + if (tlsEnabled) { + result = isPrivate ? "internal-fqdn" : "external-fqdn"; + } else { + result = isPrivate ? "internal-ip" : "external-ip"; + } + + dbgMaintenance(`Determine endpoint type: ${result}`); + return result; +} diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 57b12316708..cf5763357a6 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1,5 +1,5 @@ import COMMANDS from '../commands'; -import RedisSocket, { RedisSocketOptions } from './socket'; +import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket'; import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx'; import RedisCommandsQueue, { CommandOptions } from './commands-queue'; import { EventEmitter } from 'node:events'; @@ -20,6 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' +import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -144,7 +145,46 @@ export interface RedisClientOptions< * Tag to append to library name that is sent to the Redis server */ clientInfoTag?: string; -} + /** + * Controls how the client handles Redis Enterprise maintenance push notifications. + * + * - `disabled`: The feature is not used by the client. + * - `enabled`: The client attempts to enable the feature on the server. If the server responds with an error, the connection is interrupted. + * - `auto`: The client attempts to enable the feature on the server. If the server returns an error, the client disables the feature and continues. + * + * The default is `auto`. + */ + maintPushNotifications?: 'disabled' | 'enabled' | 'auto'; + /** + * Controls how the client requests the endpoint to reconnect to during a MOVING notification in Redis Enterprise maintenance. + * + * - `auto`: If the connection is opened to a name or IP address that is from/resolves to a reserved private IP range, request an internal endpoint (e.g., internal-ip), otherwise an external one. If TLS is enabled, then request a FQDN. + * - `internal-ip`: Enforce requesting the internal IP. + * - `internal-fqdn`: Enforce requesting the internal FQDN. + * - `external-ip`: Enforce requesting the external IP address. + * - `external-fqdn`: Enforce requesting the external FQDN. + * - `none`: Used to request a null endpoint, which tells the client to reconnect based on its current config + + * The default is `auto`. + */ + maintMovingEndpointType?: MovingEndpointType; + /** + * Specifies a more relaxed timeout (in milliseconds) for commands during a maintenance window. + * This helps minimize command timeouts during maintenance. If not provided, the `commandOptions.timeout` + * will be used instead. Timeouts during maintenance period result in a `CommandTimeoutDuringMaintenance` error. + * + * The default is 10000 + */ + maintRelaxedCommandTimeout?: number; + /** + * Specifies a more relaxed timeout (in milliseconds) for the socket during a maintenance window. + * This helps minimize socket timeouts during maintenance. If not provided, the `socket.timeout` + * will be used instead. Timeouts during maintenance period result in a `SocketTimeoutDuringMaintenance` error. + * + * The default is 10000 + */ + maintRelaxedSocketTimeout?: number; +}; export type WithCommands< RESP extends RespVersions, @@ -390,7 +430,7 @@ export default class RedisClient< } readonly #options?: RedisClientOptions; - readonly #socket: RedisSocket; + #socket: RedisSocket; readonly #queue: RedisCommandsQueue; #selectedDB = 0; #monitorCallback?: MonitorCallback; @@ -403,11 +443,16 @@ export default class RedisClient< #watchEpoch?: number; #clientSideCache?: ClientSideCacheProvider; #credentialsSubscription: Disposable | null = null; + // Flag used to pause writing to the socket during maintenance windows. + // When true, prevents new commands from being written while waiting for: + // 1. New socket to be ready after maintenance redirect + // 2. In-flight commands on the old socket to complete + #paused = false; + get clientSideCache() { return this._self.#clientSideCache; } - get options(): RedisClientOptions | undefined { return this._self.#options; } @@ -457,6 +502,11 @@ export default class RedisClient< this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); + + if(options?.maintPushNotifications !== 'disabled') { + new EnterpriseMaintenanceManager(this.#queue, this, this.#options!); + }; + if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { this.#clientSideCache = options.clientSideCache; @@ -464,7 +514,19 @@ export default class RedisClient< const cscConfig = options.clientSideCache; this.#clientSideCache = new BasicClientSideCache(cscConfig); } - this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache)); + this.#queue.addPushHandler((push: Array): boolean => { + if (push[0].toString() !== 'invalidate') return false; + + if (push[1] !== null) { + for (const key of push[1]) { + this.#clientSideCache?.invalidate(key) + } + } else { + this.#clientSideCache?.invalidate(null) + } + + return true + }); } } @@ -473,7 +535,12 @@ export default class RedisClient< throw new Error('Client Side Caching is only supported with RESP3'); } + if (options?.maintPushNotifications && options?.maintPushNotifications !== 'disabled' && options?.RESP !== 3) { + throw new Error('Graceful Maintenance is only supported with RESP3'); + } + } + #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { // Convert username/password to credentialsProvider if no credentialsProvider is already in place @@ -496,13 +563,15 @@ export default class RedisClient< this._commandOptions = options.commandOptions; } + if(options?.maintPushNotifications !== 'disabled') { + EnterpriseMaintenanceManager.setupDefaultMaintOptions(options!); + } + if (options?.url) { const parsedOptions = RedisClient.parseOptions(options); - if (parsedOptions?.database) { this._self.#selectedDB = parsedOptions.database; } - return parsedOptions; } @@ -679,9 +748,44 @@ export default class RedisClient< commands.push({cmd: this.#clientSideCache.trackingOn()}); } + const { tls, host } = this.#options!.socket as RedisTcpSocketOptions; + const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(!!tls, host!, this.#options!); + if(maintenanceHandshakeCmd) { + commands.push(maintenanceHandshakeCmd); + }; + return commands; } + #attachListeners(socket: RedisSocket) { + socket.on('data', chunk => { + try { + this.#queue.decoder.write(chunk); + } catch (err) { + this.#queue.resetDecoder(); + this.emit('error', err); + } + }) + .on('error', err => { + this.emit('error', err); + this.#clientSideCache?.onError(); + if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { + this.#queue.flushWaitingForReply(err); + } else { + this.#queue.flushAll(err); + } + }) + .on('connect', () => this.emit('connect')) + .on('ready', () => { + this.emit('ready'); + this.#setPingTimer(); + this.#maybeScheduleWrite(); + }) + .on('reconnecting', () => this.emit('reconnecting')) + .on('drain', () => this.#maybeScheduleWrite()) + .on('end', () => this.emit('end')); + } + #initiateSocket(): RedisSocket { const socketInitiator = async () => { const promises = [], @@ -713,33 +817,9 @@ export default class RedisClient< } }; - return new RedisSocket(socketInitiator, this.#options?.socket) - .on('data', chunk => { - try { - this.#queue.decoder.write(chunk); - } catch (err) { - this.#queue.resetDecoder(); - this.emit('error', err); - } - }) - .on('error', err => { - this.emit('error', err); - this.#clientSideCache?.onError(); - if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { - this.#queue.flushWaitingForReply(err); - } else { - this.#queue.flushAll(err); - } - }) - .on('connect', () => this.emit('connect')) - .on('ready', () => { - this.emit('ready'); - this.#setPingTimer(); - this.#maybeScheduleWrite(); - }) - .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this.#maybeScheduleWrite()) - .on('end', () => this.emit('end')); + const socket = new RedisSocket(socketInitiator, this.#options?.socket); + this.#attachListeners(socket); + return socket; } #pingTimer?: NodeJS.Timeout; @@ -851,6 +931,51 @@ export default class RedisClient< return this as unknown as RedisClientType; } + /** + * @internal + */ + _ejectSocket(): RedisSocket { + const socket = this._self.#socket; + // @ts-ignore + this._self.#socket = null; + socket.removeAllListeners(); + return socket; + } + + /** + * @internal + */ + _insertSocket(socket: RedisSocket) { + if(this._self.#socket) { + this._self._ejectSocket().destroy(); + } + this._self.#socket = socket; + this._self.#attachListeners(this._self.#socket); + } + + /** + * @internal + */ + _maintenanceUpdate(update: MaintenanceUpdate) { + this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout); + this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout); + } + + /** + * @internal + */ + _pause() { + this._self.#paused = true; + } + + /** + * @internal + */ + _unpause() { + this._self.#paused = false; + this._self.#maybeScheduleWrite(); + } + /** * @internal */ @@ -1080,6 +1205,9 @@ export default class RedisClient< } #write() { + if(this.#paused) { + return + } this.#socket.write(this.#queue.commandsToWrite()); } diff --git a/packages/client/lib/client/linked-list.spec.ts b/packages/client/lib/client/linked-list.spec.ts index 9547fb81c7c..c791d21900d 100644 --- a/packages/client/lib/client/linked-list.spec.ts +++ b/packages/client/lib/client/linked-list.spec.ts @@ -1,138 +1,197 @@ -import { SinglyLinkedList, DoublyLinkedList } from './linked-list'; -import { equal, deepEqual } from 'assert/strict'; - -describe('DoublyLinkedList', () => { +import { + SinglyLinkedList, + DoublyLinkedList, + EmptyAwareSinglyLinkedList, +} from "./linked-list"; +import { equal, deepEqual } from "assert/strict"; + +describe("DoublyLinkedList", () => { const list = new DoublyLinkedList(); - it('should start empty', () => { + it("should start empty", () => { equal(list.length, 0); equal(list.head, undefined); equal(list.tail, undefined); deepEqual(Array.from(list), []); }); - it('shift empty', () => { + it("shift empty", () => { equal(list.shift(), undefined); equal(list.length, 0); deepEqual(Array.from(list), []); }); - it('push 1', () => { + it("push 1", () => { list.push(1); equal(list.length, 1); deepEqual(Array.from(list), [1]); }); - it('push 2', () => { + it("push 2", () => { list.push(2); equal(list.length, 2); deepEqual(Array.from(list), [1, 2]); }); - it('unshift 0', () => { + it("unshift 0", () => { list.unshift(0); equal(list.length, 3); deepEqual(Array.from(list), [0, 1, 2]); }); - it('remove middle node', () => { + it("remove middle node", () => { list.remove(list.head!.next!); equal(list.length, 2); deepEqual(Array.from(list), [0, 2]); }); - it('remove head', () => { + it("remove head", () => { list.remove(list.head!); equal(list.length, 1); deepEqual(Array.from(list), [2]); }); - it('remove tail', () => { + it("remove tail", () => { list.remove(list.tail!); equal(list.length, 0); deepEqual(Array.from(list), []); }); - it('unshift empty queue', () => { + it("unshift empty queue", () => { list.unshift(0); equal(list.length, 1); deepEqual(Array.from(list), [0]); }); - it('push 1', () => { + it("push 1", () => { list.push(1); equal(list.length, 2); deepEqual(Array.from(list), [0, 1]); }); - it('shift', () => { + it("shift", () => { equal(list.shift(), 0); equal(list.length, 1); deepEqual(Array.from(list), [1]); }); - it('shift last element', () => { + it("shift last element", () => { equal(list.shift(), 1); equal(list.length, 0); deepEqual(Array.from(list), []); }); + + it("provide forEach for nodes", () => { + list.reset(); + list.push(1); + list.push(2); + list.push(3); + let count = 0; + for(const _ of list.nodes()) { + count++; + } + equal(count, 3); + for(const _ of list.nodes()) { + count++; + } + equal(count, 6); + }); }); -describe('SinglyLinkedList', () => { +describe("SinglyLinkedList", () => { const list = new SinglyLinkedList(); - it('should start empty', () => { + it("should start empty", () => { equal(list.length, 0); equal(list.head, undefined); equal(list.tail, undefined); deepEqual(Array.from(list), []); }); - it('shift empty', () => { + it("shift empty", () => { equal(list.shift(), undefined); equal(list.length, 0); deepEqual(Array.from(list), []); }); - it('push 1', () => { + it("push 1", () => { list.push(1); equal(list.length, 1); deepEqual(Array.from(list), [1]); }); - it('push 2', () => { + it("push 2", () => { list.push(2); equal(list.length, 2); deepEqual(Array.from(list), [1, 2]); }); - it('push 3', () => { + it("push 3", () => { list.push(3); equal(list.length, 3); deepEqual(Array.from(list), [1, 2, 3]); }); - it('shift 1', () => { + it("shift 1", () => { equal(list.shift(), 1); equal(list.length, 2); deepEqual(Array.from(list), [2, 3]); }); - it('shift 2', () => { + it("shift 2", () => { equal(list.shift(), 2); equal(list.length, 1); deepEqual(Array.from(list), [3]); }); - it('shift 3', () => { + it("shift 3", () => { equal(list.shift(), 3); equal(list.length, 0); deepEqual(Array.from(list), []); }); - it('should be empty', () => { + it("should be empty", () => { equal(list.length, 0); equal(list.head, undefined); equal(list.tail, undefined); }); }); + +describe("EmptyAwareSinglyLinkedList", () => { + it("should emit 'empty' event when reset", () => { + const list = new EmptyAwareSinglyLinkedList(); + let count = 0; + list.events.on("empty", () => count++); + list.push(1); + list.reset(); + equal(count, 1); + list.reset(); + equal(count, 1); + }); + + it("should emit 'empty' event when shift makes the list empty", () => { + const list = new EmptyAwareSinglyLinkedList(); + let count = 0; + list.events.on("empty", () => count++); + list.push(1); + list.push(2); + list.shift(); + equal(count, 0); + list.shift(); + equal(count, 1); + list.shift(); + equal(count, 1); + }); + + it("should emit 'empty' event when remove makes the list empty", () => { + const list = new EmptyAwareSinglyLinkedList(); + let count = 0; + list.events.on("empty", () => count++); + const node1 = list.push(1); + const node2 = list.push(2); + list.remove(node1, undefined); + equal(count, 0); + list.remove(node2, undefined); + equal(count, 1); + }); +}); diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 29678f027b5..461f1d40827 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -1,3 +1,5 @@ +import EventEmitter from "events"; + export interface DoublyLinkedNode { value: T; previous: DoublyLinkedNode | undefined; @@ -32,7 +34,7 @@ export class DoublyLinkedList { next: undefined, value }; - } + } return this.#tail = this.#tail.next = { previous: this.#tail, @@ -93,7 +95,7 @@ export class DoublyLinkedList { node.previous!.next = node.next; node.previous = undefined; } - + node.next = undefined; } @@ -109,6 +111,14 @@ export class DoublyLinkedList { node = node.next; } } + + *nodes() { + let node = this.#head; + while(node) { + yield node; + node = node.next; + } + } } export interface SinglyLinkedNode { @@ -201,3 +211,30 @@ export class SinglyLinkedList { } } } + +export class EmptyAwareSinglyLinkedList extends SinglyLinkedList { + readonly events = new EventEmitter(); + reset() { + const old = this.length; + super.reset(); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + } + shift(): T | undefined { + const old = this.length; + const ret = super.shift(); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + return ret; + } + remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + const old = this.length; + super.remove(node, parent); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + } + +} diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 5f0bcc44929..c5569e86547 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -1,9 +1,10 @@ import { EventEmitter, once } from 'node:events'; import net from 'node:net'; import tls from 'node:tls'; -import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError } from '../errors'; +import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError, SocketTimeoutDuringMaintenanceError } from '../errors'; import { setTimeout } from 'node:timers/promises'; import { RedisArgument } from '../RESP/types'; +import { dbgMaintenance } from './enterprise-maintenance-manager'; type NetOptions = { tls?: false; @@ -60,6 +61,8 @@ export default class RedisSocket extends EventEmitter { readonly #socketFactory; readonly #socketTimeout; + #maintenanceTimeout: number | undefined; + #socket?: net.Socket | tls.TLSSocket; #isOpen = false; @@ -238,6 +241,22 @@ export default class RedisSocket extends EventEmitter { } while (this.#isOpen && !this.#isReady); } + setMaintenanceTimeout(ms?: number) { + dbgMaintenance(`Set socket timeout to ${ms}`); + if (this.#maintenanceTimeout === ms) { + dbgMaintenance(`Socket already set maintenanceCommandTimeout to ${ms}, skipping`); + return; + }; + + this.#maintenanceTimeout = ms; + + if(ms !== undefined) { + this.#socket?.setTimeout(ms); + } else { + this.#socket?.setTimeout(this.#socketTimeout ?? 0); + } + } + async #createSocket(): Promise { const socket = this.#socketFactory.create(); @@ -260,7 +279,10 @@ export default class RedisSocket extends EventEmitter { if (this.#socketTimeout) { socket.once('timeout', () => { - socket.destroy(new SocketTimeoutError(this.#socketTimeout!)); + const error = this.#maintenanceTimeout + ? new SocketTimeoutDuringMaintenanceError(this.#maintenanceTimeout) + : new SocketTimeoutError(this.#socketTimeout!) + socket.destroy(error); }); socket.setTimeout(this.#socketTimeout); } diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index 5cb9166df02..4d9ddf7f2b1 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -71,6 +71,18 @@ export class BlobError extends ErrorReply {} export class TimeoutError extends Error {} +export class SocketTimeoutDuringMaintenanceError extends TimeoutError { + constructor(timeout: number) { + super(`Socket timeout during maintenance. Expecting data, but didn't receive any in ${timeout}ms.`); + } +} + +export class CommandTimeoutDuringMaintenanceError extends TimeoutError { + constructor(timeout: number) { + super(`Command timeout during maintenance. Waited to write command for more than ${timeout}ms.`); + } +} + export class MultiErrorReply extends ErrorReply { replies: Array; errorIndexes: Array; diff --git a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts new file mode 100644 index 00000000000..c9207d1d5eb --- /dev/null +++ b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts @@ -0,0 +1,126 @@ +import diagnostics_channel from "node:diagnostics_channel"; +import { FaultInjectorClient } from "./fault-injector-client"; +import { + getDatabaseConfig, + getDatabaseConfigFromEnv, + getEnvConfig, + RedisConnectionConfig, +} from "./test-scenario.util"; +import { createClient } from "../../.."; +import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; +import { before } from "mocha"; +import { spy } from "sinon"; +import assert from "node:assert"; +import { TestCommandRunner } from "./test-command-runner"; +import net from "node:net"; + +describe("Connection Handoff", () => { + const diagnosticsLog: DiagnosticsEvent[] = []; + + const onMessageHandler = (message: unknown) => { + diagnosticsLog.push(message as DiagnosticsEvent); + }; + + let clientConfig: RedisConnectionConfig; + let client: ReturnType>; + let faultInjectorClient: FaultInjectorClient; + let connectSpy = spy(net, "createConnection"); + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath, + ); + + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientConfig = getDatabaseConfig(redisConfig); + }); + + beforeEach(async () => { + diagnosticsLog.length = 0; + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + connectSpy.resetHistory(); + + client = createClient({ + socket: { + host: clientConfig.host, + port: clientConfig.port, + ...(clientConfig.tls === true ? { tls: true } : {}), + }, + password: clientConfig.password, + username: clientConfig.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "external-ip", + maintRelaxedCommandTimeout: 10000, + maintRelaxedSocketTimeout: 10000, + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + await client.connect(); + await client.flushAll(); + }); + + afterEach(() => { + diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); + client.destroy(); + }); + + describe("New Connection Establishment", () => { + it("should establish new connection", async () => { + assert.equal(connectSpy.callCount, 1); + + const { action_id: lowTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( + lowTimeoutBindAndMigrateActionId, + ); + + await lowTimeoutWaitPromise; + assert.equal(connectSpy.callCount, 2); + }); + }); + + describe("TLS Connection Handoff", () => { + it("TODO receiveMessagesWithTLSEnabledTest", async () => { + // + }); + it("TODO connectionHandoffWithStaticInternalNameTest", async () => { + // + }); + it("TODO connectionHandoffWithStaticExternalNameTest", async () => { + // + }); + }); + + describe("Traffic Resumption", () => { + it("Traffic resumed after handoff", async () => { + const { action_id } = await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const workloadPromise = faultInjectorClient.waitForAction(action_id); + + const commandPromises = + await TestCommandRunner.fireCommandsUntilStopSignal( + client, + workloadPromise, + ); + + const rejected = ( + await Promise.all(commandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.ok(rejected.length === 0); + }); + }); +}); diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts new file mode 100644 index 00000000000..d6635ac42e6 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -0,0 +1,187 @@ +import { setTimeout } from "node:timers/promises"; + +export type ActionType = + | "dmc_restart" + | "failover" + | "reshard" + | "sequence_of_actions" + | "network_failure" + | "execute_rlutil_command" + | "execute_rladmin_command" + | "migrate" + | "bind"; + +export interface ActionRequest { + type: ActionType; + parameters?: { + bdb_id?: string; + [key: string]: unknown; + }; +} + +export interface ActionStatus { + status: string; + error: unknown; + output: string; +} + +export class FaultInjectorClient { + private baseUrl: string; + #fetch: typeof fetch; + + constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) { + this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash + this.#fetch = fetchImpl; + } + + /** + * Lists all available actions. + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public listActions(): Promise { + return this.#request("GET", "/action"); + } + + /** + * Triggers a specific action. + * @param action The action request to trigger + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public triggerAction(action: ActionRequest): Promise { + return this.#request("POST", "/action", action); + } + + /** + * Gets the status of a specific action. + * @param actionId The ID of the action to check + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public getActionStatus(actionId: string): Promise { + return this.#request("GET", `/action/${actionId}`); + } + + /** + * Executes an rladmin command. + * @param command The rladmin command to execute + * @param bdbId Optional database ID to target + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public executeRladminCommand( + command: string, + bdbId?: string + ): Promise { + const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`; + return this.#request("POST", "/rladmin", cmd); + } + + /** + * Waits for an action to complete. + * @param actionId The ID of the action to wait for + * @param options Optional timeout and max wait time + * @throws {Error} When the action does not complete within the max wait time + */ + public async waitForAction( + actionId: string, + { + timeoutMs, + maxWaitTimeMs, + }: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } = {} + ): Promise { + const timeout = timeoutMs || 1000; + const maxWaitTime = maxWaitTimeMs || 60000; + + const startTime = Date.now(); + + while (Date.now() - startTime < maxWaitTime) { + const action = await this.getActionStatus(actionId); + + if (["finished", "failed", "success"].includes(action.status)) { + return action; + } + + await setTimeout(timeout); + } + + throw new Error(`Timeout waiting for action ${actionId}`); + } + + async migrateAndBindAction({ + bdbId, + clusterIndex, + }: { + bdbId: string | number; + clusterIndex: string | number; + }) { + const bdbIdStr = bdbId.toString(); + const clusterIndexStr = clusterIndex.toString(); + + return this.triggerAction<{ + action_id: string; + }>({ + type: "sequence_of_actions", + parameters: { + bdbId: bdbIdStr, + actions: [ + { + type: "migrate", + params: { + cluster_index: clusterIndexStr, + }, + }, + { + type: "bind", + params: { + cluster_index: clusterIndexStr, + bdb_id: bdbIdStr, + }, + }, + ], + }, + }); + } + + async #request( + method: string, + path: string, + body?: Object | string + ): Promise { + const url = `${this.baseUrl}${path}`; + const headers: Record = { + "Content-Type": "application/json", + }; + + let payload: string | undefined; + + if (body) { + if (typeof body === "string") { + headers["Content-Type"] = "text/plain"; + payload = body; + } else { + headers["Content-Type"] = "application/json"; + payload = JSON.stringify(body); + } + } + + const response = await this.#fetch(url, { method, headers, body: payload }); + + if (!response.ok) { + try { + const text = await response.text(); + throw new Error(`HTTP ${response.status} - ${text}`); + } catch { + throw new Error(`HTTP ${response.status}`); + } + } + + try { + return (await response.json()) as T; + } catch { + throw new Error( + `HTTP ${response.status} - Unable to parse response as JSON` + ); + } + } +} diff --git a/packages/client/lib/tests/test-scenario/push-notification.e2e.ts b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts new file mode 100644 index 00000000000..3408931728e --- /dev/null +++ b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts @@ -0,0 +1,94 @@ +import assert from "node:assert"; +import diagnostics_channel from "node:diagnostics_channel"; +import { FaultInjectorClient } from "./fault-injector-client"; +import { + getDatabaseConfig, + getDatabaseConfigFromEnv, + getEnvConfig, + RedisConnectionConfig, +} from "./test-scenario.util"; +import { createClient } from "../../.."; +import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; +import { before } from "mocha"; + +describe("Push Notifications", () => { + const diagnosticsLog: DiagnosticsEvent[] = []; + + const onMessageHandler = (message: unknown) => { + diagnosticsLog.push(message as DiagnosticsEvent); + }; + + let clientConfig: RedisConnectionConfig; + let client: ReturnType>; + let faultInjectorClient: FaultInjectorClient; + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath + ); + + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientConfig = getDatabaseConfig(redisConfig); + }); + + beforeEach(async () => { + diagnosticsLog.length = 0; + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + client = createClient({ + socket: { + host: clientConfig.host, + port: clientConfig.port, + ...(clientConfig.tls === true ? { tls: true } : {}), + }, + password: clientConfig.password, + username: clientConfig.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "external-ip", + maintRelaxedCommandTimeout: 10000, + maintRelaxedSocketTimeout: 10000, + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + await client.connect(); + }); + + afterEach(() => { + diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); + client.destroy(); + }); + + it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { + const { action_id: migrateActionId } = + await faultInjectorClient.triggerAction<{ action_id: string }>({ + type: "migrate", + parameters: { + cluster_index: "0", + }, + }); + + await faultInjectorClient.waitForAction(migrateActionId); + + const { action_id: bindActionId } = + await faultInjectorClient.triggerAction<{ action_id: string }>({ + type: "bind", + parameters: { + cluster_index: "0", + bdb_id: `${clientConfig.bdbId}`, + }, + }); + + await faultInjectorClient.waitForAction(bindActionId); + + const pushNotificationLogs = diagnosticsLog.filter((log) => { + return ["MOVING", "MIGRATING", "MIGRATED"].includes(log?.type); + }); + + assert.strictEqual(pushNotificationLogs.length, 3); + }); +}); diff --git a/packages/client/lib/tests/test-scenario/test-command-runner.ts b/packages/client/lib/tests/test-scenario/test-command-runner.ts new file mode 100644 index 00000000000..9e1acc3a8ab --- /dev/null +++ b/packages/client/lib/tests/test-scenario/test-command-runner.ts @@ -0,0 +1,108 @@ +import { randomUUID } from "node:crypto"; +import { setTimeout } from "node:timers/promises"; +import { createClient } from "../../.."; + +/** + * Options for the `fireCommandsUntilStopSignal` method + */ +type FireCommandsUntilStopSignalOptions = { + /** + * Number of commands to fire in each batch + */ + batchSize: number; + /** + * Timeout between batches in milliseconds + */ + timeoutMs: number; + /** + * Function that creates the commands to be executed + */ + createCommands: ( + client: ReturnType> + ) => Array<() => Promise>; +}; + +/** + * Utility class for running test commands until a stop signal is received + */ +export class TestCommandRunner { + private static readonly defaultOptions: FireCommandsUntilStopSignalOptions = { + batchSize: 60, + timeoutMs: 10, + createCommands: ( + client: ReturnType> + ) => [ + () => client.set(randomUUID(), Date.now()), + () => client.get(randomUUID()), + ], + }; + + static #toSettled(p: Promise) { + return p + .then((value) => ({ status: "fulfilled" as const, value, error: null })) + .catch((reason) => ({ + status: "rejected" as const, + value: null, + error: reason, + })); + } + + static async #racePromises({ + timeout, + stopper, + }: { + timeout: Promise; + stopper: Promise; + }) { + return Promise.race([ + TestCommandRunner.#toSettled(timeout).then((result) => ({ + ...result, + stop: false, + })), + TestCommandRunner.#toSettled(stopper).then((result) => ({ + ...result, + stop: true, + })), + ]); + } + + /** + * Fires a batch of test commands until a stop signal is received + * @param client - The Redis client to use + * @param stopSignalPromise - Promise that resolves when the execution should stop + * @param options - Options for the command execution + * @returns An object containing the promises of all executed commands and the result of the stop signal + */ + static async fireCommandsUntilStopSignal( + client: ReturnType>, + stopSignalPromise: Promise, + options?: Partial + ) { + const executeOptions = { + ...TestCommandRunner.defaultOptions, + ...options, + }; + + const commandPromises = []; + + while (true) { + for (let i = 0; i < executeOptions.batchSize; i++) { + for (const command of executeOptions.createCommands(client)) { + commandPromises.push(TestCommandRunner.#toSettled(command())); + } + } + + const result = await TestCommandRunner.#racePromises({ + timeout: setTimeout(executeOptions.timeoutMs), + stopper: stopSignalPromise, + }); + + if (result.stop) { + return { + commandPromises, + stopResult: result, + }; + } + } + } +} diff --git a/packages/client/lib/tests/test-scenario/test-scenario.util.ts b/packages/client/lib/tests/test-scenario/test-scenario.util.ts new file mode 100644 index 00000000000..b130cdc5386 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/test-scenario.util.ts @@ -0,0 +1,197 @@ +import { readFileSync } from "fs"; +import { createClient, RedisClientOptions } from "../../.."; +import { stub } from "sinon"; + +type DatabaseEndpoint = { + addr: string[]; + addr_type: string; + dns_name: string; + oss_cluster_api_preferred_endpoint_type: string; + oss_cluster_api_preferred_ip_type: string; + port: number; + proxy_policy: string; + uid: string; +}; + +type DatabaseConfig = { + bdb_id: number; + username: string; + password: string; + tls: boolean; + raw_endpoints: DatabaseEndpoint[]; + endpoints: string[]; +}; + +type DatabasesConfig = { + [databaseName: string]: DatabaseConfig; +}; + +type EnvConfig = { + redisEndpointsConfigPath: string; + faultInjectorUrl: string; +}; + +/** + * Reads environment variables required for the test scenario + * @returns Environment configuration object + * @throws Error if required environment variables are not set + */ +export function getEnvConfig(): EnvConfig { + if (!process.env.REDIS_ENDPOINTS_CONFIG_PATH) { + throw new Error( + "REDIS_ENDPOINTS_CONFIG_PATH environment variable must be set" + ); + } + + if (!process.env.FAULT_INJECTION_API_URL) { + throw new Error("FAULT_INJECTION_API_URL environment variable must be set"); + } + + return { + redisEndpointsConfigPath: process.env.REDIS_ENDPOINTS_CONFIG_PATH, + faultInjectorUrl: process.env.FAULT_INJECTION_API_URL, + }; +} + +/** + * Reads database configuration from a file + * @param filePath - The path to the database configuration file + * @returns Parsed database configuration object + * @throws Error if file doesn't exist or JSON is invalid + */ +export function getDatabaseConfigFromEnv(filePath: string): DatabasesConfig { + try { + const fileContent = readFileSync(filePath, "utf8"); + return JSON.parse(fileContent) as DatabasesConfig; + } catch (error) { + throw new Error(`Failed to read or parse database config from ${filePath}`); + } +} + +export interface RedisConnectionConfig { + host: string; + port: number; + username: string; + password: string; + tls: boolean; + bdbId: number; +} + +/** + * Gets Redis connection parameters for a specific database + * @param databasesConfig - The parsed database configuration object + * @param databaseName - Optional name of the database to retrieve (defaults to the first one) + * @returns Redis connection configuration with host, port, username, password, and tls + * @throws Error if the specified database is not found in the configuration + */ +export function getDatabaseConfig( + databasesConfig: DatabasesConfig, + databaseName?: string +): RedisConnectionConfig { + const dbConfig = databaseName + ? databasesConfig[databaseName] + : Object.values(databasesConfig)[0]; + + if (!dbConfig) { + throw new Error( + `Database ${databaseName ? databaseName : ""} not found in configuration` + ); + } + + const endpoint = dbConfig.raw_endpoints[0]; // Use the first endpoint + + return { + host: endpoint.dns_name, + port: endpoint.port, + username: dbConfig.username, + password: dbConfig.password, + tls: dbConfig.tls, + bdbId: dbConfig.bdb_id, + }; +} + +// TODO this should be moved in the tests utils package +export async function blockSetImmediate(fn: () => Promise) { + let setImmediateStub: any; + + try { + setImmediateStub = stub(global, "setImmediate"); + setImmediateStub.callsFake(() => { + //Dont call the callback, effectively blocking execution + }); + await fn(); + } finally { + if (setImmediateStub) { + setImmediateStub.restore(); + } + } +} + +/** + * Factory class for creating and managing Redis clients + */ +export class ClientFactory { + private readonly clients = new Map< + string, + ReturnType> + >(); + + constructor(private readonly config: RedisConnectionConfig) {} + + /** + * Creates a new client with the specified options and connects it to the database + * @param key - The key to store the client under + * @param options - Optional client options + * @returns The created and connected client + */ + async create(key: string, options: Partial = {}) { + const client = createClient({ + socket: { + host: this.config.host, + port: this.config.port, + ...(this.config.tls === true ? { tls: true } : {}), + }, + password: this.config.password, + username: this.config.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "auto", + ...options, + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + await client.connect(); + + this.clients.set(key, client); + + return client; + } + + /** + * Gets an existing client by key or the first one if no key is provided + * @param key - The key of the client to retrieve + * @returns The client if found, undefined otherwise + */ + get(key?: string) { + if (key) { + return this.clients.get(key); + } + + // Get the first one if no key is provided + return this.clients.values().next().value; + } + + /** + * Destroys all created clients + */ + destroyAll() { + this.clients.forEach((client) => { + if (client && client.isOpen) { + client.destroy(); + } + }); + } +} diff --git a/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts new file mode 100644 index 00000000000..7bdf23fcb15 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts @@ -0,0 +1,159 @@ +import assert from "node:assert"; + +import { FaultInjectorClient } from "./fault-injector-client"; +import { + ClientFactory, + getDatabaseConfig, + getDatabaseConfigFromEnv, + getEnvConfig, + RedisConnectionConfig, + blockSetImmediate +} from "./test-scenario.util"; +import { createClient } from "../../.."; +import { before } from "mocha"; +import { TestCommandRunner } from "./test-command-runner"; + +describe("Timeout Handling During Notifications", () => { + let clientConfig: RedisConnectionConfig; + let clientFactory: ClientFactory; + let faultInjectorClient: FaultInjectorClient; + let defaultClient: ReturnType>; + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath + ); + + clientConfig = getDatabaseConfig(redisConfig); + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientFactory = new ClientFactory(clientConfig); + }); + + beforeEach(async () => { + defaultClient = await clientFactory.create("default"); + + await defaultClient.flushAll(); + }); + + afterEach(async () => { + clientFactory.destroyAll(); + }); + + it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => { + // PART 1 + // Set very low timeout to trigger errors + const lowTimeoutClient = await clientFactory.create("lowTimeout", { + maintRelaxedCommandTimeout: 50, + }); + + const { action_id: lowTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( + lowTimeoutBindAndMigrateActionId + ); + + const lowTimeoutCommandPromises = + await TestCommandRunner.fireCommandsUntilStopSignal( + lowTimeoutClient, + lowTimeoutWaitPromise + ); + + const lowTimeoutRejectedCommands = ( + await Promise.all(lowTimeoutCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.ok(lowTimeoutRejectedCommands.length > 0); + assert.strictEqual( + lowTimeoutRejectedCommands.filter((rejected) => { + return ( + // TODO instanceof doesn't work for some reason + rejected.error.constructor.name === + "CommandTimeoutDuringMaintananceError" + ); + }).length, + lowTimeoutRejectedCommands.length + ); + + // PART 2 + // Set high timeout to avoid errors + const highTimeoutClient = await clientFactory.create("highTimeout", { + maintRelaxedCommandTimeout: 10000, + }); + + const { action_id: highTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const highTimeoutWaitPromise = faultInjectorClient.waitForAction( + highTimeoutBindAndMigrateActionId + ); + + const highTimeoutCommandPromises = + await TestCommandRunner.fireCommandsUntilStopSignal( + highTimeoutClient, + highTimeoutWaitPromise + ); + + const highTimeoutRejectedCommands = ( + await Promise.all(highTimeoutCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.strictEqual(highTimeoutRejectedCommands.length, 0); + }); + + it("should unrelax command timeout after MAINTENANCE", async () => { + const clientWithCommandTimeout = await clientFactory.create( + "clientWithCommandTimeout", + { + commandOptions: { + timeout: 100, + }, + } + ); + + const { action_id: bindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( + bindAndMigrateActionId + ); + + const relaxedTimeoutCommandPromises = + await TestCommandRunner.fireCommandsUntilStopSignal( + clientWithCommandTimeout, + lowTimeoutWaitPromise + ); + + const relaxedTimeoutRejectedCommands = ( + await Promise.all(relaxedTimeoutCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.ok(relaxedTimeoutRejectedCommands.length === 0); + + const start = performance.now(); + + let error: any; + await blockSetImmediate(async () => { + try { + await clientWithCommandTimeout.set("key", "value"); + } catch (err: any) { + error = err; + } + }); + + // Make sure it took less than 1sec to fail + assert.ok(performance.now() - start < 1000); + assert.ok(error instanceof Error); + assert.ok(error.constructor.name === "TimeoutError"); + }); +}); diff --git a/packages/entraid/package.json b/packages/entraid/package.json index 9991fa3fb89..272747d1a89 100644 --- a/packages/entraid/package.json +++ b/packages/entraid/package.json @@ -22,7 +22,7 @@ "@azure/msal-node": "^2.16.1" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" }, "devDependencies": { "@types/express": "^4.17.21", diff --git a/packages/json/package.json b/packages/json/package.json index ff689dd17ee..a1db4e44b8d 100644 --- a/packages/json/package.json +++ b/packages/json/package.json @@ -13,7 +13,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" }, "devDependencies": { "@redis/test-utils": "*" diff --git a/packages/redis/package.json b/packages/redis/package.json index 583a6606817..ed2715d06f7 100644 --- a/packages/redis/package.json +++ b/packages/redis/package.json @@ -13,11 +13,11 @@ "release": "release-it" }, "dependencies": { - "@redis/bloom": "5.8.2", - "@redis/client": "5.8.2", - "@redis/json": "5.8.2", - "@redis/search": "5.8.2", - "@redis/time-series": "5.8.2" + "@redis/bloom": "^5.8.2 || ^5.9.0-0", + "@redis/client": "^5.8.2 || ^5.9.0-0", + "@redis/json": "^5.8.2 || ^5.9.0-0", + "@redis/search": "^5.8.2 || ^5.9.0-0", + "@redis/time-series": "^5.8.2 || ^5.9.0-0" }, "engines": { "node": ">= 18" diff --git a/packages/search/package.json b/packages/search/package.json index 40238080e8b..20fe27aad60 100644 --- a/packages/search/package.json +++ b/packages/search/package.json @@ -14,7 +14,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" }, "devDependencies": { "@redis/test-utils": "*" diff --git a/packages/time-series/package.json b/packages/time-series/package.json index 46ea5b16fef..0e13ac1a11c 100644 --- a/packages/time-series/package.json +++ b/packages/time-series/package.json @@ -13,7 +13,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" }, "devDependencies": { "@redis/test-utils": "*" From 550767e3aec49ad83653bde531d5e97c470a6ce6 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:19 +0000 Subject: [PATCH 02/14] Release client@5.9.0-beta.0 --- package-lock.json | 22 +++++++++++----------- packages/client/package.json | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/package-lock.json b/package-lock.json index 288e109c979..77f386ddf23 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7330,12 +7330,12 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" } }, "packages/client": { "name": "@redis/client", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "dependencies": { "cluster-key-slot": "1.1.2" @@ -7370,7 +7370,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" } }, "packages/entraid/node_modules/@types/node": { @@ -7416,18 +7416,18 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" } }, "packages/redis": { "version": "5.8.2", "license": "MIT", "dependencies": { - "@redis/bloom": "5.8.2", - "@redis/client": "5.8.2", - "@redis/json": "5.8.2", - "@redis/search": "5.8.2", - "@redis/time-series": "5.8.2" + "@redis/bloom": "^5.8.2 || ^5.9.0-0", + "@redis/client": "^5.8.2 || ^5.9.0-0", + "@redis/json": "^5.8.2 || ^5.9.0-0", + "@redis/search": "^5.8.2 || ^5.9.0-0", + "@redis/time-series": "^5.8.2 || ^5.9.0-0" }, "engines": { "node": ">= 18" @@ -7444,7 +7444,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" } }, "packages/test-utils": { @@ -7522,7 +7522,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2" + "@redis/client": "^5.8.2 || ^5.9.0-0" } } } diff --git a/packages/client/package.json b/packages/client/package.json index 1332083bf18..2b8ad41610a 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -1,6 +1,6 @@ { "name": "@redis/client", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/index.js", "types": "./dist/index.d.ts", From ceeca035d017cd379763a6f54cb9465d4aa1e24d Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:24 +0000 Subject: [PATCH 03/14] Release bloom@5.9.0-beta.0 --- package-lock.json | 4 ++-- packages/bloom/package.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 77f386ddf23..dc0ed7fa66d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7321,7 +7321,7 @@ }, "packages/bloom": { "name": "@redis/bloom", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -7330,7 +7330,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" } }, "packages/client": { diff --git a/packages/bloom/package.json b/packages/bloom/package.json index 4f46bce4ad7..6947934fc4b 100644 --- a/packages/bloom/package.json +++ b/packages/bloom/package.json @@ -1,6 +1,6 @@ { "name": "@redis/bloom", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/lib/index.js", "types": "./dist/lib/index.d.ts", @@ -13,7 +13,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" }, "devDependencies": { "@redis/test-utils": "*" From 24ec2606b054d627325efc030ccf78ea00eb1ec8 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:30 +0000 Subject: [PATCH 04/14] Release json@5.9.0-beta.0 --- package-lock.json | 4 ++-- packages/json/package.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index dc0ed7fa66d..13e15fe14bd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7407,7 +7407,7 @@ }, "packages/json": { "name": "@redis/json", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -7416,7 +7416,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" } }, "packages/redis": { diff --git a/packages/json/package.json b/packages/json/package.json index a1db4e44b8d..552d67ac39b 100644 --- a/packages/json/package.json +++ b/packages/json/package.json @@ -1,6 +1,6 @@ { "name": "@redis/json", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/lib/index.js", "types": "./dist/lib/index.d.ts", @@ -13,7 +13,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" }, "devDependencies": { "@redis/test-utils": "*" From 35cf4844621ef4e88f38f44e03096a4a759fca47 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:35 +0000 Subject: [PATCH 05/14] Release search@5.9.0-beta.0 --- package-lock.json | 4 ++-- packages/search/package.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 13e15fe14bd..0f0721fc098 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7435,7 +7435,7 @@ }, "packages/search": { "name": "@redis/search", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -7444,7 +7444,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" } }, "packages/test-utils": { diff --git a/packages/search/package.json b/packages/search/package.json index 20fe27aad60..0ca44b22737 100644 --- a/packages/search/package.json +++ b/packages/search/package.json @@ -1,6 +1,6 @@ { "name": "@redis/search", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/lib/index.js", "types": "./dist/lib/index.d.ts", @@ -14,7 +14,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" }, "devDependencies": { "@redis/test-utils": "*" From aee6f058535e4eafddd04194901817ffb4afe1a8 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:41 +0000 Subject: [PATCH 06/14] Release time-series@5.9.0-beta.0 --- package-lock.json | 4 ++-- packages/time-series/package.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 0f0721fc098..605458b1c1e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7513,7 +7513,7 @@ }, "packages/time-series": { "name": "@redis/time-series", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -7522,7 +7522,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" } } } diff --git a/packages/time-series/package.json b/packages/time-series/package.json index 0e13ac1a11c..81e170b467a 100644 --- a/packages/time-series/package.json +++ b/packages/time-series/package.json @@ -1,6 +1,6 @@ { "name": "@redis/time-series", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/lib/index.js", "types": "./dist/lib/index.d.ts", @@ -13,7 +13,7 @@ "release": "release-it" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" }, "devDependencies": { "@redis/test-utils": "*" From 1df34a4b115e354ea38b9ba696822b1196f2bc40 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:46 +0000 Subject: [PATCH 07/14] Release entraid@5.9.0-beta.0 --- package-lock.json | 4 ++-- packages/entraid/package.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 605458b1c1e..662a6e2cbcd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7351,7 +7351,7 @@ }, "packages/entraid": { "name": "@redis/entraid", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "dependencies": { "@azure/identity": "^4.7.0", @@ -7370,7 +7370,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" } }, "packages/entraid/node_modules/@types/node": { diff --git a/packages/entraid/package.json b/packages/entraid/package.json index 272747d1a89..b02fdc7ea5e 100644 --- a/packages/entraid/package.json +++ b/packages/entraid/package.json @@ -1,6 +1,6 @@ { "name": "@redis/entraid", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/index.js", "types": "./dist/index.d.ts", @@ -22,7 +22,7 @@ "@azure/msal-node": "^2.16.1" }, "peerDependencies": { - "@redis/client": "^5.8.2 || ^5.9.0-0" + "@redis/client": "^5.9.0-beta.0" }, "devDependencies": { "@types/express": "^4.17.21", From e2702b63f2f7317f1745216ceebe3d1686c17dd0 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 10 Sep 2025 09:04:52 +0000 Subject: [PATCH 08/14] Release redis@5.9.0-beta.0 --- package-lock.json | 12 ++++++------ packages/redis/package.json | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/package-lock.json b/package-lock.json index 662a6e2cbcd..664dbb68ceb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7420,14 +7420,14 @@ } }, "packages/redis": { - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "dependencies": { - "@redis/bloom": "^5.8.2 || ^5.9.0-0", - "@redis/client": "^5.8.2 || ^5.9.0-0", - "@redis/json": "^5.8.2 || ^5.9.0-0", - "@redis/search": "^5.8.2 || ^5.9.0-0", - "@redis/time-series": "^5.8.2 || ^5.9.0-0" + "@redis/bloom": "5.9.0-beta.0", + "@redis/client": "5.9.0-beta.0", + "@redis/json": "5.9.0-beta.0", + "@redis/search": "5.9.0-beta.0", + "@redis/time-series": "5.9.0-beta.0" }, "engines": { "node": ">= 18" diff --git a/packages/redis/package.json b/packages/redis/package.json index ed2715d06f7..06b76cc5cbd 100644 --- a/packages/redis/package.json +++ b/packages/redis/package.json @@ -1,7 +1,7 @@ { "name": "redis", "description": "A modern, high performance Redis client", - "version": "5.8.2", + "version": "5.9.0-beta.0", "license": "MIT", "main": "./dist/index.js", "types": "./dist/index.d.ts", @@ -13,11 +13,11 @@ "release": "release-it" }, "dependencies": { - "@redis/bloom": "^5.8.2 || ^5.9.0-0", - "@redis/client": "^5.8.2 || ^5.9.0-0", - "@redis/json": "^5.8.2 || ^5.9.0-0", - "@redis/search": "^5.8.2 || ^5.9.0-0", - "@redis/time-series": "^5.8.2 || ^5.9.0-0" + "@redis/bloom": "5.9.0-beta.0", + "@redis/client": "5.9.0-beta.0", + "@redis/json": "5.9.0-beta.0", + "@redis/search": "5.9.0-beta.0", + "@redis/time-series": "5.9.0-beta.0" }, "engines": { "node": ">= 18" From 98da3e5307d80e9d8c126632fa4dde5e43e66250 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Wed, 10 Sep 2025 17:40:51 +0300 Subject: [PATCH 09/14] refactor(test): improve test scenario reliability and maintainability --- .../test-scenario/connection-handoff.e2e.ts | 63 +--- .../test-scenario/fault-injector-client.ts | 16 +- .../test-scenario/push-notification.e2e.ts | 137 +++++--- .../test-scenario/test-command-runner.ts | 108 ------ .../tests/test-scenario/test-scenario.util.ts | 117 +++---- .../timeout-during-notifications.e2e.ts | 313 +++++++++++++----- packages/client/tsconfig.json | 3 +- 7 files changed, 377 insertions(+), 380 deletions(-) delete mode 100644 packages/client/lib/tests/test-scenario/test-command-runner.ts diff --git a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts index c9207d1d5eb..27e79756911 100644 --- a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts +++ b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts @@ -1,35 +1,27 @@ -import diagnostics_channel from "node:diagnostics_channel"; import { FaultInjectorClient } from "./fault-injector-client"; import { + createTestClient, getDatabaseConfig, getDatabaseConfigFromEnv, getEnvConfig, RedisConnectionConfig, } from "./test-scenario.util"; import { createClient } from "../../.."; -import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; import { before } from "mocha"; import { spy } from "sinon"; import assert from "node:assert"; -import { TestCommandRunner } from "./test-command-runner"; import net from "node:net"; describe("Connection Handoff", () => { - const diagnosticsLog: DiagnosticsEvent[] = []; - - const onMessageHandler = (message: unknown) => { - diagnosticsLog.push(message as DiagnosticsEvent); - }; - let clientConfig: RedisConnectionConfig; - let client: ReturnType>; + let client: ReturnType>; let faultInjectorClient: FaultInjectorClient; let connectSpy = spy(net, "createConnection"); before(() => { const envConfig = getEnvConfig(); const redisConfig = getDatabaseConfigFromEnv( - envConfig.redisEndpointsConfigPath, + envConfig.redisEndpointsConfigPath ); faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); @@ -37,37 +29,17 @@ describe("Connection Handoff", () => { }); beforeEach(async () => { - diagnosticsLog.length = 0; - diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); - connectSpy.resetHistory(); - client = createClient({ - socket: { - host: clientConfig.host, - port: clientConfig.port, - ...(clientConfig.tls === true ? { tls: true } : {}), - }, - password: clientConfig.password, - username: clientConfig.username, - RESP: 3, - maintPushNotifications: "auto", - maintMovingEndpointType: "external-ip", - maintRelaxedCommandTimeout: 10000, - maintRelaxedSocketTimeout: 10000, - }); + client = await createTestClient(clientConfig); - client.on("error", (err: Error) => { - throw new Error(`Client error: ${err.message}`); - }); - - await client.connect(); await client.flushAll(); }); afterEach(() => { - diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); - client.destroy(); + if (client && client.isOpen) { + client.destroy(); + } }); describe("New Connection Establishment", () => { @@ -80,11 +52,8 @@ describe("Connection Handoff", () => { clusterIndex: 0, }); - const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( - lowTimeoutBindAndMigrateActionId, - ); + await faultInjectorClient.waitForAction(lowTimeoutBindAndMigrateActionId); - await lowTimeoutWaitPromise; assert.equal(connectSpy.callCount, 2); }); }); @@ -108,19 +77,13 @@ describe("Connection Handoff", () => { clusterIndex: 0, }); - const workloadPromise = faultInjectorClient.waitForAction(action_id); - - const commandPromises = - await TestCommandRunner.fireCommandsUntilStopSignal( - client, - workloadPromise, - ); + await faultInjectorClient.waitForAction(action_id); - const rejected = ( - await Promise.all(commandPromises.commandPromises) - ).filter((result) => result.status === "rejected"); + const currentTime = Date.now().toString(); + await client.set("key", currentTime); + const result = await client.get("key"); - assert.ok(rejected.length === 0); + assert.strictEqual(result, currentTime); }); }); }); diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts index d6635ac42e6..22f67211820 100644 --- a/packages/client/lib/tests/test-scenario/fault-injector-client.ts +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -47,7 +47,7 @@ export class FaultInjectorClient { * @param action The action request to trigger * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON */ - public triggerAction(action: ActionRequest): Promise { + public triggerAction(action: ActionRequest): Promise { return this.#request("POST", "/action", action); } @@ -60,20 +60,6 @@ export class FaultInjectorClient { return this.#request("GET", `/action/${actionId}`); } - /** - * Executes an rladmin command. - * @param command The rladmin command to execute - * @param bdbId Optional database ID to target - * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON - */ - public executeRladminCommand( - command: string, - bdbId?: string - ): Promise { - const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`; - return this.#request("POST", "/rladmin", cmd); - } - /** * Waits for an action to complete. * @param actionId The ID of the action to wait for diff --git a/packages/client/lib/tests/test-scenario/push-notification.e2e.ts b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts index 3408931728e..cfe714dbd22 100644 --- a/packages/client/lib/tests/test-scenario/push-notification.e2e.ts +++ b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts @@ -2,6 +2,7 @@ import assert from "node:assert"; import diagnostics_channel from "node:diagnostics_channel"; import { FaultInjectorClient } from "./fault-injector-client"; import { + createTestClient, getDatabaseConfig, getDatabaseConfigFromEnv, getEnvConfig, @@ -12,14 +13,21 @@ import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; import { before } from "mocha"; describe("Push Notifications", () => { - const diagnosticsLog: DiagnosticsEvent[] = []; - - const onMessageHandler = (message: unknown) => { - diagnosticsLog.push(message as DiagnosticsEvent); + const createNotificationMessageHandler = ( + result: Record, + notifications: Array + ) => { + return (message: unknown) => { + if (notifications.includes((message as DiagnosticsEvent).type)) { + const event = message as DiagnosticsEvent; + result[event.type] = (result[event.type] ?? 0) + 1; + } + }; }; + let onMessageHandler: ReturnType; let clientConfig: RedisConnectionConfig; - let client: ReturnType>; + let client: ReturnType>; let faultInjectorClient: FaultInjectorClient; before(() => { @@ -33,62 +41,97 @@ describe("Push Notifications", () => { }); beforeEach(async () => { - diagnosticsLog.length = 0; - diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + client = await createTestClient(clientConfig); - client = createClient({ - socket: { - host: clientConfig.host, - port: clientConfig.port, - ...(clientConfig.tls === true ? { tls: true } : {}), - }, - password: clientConfig.password, - username: clientConfig.username, - RESP: 3, - maintPushNotifications: "auto", - maintMovingEndpointType: "external-ip", - maintRelaxedCommandTimeout: 10000, - maintRelaxedSocketTimeout: 10000, - }); - - client.on("error", (err: Error) => { - throw new Error(`Client error: ${err.message}`); - }); - - await client.connect(); + await client.flushAll(); }); afterEach(() => { - diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); - client.destroy(); + if (onMessageHandler!) { + diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); + } + + if (client && client.isOpen) { + client.destroy(); + } }); it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { - const { action_id: migrateActionId } = - await faultInjectorClient.triggerAction<{ action_id: string }>({ - type: "migrate", - parameters: { - cluster_index: "0", - }, + const notifications: Array = [ + "MOVING", + "MIGRATING", + "MIGRATED", + ]; + + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: bindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, }); - await faultInjectorClient.waitForAction(migrateActionId); + await faultInjectorClient.waitForAction(bindAndMigrateActionId); - const { action_id: bindActionId } = - await faultInjectorClient.triggerAction<{ action_id: string }>({ - type: "bind", + assert.strictEqual( + diagnosticsMap.MOVING, + 1, + "Should have received exactly one MOVING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATING, + 1, + "Should have received exactly one MIGRATING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATED, + 1, + "Should have received exactly one MIGRATED notification" + ); + }); + + it("should receive FAILING_OVER and FAILED_OVER push notifications", async () => { + const notifications: Array = [ + "FAILING_OVER", + "FAILED_OVER", + ]; + + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", parameters: { - cluster_index: "0", - bdb_id: `${clientConfig.bdbId}`, + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, }, }); - await faultInjectorClient.waitForAction(bindActionId); + await faultInjectorClient.waitForAction(failoverActionId); - const pushNotificationLogs = diagnosticsLog.filter((log) => { - return ["MOVING", "MIGRATING", "MIGRATED"].includes(log?.type); - }); - - assert.strictEqual(pushNotificationLogs.length, 3); + assert.strictEqual( + diagnosticsMap.FAILING_OVER, + 1, + "Should have received exactly one FAILING_OVER notification" + ); + assert.strictEqual( + diagnosticsMap.FAILED_OVER, + 1, + "Should have received exactly one FAILED_OVER notification" + ); }); }); diff --git a/packages/client/lib/tests/test-scenario/test-command-runner.ts b/packages/client/lib/tests/test-scenario/test-command-runner.ts deleted file mode 100644 index 9e1acc3a8ab..00000000000 --- a/packages/client/lib/tests/test-scenario/test-command-runner.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { randomUUID } from "node:crypto"; -import { setTimeout } from "node:timers/promises"; -import { createClient } from "../../.."; - -/** - * Options for the `fireCommandsUntilStopSignal` method - */ -type FireCommandsUntilStopSignalOptions = { - /** - * Number of commands to fire in each batch - */ - batchSize: number; - /** - * Timeout between batches in milliseconds - */ - timeoutMs: number; - /** - * Function that creates the commands to be executed - */ - createCommands: ( - client: ReturnType> - ) => Array<() => Promise>; -}; - -/** - * Utility class for running test commands until a stop signal is received - */ -export class TestCommandRunner { - private static readonly defaultOptions: FireCommandsUntilStopSignalOptions = { - batchSize: 60, - timeoutMs: 10, - createCommands: ( - client: ReturnType> - ) => [ - () => client.set(randomUUID(), Date.now()), - () => client.get(randomUUID()), - ], - }; - - static #toSettled(p: Promise) { - return p - .then((value) => ({ status: "fulfilled" as const, value, error: null })) - .catch((reason) => ({ - status: "rejected" as const, - value: null, - error: reason, - })); - } - - static async #racePromises({ - timeout, - stopper, - }: { - timeout: Promise; - stopper: Promise; - }) { - return Promise.race([ - TestCommandRunner.#toSettled(timeout).then((result) => ({ - ...result, - stop: false, - })), - TestCommandRunner.#toSettled(stopper).then((result) => ({ - ...result, - stop: true, - })), - ]); - } - - /** - * Fires a batch of test commands until a stop signal is received - * @param client - The Redis client to use - * @param stopSignalPromise - Promise that resolves when the execution should stop - * @param options - Options for the command execution - * @returns An object containing the promises of all executed commands and the result of the stop signal - */ - static async fireCommandsUntilStopSignal( - client: ReturnType>, - stopSignalPromise: Promise, - options?: Partial - ) { - const executeOptions = { - ...TestCommandRunner.defaultOptions, - ...options, - }; - - const commandPromises = []; - - while (true) { - for (let i = 0; i < executeOptions.batchSize; i++) { - for (const command of executeOptions.createCommands(client)) { - commandPromises.push(TestCommandRunner.#toSettled(command())); - } - } - - const result = await TestCommandRunner.#racePromises({ - timeout: setTimeout(executeOptions.timeoutMs), - stopper: stopSignalPromise, - }); - - if (result.stop) { - return { - commandPromises, - stopResult: result, - }; - } - } - } -} diff --git a/packages/client/lib/tests/test-scenario/test-scenario.util.ts b/packages/client/lib/tests/test-scenario/test-scenario.util.ts index b130cdc5386..9a8ef7c6e47 100644 --- a/packages/client/lib/tests/test-scenario/test-scenario.util.ts +++ b/packages/client/lib/tests/test-scenario/test-scenario.util.ts @@ -110,8 +110,18 @@ export function getDatabaseConfig( }; } -// TODO this should be moved in the tests utils package -export async function blockSetImmediate(fn: () => Promise) { +/** + * Executes the provided function in a context where setImmediate is stubbed to not do anything. + * This blocks setImmediate callbacks from executing + * + * @param command - The command to execute + * @returns The error and duration of the command execution + */ +export async function blockCommand(command: () => Promise) { + let error: any; + + const start = performance.now(); + let setImmediateStub: any; try { @@ -119,79 +129,50 @@ export async function blockSetImmediate(fn: () => Promise) { setImmediateStub.callsFake(() => { //Dont call the callback, effectively blocking execution }); - await fn(); + await command(); + } catch (err: any) { + error = err; } finally { if (setImmediateStub) { setImmediateStub.restore(); } } + + return { + error, + duration: performance.now() - start, + }; } /** - * Factory class for creating and managing Redis clients + * Creates a test client with the provided configuration, connects it and attaches an error handler listener + * @param clientConfig - The Redis connection configuration + * @param options - Optional client options + * @returns The created Redis client */ -export class ClientFactory { - private readonly clients = new Map< - string, - ReturnType> - >(); - - constructor(private readonly config: RedisConnectionConfig) {} - - /** - * Creates a new client with the specified options and connects it to the database - * @param key - The key to store the client under - * @param options - Optional client options - * @returns The created and connected client - */ - async create(key: string, options: Partial = {}) { - const client = createClient({ - socket: { - host: this.config.host, - port: this.config.port, - ...(this.config.tls === true ? { tls: true } : {}), - }, - password: this.config.password, - username: this.config.username, - RESP: 3, - maintPushNotifications: "auto", - maintMovingEndpointType: "auto", - ...options, - }); - - client.on("error", (err: Error) => { - throw new Error(`Client error: ${err.message}`); - }); - - await client.connect(); - - this.clients.set(key, client); - - return client; - } - - /** - * Gets an existing client by key or the first one if no key is provided - * @param key - The key of the client to retrieve - * @returns The client if found, undefined otherwise - */ - get(key?: string) { - if (key) { - return this.clients.get(key); - } - - // Get the first one if no key is provided - return this.clients.values().next().value; - } - - /** - * Destroys all created clients - */ - destroyAll() { - this.clients.forEach((client) => { - if (client && client.isOpen) { - client.destroy(); - } - }); - } +export async function createTestClient( + clientConfig: RedisConnectionConfig, + options: Partial = {} +) { + const client = createClient({ + socket: { + host: clientConfig.host, + port: clientConfig.port, + ...(clientConfig.tls === true ? { tls: true } : {}), + }, + password: clientConfig.password, + username: clientConfig.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "auto", + ...options, + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + await client.connect(); + + return client; } diff --git a/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts index 7bdf23fcb15..a60aacb703c 100644 --- a/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts +++ b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts @@ -2,22 +2,48 @@ import assert from "node:assert"; import { FaultInjectorClient } from "./fault-injector-client"; import { - ClientFactory, getDatabaseConfig, getDatabaseConfigFromEnv, getEnvConfig, RedisConnectionConfig, - blockSetImmediate + blockCommand, + createTestClient, } from "./test-scenario.util"; import { createClient } from "../../.."; import { before } from "mocha"; -import { TestCommandRunner } from "./test-command-runner"; +import diagnostics_channel from "node:diagnostics_channel"; +import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; describe("Timeout Handling During Notifications", () => { let clientConfig: RedisConnectionConfig; - let clientFactory: ClientFactory; let faultInjectorClient: FaultInjectorClient; - let defaultClient: ReturnType>; + let client: ReturnType>; + + const NORMAL_COMMAND_TIMEOUT = 50; + const RELAXED_COMMAND_TIMEOUT = 2000; + + /** + * Creates a handler for the `redis.maintenance` channel that will execute and block a command on the client + * when a notification is received and save the result in the `result` object. + * This is used to test that the command timeout is relaxed during notifications. + */ + const createNotificationMessageHandler = ( + client: ReturnType>, + result: Record, + notifications: Array + ) => { + return (message: unknown) => { + if (notifications.includes((message as DiagnosticsEvent).type)) { + setImmediate(async () => { + result[(message as DiagnosticsEvent).type] = await blockCommand( + async () => { + await client.set("key", "value"); + } + ); + }); + } + }; + }; before(() => { const envConfig = getEnvConfig(); @@ -27,133 +53,238 @@ describe("Timeout Handling During Notifications", () => { clientConfig = getDatabaseConfig(redisConfig); faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); - clientFactory = new ClientFactory(clientConfig); }); beforeEach(async () => { - defaultClient = await clientFactory.create("default"); + client = await createTestClient(clientConfig, { + commandOptions: { timeout: NORMAL_COMMAND_TIMEOUT }, + maintRelaxedCommandTimeout: RELAXED_COMMAND_TIMEOUT, + }); - await defaultClient.flushAll(); + await client.flushAll(); }); - afterEach(async () => { - clientFactory.destroyAll(); + afterEach(() => { + if (client && client.isOpen) { + client.destroy(); + } }); - it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => { + it("should relax command timeout on MOVING, MIGRATING", async () => { // PART 1 - // Set very low timeout to trigger errors - const lowTimeoutClient = await clientFactory.create("lowTimeout", { - maintRelaxedCommandTimeout: 50, + // Normal command timeout + const { error, duration } = await blockCommand(async () => { + await client.set("key", "value"); }); - const { action_id: lowTimeoutBindAndMigrateActionId } = - await faultInjectorClient.migrateAndBindAction({ - bdbId: clientConfig.bdbId, - clusterIndex: 0, - }); - - const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( - lowTimeoutBindAndMigrateActionId + assert.ok( + error instanceof Error, + "Command Timeout error should be instanceof Error" + ); + assert.ok( + duration > NORMAL_COMMAND_TIMEOUT && + duration < NORMAL_COMMAND_TIMEOUT * 1.1, + `Normal command should timeout within normal timeout ms` + ); + assert.strictEqual( + error?.constructor?.name, + "TimeoutError", + "Command Timeout error should be TimeoutError" ); - const lowTimeoutCommandPromises = - await TestCommandRunner.fireCommandsUntilStopSignal( - lowTimeoutClient, - lowTimeoutWaitPromise - ); + // PART 2 + // Command timeout during maintenance + const notifications: Array = [ + "MOVING", + "MIGRATING", + ]; - const lowTimeoutRejectedCommands = ( - await Promise.all(lowTimeoutCommandPromises.commandPromises) - ).filter((result) => result.status === "rejected"); + const result: Record< + DiagnosticsEvent["type"], + { error: any; duration: number } + > = {}; - assert.ok(lowTimeoutRejectedCommands.length > 0); - assert.strictEqual( - lowTimeoutRejectedCommands.filter((rejected) => { - return ( - // TODO instanceof doesn't work for some reason - rejected.error.constructor.name === - "CommandTimeoutDuringMaintananceError" - ); - }).length, - lowTimeoutRejectedCommands.length + const onMessageHandler = createNotificationMessageHandler( + client, + result, + notifications ); - // PART 2 - // Set high timeout to avoid errors - const highTimeoutClient = await clientFactory.create("highTimeout", { - maintRelaxedCommandTimeout: 10000, - }); + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); - const { action_id: highTimeoutBindAndMigrateActionId } = + const { action_id: bindAndMigrateActionId } = await faultInjectorClient.migrateAndBindAction({ bdbId: clientConfig.bdbId, clusterIndex: 0, }); - const highTimeoutWaitPromise = faultInjectorClient.waitForAction( - highTimeoutBindAndMigrateActionId - ); + await faultInjectorClient.waitForAction(bindAndMigrateActionId); + + diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); - const highTimeoutCommandPromises = - await TestCommandRunner.fireCommandsUntilStopSignal( - highTimeoutClient, - highTimeoutWaitPromise + notifications.forEach((notification) => { + assert.ok( + result[notification]?.error instanceof Error, + `${notification} notification error should be instanceof Error` + ); + assert.ok( + result[notification]?.duration > RELAXED_COMMAND_TIMEOUT && + result[notification]?.duration < RELAXED_COMMAND_TIMEOUT * 1.1, + `${notification} notification should timeout within relaxed timeout` ); + assert.strictEqual( + result[notification]?.error?.constructor?.name, + "CommandTimeoutDuringMaintenanceError", + `${notification} notification error should be CommandTimeoutDuringMaintenanceError` + ); + }); + }); - const highTimeoutRejectedCommands = ( - await Promise.all(highTimeoutCommandPromises.commandPromises) - ).filter((result) => result.status === "rejected"); + it("should unrelax command timeout after MIGRATED and MOVING", async () => { + const { action_id: migrateActionId } = + await faultInjectorClient.triggerAction({ + type: "migrate", + parameters: { + cluster_index: 0, + }, + }); - assert.strictEqual(highTimeoutRejectedCommands.length, 0); - }); + await faultInjectorClient.waitForAction(migrateActionId); + + // PART 1 + // After migration + const { error: errorMigrate, duration: durationMigrate } = + await blockCommand(async () => { + await client.set("key", "value"); + }); + + assert.ok( + errorMigrate instanceof Error, + "Command Timeout error should be instanceof Error" + ); + assert.ok( + durationMigrate > NORMAL_COMMAND_TIMEOUT && + durationMigrate < NORMAL_COMMAND_TIMEOUT * 1.1, + `Normal command should timeout within normal timeout ms` + ); + assert.strictEqual( + errorMigrate?.constructor?.name, + "TimeoutError", + "Command Timeout error should be TimeoutError" + ); - it("should unrelax command timeout after MAINTENANCE", async () => { - const clientWithCommandTimeout = await clientFactory.create( - "clientWithCommandTimeout", + const { action_id: bindActionId } = await faultInjectorClient.triggerAction( { - commandOptions: { - timeout: 100, + type: "bind", + parameters: { + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, }, } ); - const { action_id: bindAndMigrateActionId } = - await faultInjectorClient.migrateAndBindAction({ - bdbId: clientConfig.bdbId, - clusterIndex: 0, - }); + await faultInjectorClient.waitForAction(bindActionId); - const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( - bindAndMigrateActionId + // PART 2 + // After bind + const { error: errorBind, duration: durationBind } = await blockCommand( + async () => { + await client.set("key", "value"); + } ); - const relaxedTimeoutCommandPromises = - await TestCommandRunner.fireCommandsUntilStopSignal( - clientWithCommandTimeout, - lowTimeoutWaitPromise - ); + assert.ok( + errorBind instanceof Error, + "Command Timeout error should be instanceof Error" + ); + assert.ok( + durationBind > NORMAL_COMMAND_TIMEOUT && + durationBind < NORMAL_COMMAND_TIMEOUT * 1.1, + `Normal command should timeout within normal timeout ms` + ); + assert.strictEqual( + errorBind?.constructor?.name, + "TimeoutError", + "Command Timeout error should be TimeoutError" + ); + }); - const relaxedTimeoutRejectedCommands = ( - await Promise.all(relaxedTimeoutCommandPromises.commandPromises) - ).filter((result) => result.status === "rejected"); + it("should relax command timeout on FAILING_OVER", async () => { + const notifications: Array = ["FAILING_OVER"]; - assert.ok(relaxedTimeoutRejectedCommands.length === 0); + const result: Record< + DiagnosticsEvent["type"], + { error: any; duration: number } + > = {}; - const start = performance.now(); + const onMessageHandler = createNotificationMessageHandler( + client, + result, + notifications + ); - let error: any; - await blockSetImmediate(async () => { - try { - await clientWithCommandTimeout.set("key", "value"); - } catch (err: any) { - error = err; - } + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + await faultInjectorClient.waitForAction(failoverActionId); + + diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); + + notifications.forEach((notification) => { + assert.ok( + result[notification]?.error instanceof Error, + `${notification} notification error should be instanceof Error` + ); + assert.ok( + result[notification]?.duration > RELAXED_COMMAND_TIMEOUT && + result[notification]?.duration < RELAXED_COMMAND_TIMEOUT * 1.1, + `${notification} notification should timeout within relaxed timeout` + ); + assert.strictEqual( + result[notification]?.error?.constructor?.name, + "CommandTimeoutDuringMaintenanceError", + `${notification} notification error should be CommandTimeoutDuringMaintenanceError` + ); }); + }); - // Make sure it took less than 1sec to fail - assert.ok(performance.now() - start < 1000); - assert.ok(error instanceof Error); - assert.ok(error.constructor.name === "TimeoutError"); + it("should unrelax command timeout after FAILED_OVER", async () => { + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + await faultInjectorClient.waitForAction(failoverActionId); + + const { error, duration } = await blockCommand(async () => { + await client.set("key", "value"); + }); + + assert.ok( + error instanceof Error, + "Command Timeout error should be instanceof Error" + ); + assert.ok( + duration > NORMAL_COMMAND_TIMEOUT && + duration < NORMAL_COMMAND_TIMEOUT * 1.1, + `Normal command should timeout within normal timeout ms` + ); + assert.strictEqual( + error?.constructor?.name, + "TimeoutError", + "Command Timeout error should be TimeoutError" + ); }); }); diff --git a/packages/client/tsconfig.json b/packages/client/tsconfig.json index b1f7b44d915..f87c7d4f533 100644 --- a/packages/client/tsconfig.json +++ b/packages/client/tsconfig.json @@ -11,7 +11,8 @@ "exclude": [ "./lib/test-utils.ts", "./lib/**/*.spec.ts", - "./lib/sentinel/test-util.ts" + "./lib/sentinel/test-util.ts", + "./lib/tests/**/*.ts" ], "typedocOptions": { "entryPoints": [ From 953fb9a33d9a69dc627ef21421882c1043896c07 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 11 Sep 2025 12:41:09 +0300 Subject: [PATCH 10/14] tests: add resp3 check test (#1) --- .../lib/tests/test-scenario/negative-tests.e2e.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 packages/client/lib/tests/test-scenario/negative-tests.e2e.ts diff --git a/packages/client/lib/tests/test-scenario/negative-tests.e2e.ts b/packages/client/lib/tests/test-scenario/negative-tests.e2e.ts new file mode 100644 index 00000000000..9e90b80c502 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/negative-tests.e2e.ts @@ -0,0 +1,15 @@ +import assert from "assert"; +import { createClient } from "../../.."; + +describe("Negative tests", () => { + it("should only be enabled with RESP3", () => { + assert.throws( + () => + createClient({ + RESP: 2, + maintPushNotifications: "enabled", + }), + "Error: Graceful Maintenance is only supported with RESP3", + ); + }); +}); From 0fb62bd5b4cdba44c907587a4d3408d4ff925a05 Mon Sep 17 00:00:00 2001 From: Pavel Pashov <60297174+PavelPashov@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:21:09 +0300 Subject: [PATCH 11/14] test: refactor connection handoff tests with enhanced spy utility (#2) --- .../test-scenario/connection-handoff.e2e.ts | 169 +++++++++++++----- 1 file changed, 128 insertions(+), 41 deletions(-) diff --git a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts index 27e79756911..3c71dff7ec4 100644 --- a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts +++ b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts @@ -6,17 +6,56 @@ import { getEnvConfig, RedisConnectionConfig, } from "./test-scenario.util"; -import { createClient } from "../../.."; +import { createClient, RedisClientOptions } from "../../.."; import { before } from "mocha"; -import { spy } from "sinon"; +import Sinon, { SinonSpy, spy, stub } from "sinon"; import assert from "node:assert"; -import net from "node:net"; + +/** + * Creates a spy on a duplicated client method + * @param client - The Redis client instance + * @param funcName - The name of the method to spy on + * @returns Object containing the promise that resolves with the spy and restore function + */ +const spyOnTemporaryClientInstanceMethod = ( + client: ReturnType>, + methodName: string +) => { + const { promise, resolve } = ( + Promise as typeof Promise & { + withResolvers: () => { + promise: Promise<{ spy: SinonSpy; restore: () => void }>; + resolve: (value: any) => void; + }; + } + ).withResolvers(); + + const originalDuplicate = client.duplicate.bind(client); + + const duplicateStub: Sinon.SinonStub = stub( + // Temporary clients (in the context of hitless upgrade) + // are created by calling the duplicate method on the client. + Object.getPrototypeOf(client), + "duplicate" + ).callsFake((opts) => { + const tmpClient = originalDuplicate(opts); + resolve({ + spy: spy(tmpClient, methodName), + restore: duplicateStub.restore, + }); + + return tmpClient; + }); + + return { + getSpy: () => promise, + }; +}; describe("Connection Handoff", () => { let clientConfig: RedisConnectionConfig; let client: ReturnType>; let faultInjectorClient: FaultInjectorClient; - let connectSpy = spy(net, "createConnection"); before(() => { const envConfig = getEnvConfig(); @@ -28,62 +67,110 @@ describe("Connection Handoff", () => { clientConfig = getDatabaseConfig(redisConfig); }); - beforeEach(async () => { - connectSpy.resetHistory(); - - client = await createTestClient(clientConfig); - - await client.flushAll(); - }); - - afterEach(() => { + afterEach(async () => { if (client && client.isOpen) { + await client.flushAll(); client.destroy(); } }); - describe("New Connection Establishment", () => { - it("should establish new connection", async () => { - assert.equal(connectSpy.callCount, 1); - - const { action_id: lowTimeoutBindAndMigrateActionId } = - await faultInjectorClient.migrateAndBindAction({ - bdbId: clientConfig.bdbId, - clusterIndex: 0, - }); - - await faultInjectorClient.waitForAction(lowTimeoutBindAndMigrateActionId); - - assert.equal(connectSpy.callCount, 2); - }); + describe("New Connection Establishment & Traffic Resumption", () => { + const cases: Array<{ + name: string; + clientOptions: Partial; + }> = [ + { + name: "default options", + clientOptions: {}, + }, + { + name: "external-ip", + clientOptions: { + maintMovingEndpointType: "external-ip", + }, + }, + { + name: "external-fqdn", + clientOptions: { + maintMovingEndpointType: "external-fqdn", + }, + }, + { + name: "auto", + clientOptions: { + maintMovingEndpointType: "auto", + }, + }, + { + name: "none", + clientOptions: { + maintMovingEndpointType: "none", + }, + }, + ]; + + for (const { name, clientOptions } of cases) { + it.only(`should establish new connection and resume traffic afterwards - ${name}`, async () => { + client = await createTestClient(clientConfig, clientOptions); + + const spyObject = spyOnTemporaryClientInstanceMethod(client, "connect"); + + // PART 1 Establish initial connection + const { action_id: lowTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction( + lowTimeoutBindAndMigrateActionId + ); + + const spyResult = await spyObject.getSpy(); + + assert.strictEqual(spyResult.spy.callCount, 1); + + // PART 2 Verify traffic resumption + const currentTime = Date.now().toString(); + await client.set("key", currentTime); + const result = await client.get("key"); + + assert.strictEqual(result, currentTime); + + spyResult.restore(); + }); + } }); describe("TLS Connection Handoff", () => { - it("TODO receiveMessagesWithTLSEnabledTest", async () => { + it.skip("TODO receiveMessagesWithTLSEnabledTest", async () => { // }); - it("TODO connectionHandoffWithStaticInternalNameTest", async () => { + it.skip("TODO connectionHandoffWithStaticInternalNameTest", async () => { // }); - it("TODO connectionHandoffWithStaticExternalNameTest", async () => { + it.skip("TODO connectionHandoffWithStaticExternalNameTest", async () => { // }); }); - describe("Traffic Resumption", () => { - it("Traffic resumed after handoff", async () => { - const { action_id } = await faultInjectorClient.migrateAndBindAction({ - bdbId: clientConfig.bdbId, - clusterIndex: 0, - }); + describe("Connection Cleanup", () => { + it("should shut down old connection", async () => { + const spyObject = spyOnTemporaryClientInstanceMethod(client, "destroy"); + + const { action_id: lowTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(lowTimeoutBindAndMigrateActionId); - await faultInjectorClient.waitForAction(action_id); + const spyResult = await spyObject.getSpy(); - const currentTime = Date.now().toString(); - await client.set("key", currentTime); - const result = await client.get("key"); + assert.equal(spyResult.spy.callCount, 1); - assert.strictEqual(result, currentTime); + spyResult.restore(); }); }); }); From 8d4d531de89ac7e508dbcf7d3f8526fc6e94bb6f Mon Sep 17 00:00:00 2001 From: Pavel Pashov <60297174+PavelPashov@users.noreply.github.com> Date: Fri, 12 Sep 2025 11:57:57 +0300 Subject: [PATCH 12/14] test: add comprehensive push notification disabled scenarios (#3) --- .../test-scenario/connection-handoff.e2e.ts | 2 +- .../test-scenario/fault-injector-client.ts | 7 +- .../test-scenario/push-notification.e2e.ts | 348 ++++++++++++++---- .../tests/test-scenario/test-scenario.util.ts | 4 - 4 files changed, 286 insertions(+), 75 deletions(-) diff --git a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts index 3c71dff7ec4..3fbf5e38d40 100644 --- a/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts +++ b/packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts @@ -110,7 +110,7 @@ describe("Connection Handoff", () => { ]; for (const { name, clientOptions } of cases) { - it.only(`should establish new connection and resume traffic afterwards - ${name}`, async () => { + it(`should establish new connection and resume traffic afterwards - ${name}`, async () => { client = await createTestClient(clientConfig, clientOptions); const spyObject = spyOnTemporaryClientInstanceMethod(client, "connect"); diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts index 22f67211820..13c81412b18 100644 --- a/packages/client/lib/tests/test-scenario/fault-injector-client.ts +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -9,7 +9,8 @@ export type ActionType = | "execute_rlutil_command" | "execute_rladmin_command" | "migrate" - | "bind"; + | "bind" + | "update_cluster_config"; export interface ActionRequest { type: ActionType; @@ -47,7 +48,9 @@ export class FaultInjectorClient { * @param action The action request to trigger * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON */ - public triggerAction(action: ActionRequest): Promise { + public triggerAction( + action: ActionRequest + ): Promise { return this.#request("POST", "/action", action); } diff --git a/packages/client/lib/tests/test-scenario/push-notification.e2e.ts b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts index cfe714dbd22..9962d0a02dc 100644 --- a/packages/client/lib/tests/test-scenario/push-notification.e2e.ts +++ b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts @@ -40,12 +40,6 @@ describe("Push Notifications", () => { clientConfig = getDatabaseConfig(redisConfig); }); - beforeEach(async () => { - client = await createTestClient(clientConfig); - - await client.flushAll(); - }); - afterEach(() => { if (onMessageHandler!) { diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); @@ -56,82 +50,300 @@ describe("Push Notifications", () => { } }); - it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { - const notifications: Array = [ - "MOVING", - "MIGRATING", - "MIGRATED", - ]; + describe("Push Notifications Enabled", () => { + beforeEach(async () => { + client = await createTestClient(clientConfig); - const diagnosticsMap: Record = {}; + await client.flushAll(); + }); - onMessageHandler = createNotificationMessageHandler( - diagnosticsMap, - notifications - ); + it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { + const notifications: Array = [ + "MOVING", + "MIGRATING", + "MIGRATED", + ]; - diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + const diagnosticsMap: Record = {}; - const { action_id: bindAndMigrateActionId } = - await faultInjectorClient.migrateAndBindAction({ - bdbId: clientConfig.bdbId, - clusterIndex: 0, - }); + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); - await faultInjectorClient.waitForAction(bindAndMigrateActionId); + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); - assert.strictEqual( - diagnosticsMap.MOVING, - 1, - "Should have received exactly one MOVING notification" - ); - assert.strictEqual( - diagnosticsMap.MIGRATING, - 1, - "Should have received exactly one MIGRATING notification" - ); - assert.strictEqual( - diagnosticsMap.MIGRATED, - 1, - "Should have received exactly one MIGRATED notification" - ); + const { action_id: bindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(bindAndMigrateActionId); + + assert.strictEqual( + diagnosticsMap.MOVING, + 1, + "Should have received exactly one MOVING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATING, + 1, + "Should have received exactly one MIGRATING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATED, + 1, + "Should have received exactly one MIGRATED notification" + ); + }); + + it("should receive FAILING_OVER and FAILED_OVER push notifications", async () => { + const notifications: Array = [ + "FAILING_OVER", + "FAILED_OVER", + ]; + + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + await faultInjectorClient.waitForAction(failoverActionId); + + assert.strictEqual( + diagnosticsMap.FAILING_OVER, + 1, + "Should have received exactly one FAILING_OVER notification" + ); + assert.strictEqual( + diagnosticsMap.FAILED_OVER, + 1, + "Should have received exactly one FAILED_OVER notification" + ); + }); }); - it("should receive FAILING_OVER and FAILED_OVER push notifications", async () => { - const notifications: Array = [ - "FAILING_OVER", - "FAILED_OVER", - ]; + describe("Push Notifications Disabled - Client", () => { + beforeEach(async () => { + client = await createTestClient(clientConfig, { + maintPushNotifications: "disabled", + }); - const diagnosticsMap: Record = {}; + client.on("error", (_err) => { + // Expect the socket to be closed + // Ignore errors + }); - onMessageHandler = createNotificationMessageHandler( - diagnosticsMap, - notifications - ); + await client.flushAll(); + }); + + it("should NOT receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { + const notifications: Array = [ + "MOVING", + "MIGRATING", + "MIGRATED", + ]; + + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: bindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(bindAndMigrateActionId); + + assert.strictEqual( + diagnosticsMap.MOVING, + undefined, + "Should NOT have received exactly one MOVING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATING, + undefined, + "Should NOT have received exactly one MIGRATING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATED, + undefined, + "Should NOT have received exactly one MIGRATED notification" + ); + }); - diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + it("should NOT receive FAILING_OVER and FAILED_OVER push notifications", async () => { + const notifications: Array = [ + "FAILING_OVER", + "FAILED_OVER", + ]; - const { action_id: failoverActionId } = - await faultInjectorClient.triggerAction({ - type: "failover", - parameters: { - bdb_id: clientConfig.bdbId.toString(), - cluster_index: 0, - }, + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + await faultInjectorClient.waitForAction(failoverActionId); + + assert.strictEqual( + diagnosticsMap.FAILING_OVER, + undefined, + "Should have received exactly one FAILING_OVER notification" + ); + assert.strictEqual( + diagnosticsMap.FAILED_OVER, + undefined, + "Should have received exactly one FAILED_OVER notification" + ); + }); + }); + + describe("Push Notifications Disabled - Server", () => { + beforeEach(async () => { + client = await createTestClient(clientConfig); + + client.on("error", (_err) => { + // Expect the socket to be closed + // Ignore errors }); - await faultInjectorClient.waitForAction(failoverActionId); + await client.flushAll(); + }); - assert.strictEqual( - diagnosticsMap.FAILING_OVER, - 1, - "Should have received exactly one FAILING_OVER notification" - ); - assert.strictEqual( - diagnosticsMap.FAILED_OVER, - 1, - "Should have received exactly one FAILED_OVER notification" - ); + before(async () => { + const { action_id: disablePushNotificationsActionId } = + await faultInjectorClient.triggerAction({ + type: "update_cluster_config", + parameters: { + config: { client_maint_notifications: false }, + }, + }); + + await faultInjectorClient.waitForAction(disablePushNotificationsActionId); + }); + + after(async () => { + const { action_id: enablePushNotificationsActionId } = + await faultInjectorClient.triggerAction({ + type: "update_cluster_config", + parameters: { + config: { client_maint_notifications: true }, + }, + }); + + await faultInjectorClient.waitForAction(enablePushNotificationsActionId); + }); + + it("should NOT receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { + const notifications: Array = [ + "MOVING", + "MIGRATING", + "MIGRATED", + ]; + + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: bindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(bindAndMigrateActionId); + + assert.strictEqual( + diagnosticsMap.MOVING, + undefined, + "Should NOT have received exactly one MOVING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATING, + undefined, + "Should NOT have received exactly one MIGRATING notification" + ); + assert.strictEqual( + diagnosticsMap.MIGRATED, + undefined, + "Should NOT have received exactly one MIGRATED notification" + ); + }); + + it("should NOT receive FAILING_OVER and FAILED_OVER push notifications", async () => { + const notifications: Array = [ + "FAILING_OVER", + "FAILED_OVER", + ]; + + const diagnosticsMap: Record = {}; + + onMessageHandler = createNotificationMessageHandler( + diagnosticsMap, + notifications + ); + + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + await faultInjectorClient.waitForAction(failoverActionId); + + assert.strictEqual( + diagnosticsMap.FAILING_OVER, + undefined, + "Should have received exactly one FAILING_OVER notification" + ); + assert.strictEqual( + diagnosticsMap.FAILED_OVER, + undefined, + "Should have received exactly one FAILED_OVER notification" + ); + }); }); }); diff --git a/packages/client/lib/tests/test-scenario/test-scenario.util.ts b/packages/client/lib/tests/test-scenario/test-scenario.util.ts index 9a8ef7c6e47..c98ba90fe19 100644 --- a/packages/client/lib/tests/test-scenario/test-scenario.util.ts +++ b/packages/client/lib/tests/test-scenario/test-scenario.util.ts @@ -168,10 +168,6 @@ export async function createTestClient( ...options, }); - client.on("error", (err: Error) => { - throw new Error(`Client error: ${err.message}`); - }); - await client.connect(); return client; From 4e339e727d53faf5b82f1457cb92701ed78826df Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 15 Sep 2025 13:20:01 +0300 Subject: [PATCH 13/14] tests: add params config tests (#4) --- .../client/enterprise-maintenance-manager.ts | 21 ++- .../tests/test-scenario/configuration.e2e.ts | 155 ++++++++++++++++++ 2 files changed, 168 insertions(+), 8 deletions(-) create mode 100644 packages/client/lib/tests/test-scenario/configuration.e2e.ts diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index d4766d9e533..631fb1f7115 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -51,9 +51,10 @@ interface Client { _pause: () => void; _unpause: () => void; _maintenanceUpdate: (update: MaintenanceUpdate) => void; - duplicate: (options: RedisClientOptions) => Client; + duplicate: () => Client; connect: () => Promise; destroy: () => void; + on: (event: string, callback: (value: unknown) => void) => void; } export default class EnterpriseMaintenanceManager { @@ -211,21 +212,25 @@ export default class EnterpriseMaintenanceManager { dbgMaintenance("Creating new tmp client"); let start = performance.now(); - const tmpOptions = this.#options; // If the URL is provided, it takes precedense - if(tmpOptions.url) { - const u = new URL(tmpOptions.url); + // the options object could just be mutated + if(this.#options.url) { + const u = new URL(this.#options.url); u.hostname = host; u.port = String(port); - tmpOptions.url = u.toString(); + this.#options.url = u.toString(); } else { - tmpOptions.socket = { - ...tmpOptions.socket, + this.#options.socket = { + ...this.#options.socket, host, port } } - const tmpClient = this.#client.duplicate(tmpOptions); + const tmpClient = this.#client.duplicate(); + tmpClient.on('error', (error: unknown) => { + //We dont know how to handle tmp client errors + dbgMaintenance(`[ERR]`, error) + }); dbgMaintenance(`Tmp client created in ${( performance.now() - start ).toFixed(2)}ms`); dbgMaintenance( `Set timeout for tmp client to ${this.#options.maintRelaxedSocketTimeout}`, diff --git a/packages/client/lib/tests/test-scenario/configuration.e2e.ts b/packages/client/lib/tests/test-scenario/configuration.e2e.ts new file mode 100644 index 00000000000..1df8ae60094 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/configuration.e2e.ts @@ -0,0 +1,155 @@ +import assert from "node:assert"; +import diagnostics_channel from "node:diagnostics_channel"; +import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; + +import { + RedisConnectionConfig, + createTestClient, + getDatabaseConfig, + getDatabaseConfigFromEnv, + getEnvConfig, +} from "./test-scenario.util"; +import { createClient } from "../../.."; +import { FaultInjectorClient } from "./fault-injector-client"; +import { MovingEndpointType } from "../../../dist/lib/client/enterprise-maintenance-manager"; +import { RedisTcpSocketOptions } from "../../client/socket"; + +describe("Parameter Configuration", () => { + describe("Handshake with endpoint type", () => { + let clientConfig: RedisConnectionConfig; + let client: ReturnType>; + let faultInjectorClient: FaultInjectorClient; + let log: DiagnosticsEvent[] = []; + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath, + ); + + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientConfig = getDatabaseConfig(redisConfig); + + diagnostics_channel.subscribe("redis.maintenance", (event) => { + log.push(event as DiagnosticsEvent); + }); + }); + + beforeEach(() => { + log.length = 0; + }); + + afterEach(async () => { + if (client && client.isOpen) { + await client.flushAll(); + client.destroy(); + } + }); + + const endpoints: MovingEndpointType[] = [ + "auto", + // "internal-ip", + // "internal-fqdn", + "external-ip", + "external-fqdn", + "none", + ]; + + for (const endpointType of endpoints) { + it(`should request \`${endpointType}\` movingEndpointType and receive it`, async () => { + try { + client = await createTestClient(clientConfig, { + maintMovingEndpointType: endpointType, + }); + client.on('error', () => {}) + + //need to copy those because they will be mutated later + const oldOptions = JSON.parse(JSON.stringify(client.options)); + assert.ok(oldOptions); + + const { action_id } = await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(action_id); + + const movingEvent = log.find((event) => event.type === "MOVING"); + assert(!!movingEvent, "Didnt receive moving PN"); + + let endpoint: string | undefined; + try { + //@ts-ignore + endpoint = movingEvent.data.push[3]; + } catch (err) { + assert( + false, + `couldnt get endpoint from event ${JSON.stringify(movingEvent)}`, + ); + } + + assert(endpoint !== undefined, "no endpoint"); + + const newOptions = client.options; + assert.ok(newOptions); + + if (oldOptions?.url) { + if (endpointType === "none") { + assert.equal( + newOptions!.url, + oldOptions.url, + "For movingEndpointTpe 'none', we expect old and new url to be the same", + ); + } else { + assert.equal( + newOptions.url, + endpoint, + "Expected what came through the wire to be set in the new client", + ); + assert.notEqual( + newOptions!.url, + oldOptions.url, + `For movingEndpointTpe ${endpointType}, we expect old and new url to be different`, + ); + } + } else { + const oldSocket = oldOptions.socket as RedisTcpSocketOptions; + const newSocket = newOptions.socket as RedisTcpSocketOptions; + assert.ok(oldSocket); + assert.ok(newSocket); + + if (endpointType === "none") { + assert.equal( + newSocket.host, + oldSocket.host, + "For movingEndpointTpe 'none', we expect old and new host to be the same", + ); + } else { + assert.equal( + newSocket.host + ":" + newSocket.port, + endpoint, + "Expected what came through the wire to be set in the new client", + ); + assert.notEqual( + newSocket.host, + oldSocket.host, + `For movingEndpointTpe ${endpointType}, we expect old and new host to be different`, + ); + } + } + } catch (error: any) { + console.log('endpointType', endpointType); + console.log('caught error', error); + if ( + endpointType === "internal-fqdn" || + endpointType === "internal-ip" + ) { + // errors are expected here, because we cannot connect to internal endpoints unless we are deployed in the same place as the server + } else { + assert(false, error); + } + } + }); + } + }); +}); From 74a2d340660b3216ab07dc37713fc8aa9af3cd31 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 17 Sep 2025 16:03:28 +0300 Subject: [PATCH 14/14] tests: add feature enablement tests (#5) --- .../tests/test-scenario/configuration.e2e.ts | 110 +++++++++++++----- 1 file changed, 78 insertions(+), 32 deletions(-) diff --git a/packages/client/lib/tests/test-scenario/configuration.e2e.ts b/packages/client/lib/tests/test-scenario/configuration.e2e.ts index 1df8ae60094..a648375f6e4 100644 --- a/packages/client/lib/tests/test-scenario/configuration.e2e.ts +++ b/packages/client/lib/tests/test-scenario/configuration.e2e.ts @@ -14,38 +14,38 @@ import { FaultInjectorClient } from "./fault-injector-client"; import { MovingEndpointType } from "../../../dist/lib/client/enterprise-maintenance-manager"; import { RedisTcpSocketOptions } from "../../client/socket"; -describe("Parameter Configuration", () => { - describe("Handshake with endpoint type", () => { - let clientConfig: RedisConnectionConfig; - let client: ReturnType>; - let faultInjectorClient: FaultInjectorClient; - let log: DiagnosticsEvent[] = []; - - before(() => { - const envConfig = getEnvConfig(); - const redisConfig = getDatabaseConfigFromEnv( - envConfig.redisEndpointsConfigPath, - ); - - faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); - clientConfig = getDatabaseConfig(redisConfig); - - diagnostics_channel.subscribe("redis.maintenance", (event) => { - log.push(event as DiagnosticsEvent); - }); +describe("Client Configuration and Handshake", () => { + let clientConfig: RedisConnectionConfig; + let client: ReturnType>; + let faultInjectorClient: FaultInjectorClient; + let log: DiagnosticsEvent[] = []; + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath, + ); + + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientConfig = getDatabaseConfig(redisConfig); + + diagnostics_channel.subscribe("redis.maintenance", (event) => { + log.push(event as DiagnosticsEvent); }); + }); - beforeEach(() => { - log.length = 0; - }); + beforeEach(() => { + log.length = 0; + }); - afterEach(async () => { - if (client && client.isOpen) { - await client.flushAll(); - client.destroy(); - } - }); + afterEach(async () => { + if (client && client.isOpen) { + await client.flushAll(); + client.destroy(); + } + }); + describe("Parameter Configuration", () => { const endpoints: MovingEndpointType[] = [ "auto", // "internal-ip", @@ -56,12 +56,12 @@ describe("Parameter Configuration", () => { ]; for (const endpointType of endpoints) { - it(`should request \`${endpointType}\` movingEndpointType and receive it`, async () => { + it(`clientHandshakeWithEndpointType '${endpointType}'`, async () => { try { client = await createTestClient(clientConfig, { maintMovingEndpointType: endpointType, }); - client.on('error', () => {}) + client.on("error", () => {}); //need to copy those because they will be mutated later const oldOptions = JSON.parse(JSON.stringify(client.options)); @@ -138,8 +138,6 @@ describe("Parameter Configuration", () => { } } } catch (error: any) { - console.log('endpointType', endpointType); - console.log('caught error', error); if ( endpointType === "internal-fqdn" || endpointType === "internal-ip" @@ -152,4 +150,52 @@ describe("Parameter Configuration", () => { }); } }); + + describe("Feature Enablement", () => { + it("connectionHandshakeIncludesEnablingNotifications", async () => { + client = await createTestClient(clientConfig, { + maintPushNotifications: "enabled", + }); + + const { action_id } = await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(action_id); + + let movingEvent = false; + let migratingEvent = false; + let migratedEvent = false; + for (const event of log) { + if (event.type === "MOVING") movingEvent = true; + if (event.type === "MIGRATING") migratingEvent = true; + if (event.type === "MIGRATED") migratedEvent = true; + } + assert.ok(movingEvent, "didnt receive MOVING PN"); + assert.ok(migratingEvent, "didnt receive MIGRATING PN"); + assert.ok(migratedEvent, "didnt receive MIGRATED PN"); + }); + + it("disabledDontReceiveNotifications", async () => { + try { + client = await createTestClient(clientConfig, { + maintPushNotifications: "disabled", + socket: { + reconnectStrategy: false + } + }); + client.on('error', console.log.bind(console)) + + const { action_id } = await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + await faultInjectorClient.waitForAction(action_id); + + assert.equal(log.length, 0, "received a PN while feature is disabled"); + } catch (error: any) { } + }); + }); });