diff --git a/CHANGELOG.md b/CHANGELOG.md index 2827443b7a5..c839a088c6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Node: Drop support for Node.js 16.x and 18.x. Minimum supported version is now Node.js 20.x. #### Changes +* Node: add dynamic PubSub support ([#5295](https://github.com/valkey-io/valkey-glide/pull/5295)) * JAVA: Add EVAL_RO, EVALSHA_RO, and SCRIPT DEBUG commands ([#5125](https://github.com/valkey-io/valkey-glide/pull/5125)) * CORE: Add client certificate and private key support for mTLS ([#5092](https://github.com/valkey-io/valkey-glide/issues/5092)) * Python: Add client certificate and private key support for mTLS ([5123](https://github.com/valkey-io/valkey-glide/issues/5123)) diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index 130b8491f11..1f3b3027dcb 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -25,6 +25,7 @@ use pin_project_lite::pin_project; use std::collections::VecDeque; use std::fmt; use std::fmt::Debug; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -605,6 +606,7 @@ impl MultiplexedConnection { stream, std::time::Duration::MAX, glide_connection_options, + None, // No socket_addr available in this path ) .await } @@ -616,6 +618,7 @@ impl MultiplexedConnection { stream: C, response_timeout: std::time::Duration, glide_connection_options: GlideConnectionOptions, + socket_addr: Option, ) -> RedisResult<(Self, impl Future)> where C: Unpin + AsyncRead + AsyncWrite + Send + 'static, @@ -626,10 +629,17 @@ impl MultiplexedConnection { let (mut pipeline, driver) = Pipeline::new(codec, glide_connection_options.disconnect_notifier); let driver = Box::pin(driver); + + // Use the actual socket address if available, otherwise fall back to connection_info.addr + let address_string = if let Some(socket_addr) = socket_addr { + format!("{}:{}", socket_addr.ip(), socket_addr.port()) + } else { + connection_info.addr.to_string() + }; let pm = PushManager::new( glide_connection_options.push_sender, glide_connection_options.pubsub_synchronizer, - Some(connection_info.addr.to_string()), + Some(address_string), ); pipeline.set_push_manager(pm.clone()); diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 4fb1e0aeb71..1064d4c050d 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -445,6 +445,7 @@ impl Client { con, response_timeout, glide_connection_options, + socket_addr, ) .await .map(|res| (res.0, res.1, ip)) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 5670a1c30d1..8060a2c699a 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1290,7 +1290,9 @@ where .await .get_node(); let node_address = if let Some(socket_addr) = socket_addr { - socket_addr.to_string() + // Use format! to avoid reverse DNS lookup that socket_addr.to_string() performs + let addr = format!("{}:{}", socket_addr.ip(), socket_addr.port()); + addr } else { node_addr }; @@ -2250,7 +2252,12 @@ where let conn_lock = inner.conn_lock.read().expect(MUTEX_READ_ERR); socket_addresses.find_map(|socket_addr| { - conn_lock.node_for_address(&socket_addr.to_string()) + let addr_str = format!( + "{}:{}", + socket_addr.ip(), + socket_addr.port() + ); + conn_lock.node_for_address(&addr_str) }) }, ); diff --git a/node/rust-client/src/lib.rs b/node/rust-client/src/lib.rs index 36b5cb14a20..05b144f0929 100644 --- a/node/rust-client/src/lib.rs +++ b/node/rust-client/src/lib.rs @@ -703,6 +703,9 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result> { let total_bytes_compressed = Telemetry::total_bytes_compressed().to_string(); let total_bytes_decompressed = Telemetry::total_bytes_decompressed().to_string(); let compression_skipped_count = Telemetry::compression_skipped_count().to_string(); + let subscription_out_of_sync_count = Telemetry::subscription_out_of_sync_count().to_string(); + let subscription_last_sync_timestamp = + Telemetry::subscription_last_sync_timestamp().to_string(); let mut stats = Object::new(env)?; stats.set_named_property("total_connections", total_connections)?; @@ -713,6 +716,14 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result> { stats.set_named_property("total_bytes_compressed", total_bytes_compressed)?; stats.set_named_property("total_bytes_decompressed", total_bytes_decompressed)?; stats.set_named_property("compression_skipped_count", compression_skipped_count)?; + stats.set_named_property( + "subscription_out_of_sync_count", + subscription_out_of_sync_count, + )?; + stats.set_named_property( + "subscription_last_sync_timestamp", + subscription_last_sync_timestamp, + )?; Ok(stats) } diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 5df60637730..0b599024fb3 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -185,6 +185,14 @@ import { createPubSubChannels, createPubSubNumPat, createPubSubNumSub, + createPSubscribe, + createPSubscribeLazy, + createPUnsubscribe, + createPUnsubscribeLazy, + createSubscribe, + createSubscribeLazy, + createUnsubscribe, + createUnsubscribeLazy, createRPop, createRPush, createRPushX, @@ -390,6 +398,28 @@ export type StreamEntryDataType = Record; */ export type Score = number | "+inf" | "-inf"; +/** + * Constant representing "all channels" for unsubscribe operations. + * Use this to unsubscribe from all exact channels at once. + * + * @example + * ```typescript + * await client.unsubscribeLazy(ALL_CHANNELS); + * ``` + */ +export const ALL_CHANNELS = null; + +/** + * Constant representing "all patterns" for punsubscribe operations. + * Use this to unsubscribe from all pattern subscriptions at once. + * + * @example + * ```typescript + * await client.punsubscribeLazy(ALL_PATTERNS); + * ``` + */ +export const ALL_PATTERNS = null; + /** * Data type which represents sorted sets data for input parameter of ZADD command, * including element and its respective score. @@ -936,6 +966,33 @@ export interface AdvancedBaseClientConfiguration { * - If not explicitly set, a default value of `true` will be used by the Rust core. */ tcpNoDelay?: boolean; + + /** + * The interval in milliseconds between PubSub subscription reconciliation attempts. + * + * The reconciliation process ensures that the client's desired subscriptions match + * the actual subscriptions on the server. This is useful when subscriptions may have + * been lost due to network issues or server restarts. + * + * If not explicitly set, the Rust core will use its default reconciliation interval. + * + * @remarks + * - Must be a positive integer representing milliseconds. + * - The reconciliation process runs automatically in the background. + * - A lower interval provides faster recovery from subscription issues but increases overhead. + * - A higher interval reduces overhead but may delay recovery from subscription issues. + * + * @example + * ```typescript + * const config: GlideClientConfiguration = { + * addresses: [{ host: "localhost", port: 6379 }], + * advancedConfiguration: { + * pubsubReconciliationIntervalMs: 5000 // Reconcile every 5 seconds + * } + * }; + * ``` + */ + pubsubReconciliationIntervalMs?: number; } /** @@ -1617,13 +1674,11 @@ export class BaseClient { ); } - if (!this.isPubsubConfigured(this.config!)) { - throw new ConfigurationError( - "The operation will never complete since there was no pubsbub subscriptions applied to the client.", - ); - } - - if (this.getPubsubCallbackAndContext(this.config!)[0]) { + // Java: if (subscriptionConfiguration.isPresent() && subscriptionConfiguration.get().getCallback().isPresent()) + if ( + this.isPubsubConfigured(this.config!) && + this.getPubsubCallbackAndContext(this.config!)[0] + ) { throw new ConfigurationError( "The operation will never complete since messages will be passed to the configured callback.", ); @@ -1642,13 +1697,11 @@ export class BaseClient { ); } - if (!this.isPubsubConfigured(this.config!)) { - throw new ConfigurationError( - "The operation will never complete since there was no pubsbub subscriptions applied to the client.", - ); - } - - if (this.getPubsubCallbackAndContext(this.config!)[0]) { + // Java: if (subscriptionConfiguration.isPresent() && subscriptionConfiguration.get().getCallback().isPresent()) + if ( + this.isPubsubConfigured(this.config!) && + this.getPubsubCallbackAndContext(this.config!)[0] + ) { throw new ConfigurationError( "The operation will never complete since messages will be passed to the configured callback.", ); @@ -9230,6 +9283,12 @@ export class BaseClient { request.tcpNodelay = options.tcpNoDelay; } + // Set PubSub reconciliation interval if explicitly configured + if (options.pubsubReconciliationIntervalMs !== undefined) { + request.pubsubReconciliationIntervalMs = + options.pubsubReconciliationIntervalMs; + } + // Apply TLS configuration if present if (options.tlsAdvancedConfiguration) { // request.tlsMode is either SecureTls or InsecureTls here @@ -9470,6 +9529,276 @@ export class BaseClient { return response; // "OK" } + /** + * Subscribes the client to the specified channels (non-blocking). + * Returns immediately without waiting for subscription confirmation. + * + * @see {@link https://valkey.io/commands/subscribe/|valkey.io} for details. + * + * @param channels - A set of channel names to subscribe to. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await client.subscribeLazy(new Set(["news", "updates"])); + * ``` + */ + public async subscribeLazy( + channels: Set, + options?: DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + return this.createWritePromise( + createSubscribeLazy(channelsArray), + options, + ); + } + + /** + * Subscribes the client to the specified channels (blocking). + * Waits for subscription confirmation or until timeout. + * + * @see {@link https://valkey.io/commands/subscribe/|valkey.io} for details. + * + * @param channels - A set of channel names to subscribe to. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when subscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * // Wait up to 5 seconds + * await client.subscribe(new Set(["news"]), 5000); + * // Wait indefinitely + * await client.subscribe(new Set(["news"]), 0); + * ``` + */ + public async subscribe( + channels: Set, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + return this.createWritePromise( + createSubscribe(channelsArray, timeoutMs), + options, + ); + } + + /** + * Subscribes the client to the specified patterns (non-blocking). + * Returns immediately without waiting for subscription confirmation. + * + * @see {@link https://valkey.io/commands/psubscribe/|valkey.io} for details. + * + * @param patterns - A set of glob-style patterns to subscribe to. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await client.psubscribeLazy(new Set(["news.*", "updates.*"])); + * ``` + */ + public async psubscribeLazy( + patterns: Set, + options?: DecoderOption, + ): Promise { + const patternsArray = Array.from(patterns); + return this.createWritePromise( + createPSubscribeLazy(patternsArray), + options, + ); + } + + /** + * Subscribes the client to the specified patterns (blocking). + * Waits for subscription confirmation or until timeout. + * + * @see {@link https://valkey.io/commands/psubscribe/|valkey.io} for details. + * + * @param patterns - A set of glob-style patterns to subscribe to. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when subscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await client.psubscribe(new Set(["news.*"]), 5000); + * ``` + */ + public async psubscribe( + patterns: Set, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const patternsArray = Array.from(patterns); + return this.createWritePromise( + createPSubscribe(patternsArray, timeoutMs), + options, + ); + } + + /** + * Unsubscribes the client from the specified channels (non-blocking). + * Pass null to unsubscribe from all exact channels. + * + * @see {@link https://valkey.io/commands/unsubscribe/|valkey.io} for details. + * + * @param channels - Channel names to unsubscribe from, or null for all channels. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await client.unsubscribeLazy(new Set(["news"])); + * // Unsubscribe from all channels + * await client.unsubscribeLazy(ALL_CHANNELS); + * ``` + */ + public async unsubscribeLazy( + channels?: Set | null, + options?: DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createUnsubscribeLazy(channelsArray), + options, + ); + } + + /** + * Unsubscribes the client from the specified channels (blocking). + * Pass null to unsubscribe from all exact channels. + * + * @see {@link https://valkey.io/commands/unsubscribe/|valkey.io} for details. + * + * @param channels - Channel names to unsubscribe from, or null for all channels. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when unsubscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await client.unsubscribe(new Set(["news"]), 5000); + * // Unsubscribe from all channels with timeout + * await client.unsubscribe(ALL_CHANNELS, 5000); + * ``` + */ + public async unsubscribe( + channels: Set | null, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createUnsubscribe(channelsArray ?? [], timeoutMs), + options, + ); + } + + /** + * Unsubscribes the client from the specified patterns (non-blocking). + * Pass null to unsubscribe from all patterns. + * + * @see {@link https://valkey.io/commands/punsubscribe/|valkey.io} for details. + * + * @param patterns - Pattern names to unsubscribe from, or null for all patterns. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await client.punsubscribeLazy(new Set(["news.*"])); + * // Unsubscribe from all patterns + * await client.punsubscribeLazy(ALL_PATTERNS); + * ``` + */ + public async punsubscribeLazy( + patterns?: Set | null, + options?: DecoderOption, + ): Promise { + const patternsArray = patterns ? Array.from(patterns) : undefined; + return this.createWritePromise( + createPUnsubscribeLazy(patternsArray), + options, + ); + } + + /** + * Unsubscribes the client from the specified patterns (blocking). + * Pass null to unsubscribe from all patterns. + * + * @see {@link https://valkey.io/commands/punsubscribe/|valkey.io} for details. + * + * @param patterns - Pattern names to unsubscribe from, or null for all patterns. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when unsubscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await client.punsubscribe(new Set(["news.*"]), 5000); + * // Unsubscribe from all patterns with timeout + * await client.punsubscribe(ALL_PATTERNS, 5000); + * ``` + */ + public async punsubscribe( + patterns: Set | null, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const patternsArray = patterns ? Array.from(patterns) : undefined; + return this.createWritePromise( + createPUnsubscribe(patternsArray ?? [], timeoutMs), + options, + ); + } + + /** + * @internal + * Helper method to parse GetSubscriptions response from Rust core. + * Converts array response to structured object with desired and actual subscriptions. + * + * @param response - The response array from Rust core with format: + * ["desired", { mode: [channels...] }, "actual", { mode: [channels...] }] + * @returns Parsed subscription state with desired and actual subscriptions + */ + protected parseGetSubscriptionsResponse( + response: unknown[], + ): { + desiredSubscriptions: Partial>>; + actualSubscriptions: Partial>>; + } { + // Response format: ["desired", {...}, "actual", {...}] + if (!Array.isArray(response) || response.length !== 4) { + throw new Error( + `Invalid GetSubscriptions response format: expected array of length 4, got ${response}`, + ); + } + + const desiredData = response[1] as Record; + const actualData = response[3] as Record; + + const desiredSubscriptions: Partial>> = {}; + const actualSubscriptions: Partial>> = {}; + + // Parse desired subscriptions + for (const [mode, channels] of Object.entries(desiredData)) { + const modeKey = parseInt(mode) as T; + desiredSubscriptions[modeKey] = new Set(channels); + } + + // Parse actual subscriptions + for (const [mode, channels] of Object.entries(actualData)) { + const modeKey = parseInt(mode) as T; + actualSubscriptions[modeKey] = new Set(channels); + } + + return { desiredSubscriptions, actualSubscriptions }; + } + /** * Return a statistics * diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 0935fc4c156..d60137d21dd 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -4543,6 +4543,145 @@ export function createPubSubShardNumSub( ); } +/** + * @internal + */ +export function createSubscribeLazy( + channels: GlideString[], +): command_request.Command { + return createCommand(RequestType.Subscribe, channels); +} + +/** + * @internal + */ +export function createSubscribe( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.SubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createPSubscribeLazy( + patterns: GlideString[], +): command_request.Command { + return createCommand(RequestType.PSubscribe, patterns); +} + +/** + * @internal + */ +export function createPSubscribe( + patterns: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.PSubscribeBlocking, [ + ...patterns, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createUnsubscribeLazy( + channels?: GlideString[], +): command_request.Command { + return createCommand(RequestType.Unsubscribe, channels ? channels : []); +} + +/** + * @internal + */ +export function createUnsubscribe( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.UnsubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createPUnsubscribeLazy( + patterns?: GlideString[], +): command_request.Command { + return createCommand(RequestType.PUnsubscribe, patterns ? patterns : []); +} + +/** + * @internal + */ +export function createPUnsubscribe( + patterns: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.PUnsubscribeBlocking, [ + ...patterns, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createSSubscribeLazy( + channels: GlideString[], +): command_request.Command { + return createCommand(RequestType.SSubscribe, channels); +} + +/** + * @internal + */ +export function createSSubscribe( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.SSubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createSUnsubscribeLazy( + channels?: GlideString[], +): command_request.Command { + return createCommand(RequestType.SUnsubscribe, channels ? channels : []); +} + +/** + * @internal + */ +export function createSUnsubscribe( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.SUnsubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createGetSubscriptions(): command_request.Command { + return createCommand(RequestType.GetSubscriptions, []); +} + /** * @internal */ diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index 49dba16fa7d..bf613a81a48 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -38,6 +38,7 @@ import { createFunctionLoad, createFunctionRestore, createFunctionStats, + createGetSubscriptions, createInfo, createLastSave, createLolwut, @@ -98,6 +99,43 @@ export namespace GlideClientConfiguration { } } +/** + * Represents the subscription state for a standalone client. + * + * @remarks + * This interface provides information about the current PubSub subscriptions for a standalone client. + * It includes both the desired subscriptions (what the client wants to maintain) and the actual + * subscriptions (what is currently established on the server). + * + * The subscriptions are organized by channel mode: + * - {@link GlideClientConfiguration.PubSubChannelModes.Exact | Exact}: Exact channel names + * - {@link GlideClientConfiguration.PubSubChannelModes.Pattern | Pattern}: Channel patterns using glob-style matching + * + * @example + * ```typescript + * const state = await client.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual exact channels:", state.actualSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * ``` + */ +export interface StandalonePubSubState { + /** + * Desired subscriptions organized by channel mode. + * These are the subscriptions the client wants to maintain. + */ + desiredSubscriptions: Partial< + Record> + >; + + /** + * Actual subscriptions currently active on the server. + * These are the subscriptions that are actually established. + */ + actualSubscriptions: Partial< + Record> + >; +} + /** * Configuration options for creating a {@link GlideClient | GlideClient}. * @@ -1067,4 +1105,28 @@ export class GlideClient extends BaseClient { ): Promise<[GlideString, GlideString[]]> { return this.createWritePromise(createScan(cursor, options), options); } + + /** + * Returns the current subscription state for the client. + * + * @see {@link https://valkey.io/commands/pubsub/|valkey.io} for details. + * + * @returns A promise that resolves to the subscription state containing + * desired and actual subscriptions organized by channel mode. + * + * @example + * ```typescript + * const state = await client.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual exact channels:", state.actualSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * ``` + */ + public async getSubscriptions(): Promise { + const response = await this.createWritePromise( + createGetSubscriptions(), + ); + return this.parseGetSubscriptionsResponse( + response, + ); + } } diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 61c42fa4bf7..ee2c2acbdc3 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -53,6 +53,7 @@ import { createFunctionLoad, createFunctionRestore, createFunctionStats, + createGetSubscriptions, createInfo, createLastSave, createLolwut, @@ -64,9 +65,25 @@ import { createScriptExists, createScriptFlush, createScriptKill, + createSSubscribeLazy, + createSSubscribe, + createSUnsubscribeLazy, + createSUnsubscribe, createTime, createUnWatch, } from "./Commands"; + +/** + * Constant representing all sharded channels. + * Use this with unsubscribe methods to unsubscribe from all sharded channels. + * + * @example + * ```typescript + * await client.sunsubscribeLazy(ALL_SHARDED_CHANNELS); + * ``` + */ +export const ALL_SHARDED_CHANNELS = null; + /** An extension to command option types with {@link Routes}. */ export interface RouteOption { /** @@ -147,6 +164,51 @@ export namespace GlideClusterClientConfiguration { context?: any; } } + +/** + * Represents the subscription state for a cluster client. + * + * @remarks + * This interface provides information about the current PubSub subscriptions for a cluster client. + * It includes both the desired subscriptions (what the client wants to maintain) and the actual + * subscriptions (what is currently established on the server). + * + * The subscriptions are organized by channel mode: + * - {@link GlideClusterClientConfiguration.PubSubChannelModes.Exact | Exact}: Exact channel names + * - {@link GlideClusterClientConfiguration.PubSubChannelModes.Pattern | Pattern}: Channel patterns using glob-style matching + * - {@link GlideClusterClientConfiguration.PubSubChannelModes.Sharded | Sharded}: Sharded channels (available since Valkey 7.0) + * + * @example + * ```typescript + * const state = await clusterClient.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual sharded channels:", state.actualSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Sharded]); + * ``` + */ +export interface ClusterPubSubState { + /** + * Desired subscriptions organized by channel mode. + * These are the subscriptions the client wants to maintain. + */ + desiredSubscriptions: Partial< + Record< + GlideClusterClientConfiguration.PubSubChannelModes, + Set + > + >; + + /** + * Actual subscriptions currently active on the server. + * These are the subscriptions that are actually established. + */ + actualSubscriptions: Partial< + Record< + GlideClusterClientConfiguration.PubSubChannelModes, + Set + > + >; +} + /** * Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}. * @@ -1971,4 +2033,147 @@ export class GlideClusterClient extends BaseClient { ...options, }); } + + /** + * Subscribes the client to the specified sharded channels (non-blocking). + * Returns immediately without waiting for subscription confirmation. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/ssubscribe/|valkey.io} for details. + * + * @param channels - A set of sharded channel names to subscribe to. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await clusterClient.ssubscribeLazy(new Set(["shard-channel-1"])); + * ``` + */ + public async ssubscribeLazy( + channels: Set, + options?: DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + return this.createWritePromise( + createSSubscribeLazy(channelsArray), + options, + ); + } + + /** + * Subscribes the client to the specified sharded channels (blocking). + * Waits for subscription confirmation or until timeout. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/ssubscribe/|valkey.io} for details. + * + * @param channels - A set of sharded channel names to subscribe to. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when subscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * // Wait up to 5 seconds + * await clusterClient.ssubscribe(new Set(["shard-channel-1"]), 5000); + * // Wait indefinitely + * await clusterClient.ssubscribe(new Set(["shard-channel-1"]), 0); + * ``` + */ + public async ssubscribe( + channels: Set, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + return this.createWritePromise( + createSSubscribe(channelsArray, timeoutMs), + options, + ); + } + + /** + * Unsubscribes the client from the specified sharded channels (non-blocking). + * Pass null to unsubscribe from all sharded channels. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/sunsubscribe/|valkey.io} for details. + * + * @param channels - Sharded channel names to unsubscribe from, or null for all channels. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await clusterClient.sunsubscribeLazy(new Set(["shard-channel-1"])); + * // Unsubscribe from all sharded channels + * await clusterClient.sunsubscribeLazy(ALL_SHARDED_CHANNELS); + * ``` + */ + public async sunsubscribeLazy( + channels?: Set | null, + options?: DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createSUnsubscribeLazy(channelsArray), + options, + ); + } + + /** + * Unsubscribes the client from the specified sharded channels (blocking). + * Pass null to unsubscribe from all sharded channels. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/sunsubscribe/|valkey.io} for details. + * + * @param channels - Sharded channel names to unsubscribe from, or null for all channels. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when unsubscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await clusterClient.sunsubscribe(new Set(["shard-channel-1"]), 5000); + * // Unsubscribe from all sharded channels with timeout + * await clusterClient.sunsubscribe(ALL_SHARDED_CHANNELS, 5000); + * ``` + */ + public async sunsubscribe( + channels: Set | null, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createSUnsubscribe(channelsArray ?? [], timeoutMs), + options, + ); + } + + /** + * Returns the current subscription state for the cluster client. + * + * @see {@link https://valkey.io/commands/pubsub/|valkey.io} for details. + * + * @returns A promise that resolves to the subscription state containing + * desired and actual subscriptions organized by channel mode. + * + * @example + * ```typescript + * const state = await clusterClient.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual sharded channels:", state.actualSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Sharded]); + * ``` + */ + public async getSubscriptions(): Promise { + const response = await this.createWritePromise( + createGetSubscriptions(), + ); + return this.parseGetSubscriptionsResponse( + response, + ); + } } diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index 00ac9bcf34a..be6db4c720d 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -2511,7 +2511,11 @@ describe("GlideClusterClient", () => { expect(stats).toHaveProperty("total_bytes_compressed"); expect(stats).toHaveProperty("total_bytes_decompressed"); expect(stats).toHaveProperty("compression_skipped_count"); - expect(Object.keys(stats)).toHaveLength(8); + expect(stats).toHaveProperty("subscription_out_of_sync_count"); + expect(stats).toHaveProperty( + "subscription_last_sync_timestamp", + ); + expect(Object.keys(stats)).toHaveLength(10); } finally { // Ensure the client is properly closed glideClientForTesting?.close(); diff --git a/node/tests/PubSub.test.ts b/node/tests/PubSub.test.ts index f9823ad61ce..88a419747eb 100644 --- a/node/tests/PubSub.test.ts +++ b/node/tests/PubSub.test.ts @@ -4189,4 +4189,666 @@ describe("PubSub", () => { }, TIMEOUT, ); + + /** + * Tests dynamic subscription without pre-configuration. + * + * This test verifies that a client can dynamically subscribe to a channel + * at runtime without pre-configuring subscriptions during client creation. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_subscribe_lazy_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message = getRandomKey(); + + if (clusterMode) { + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Dynamically subscribe to channel (non-blocking) + await listener.subscribeLazy(new Set([channel])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish message + expect(await sender.publish(message, channel)).toBeGreaterThan( + 0, + ); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify message received + const pubsubMsg = await listener.getPubSubMessage(); + expect(pubsubMsg.message).toEqual(message); + expect(pubsubMsg.channel).toEqual(channel); + expect(pubsubMsg.pattern).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic pattern subscription without pre-configuration. + * + * This test verifies that a client can dynamically subscribe to a pattern + * at runtime and receive messages from channels matching that pattern. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_psubscribe_lazy_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const pattern = `${getRandomKey()}*`; + const channel = pattern.replace(/\*/g, getRandomKey()); + const message = getRandomKey(); + + if (clusterMode) { + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Dynamically subscribe to pattern (non-blocking) + await listener.psubscribeLazy(new Set([pattern])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish message to matching channel + await sender.publish(message, channel); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify message received with pattern + const pubsubMsg = await listener.getPubSubMessage(); + expect(pubsubMsg.message).toEqual(message); + expect(pubsubMsg.channel).toEqual(channel); + expect(pubsubMsg.pattern).toEqual(pattern); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic unsubscription. + * + * This test verifies that a client can dynamically unsubscribe from a channel + * and stop receiving messages from that channel. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_unsubscribe_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = "test-channel-" + getRandomKey(); + const message = "test-message"; + + if (clusterMode) { + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Subscribe (non-blocking) + const channels = new Set([channel]); + await listener.subscribeLazy(channels); + + // Wait for subscription to be established + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Unsubscribe (blocking - waits for reconciliation) + await listener.unsubscribe(channels, 30000); + + // Wait longer for unsubscribe to fully propagate across all cluster nodes + // In a 3-primary cluster, this can take time + await new Promise((resolve) => setTimeout(resolve, 5000)); + + // Verify no message in queue before publishing + const msgBefore = listener.tryGetPubSubMessage(); + expect(msgBefore).toBeNull(); + + // Publish message AFTER unsubscribe + await sender.publish(message, channel); + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Should not receive message + const msg = listener.tryGetPubSubMessage(); + expect(msg).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic unsubscription using SHARDED pubsub (cluster only). + * + * Sharded pubsub works differently - it's tied to specific slots/shards, + * so it might not have the same reconciliation issues as regular pubsub. + * + * @param clusterMode - Must be true (cluster mode only). + */ + it.each([true])( + "dynamic_sunsubscribe_simple_%p", + async (clusterMode) => { + const minVersion = "7.0.0"; + + if (cmeCluster.checkIfServerVersionLessThan(minVersion)) { + return; + } + + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = "test-channel-" + getRandomKey(); + const message = "test-message"; + + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + + // Subscribe to sharded channel (non-blocking) + const channels = new Set([channel]); + await (listener as GlideClusterClient).ssubscribeLazy(channels); + + // Wait for subscription to be established + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Unsubscribe from sharded channel (blocking) + await (listener as GlideClusterClient).sunsubscribe( + channels, + 30000, + ); + + // Wait for unsubscribe to propagate + await new Promise((resolve) => setTimeout(resolve, 5000)); + + // Verify no message in queue before publishing + const msgBefore = listener.tryGetPubSubMessage(); + expect(msgBefore).toBeNull(); + + // Publish message AFTER unsubscribe (sharded publish) + await (sender as GlideClusterClient).publish( + message, + channel, + true, + ); + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Should not receive message + const msg = listener.tryGetPubSubMessage(); + expect(msg).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic sharded subscription without pre-configuration (cluster only). + * + * This test verifies that a cluster client can dynamically subscribe to a sharded + * channel at runtime. Requires Valkey 7.0+. + * + * @param clusterMode - Must be true (cluster mode only). + */ + it.each([true])( + "dynamic_ssubscribe_lazy_%p", + async (clusterMode) => { + const minVersion = "7.0.0"; + + if (cmeCluster.checkIfServerVersionLessThan(minVersion)) { + return; + } + + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message = getRandomKey(); + + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + + // Dynamically subscribe to sharded channel (non-blocking) + await (listener as GlideClusterClient).ssubscribeLazy( + new Set([channel]), + ); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish message to sharded channel + expect( + await (sender as GlideClusterClient).publish( + message, + channel, + true, + ), + ).toBeGreaterThan(0); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify message received + const pubsubMsg = await listener.getPubSubMessage(); + expect(pubsubMsg.message).toEqual(message); + expect(pubsubMsg.channel).toEqual(channel); + expect(pubsubMsg.pattern).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic sharded unsubscription (cluster only). + * + * This test verifies that a cluster client can dynamically unsubscribe from a + * sharded channel and stop receiving messages. Requires Valkey 7.0+. + * + * @param clusterMode - Must be true (cluster mode only). + */ + it.each([true])( + "dynamic_sunsubscribe_%p", + async (clusterMode) => { + const minVersion = "7.0.0"; + + if (cmeCluster.checkIfServerVersionLessThan(minVersion)) { + return; + } + + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message1 = getRandomKey(); + const message2 = getRandomKey(); + + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + + // Dynamically subscribe to sharded channel (non-blocking) + await (listener as GlideClusterClient).ssubscribeLazy( + new Set([channel]), + ); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish first message + expect( + await (sender as GlideClusterClient).publish( + message1, + channel, + true, + ), + ).toBeGreaterThan(0); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify first message received + const pubsubMsg1 = await listener.getPubSubMessage(); + expect(pubsubMsg1.message).toEqual(message1); + expect(pubsubMsg1.channel).toEqual(channel); + + // Dynamically unsubscribe from sharded channel (non-blocking) + await (listener as GlideClusterClient).sunsubscribeLazy( + new Set([channel]), + ); + + // Drain any pending messages that were published before unsubscribe + while (listener.tryGetPubSubMessage() !== null) { + // Keep draining + } + + // Publish second message + await (sender as GlideClusterClient).publish( + message2, + channel, + true, + ); + + // Allow time for potential message delivery + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Verify no message received after unsubscribe + const pubsubMsg2 = listener.tryGetPubSubMessage(); + expect(pubsubMsg2).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests unsubscribing from pre-configured subscriptions. + * + * This test verifies that a client can dynamically unsubscribe from channels + * that were pre-configured during client creation. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_unsubscribe_from_preconfigured_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message1 = getRandomKey(); + const message2 = getRandomKey(); + + // Create client with pre-configured subscription + const pubSub = createPubSubSubscription( + clusterMode, + { + [GlideClusterClientConfiguration.PubSubChannelModes + .Exact]: new Set([channel as string]), + }, + { + [GlideClientConfiguration.PubSubChannelModes.Exact]: + new Set([channel as string]), + }, + ); + + if (clusterMode) { + listener = await GlideClusterClient.createClient({ + pubsubSubscriptions: pubSub, + ...getOptions(clusterMode), + }); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient({ + pubsubSubscriptions: pubSub, + ...getOptions(clusterMode), + }); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish first message + expect(await sender.publish(message1, channel)).toBeGreaterThan( + 0, + ); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify first message received + const pubsubMsg1 = await listener.getPubSubMessage(); + expect(pubsubMsg1.message).toEqual(message1); + expect(pubsubMsg1.channel).toEqual(channel); + + // Dynamically unsubscribe from pre-configured channel (non-blocking) + await listener.unsubscribeLazy(new Set([channel])); + + // Drain any pending messages that were published before unsubscribe + while (listener.tryGetPubSubMessage() !== null) { + // Keep draining + } + + // Publish second message + await sender.publish(message2, channel); + + // Allow time for potential message delivery + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Verify no message received after unsubscribe + const pubsubMsg2 = listener.tryGetPubSubMessage(); + expect(pubsubMsg2).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests that subscription metrics exist in statistics. + * + * This test verifies that the getStatistics method returns subscription-related + * metrics including out-of-sync count and last sync timestamp. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "test_subscription_metrics_in_statistics_%p", + async (clusterMode) => { + let client: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + + // Create client WITHOUT any subscription configuration + if (clusterMode) { + client = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + client = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Dynamically subscribe to channel (non-blocking) + await client.subscribeLazy(new Set([channel])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Get statistics + const stats = (await client.getStatistics()) as Record< + string, + string + >; + + // Verify subscription metrics exist + expect(stats).toHaveProperty("subscription_out_of_sync_count"); + expect(stats).toHaveProperty( + "subscription_last_sync_timestamp", + ); + + // Verify metrics are valid (non-negative numbers) + const outOfSyncCount = parseInt( + stats["subscription_out_of_sync_count"], + ); + const lastSyncTimestamp = parseInt( + stats["subscription_last_sync_timestamp"], + ); + + expect(outOfSyncCount).toBeGreaterThanOrEqual(0); + expect(lastSyncTimestamp).toBeGreaterThanOrEqual(0); + } finally { + if (client) { + client.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests that subscription timestamp updates after subscribe. + * + * This test verifies that the subscription_last_sync_timestamp metric + * is updated after a subscription operation. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "test_subscription_timestamp_updates_after_subscribe_%p", + async (clusterMode) => { + let client: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + + // Create client WITHOUT any subscription configuration + if (clusterMode) { + client = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + client = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Get initial statistics + const statsBefore = (await client.getStatistics()) as Record< + string, + string + >; + const timestampBefore = parseInt( + statsBefore["subscription_last_sync_timestamp"], + ); + + // Dynamically subscribe to channel (non-blocking) + await client.subscribeLazy(new Set([channel])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Get updated statistics + const statsAfter = (await client.getStatistics()) as Record< + string, + string + >; + const timestampAfter = parseInt( + statsAfter["subscription_last_sync_timestamp"], + ); + + // Verify timestamp was updated (increased or stayed the same) + expect(timestampAfter).toBeGreaterThanOrEqual(timestampBefore); + } finally { + if (client) { + client.close(); + } + } + }, + TIMEOUT, + ); });