From cd3aefa446cc2992bc08ac598e8181d4b06dbf55 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Tue, 3 Feb 2026 00:51:05 -0800 Subject: [PATCH 1/8] Node: add dynamic PubSub support - Add subscribe() and psubscribe() methods to BaseClient for channel and pattern subscriptions - Add unsubscribe() and punsubscribe() methods with optional channel/pattern filtering - Add pubsubReconciliationIntervalMs configuration option to AdvancedBaseClientConfiguration - Expose subscription telemetry metrics: subscription_out_of_sync_count and subscription_last_sync_timestamp - Import new PubSub command creators (subscribe, psubscribe, unsubscribe, punsubscribe variants) - Add JSDoc documentation with examples for all new subscription methods - Update Rust FFI bindings to expose subscription reconciliation interval configuration - Add test coverage for cluster client subscription functionality Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- node/rust-client/src/lib.rs | 4 + node/src/BaseClient.ts | 239 ++++++++++++++++++++++++++ node/src/Commands.ts | 139 +++++++++++++++ node/src/GlideClient.ts | 62 +++++++ node/src/GlideClusterClient.ts | 153 +++++++++++++++++ node/tests/GlideClusterClient.test.ts | 6 +- 6 files changed, 602 insertions(+), 1 deletion(-) diff --git a/node/rust-client/src/lib.rs b/node/rust-client/src/lib.rs index 36b5cb14a20..a6cd08c2713 100644 --- a/node/rust-client/src/lib.rs +++ b/node/rust-client/src/lib.rs @@ -703,6 +703,8 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result> { let total_bytes_compressed = Telemetry::total_bytes_compressed().to_string(); let total_bytes_decompressed = Telemetry::total_bytes_decompressed().to_string(); let compression_skipped_count = Telemetry::compression_skipped_count().to_string(); + let subscription_out_of_sync_count = Telemetry::subscription_out_of_sync_count().to_string(); + let subscription_last_sync_timestamp = Telemetry::subscription_last_sync_timestamp().to_string(); let mut stats = Object::new(env)?; stats.set_named_property("total_connections", total_connections)?; @@ -713,6 +715,8 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result> { stats.set_named_property("total_bytes_compressed", total_bytes_compressed)?; stats.set_named_property("total_bytes_decompressed", total_bytes_decompressed)?; stats.set_named_property("compression_skipped_count", compression_skipped_count)?; + stats.set_named_property("subscription_out_of_sync_count", subscription_out_of_sync_count)?; + stats.set_named_property("subscription_last_sync_timestamp", subscription_last_sync_timestamp)?; Ok(stats) } diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 5df60637730..320dc246f83 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -185,6 +185,14 @@ import { createPubSubChannels, createPubSubNumPat, createPubSubNumSub, + createPSubscribe, + createPSubscribeBlocking, + createPUnsubscribe, + createPUnsubscribeBlocking, + createSubscribe, + createSubscribeBlocking, + createUnsubscribe, + createUnsubscribeBlocking, createRPop, createRPush, createRPushX, @@ -936,6 +944,33 @@ export interface AdvancedBaseClientConfiguration { * - If not explicitly set, a default value of `true` will be used by the Rust core. */ tcpNoDelay?: boolean; + + /** + * The interval in milliseconds between PubSub subscription reconciliation attempts. + * + * The reconciliation process ensures that the client's desired subscriptions match + * the actual subscriptions on the server. This is useful when subscriptions may have + * been lost due to network issues or server restarts. + * + * If not explicitly set, the Rust core will use its default reconciliation interval. + * + * @remarks + * - Must be a positive integer representing milliseconds. + * - The reconciliation process runs automatically in the background. + * - A lower interval provides faster recovery from subscription issues but increases overhead. + * - A higher interval reduces overhead but may delay recovery from subscription issues. + * + * @example + * ```typescript + * const config: GlideClientConfiguration = { + * addresses: [{ host: "localhost", port: 6379 }], + * advancedConfiguration: { + * pubsubReconciliationIntervalMs: 5000 // Reconcile every 5 seconds + * } + * }; + * ``` + */ + pubsubReconciliationIntervalMs?: number; } /** @@ -9230,6 +9265,14 @@ export class BaseClient { request.tcpNodelay = options.tcpNoDelay; } + // Set PubSub reconciliation interval if explicitly configured + // Note: This requires the protobuf files to be regenerated with: + // npm run build-protobuf (from the node directory) + if (options.pubsubReconciliationIntervalMs !== undefined) { + request.pubsubReconciliationIntervalMs = + options.pubsubReconciliationIntervalMs; + } + // Apply TLS configuration if present if (options.tlsAdvancedConfiguration) { // request.tlsMode is either SecureTls or InsecureTls here @@ -9470,6 +9513,202 @@ export class BaseClient { return response; // "OK" } + /** + * Subscribes the client to the specified channels. + * + * @see {@link https://valkey.io/commands/subscribe/|valkey.io} for details. + * + * @param channels - A set of channel names to subscribe to. + * @param options - (Optional) Additional parameters: + * - timeout: Maximum time in milliseconds to wait for subscription confirmation. + * - decoder: See {@link DecoderOption}. + * @returns A promise that resolves when the subscription is complete. + * + * @example + * ```typescript + * await client.subscribe(new Set(["news", "updates"])); + * // With timeout + * await client.subscribe(new Set(["news"]), { timeout: 5000 }); + * ``` + */ + public async subscribe( + channels: Set, + options?: { timeout?: number } & DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + + if (options?.timeout !== undefined) { + return this.createWritePromise( + createSubscribeBlocking(channelsArray, options.timeout), + options, + ); + } else { + return this.createWritePromise( + createSubscribe(channelsArray), + options, + ); + } + } + + /** + * Subscribes the client to the specified patterns. + * + * @see {@link https://valkey.io/commands/psubscribe/|valkey.io} for details. + * + * @param patterns - A set of glob-style patterns to subscribe to. + * @param options - (Optional) Additional parameters: + * - timeout: Maximum time in milliseconds to wait for subscription confirmation. + * - decoder: See {@link DecoderOption}. + * @returns A promise that resolves when the subscription is complete. + * + * @example + * ```typescript + * await client.psubscribe(new Set(["news.*", "updates.*"])); + * ``` + */ + public async psubscribe( + patterns: Set, + options?: { timeout?: number } & DecoderOption, + ): Promise { + const patternsArray = Array.from(patterns); + + if (options?.timeout !== undefined) { + return this.createWritePromise( + createPSubscribeBlocking(patternsArray, options.timeout), + options, + ); + } else { + return this.createWritePromise( + createPSubscribe(patternsArray), + options, + ); + } + } + + /** + * Unsubscribes the client from the specified channels. + * If no channels are provided, unsubscribes from all exact channels. + * + * @see {@link https://valkey.io/commands/unsubscribe/|valkey.io} for details. + * + * @param channels - (Optional) A set of channel names to unsubscribe from. + * @param options - (Optional) Additional parameters: + * - timeout: Maximum time in milliseconds to wait for unsubscription confirmation. + * - decoder: See {@link DecoderOption}. + * @returns A promise that resolves when the unsubscription is complete. + * + * @example + * ```typescript + * await client.unsubscribe(new Set(["news"])); + * // Unsubscribe from all channels + * await client.unsubscribe(); + * ``` + */ + public async unsubscribe( + channels?: Set, + options?: { timeout?: number } & DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + + if (options?.timeout !== undefined) { + // For blocking unsubscribe, we need to provide channels (empty array if none) + return this.createWritePromise( + createUnsubscribeBlocking(channelsArray ?? [], options.timeout), + options, + ); + } else { + return this.createWritePromise( + createUnsubscribe(channelsArray), + options, + ); + } + } + + /** + * Unsubscribes the client from the specified patterns. + * If no patterns are provided, unsubscribes from all patterns. + * + * @see {@link https://valkey.io/commands/punsubscribe/|valkey.io} for details. + * + * @param patterns - (Optional) A set of patterns to unsubscribe from. + * @param options - (Optional) Additional parameters: + * - timeout: Maximum time in milliseconds to wait for unsubscription confirmation. + * - decoder: See {@link DecoderOption}. + * @returns A promise that resolves when the unsubscription is complete. + * + * @example + * ```typescript + * await client.punsubscribe(new Set(["news.*"])); + * // Unsubscribe from all patterns + * await client.punsubscribe(); + * ``` + */ + public async punsubscribe( + patterns?: Set, + options?: { timeout?: number } & DecoderOption, + ): Promise { + const patternsArray = patterns ? Array.from(patterns) : undefined; + + if (options?.timeout !== undefined) { + // For blocking punsubscribe, we need to provide patterns (empty array if none) + return this.createWritePromise( + createPUnsubscribeBlocking( + patternsArray ?? [], + options.timeout, + ), + options, + ); + } else { + return this.createWritePromise( + createPUnsubscribe(patternsArray), + options, + ); + } + } + + /** + * @internal + * Helper method to parse GetSubscriptions response from Rust core. + * Converts array response to structured object with desired and actual subscriptions. + * + * @param response - The response array from Rust core with format: + * ["desired", { mode: [channels...] }, "actual", { mode: [channels...] }] + * @returns Parsed subscription state with desired and actual subscriptions + */ + protected parseGetSubscriptionsResponse( + response: unknown[], + ): { + desiredSubscriptions: Partial>>; + actualSubscriptions: Partial>>; + } { + // Response format: ["desired", {...}, "actual", {...}] + if (!Array.isArray(response) || response.length !== 4) { + throw new Error( + `Invalid GetSubscriptions response format: expected array of length 4, got ${response}`, + ); + } + + const desiredData = response[1] as Record; + const actualData = response[3] as Record; + + const desiredSubscriptions: Partial>> = {}; + const actualSubscriptions: Partial>> = {}; + + // Parse desired subscriptions + for (const [mode, channels] of Object.entries(desiredData)) { + const modeKey = parseInt(mode) as T; + desiredSubscriptions[modeKey] = new Set(channels); + } + + // Parse actual subscriptions + for (const [mode, channels] of Object.entries(actualData)) { + const modeKey = parseInt(mode) as T; + actualSubscriptions[modeKey] = new Set(channels); + } + + return { desiredSubscriptions, actualSubscriptions }; + } + /** * Return a statistics * diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 0935fc4c156..62d971d789d 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -4543,6 +4543,145 @@ export function createPubSubShardNumSub( ); } +/** + * @internal + */ +export function createSubscribe( + channels: GlideString[], +): command_request.Command { + return createCommand(RequestType.Subscribe, channels); +} + +/** + * @internal + */ +export function createSubscribeBlocking( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.SubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createPSubscribe( + patterns: GlideString[], +): command_request.Command { + return createCommand(RequestType.PSubscribe, patterns); +} + +/** + * @internal + */ +export function createPSubscribeBlocking( + patterns: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.PSubscribeBlocking, [ + ...patterns, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createUnsubscribe( + channels?: GlideString[], +): command_request.Command { + return createCommand(RequestType.Unsubscribe, channels ? channels : []); +} + +/** + * @internal + */ +export function createUnsubscribeBlocking( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.UnsubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createPUnsubscribe( + patterns?: GlideString[], +): command_request.Command { + return createCommand(RequestType.PUnsubscribe, patterns ? patterns : []); +} + +/** + * @internal + */ +export function createPUnsubscribeBlocking( + patterns: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.PUnsubscribeBlocking, [ + ...patterns, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createSSubscribe( + channels: GlideString[], +): command_request.Command { + return createCommand(RequestType.SSubscribe, channels); +} + +/** + * @internal + */ +export function createSSubscribeBlocking( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.SSubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createSUnsubscribe( + channels?: GlideString[], +): command_request.Command { + return createCommand(RequestType.SUnsubscribe, channels ? channels : []); +} + +/** + * @internal + */ +export function createSUnsubscribeBlocking( + channels: GlideString[], + timeout: number, +): command_request.Command { + return createCommand(RequestType.SUnsubscribeBlocking, [ + ...channels, + timeout.toString(), + ]); +} + +/** + * @internal + */ +export function createGetSubscriptions(): command_request.Command { + return createCommand(RequestType.GetSubscriptions, []); +} + /** * @internal */ diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index 49dba16fa7d..bf613a81a48 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -38,6 +38,7 @@ import { createFunctionLoad, createFunctionRestore, createFunctionStats, + createGetSubscriptions, createInfo, createLastSave, createLolwut, @@ -98,6 +99,43 @@ export namespace GlideClientConfiguration { } } +/** + * Represents the subscription state for a standalone client. + * + * @remarks + * This interface provides information about the current PubSub subscriptions for a standalone client. + * It includes both the desired subscriptions (what the client wants to maintain) and the actual + * subscriptions (what is currently established on the server). + * + * The subscriptions are organized by channel mode: + * - {@link GlideClientConfiguration.PubSubChannelModes.Exact | Exact}: Exact channel names + * - {@link GlideClientConfiguration.PubSubChannelModes.Pattern | Pattern}: Channel patterns using glob-style matching + * + * @example + * ```typescript + * const state = await client.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual exact channels:", state.actualSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * ``` + */ +export interface StandalonePubSubState { + /** + * Desired subscriptions organized by channel mode. + * These are the subscriptions the client wants to maintain. + */ + desiredSubscriptions: Partial< + Record> + >; + + /** + * Actual subscriptions currently active on the server. + * These are the subscriptions that are actually established. + */ + actualSubscriptions: Partial< + Record> + >; +} + /** * Configuration options for creating a {@link GlideClient | GlideClient}. * @@ -1067,4 +1105,28 @@ export class GlideClient extends BaseClient { ): Promise<[GlideString, GlideString[]]> { return this.createWritePromise(createScan(cursor, options), options); } + + /** + * Returns the current subscription state for the client. + * + * @see {@link https://valkey.io/commands/pubsub/|valkey.io} for details. + * + * @returns A promise that resolves to the subscription state containing + * desired and actual subscriptions organized by channel mode. + * + * @example + * ```typescript + * const state = await client.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual exact channels:", state.actualSubscriptions[GlideClientConfiguration.PubSubChannelModes.Exact]); + * ``` + */ + public async getSubscriptions(): Promise { + const response = await this.createWritePromise( + createGetSubscriptions(), + ); + return this.parseGetSubscriptionsResponse( + response, + ); + } } diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 61c42fa4bf7..665b22e2658 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -53,6 +53,7 @@ import { createFunctionLoad, createFunctionRestore, createFunctionStats, + createGetSubscriptions, createInfo, createLastSave, createLolwut, @@ -64,6 +65,10 @@ import { createScriptExists, createScriptFlush, createScriptKill, + createSSubscribe, + createSSubscribeBlocking, + createSUnsubscribe, + createSUnsubscribeBlocking, createTime, createUnWatch, } from "./Commands"; @@ -147,6 +152,51 @@ export namespace GlideClusterClientConfiguration { context?: any; } } + +/** + * Represents the subscription state for a cluster client. + * + * @remarks + * This interface provides information about the current PubSub subscriptions for a cluster client. + * It includes both the desired subscriptions (what the client wants to maintain) and the actual + * subscriptions (what is currently established on the server). + * + * The subscriptions are organized by channel mode: + * - {@link GlideClusterClientConfiguration.PubSubChannelModes.Exact | Exact}: Exact channel names + * - {@link GlideClusterClientConfiguration.PubSubChannelModes.Pattern | Pattern}: Channel patterns using glob-style matching + * - {@link GlideClusterClientConfiguration.PubSubChannelModes.Sharded | Sharded}: Sharded channels (available since Valkey 7.0) + * + * @example + * ```typescript + * const state = await clusterClient.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual sharded channels:", state.actualSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Sharded]); + * ``` + */ +export interface ClusterPubSubState { + /** + * Desired subscriptions organized by channel mode. + * These are the subscriptions the client wants to maintain. + */ + desiredSubscriptions: Partial< + Record< + GlideClusterClientConfiguration.PubSubChannelModes, + Set + > + >; + + /** + * Actual subscriptions currently active on the server. + * These are the subscriptions that are actually established. + */ + actualSubscriptions: Partial< + Record< + GlideClusterClientConfiguration.PubSubChannelModes, + Set + > + >; +} + /** * Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}. * @@ -1971,4 +2021,107 @@ export class GlideClusterClient extends BaseClient { ...options, }); } + + /** + * Subscribes the client to the specified sharded channels. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/ssubscribe/|valkey.io} for details. + * + * @param channels - A set of sharded channel names to subscribe to. + * @param options - (Optional) Additional parameters: + * - timeout: Maximum time in milliseconds to wait for subscription confirmation. + * - decoder: See {@link DecoderOption}. + * @returns A promise that resolves when the subscription is complete. + * + * @example + * ```typescript + * await clusterClient.ssubscribe(new Set(["shard-channel-1"])); + * ``` + */ + public async ssubscribe( + channels: Set, + options?: { timeout?: number } & DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + + if (options?.timeout !== undefined) { + return this.createWritePromise( + createSSubscribeBlocking(channelsArray, options.timeout), + options, + ); + } else { + return this.createWritePromise( + createSSubscribe(channelsArray), + options, + ); + } + } + + /** + * Unsubscribes the client from the specified sharded channels. + * If no channels are provided, unsubscribes from all sharded channels. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/sunsubscribe/|valkey.io} for details. + * + * @param channels - (Optional) A set of sharded channel names to unsubscribe from. + * @param options - (Optional) Additional parameters: + * - timeout: Maximum time in milliseconds to wait for unsubscription confirmation. + * - decoder: See {@link DecoderOption}. + * @returns A promise that resolves when the unsubscription is complete. + * + * @example + * ```typescript + * await clusterClient.sunsubscribe(new Set(["shard-channel-1"])); + * // Unsubscribe from all sharded channels + * await clusterClient.sunsubscribe(); + * ``` + */ + public async sunsubscribe( + channels?: Set, + options?: { timeout?: number } & DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + + if (options?.timeout !== undefined) { + // For blocking sunsubscribe, we need to provide channels (empty array if none) + return this.createWritePromise( + createSUnsubscribeBlocking( + channelsArray ?? [], + options.timeout, + ), + options, + ); + } else { + return this.createWritePromise( + createSUnsubscribe(channelsArray), + options, + ); + } + } + + /** + * Returns the current subscription state for the cluster client. + * + * @see {@link https://valkey.io/commands/pubsub/|valkey.io} for details. + * + * @returns A promise that resolves to the subscription state containing + * desired and actual subscriptions organized by channel mode. + * + * @example + * ```typescript + * const state = await clusterClient.getSubscriptions(); + * console.log("Desired exact channels:", state.desiredSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Exact]); + * console.log("Actual sharded channels:", state.actualSubscriptions[GlideClusterClientConfiguration.PubSubChannelModes.Sharded]); + * ``` + */ + public async getSubscriptions(): Promise { + const response = await this.createWritePromise( + createGetSubscriptions(), + ); + return this.parseGetSubscriptionsResponse( + response, + ); + } } diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index 00ac9bcf34a..be6db4c720d 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -2511,7 +2511,11 @@ describe("GlideClusterClient", () => { expect(stats).toHaveProperty("total_bytes_compressed"); expect(stats).toHaveProperty("total_bytes_decompressed"); expect(stats).toHaveProperty("compression_skipped_count"); - expect(Object.keys(stats)).toHaveLength(8); + expect(stats).toHaveProperty("subscription_out_of_sync_count"); + expect(stats).toHaveProperty( + "subscription_last_sync_timestamp", + ); + expect(Object.keys(stats)).toHaveLength(10); } finally { // Ensure the client is properly closed glideClientForTesting?.close(); From c9c63d88df9553f3bc199197911f9c8ea80164a9 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 11 Feb 2026 17:07:48 -0800 Subject: [PATCH 2/8] adding integration tests Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- node/src/BaseClient.ts | 26 +- node/tests/PubSub.test.ts | 667 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 679 insertions(+), 14 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 320dc246f83..b91b13f6672 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -1652,13 +1652,12 @@ export class BaseClient { ); } - if (!this.isPubsubConfigured(this.config!)) { - throw new ConfigurationError( - "The operation will never complete since there was no pubsbub subscriptions applied to the client.", - ); - } - - if (this.getPubsubCallbackAndContext(this.config!)[0]) { + // Match Java's validation: only throw error if BOTH config exists AND callback exists + // Java: if (subscriptionConfiguration.isPresent() && subscriptionConfiguration.get().getCallback().isPresent()) + if ( + this.isPubsubConfigured(this.config!) && + this.getPubsubCallbackAndContext(this.config!)[0] + ) { throw new ConfigurationError( "The operation will never complete since messages will be passed to the configured callback.", ); @@ -1677,13 +1676,12 @@ export class BaseClient { ); } - if (!this.isPubsubConfigured(this.config!)) { - throw new ConfigurationError( - "The operation will never complete since there was no pubsbub subscriptions applied to the client.", - ); - } - - if (this.getPubsubCallbackAndContext(this.config!)[0]) { + // Match Java's validation: only throw error if BOTH config exists AND callback exists + // Java: if (subscriptionConfiguration.isPresent() && subscriptionConfiguration.get().getCallback().isPresent()) + if ( + this.isPubsubConfigured(this.config!) && + this.getPubsubCallbackAndContext(this.config!)[0] + ) { throw new ConfigurationError( "The operation will never complete since messages will be passed to the configured callback.", ); diff --git a/node/tests/PubSub.test.ts b/node/tests/PubSub.test.ts index f9823ad61ce..ab103b9ccce 100644 --- a/node/tests/PubSub.test.ts +++ b/node/tests/PubSub.test.ts @@ -4189,4 +4189,671 @@ describe("PubSub", () => { }, TIMEOUT, ); + + /** + * Tests dynamic subscription without pre-configuration. + * + * This test verifies that a client can dynamically subscribe to a channel + * at runtime without pre-configuring subscriptions during client creation. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_subscribe_lazy_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message = getRandomKey(); + + // Create clients WITHOUT any subscription configuration + if (clusterMode) { + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Dynamically subscribe to channel + await listener.subscribe(new Set([channel])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish message + expect(await sender.publish(message, channel)).toBeGreaterThan( + 0, + ); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify message received + const pubsubMsg = await listener.getPubSubMessage(); + expect(pubsubMsg.message).toEqual(message); + expect(pubsubMsg.channel).toEqual(channel); + expect(pubsubMsg.pattern).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic pattern subscription without pre-configuration. + * + * This test verifies that a client can dynamically subscribe to a pattern + * at runtime and receive messages from channels matching that pattern. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_psubscribe_lazy_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const pattern = `${getRandomKey()}*`; + const channel = pattern.replace("*", getRandomKey()); + const message = getRandomKey(); + + // Create clients WITHOUT any subscription configuration + if (clusterMode) { + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Dynamically subscribe to pattern + await listener.psubscribe(new Set([pattern])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish message to matching channel + await sender.publish(message, channel); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify message received with pattern + const pubsubMsg = await listener.getPubSubMessage(); + expect(pubsubMsg.message).toEqual(message); + expect(pubsubMsg.channel).toEqual(channel); + expect(pubsubMsg.pattern).toEqual(pattern); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic unsubscription. + * + * This test verifies that a client can dynamically unsubscribe from a channel + * and stop receiving messages from that channel. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_unsubscribe_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = "test-channel-" + getRandomKey(); + const message = "test-message"; + + // Create clients WITHOUT any subscription configuration (like Java) + if (clusterMode) { + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Subscribe + const channels = new Set([channel]); + await listener.subscribe(channels); + + // Wait for subscription to be established + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Unsubscribe (blocking - waits for reconciliation) + await listener.unsubscribe(channels, { timeout: 30000 }); + + // Wait longer for unsubscribe to fully propagate across all cluster nodes + // In a 3-primary cluster, this can take time + await new Promise((resolve) => setTimeout(resolve, 5000)); + + // Verify no message in queue before publishing + const msgBefore = listener.tryGetPubSubMessage(); + expect(msgBefore).toBeNull(); + + // Publish message AFTER unsubscribe + await sender.publish(message, channel); + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Should not receive message + const msg = listener.tryGetPubSubMessage(); + expect(msg).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic unsubscription using SHARDED pubsub (cluster only). + * + * Sharded pubsub works differently - it's tied to specific slots/shards, + * so it might not have the same reconciliation issues as regular pubsub. + * + * @param clusterMode - Must be true (cluster mode only). + */ + it.each([true])( + "dynamic_sunsubscribe_simple_%p", + async (clusterMode) => { + const minVersion = "7.0.0"; + + if (cmeCluster.checkIfServerVersionLessThan(minVersion)) { + return; + } + + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = "test-channel-" + getRandomKey(); + const message = "test-message"; + + // Create clients WITHOUT any subscription configuration + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + + // Subscribe to sharded channel + const channels = new Set([channel]); + await (listener as GlideClusterClient).ssubscribe(channels); + + // Wait for subscription to be established + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Unsubscribe from sharded channel (blocking) + await (listener as GlideClusterClient).sunsubscribe(channels, { + timeout: 30000, + }); + + // Wait for unsubscribe to propagate + await new Promise((resolve) => setTimeout(resolve, 5000)); + + // Verify no message in queue before publishing + const msgBefore = listener.tryGetPubSubMessage(); + expect(msgBefore).toBeNull(); + + // Publish message AFTER unsubscribe (sharded publish) + await (sender as GlideClusterClient).publish( + message, + channel, + true, + ); + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Should not receive message + const msg = listener.tryGetPubSubMessage(); + expect(msg).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic sharded subscription without pre-configuration (cluster only). + * + * This test verifies that a cluster client can dynamically subscribe to a sharded + * channel at runtime. Requires Valkey 7.0+. + * + * @param clusterMode - Must be true (cluster mode only). + */ + it.each([true])( + "dynamic_ssubscribe_lazy_%p", + async (clusterMode) => { + const minVersion = "7.0.0"; + + if (cmeCluster.checkIfServerVersionLessThan(minVersion)) { + return; + } + + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message = getRandomKey(); + + // Create clients WITHOUT any subscription configuration + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + + // Dynamically subscribe to sharded channel + await (listener as GlideClusterClient).ssubscribe( + new Set([channel]), + ); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish message to sharded channel + expect( + await (sender as GlideClusterClient).publish( + message, + channel, + true, + ), + ).toBeGreaterThan(0); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify message received + const pubsubMsg = await listener.getPubSubMessage(); + expect(pubsubMsg.message).toEqual(message); + expect(pubsubMsg.channel).toEqual(channel); + expect(pubsubMsg.pattern).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests dynamic sharded unsubscription (cluster only). + * + * This test verifies that a cluster client can dynamically unsubscribe from a + * sharded channel and stop receiving messages. Requires Valkey 7.0+. + * + * @param clusterMode - Must be true (cluster mode only). + */ + it.each([true])( + "dynamic_sunsubscribe_%p", + async (clusterMode) => { + const minVersion = "7.0.0"; + + if (cmeCluster.checkIfServerVersionLessThan(minVersion)) { + return; + } + + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message1 = getRandomKey(); + const message2 = getRandomKey(); + + // Create clients WITHOUT any subscription configuration + listener = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + + // Dynamically subscribe to sharded channel + await (listener as GlideClusterClient).ssubscribe( + new Set([channel]), + ); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish first message + expect( + await (sender as GlideClusterClient).publish( + message1, + channel, + true, + ), + ).toBeGreaterThan(0); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify first message received + const pubsubMsg1 = await listener.getPubSubMessage(); + expect(pubsubMsg1.message).toEqual(message1); + expect(pubsubMsg1.channel).toEqual(channel); + + // Dynamically unsubscribe from sharded channel + await (listener as GlideClusterClient).sunsubscribe( + new Set([channel]), + ); + + // Drain any pending messages that were published before unsubscribe + while (listener.tryGetPubSubMessage() !== null) { + // Keep draining + } + + // Publish second message + await (sender as GlideClusterClient).publish( + message2, + channel, + true, + ); + + // Allow time for potential message delivery + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Verify no message received after unsubscribe + const pubsubMsg2 = listener.tryGetPubSubMessage(); + expect(pubsubMsg2).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests unsubscribing from pre-configured subscriptions. + * + * This test verifies that a client can dynamically unsubscribe from channels + * that were pre-configured during client creation. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "dynamic_unsubscribe_from_preconfigured_%p", + async (clusterMode) => { + let listener: TGlideClient | null = null; + let sender: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + const message1 = getRandomKey(); + const message2 = getRandomKey(); + + // Create client with pre-configured subscription + const pubSub = createPubSubSubscription( + clusterMode, + { + [GlideClusterClientConfiguration.PubSubChannelModes + .Exact]: new Set([channel as string]), + }, + { + [GlideClientConfiguration.PubSubChannelModes.Exact]: + new Set([channel as string]), + }, + ); + + if (clusterMode) { + listener = await GlideClusterClient.createClient({ + pubsubSubscriptions: pubSub, + ...getOptions(clusterMode), + }); + sender = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + listener = await GlideClient.createClient({ + pubsubSubscriptions: pubSub, + ...getOptions(clusterMode), + }); + sender = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Publish first message + expect(await sender.publish(message1, channel)).toBeGreaterThan( + 0, + ); + + // Allow time for message delivery + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify first message received + const pubsubMsg1 = await listener.getPubSubMessage(); + expect(pubsubMsg1.message).toEqual(message1); + expect(pubsubMsg1.channel).toEqual(channel); + + // Dynamically unsubscribe from pre-configured channel + await listener.unsubscribe(new Set([channel])); + + // Drain any pending messages that were published before unsubscribe + while (listener.tryGetPubSubMessage() !== null) { + // Keep draining + } + + // Publish second message + await sender.publish(message2, channel); + + // Allow time for potential message delivery + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Verify no message received after unsubscribe + const pubsubMsg2 = listener.tryGetPubSubMessage(); + expect(pubsubMsg2).toBeNull(); + } finally { + if (listener) { + listener.close(); + } + + if (sender) { + sender.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests that subscription metrics exist in statistics. + * + * This test verifies that the getStatistics method returns subscription-related + * metrics including out-of-sync count and last sync timestamp. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "test_subscription_metrics_in_statistics_%p", + async (clusterMode) => { + let client: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + + // Create client WITHOUT any subscription configuration + if (clusterMode) { + client = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + client = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Dynamically subscribe to channel + await client.subscribe(new Set([channel])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Get statistics + const stats = (await client.getStatistics()) as Record< + string, + string + >; + + // Verify subscription metrics exist + expect(stats).toHaveProperty("subscription_out_of_sync_count"); + expect(stats).toHaveProperty( + "subscription_last_sync_timestamp", + ); + + // Verify metrics are valid (non-negative numbers) + const outOfSyncCount = parseInt( + stats["subscription_out_of_sync_count"], + ); + const lastSyncTimestamp = parseInt( + stats["subscription_last_sync_timestamp"], + ); + + expect(outOfSyncCount).toBeGreaterThanOrEqual(0); + expect(lastSyncTimestamp).toBeGreaterThanOrEqual(0); + } finally { + if (client) { + client.close(); + } + } + }, + TIMEOUT, + ); + + /** + * Tests that subscription timestamp updates after subscribe. + * + * This test verifies that the subscription_last_sync_timestamp metric + * is updated after a subscription operation. + * + * @param clusterMode - Indicates if the test should be run in cluster mode. + */ + it.each([true, false])( + "test_subscription_timestamp_updates_after_subscribe_%p", + async (clusterMode) => { + let client: TGlideClient | null = null; + + try { + const channel = getRandomKey(); + + // Create client WITHOUT any subscription configuration + if (clusterMode) { + client = await GlideClusterClient.createClient( + getOptions(clusterMode), + ); + } else { + client = await GlideClient.createClient( + getOptions(clusterMode), + ); + } + + // Get initial statistics + const statsBefore = (await client.getStatistics()) as Record< + string, + string + >; + const timestampBefore = parseInt( + statsBefore["subscription_last_sync_timestamp"], + ); + + // Dynamically subscribe to channel + await client.subscribe(new Set([channel])); + + // Allow time for subscription to propagate + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Get updated statistics + const statsAfter = (await client.getStatistics()) as Record< + string, + string + >; + const timestampAfter = parseInt( + statsAfter["subscription_last_sync_timestamp"], + ); + + // Verify timestamp was updated (increased or stayed the same) + expect(timestampAfter).toBeGreaterThanOrEqual(timestampBefore); + } finally { + if (client) { + client.close(); + } + } + }, + TIMEOUT, + ); }); From e7973b722eb65e286a7fb5ecd9f92a6525da415e Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 11 Feb 2026 17:10:41 -0800 Subject: [PATCH 3/8] lint fix Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- node/rust-client/src/lib.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/node/rust-client/src/lib.rs b/node/rust-client/src/lib.rs index a6cd08c2713..05b144f0929 100644 --- a/node/rust-client/src/lib.rs +++ b/node/rust-client/src/lib.rs @@ -704,7 +704,8 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result> { let total_bytes_decompressed = Telemetry::total_bytes_decompressed().to_string(); let compression_skipped_count = Telemetry::compression_skipped_count().to_string(); let subscription_out_of_sync_count = Telemetry::subscription_out_of_sync_count().to_string(); - let subscription_last_sync_timestamp = Telemetry::subscription_last_sync_timestamp().to_string(); + let subscription_last_sync_timestamp = + Telemetry::subscription_last_sync_timestamp().to_string(); let mut stats = Object::new(env)?; stats.set_named_property("total_connections", total_connections)?; @@ -715,8 +716,14 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result> { stats.set_named_property("total_bytes_compressed", total_bytes_compressed)?; stats.set_named_property("total_bytes_decompressed", total_bytes_decompressed)?; stats.set_named_property("compression_skipped_count", compression_skipped_count)?; - stats.set_named_property("subscription_out_of_sync_count", subscription_out_of_sync_count)?; - stats.set_named_property("subscription_last_sync_timestamp", subscription_last_sync_timestamp)?; + stats.set_named_property( + "subscription_out_of_sync_count", + subscription_out_of_sync_count, + )?; + stats.set_named_property( + "subscription_last_sync_timestamp", + subscription_last_sync_timestamp, + )?; Ok(stats) } From 514066f3e676f10562a5f0ed8b78eb3bc93edd44 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 11 Feb 2026 17:11:12 -0800 Subject: [PATCH 4/8] fix(core): avoid reverse DNS lookup in socket address formatting - Replace socket_addr.to_string() calls with explicit format!() to prevent reverse DNS lookups - Add socket_addr parameter to MultiplexedConnection::new_with_stream() for accurate address tracking - Update address_string construction in PushManager initialization to use actual socket address when available - Pass socket_addr through client connection flow to cluster async module Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- .../redis-rs/redis/src/aio/multiplexed_connection.rs | 12 +++++++++++- glide-core/redis-rs/redis/src/client.rs | 1 + glide-core/redis-rs/redis/src/cluster_async/mod.rs | 11 +++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index 130b8491f11..1f3b3027dcb 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -25,6 +25,7 @@ use pin_project_lite::pin_project; use std::collections::VecDeque; use std::fmt; use std::fmt::Debug; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -605,6 +606,7 @@ impl MultiplexedConnection { stream, std::time::Duration::MAX, glide_connection_options, + None, // No socket_addr available in this path ) .await } @@ -616,6 +618,7 @@ impl MultiplexedConnection { stream: C, response_timeout: std::time::Duration, glide_connection_options: GlideConnectionOptions, + socket_addr: Option, ) -> RedisResult<(Self, impl Future)> where C: Unpin + AsyncRead + AsyncWrite + Send + 'static, @@ -626,10 +629,17 @@ impl MultiplexedConnection { let (mut pipeline, driver) = Pipeline::new(codec, glide_connection_options.disconnect_notifier); let driver = Box::pin(driver); + + // Use the actual socket address if available, otherwise fall back to connection_info.addr + let address_string = if let Some(socket_addr) = socket_addr { + format!("{}:{}", socket_addr.ip(), socket_addr.port()) + } else { + connection_info.addr.to_string() + }; let pm = PushManager::new( glide_connection_options.push_sender, glide_connection_options.pubsub_synchronizer, - Some(connection_info.addr.to_string()), + Some(address_string), ); pipeline.set_push_manager(pm.clone()); diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 4fb1e0aeb71..1064d4c050d 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -445,6 +445,7 @@ impl Client { con, response_timeout, glide_connection_options, + socket_addr, ) .await .map(|res| (res.0, res.1, ip)) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 5670a1c30d1..8060a2c699a 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1290,7 +1290,9 @@ where .await .get_node(); let node_address = if let Some(socket_addr) = socket_addr { - socket_addr.to_string() + // Use format! to avoid reverse DNS lookup that socket_addr.to_string() performs + let addr = format!("{}:{}", socket_addr.ip(), socket_addr.port()); + addr } else { node_addr }; @@ -2250,7 +2252,12 @@ where let conn_lock = inner.conn_lock.read().expect(MUTEX_READ_ERR); socket_addresses.find_map(|socket_addr| { - conn_lock.node_for_address(&socket_addr.to_string()) + let addr_str = format!( + "{}:{}", + socket_addr.ip(), + socket_addr.port() + ); + conn_lock.node_for_address(&addr_str) }) }, ); From c1e0a5dabbf21fa050753733599a79e2df831af2 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 11 Feb 2026 17:48:35 -0800 Subject: [PATCH 5/8] Potential fix for code scanning alert no. 65: Incomplete string escaping or encoding Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- node/tests/PubSub.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/tests/PubSub.test.ts b/node/tests/PubSub.test.ts index ab103b9ccce..528bde609b1 100644 --- a/node/tests/PubSub.test.ts +++ b/node/tests/PubSub.test.ts @@ -4273,7 +4273,7 @@ describe("PubSub", () => { try { const pattern = `${getRandomKey()}*`; - const channel = pattern.replace("*", getRandomKey()); + const channel = pattern.replace(/\*/g, getRandomKey()); const message = getRandomKey(); // Create clients WITHOUT any subscription configuration From 417cc544bbbf08afa9c614bb88f91387a4cc455c Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Thu, 12 Feb 2026 15:16:43 -0800 Subject: [PATCH 6/8] Added changelog Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2827443b7a5..c839a088c6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Node: Drop support for Node.js 16.x and 18.x. Minimum supported version is now Node.js 20.x. #### Changes +* Node: add dynamic PubSub support ([#5295](https://github.com/valkey-io/valkey-glide/pull/5295)) * JAVA: Add EVAL_RO, EVALSHA_RO, and SCRIPT DEBUG commands ([#5125](https://github.com/valkey-io/valkey-glide/pull/5125)) * CORE: Add client certificate and private key support for mTLS ([#5092](https://github.com/valkey-io/valkey-glide/issues/5092)) * Python: Add client certificate and private key support for mTLS ([5123](https://github.com/valkey-io/valkey-glide/issues/5123)) From 03acfac9ddf8041a1725583d0ffeaec7e2bf98cc Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Fri, 13 Feb 2026 14:30:08 -0800 Subject: [PATCH 7/8] - Rename subscribe/unsubscribe methods to subscribeLazy/unsubscribeLazy for non-blocking operations - Rename psubscribe/punsubscribe methods to psubscribeLazy/punsubscribeLazy for non-blocking pattern operations - Add blocking variants (subscribe, psubscribe, unsubscribe, punsubscribe) that accept timeoutMs parameter - Introduce ALL_CHANNELS and ALL_PATTERNS constants for unsubscribing from all subscriptions - Update method signatures to clarify blocking vs non-blocking behavior with explicit timeout parameters - Simplify API by removing optional timeout from options object in favor of explicit timeoutMs parameter - Update PubSub tests to use new lazy and blocking method variants Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- node/src/BaseClient.ts | 284 ++++++++++++++++++++++----------- node/src/Commands.ts | 24 +-- node/src/GlideClusterClient.ts | 148 +++++++++++------ node/tests/PubSub.test.ts | 49 +++--- 4 files changed, 327 insertions(+), 178 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index b91b13f6672..d0e5eeb3fd4 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -186,13 +186,13 @@ import { createPubSubNumPat, createPubSubNumSub, createPSubscribe, - createPSubscribeBlocking, + createPSubscribeLazy, createPUnsubscribe, - createPUnsubscribeBlocking, + createPUnsubscribeLazy, createSubscribe, - createSubscribeBlocking, + createSubscribeLazy, createUnsubscribe, - createUnsubscribeBlocking, + createUnsubscribeLazy, createRPop, createRPush, createRPushX, @@ -398,6 +398,28 @@ export type StreamEntryDataType = Record; */ export type Score = number | "+inf" | "-inf"; +/** + * Constant representing "all channels" for unsubscribe operations. + * Use this to unsubscribe from all exact channels at once. + * + * @example + * ```typescript + * await client.unsubscribeLazy(ALL_CHANNELS); + * ``` + */ +export const ALL_CHANNELS = null; + +/** + * Constant representing "all patterns" for punsubscribe operations. + * Use this to unsubscribe from all pattern subscriptions at once. + * + * @example + * ```typescript + * await client.punsubscribeLazy(ALL_PATTERNS); + * ``` + */ +export const ALL_PATTERNS = null; + /** * Data type which represents sorted sets data for input parameter of ZADD command, * including element and its respective score. @@ -9512,156 +9534,230 @@ export class BaseClient { } /** - * Subscribes the client to the specified channels. + * Subscribes the client to the specified channels (non-blocking). + * Returns immediately without waiting for subscription confirmation. * * @see {@link https://valkey.io/commands/subscribe/|valkey.io} for details. * * @param channels - A set of channel names to subscribe to. - * @param options - (Optional) Additional parameters: - * - timeout: Maximum time in milliseconds to wait for subscription confirmation. - * - decoder: See {@link DecoderOption}. - * @returns A promise that resolves when the subscription is complete. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await client.subscribeLazy(new Set(["news", "updates"])); + * ``` + */ + public async subscribeLazy( + channels: Set, + options?: DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + return this.createWritePromise( + createSubscribeLazy(channelsArray), + options, + ); + } + + /** + * Subscribes the client to the specified channels (blocking). + * Waits for subscription confirmation or until timeout. + * + * @see {@link https://valkey.io/commands/subscribe/|valkey.io} for details. + * + * @param channels - A set of channel names to subscribe to. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when subscription is confirmed or timeout occurs. * * @example * ```typescript - * await client.subscribe(new Set(["news", "updates"])); - * // With timeout - * await client.subscribe(new Set(["news"]), { timeout: 5000 }); + * // Wait up to 5 seconds + * await client.subscribe(new Set(["news"]), 5000); + * // Wait indefinitely + * await client.subscribe(new Set(["news"]), 0); * ``` */ public async subscribe( channels: Set, - options?: { timeout?: number } & DecoderOption, + timeoutMs: number, + options?: DecoderOption, ): Promise { const channelsArray = Array.from(channels); + return this.createWritePromise( + createSubscribe(channelsArray, timeoutMs), + options, + ); + } - if (options?.timeout !== undefined) { - return this.createWritePromise( - createSubscribeBlocking(channelsArray, options.timeout), - options, - ); - } else { - return this.createWritePromise( - createSubscribe(channelsArray), - options, - ); - } + /** + * Subscribes the client to the specified patterns (non-blocking). + * Returns immediately without waiting for subscription confirmation. + * + * @see {@link https://valkey.io/commands/psubscribe/|valkey.io} for details. + * + * @param patterns - A set of glob-style patterns to subscribe to. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. + * + * @example + * ```typescript + * await client.psubscribeLazy(new Set(["news.*", "updates.*"])); + * ``` + */ + public async psubscribeLazy( + patterns: Set, + options?: DecoderOption, + ): Promise { + const patternsArray = Array.from(patterns); + return this.createWritePromise( + createPSubscribeLazy(patternsArray), + options, + ); } /** - * Subscribes the client to the specified patterns. + * Subscribes the client to the specified patterns (blocking). + * Waits for subscription confirmation or until timeout. * * @see {@link https://valkey.io/commands/psubscribe/|valkey.io} for details. * * @param patterns - A set of glob-style patterns to subscribe to. - * @param options - (Optional) Additional parameters: - * - timeout: Maximum time in milliseconds to wait for subscription confirmation. - * - decoder: See {@link DecoderOption}. - * @returns A promise that resolves when the subscription is complete. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when subscription is confirmed or timeout occurs. * * @example * ```typescript - * await client.psubscribe(new Set(["news.*", "updates.*"])); + * await client.psubscribe(new Set(["news.*"]), 5000); * ``` */ public async psubscribe( patterns: Set, - options?: { timeout?: number } & DecoderOption, + timeoutMs: number, + options?: DecoderOption, ): Promise { const patternsArray = Array.from(patterns); - - if (options?.timeout !== undefined) { - return this.createWritePromise( - createPSubscribeBlocking(patternsArray, options.timeout), - options, - ); - } else { - return this.createWritePromise( - createPSubscribe(patternsArray), - options, - ); - } + return this.createWritePromise( + createPSubscribe(patternsArray, timeoutMs), + options, + ); } /** - * Unsubscribes the client from the specified channels. - * If no channels are provided, unsubscribes from all exact channels. + * Unsubscribes the client from the specified channels (non-blocking). + * Pass null to unsubscribe from all exact channels. * * @see {@link https://valkey.io/commands/unsubscribe/|valkey.io} for details. * - * @param channels - (Optional) A set of channel names to unsubscribe from. - * @param options - (Optional) Additional parameters: - * - timeout: Maximum time in milliseconds to wait for unsubscription confirmation. - * - decoder: See {@link DecoderOption}. - * @returns A promise that resolves when the unsubscription is complete. + * @param channels - Channel names to unsubscribe from, or null for all channels. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. * * @example * ```typescript - * await client.unsubscribe(new Set(["news"])); + * await client.unsubscribeLazy(new Set(["news"])); * // Unsubscribe from all channels - * await client.unsubscribe(); + * await client.unsubscribeLazy(ALL_CHANNELS); * ``` */ - public async unsubscribe( - channels?: Set, - options?: { timeout?: number } & DecoderOption, + public async unsubscribeLazy( + channels?: Set | null, + options?: DecoderOption, ): Promise { const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createUnsubscribeLazy(channelsArray), + options, + ); + } - if (options?.timeout !== undefined) { - // For blocking unsubscribe, we need to provide channels (empty array if none) - return this.createWritePromise( - createUnsubscribeBlocking(channelsArray ?? [], options.timeout), - options, - ); - } else { - return this.createWritePromise( - createUnsubscribe(channelsArray), - options, - ); - } + /** + * Unsubscribes the client from the specified channels (blocking). + * Pass null to unsubscribe from all exact channels. + * + * @see {@link https://valkey.io/commands/unsubscribe/|valkey.io} for details. + * + * @param channels - Channel names to unsubscribe from, or null for all channels. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when unsubscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await client.unsubscribe(new Set(["news"]), 5000); + * // Unsubscribe from all channels with timeout + * await client.unsubscribe(ALL_CHANNELS, 5000); + * ``` + */ + public async unsubscribe( + channels: Set | null, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createUnsubscribe(channelsArray ?? [], timeoutMs), + options, + ); } /** - * Unsubscribes the client from the specified patterns. - * If no patterns are provided, unsubscribes from all patterns. + * Unsubscribes the client from the specified patterns (non-blocking). + * Pass null to unsubscribe from all patterns. * * @see {@link https://valkey.io/commands/punsubscribe/|valkey.io} for details. * - * @param patterns - (Optional) A set of patterns to unsubscribe from. - * @param options - (Optional) Additional parameters: - * - timeout: Maximum time in milliseconds to wait for unsubscription confirmation. - * - decoder: See {@link DecoderOption}. - * @returns A promise that resolves when the unsubscription is complete. + * @param patterns - Pattern names to unsubscribe from, or null for all patterns. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. * * @example * ```typescript - * await client.punsubscribe(new Set(["news.*"])); + * await client.punsubscribeLazy(new Set(["news.*"])); * // Unsubscribe from all patterns - * await client.punsubscribe(); + * await client.punsubscribeLazy(ALL_PATTERNS); * ``` */ - public async punsubscribe( - patterns?: Set, - options?: { timeout?: number } & DecoderOption, + public async punsubscribeLazy( + patterns?: Set | null, + options?: DecoderOption, ): Promise { const patternsArray = patterns ? Array.from(patterns) : undefined; + return this.createWritePromise( + createPUnsubscribeLazy(patternsArray), + options, + ); + } - if (options?.timeout !== undefined) { - // For blocking punsubscribe, we need to provide patterns (empty array if none) - return this.createWritePromise( - createPUnsubscribeBlocking( - patternsArray ?? [], - options.timeout, - ), - options, - ); - } else { - return this.createWritePromise( - createPUnsubscribe(patternsArray), - options, - ); - } + /** + * Unsubscribes the client from the specified patterns (blocking). + * Pass null to unsubscribe from all patterns. + * + * @see {@link https://valkey.io/commands/punsubscribe/|valkey.io} for details. + * + * @param patterns - Pattern names to unsubscribe from, or null for all patterns. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when unsubscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await client.punsubscribe(new Set(["news.*"]), 5000); + * // Unsubscribe from all patterns with timeout + * await client.punsubscribe(ALL_PATTERNS, 5000); + * ``` + */ + public async punsubscribe( + patterns: Set | null, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const patternsArray = patterns ? Array.from(patterns) : undefined; + return this.createWritePromise( + createPUnsubscribe(patternsArray ?? [], timeoutMs), + options, + ); } /** diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 62d971d789d..d60137d21dd 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -4546,7 +4546,7 @@ export function createPubSubShardNumSub( /** * @internal */ -export function createSubscribe( +export function createSubscribeLazy( channels: GlideString[], ): command_request.Command { return createCommand(RequestType.Subscribe, channels); @@ -4555,7 +4555,7 @@ export function createSubscribe( /** * @internal */ -export function createSubscribeBlocking( +export function createSubscribe( channels: GlideString[], timeout: number, ): command_request.Command { @@ -4568,7 +4568,7 @@ export function createSubscribeBlocking( /** * @internal */ -export function createPSubscribe( +export function createPSubscribeLazy( patterns: GlideString[], ): command_request.Command { return createCommand(RequestType.PSubscribe, patterns); @@ -4577,7 +4577,7 @@ export function createPSubscribe( /** * @internal */ -export function createPSubscribeBlocking( +export function createPSubscribe( patterns: GlideString[], timeout: number, ): command_request.Command { @@ -4590,7 +4590,7 @@ export function createPSubscribeBlocking( /** * @internal */ -export function createUnsubscribe( +export function createUnsubscribeLazy( channels?: GlideString[], ): command_request.Command { return createCommand(RequestType.Unsubscribe, channels ? channels : []); @@ -4599,7 +4599,7 @@ export function createUnsubscribe( /** * @internal */ -export function createUnsubscribeBlocking( +export function createUnsubscribe( channels: GlideString[], timeout: number, ): command_request.Command { @@ -4612,7 +4612,7 @@ export function createUnsubscribeBlocking( /** * @internal */ -export function createPUnsubscribe( +export function createPUnsubscribeLazy( patterns?: GlideString[], ): command_request.Command { return createCommand(RequestType.PUnsubscribe, patterns ? patterns : []); @@ -4621,7 +4621,7 @@ export function createPUnsubscribe( /** * @internal */ -export function createPUnsubscribeBlocking( +export function createPUnsubscribe( patterns: GlideString[], timeout: number, ): command_request.Command { @@ -4634,7 +4634,7 @@ export function createPUnsubscribeBlocking( /** * @internal */ -export function createSSubscribe( +export function createSSubscribeLazy( channels: GlideString[], ): command_request.Command { return createCommand(RequestType.SSubscribe, channels); @@ -4643,7 +4643,7 @@ export function createSSubscribe( /** * @internal */ -export function createSSubscribeBlocking( +export function createSSubscribe( channels: GlideString[], timeout: number, ): command_request.Command { @@ -4656,7 +4656,7 @@ export function createSSubscribeBlocking( /** * @internal */ -export function createSUnsubscribe( +export function createSUnsubscribeLazy( channels?: GlideString[], ): command_request.Command { return createCommand(RequestType.SUnsubscribe, channels ? channels : []); @@ -4665,7 +4665,7 @@ export function createSUnsubscribe( /** * @internal */ -export function createSUnsubscribeBlocking( +export function createSUnsubscribe( channels: GlideString[], timeout: number, ): command_request.Command { diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 665b22e2658..ee2c2acbdc3 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -65,13 +65,25 @@ import { createScriptExists, createScriptFlush, createScriptKill, + createSSubscribeLazy, createSSubscribe, - createSSubscribeBlocking, + createSUnsubscribeLazy, createSUnsubscribe, - createSUnsubscribeBlocking, createTime, createUnWatch, } from "./Commands"; + +/** + * Constant representing all sharded channels. + * Use this with unsubscribe methods to unsubscribe from all sharded channels. + * + * @example + * ```typescript + * await client.sunsubscribeLazy(ALL_SHARDED_CHANNELS); + * ``` + */ +export const ALL_SHARDED_CHANNELS = null; + /** An extension to command option types with {@link Routes}. */ export interface RouteOption { /** @@ -2023,82 +2035,122 @@ export class GlideClusterClient extends BaseClient { } /** - * Subscribes the client to the specified sharded channels. + * Subscribes the client to the specified sharded channels (non-blocking). + * Returns immediately without waiting for subscription confirmation. * Available since Valkey 7.0. * * @see {@link https://valkey.io/commands/ssubscribe/|valkey.io} for details. * * @param channels - A set of sharded channel names to subscribe to. - * @param options - (Optional) Additional parameters: - * - timeout: Maximum time in milliseconds to wait for subscription confirmation. - * - decoder: See {@link DecoderOption}. - * @returns A promise that resolves when the subscription is complete. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. * * @example * ```typescript - * await clusterClient.ssubscribe(new Set(["shard-channel-1"])); + * await clusterClient.ssubscribeLazy(new Set(["shard-channel-1"])); * ``` */ - public async ssubscribe( + public async ssubscribeLazy( channels: Set, - options?: { timeout?: number } & DecoderOption, + options?: DecoderOption, ): Promise { const channelsArray = Array.from(channels); + return this.createWritePromise( + createSSubscribeLazy(channelsArray), + options, + ); + } - if (options?.timeout !== undefined) { - return this.createWritePromise( - createSSubscribeBlocking(channelsArray, options.timeout), - options, - ); - } else { - return this.createWritePromise( - createSSubscribe(channelsArray), - options, - ); - } + /** + * Subscribes the client to the specified sharded channels (blocking). + * Waits for subscription confirmation or until timeout. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/ssubscribe/|valkey.io} for details. + * + * @param channels - A set of sharded channel names to subscribe to. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when subscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * // Wait up to 5 seconds + * await clusterClient.ssubscribe(new Set(["shard-channel-1"]), 5000); + * // Wait indefinitely + * await clusterClient.ssubscribe(new Set(["shard-channel-1"]), 0); + * ``` + */ + public async ssubscribe( + channels: Set, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = Array.from(channels); + return this.createWritePromise( + createSSubscribe(channelsArray, timeoutMs), + options, + ); } /** - * Unsubscribes the client from the specified sharded channels. - * If no channels are provided, unsubscribes from all sharded channels. + * Unsubscribes the client from the specified sharded channels (non-blocking). + * Pass null to unsubscribe from all sharded channels. * Available since Valkey 7.0. * * @see {@link https://valkey.io/commands/sunsubscribe/|valkey.io} for details. * - * @param channels - (Optional) A set of sharded channel names to unsubscribe from. - * @param options - (Optional) Additional parameters: - * - timeout: Maximum time in milliseconds to wait for unsubscription confirmation. - * - decoder: See {@link DecoderOption}. - * @returns A promise that resolves when the unsubscription is complete. + * @param channels - Sharded channel names to unsubscribe from, or null for all channels. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves immediately. * * @example * ```typescript - * await clusterClient.sunsubscribe(new Set(["shard-channel-1"])); + * await clusterClient.sunsubscribeLazy(new Set(["shard-channel-1"])); * // Unsubscribe from all sharded channels - * await clusterClient.sunsubscribe(); + * await clusterClient.sunsubscribeLazy(ALL_SHARDED_CHANNELS); * ``` */ - public async sunsubscribe( - channels?: Set, - options?: { timeout?: number } & DecoderOption, + public async sunsubscribeLazy( + channels?: Set | null, + options?: DecoderOption, ): Promise { const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createSUnsubscribeLazy(channelsArray), + options, + ); + } - if (options?.timeout !== undefined) { - // For blocking sunsubscribe, we need to provide channels (empty array if none) - return this.createWritePromise( - createSUnsubscribeBlocking( - channelsArray ?? [], - options.timeout, - ), - options, - ); - } else { - return this.createWritePromise( - createSUnsubscribe(channelsArray), - options, - ); - } + /** + * Unsubscribes the client from the specified sharded channels (blocking). + * Pass null to unsubscribe from all sharded channels. + * Available since Valkey 7.0. + * + * @see {@link https://valkey.io/commands/sunsubscribe/|valkey.io} for details. + * + * @param channels - Sharded channel names to unsubscribe from, or null for all channels. + * @param timeoutMs - Maximum time in milliseconds to wait. Use 0 for indefinite wait. + * @param options - (Optional) See {@link DecoderOption}. + * @returns A promise that resolves when unsubscription is confirmed or timeout occurs. + * + * @example + * ```typescript + * await clusterClient.sunsubscribe(new Set(["shard-channel-1"]), 5000); + * // Unsubscribe from all sharded channels with timeout + * await clusterClient.sunsubscribe(ALL_SHARDED_CHANNELS, 5000); + * ``` + */ + public async sunsubscribe( + channels: Set | null, + timeoutMs: number, + options?: DecoderOption, + ): Promise { + const channelsArray = channels ? Array.from(channels) : undefined; + return this.createWritePromise( + createSUnsubscribe(channelsArray ?? [], timeoutMs), + options, + ); } /** diff --git a/node/tests/PubSub.test.ts b/node/tests/PubSub.test.ts index 528bde609b1..0e71bd105a4 100644 --- a/node/tests/PubSub.test.ts +++ b/node/tests/PubSub.test.ts @@ -4225,8 +4225,8 @@ describe("PubSub", () => { ); } - // Dynamically subscribe to channel - await listener.subscribe(new Set([channel])); + // Dynamically subscribe to channel (non-blocking) + await listener.subscribeLazy(new Set([channel])); // Allow time for subscription to propagate await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -4293,8 +4293,8 @@ describe("PubSub", () => { ); } - // Dynamically subscribe to pattern - await listener.psubscribe(new Set([pattern])); + // Dynamically subscribe to pattern (non-blocking) + await listener.psubscribeLazy(new Set([pattern])); // Allow time for subscription to propagate await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -4358,15 +4358,15 @@ describe("PubSub", () => { ); } - // Subscribe + // Subscribe (non-blocking) const channels = new Set([channel]); - await listener.subscribe(channels); + await listener.subscribeLazy(channels); // Wait for subscription to be established await new Promise((resolve) => setTimeout(resolve, 2000)); // Unsubscribe (blocking - waits for reconciliation) - await listener.unsubscribe(channels, { timeout: 30000 }); + await listener.unsubscribe(channels, 30000); // Wait longer for unsubscribe to fully propagate across all cluster nodes // In a 3-primary cluster, this can take time @@ -4428,17 +4428,18 @@ describe("PubSub", () => { getOptions(clusterMode), ); - // Subscribe to sharded channel + // Subscribe to sharded channel (non-blocking) const channels = new Set([channel]); - await (listener as GlideClusterClient).ssubscribe(channels); + await (listener as GlideClusterClient).ssubscribeLazy(channels); // Wait for subscription to be established await new Promise((resolve) => setTimeout(resolve, 2000)); // Unsubscribe from sharded channel (blocking) - await (listener as GlideClusterClient).sunsubscribe(channels, { - timeout: 30000, - }); + await (listener as GlideClusterClient).sunsubscribe( + channels, + 30000, + ); // Wait for unsubscribe to propagate await new Promise((resolve) => setTimeout(resolve, 5000)); @@ -4503,8 +4504,8 @@ describe("PubSub", () => { getOptions(clusterMode), ); - // Dynamically subscribe to sharded channel - await (listener as GlideClusterClient).ssubscribe( + // Dynamically subscribe to sharded channel (non-blocking) + await (listener as GlideClusterClient).ssubscribeLazy( new Set([channel]), ); @@ -4574,8 +4575,8 @@ describe("PubSub", () => { getOptions(clusterMode), ); - // Dynamically subscribe to sharded channel - await (listener as GlideClusterClient).ssubscribe( + // Dynamically subscribe to sharded channel (non-blocking) + await (listener as GlideClusterClient).ssubscribeLazy( new Set([channel]), ); @@ -4599,8 +4600,8 @@ describe("PubSub", () => { expect(pubsubMsg1.message).toEqual(message1); expect(pubsubMsg1.channel).toEqual(channel); - // Dynamically unsubscribe from sharded channel - await (listener as GlideClusterClient).sunsubscribe( + // Dynamically unsubscribe from sharded channel (non-blocking) + await (listener as GlideClusterClient).sunsubscribeLazy( new Set([channel]), ); @@ -4701,8 +4702,8 @@ describe("PubSub", () => { expect(pubsubMsg1.message).toEqual(message1); expect(pubsubMsg1.channel).toEqual(channel); - // Dynamically unsubscribe from pre-configured channel - await listener.unsubscribe(new Set([channel])); + // Dynamically unsubscribe from pre-configured channel (non-blocking) + await listener.unsubscribeLazy(new Set([channel])); // Drain any pending messages that were published before unsubscribe while (listener.tryGetPubSubMessage() !== null) { @@ -4758,8 +4759,8 @@ describe("PubSub", () => { ); } - // Dynamically subscribe to channel - await client.subscribe(new Set([channel])); + // Dynamically subscribe to channel (non-blocking) + await client.subscribeLazy(new Set([channel])); // Allow time for subscription to propagate await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -4831,8 +4832,8 @@ describe("PubSub", () => { statsBefore["subscription_last_sync_timestamp"], ); - // Dynamically subscribe to channel - await client.subscribe(new Set([channel])); + // Dynamically subscribe to channel (non-blocking) + await client.subscribeLazy(new Set([channel])); // Allow time for subscription to propagate await new Promise((resolve) => setTimeout(resolve, 1000)); From 98ea10f6cc97065743ea531ca0ed18c9db4e8489 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Fri, 13 Feb 2026 18:39:37 -0800 Subject: [PATCH 8/8] - Remove duplicate Java validation comments in subscribe/unsubscribeLazy methods - Remove protobuf regeneration note from pubsubReconciliationIntervalMs configuration - Remove redundant "Create clients WITHOUT subscription configuration" comments from PubSub tests Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- node/src/BaseClient.ts | 4 ---- node/tests/PubSub.test.ts | 6 ------ 2 files changed, 10 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index d0e5eeb3fd4..0b599024fb3 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -1674,7 +1674,6 @@ export class BaseClient { ); } - // Match Java's validation: only throw error if BOTH config exists AND callback exists // Java: if (subscriptionConfiguration.isPresent() && subscriptionConfiguration.get().getCallback().isPresent()) if ( this.isPubsubConfigured(this.config!) && @@ -1698,7 +1697,6 @@ export class BaseClient { ); } - // Match Java's validation: only throw error if BOTH config exists AND callback exists // Java: if (subscriptionConfiguration.isPresent() && subscriptionConfiguration.get().getCallback().isPresent()) if ( this.isPubsubConfigured(this.config!) && @@ -9286,8 +9284,6 @@ export class BaseClient { } // Set PubSub reconciliation interval if explicitly configured - // Note: This requires the protobuf files to be regenerated with: - // npm run build-protobuf (from the node directory) if (options.pubsubReconciliationIntervalMs !== undefined) { request.pubsubReconciliationIntervalMs = options.pubsubReconciliationIntervalMs; diff --git a/node/tests/PubSub.test.ts b/node/tests/PubSub.test.ts index 0e71bd105a4..88a419747eb 100644 --- a/node/tests/PubSub.test.ts +++ b/node/tests/PubSub.test.ts @@ -4208,7 +4208,6 @@ describe("PubSub", () => { const channel = getRandomKey(); const message = getRandomKey(); - // Create clients WITHOUT any subscription configuration if (clusterMode) { listener = await GlideClusterClient.createClient( getOptions(clusterMode), @@ -4276,7 +4275,6 @@ describe("PubSub", () => { const channel = pattern.replace(/\*/g, getRandomKey()); const message = getRandomKey(); - // Create clients WITHOUT any subscription configuration if (clusterMode) { listener = await GlideClusterClient.createClient( getOptions(clusterMode), @@ -4341,7 +4339,6 @@ describe("PubSub", () => { const channel = "test-channel-" + getRandomKey(); const message = "test-message"; - // Create clients WITHOUT any subscription configuration (like Java) if (clusterMode) { listener = await GlideClusterClient.createClient( getOptions(clusterMode), @@ -4420,7 +4417,6 @@ describe("PubSub", () => { const channel = "test-channel-" + getRandomKey(); const message = "test-message"; - // Create clients WITHOUT any subscription configuration listener = await GlideClusterClient.createClient( getOptions(clusterMode), ); @@ -4496,7 +4492,6 @@ describe("PubSub", () => { const channel = getRandomKey(); const message = getRandomKey(); - // Create clients WITHOUT any subscription configuration listener = await GlideClusterClient.createClient( getOptions(clusterMode), ); @@ -4567,7 +4562,6 @@ describe("PubSub", () => { const message1 = getRandomKey(); const message2 = getRandomKey(); - // Create clients WITHOUT any subscription configuration listener = await GlideClusterClient.createClient( getOptions(clusterMode), );