Skip to content

Commit fae0e41

Browse files
committed
Node: add dynamic PubSub support
- Add subscribe() and psubscribe() methods to BaseClient for channel and pattern subscriptions - Add unsubscribe() and punsubscribe() methods with optional channel/pattern filtering - Add pubsubReconciliationIntervalMs configuration option to AdvancedBaseClientConfiguration - Expose subscription telemetry metrics: subscription_out_of_sync_count and subscription_last_sync_timestamp - Import new PubSub command creators (subscribe, psubscribe, unsubscribe, punsubscribe variants) - Add JSDoc documentation with examples for all new subscription methods - Update Rust FFI bindings to expose subscription reconciliation interval configuration - Add test coverage for cluster client subscription functionality Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
1 parent ed079d8 commit fae0e41

File tree

6 files changed

+602
-1
lines changed

6 files changed

+602
-1
lines changed

node/rust-client/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,8 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result<Object<'a>> {
703703
let total_bytes_compressed = Telemetry::total_bytes_compressed().to_string();
704704
let total_bytes_decompressed = Telemetry::total_bytes_decompressed().to_string();
705705
let compression_skipped_count = Telemetry::compression_skipped_count().to_string();
706+
let subscription_out_of_sync_count = Telemetry::subscription_out_of_sync_count().to_string();
707+
let subscription_last_sync_timestamp = Telemetry::subscription_last_sync_timestamp().to_string();
706708

707709
let mut stats = Object::new(env)?;
708710
stats.set_named_property("total_connections", total_connections)?;
@@ -713,6 +715,8 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result<Object<'a>> {
713715
stats.set_named_property("total_bytes_compressed", total_bytes_compressed)?;
714716
stats.set_named_property("total_bytes_decompressed", total_bytes_decompressed)?;
715717
stats.set_named_property("compression_skipped_count", compression_skipped_count)?;
718+
stats.set_named_property("subscription_out_of_sync_count", subscription_out_of_sync_count)?;
719+
stats.set_named_property("subscription_last_sync_timestamp", subscription_last_sync_timestamp)?;
716720

717721
Ok(stats)
718722
}

node/src/BaseClient.ts

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,14 @@ import {
185185
createPubSubChannels,
186186
createPubSubNumPat,
187187
createPubSubNumSub,
188+
createPSubscribe,
189+
createPSubscribeBlocking,
190+
createPUnsubscribe,
191+
createPUnsubscribeBlocking,
192+
createSubscribe,
193+
createSubscribeBlocking,
194+
createUnsubscribe,
195+
createUnsubscribeBlocking,
188196
createRPop,
189197
createRPush,
190198
createRPushX,
@@ -936,6 +944,33 @@ export interface AdvancedBaseClientConfiguration {
936944
* - If not explicitly set, a default value of `true` will be used by the Rust core.
937945
*/
938946
tcpNoDelay?: boolean;
947+
948+
/**
949+
* The interval in milliseconds between PubSub subscription reconciliation attempts.
950+
*
951+
* The reconciliation process ensures that the client's desired subscriptions match
952+
* the actual subscriptions on the server. This is useful when subscriptions may have
953+
* been lost due to network issues or server restarts.
954+
*
955+
* If not explicitly set, the Rust core will use its default reconciliation interval.
956+
*
957+
* @remarks
958+
* - Must be a positive integer representing milliseconds.
959+
* - The reconciliation process runs automatically in the background.
960+
* - A lower interval provides faster recovery from subscription issues but increases overhead.
961+
* - A higher interval reduces overhead but may delay recovery from subscription issues.
962+
*
963+
* @example
964+
* ```typescript
965+
* const config: GlideClientConfiguration = {
966+
* addresses: [{ host: "localhost", port: 6379 }],
967+
* advancedConfiguration: {
968+
* pubsubReconciliationIntervalMs: 5000 // Reconcile every 5 seconds
969+
* }
970+
* };
971+
* ```
972+
*/
973+
pubsubReconciliationIntervalMs?: number;
939974
}
940975

941976
/**
@@ -9230,6 +9265,14 @@ export class BaseClient {
92309265
request.tcpNodelay = options.tcpNoDelay;
92319266
}
92329267

9268+
// Set PubSub reconciliation interval if explicitly configured
9269+
// Note: This requires the protobuf files to be regenerated with:
9270+
// npm run build-protobuf (from the node directory)
9271+
if (options.pubsubReconciliationIntervalMs !== undefined) {
9272+
request.pubsubReconciliationIntervalMs =
9273+
options.pubsubReconciliationIntervalMs;
9274+
}
9275+
92339276
// Apply TLS configuration if present
92349277
if (options.tlsAdvancedConfiguration) {
92359278
// request.tlsMode is either SecureTls or InsecureTls here
@@ -9470,6 +9513,202 @@ export class BaseClient {
94709513
return response; // "OK"
94719514
}
94729515

9516+
/**
9517+
* Subscribes the client to the specified channels.
9518+
*
9519+
* @see {@link https://valkey.io/commands/subscribe/|valkey.io} for details.
9520+
*
9521+
* @param channels - A set of channel names to subscribe to.
9522+
* @param options - (Optional) Additional parameters:
9523+
* - timeout: Maximum time in milliseconds to wait for subscription confirmation.
9524+
* - decoder: See {@link DecoderOption}.
9525+
* @returns A promise that resolves when the subscription is complete.
9526+
*
9527+
* @example
9528+
* ```typescript
9529+
* await client.subscribe(new Set(["news", "updates"]));
9530+
* // With timeout
9531+
* await client.subscribe(new Set(["news"]), { timeout: 5000 });
9532+
* ```
9533+
*/
9534+
public async subscribe(
9535+
channels: Set<GlideString>,
9536+
options?: { timeout?: number } & DecoderOption,
9537+
): Promise<void> {
9538+
const channelsArray = Array.from(channels);
9539+
9540+
if (options?.timeout !== undefined) {
9541+
return this.createWritePromise(
9542+
createSubscribeBlocking(channelsArray, options.timeout),
9543+
options,
9544+
);
9545+
} else {
9546+
return this.createWritePromise(
9547+
createSubscribe(channelsArray),
9548+
options,
9549+
);
9550+
}
9551+
}
9552+
9553+
/**
9554+
* Subscribes the client to the specified patterns.
9555+
*
9556+
* @see {@link https://valkey.io/commands/psubscribe/|valkey.io} for details.
9557+
*
9558+
* @param patterns - A set of glob-style patterns to subscribe to.
9559+
* @param options - (Optional) Additional parameters:
9560+
* - timeout: Maximum time in milliseconds to wait for subscription confirmation.
9561+
* - decoder: See {@link DecoderOption}.
9562+
* @returns A promise that resolves when the subscription is complete.
9563+
*
9564+
* @example
9565+
* ```typescript
9566+
* await client.psubscribe(new Set(["news.*", "updates.*"]));
9567+
* ```
9568+
*/
9569+
public async psubscribe(
9570+
patterns: Set<GlideString>,
9571+
options?: { timeout?: number } & DecoderOption,
9572+
): Promise<void> {
9573+
const patternsArray = Array.from(patterns);
9574+
9575+
if (options?.timeout !== undefined) {
9576+
return this.createWritePromise(
9577+
createPSubscribeBlocking(patternsArray, options.timeout),
9578+
options,
9579+
);
9580+
} else {
9581+
return this.createWritePromise(
9582+
createPSubscribe(patternsArray),
9583+
options,
9584+
);
9585+
}
9586+
}
9587+
9588+
/**
9589+
* Unsubscribes the client from the specified channels.
9590+
* If no channels are provided, unsubscribes from all exact channels.
9591+
*
9592+
* @see {@link https://valkey.io/commands/unsubscribe/|valkey.io} for details.
9593+
*
9594+
* @param channels - (Optional) A set of channel names to unsubscribe from.
9595+
* @param options - (Optional) Additional parameters:
9596+
* - timeout: Maximum time in milliseconds to wait for unsubscription confirmation.
9597+
* - decoder: See {@link DecoderOption}.
9598+
* @returns A promise that resolves when the unsubscription is complete.
9599+
*
9600+
* @example
9601+
* ```typescript
9602+
* await client.unsubscribe(new Set(["news"]));
9603+
* // Unsubscribe from all channels
9604+
* await client.unsubscribe();
9605+
* ```
9606+
*/
9607+
public async unsubscribe(
9608+
channels?: Set<GlideString>,
9609+
options?: { timeout?: number } & DecoderOption,
9610+
): Promise<void> {
9611+
const channelsArray = channels ? Array.from(channels) : undefined;
9612+
9613+
if (options?.timeout !== undefined) {
9614+
// For blocking unsubscribe, we need to provide channels (empty array if none)
9615+
return this.createWritePromise(
9616+
createUnsubscribeBlocking(channelsArray ?? [], options.timeout),
9617+
options,
9618+
);
9619+
} else {
9620+
return this.createWritePromise(
9621+
createUnsubscribe(channelsArray),
9622+
options,
9623+
);
9624+
}
9625+
}
9626+
9627+
/**
9628+
* Unsubscribes the client from the specified patterns.
9629+
* If no patterns are provided, unsubscribes from all patterns.
9630+
*
9631+
* @see {@link https://valkey.io/commands/punsubscribe/|valkey.io} for details.
9632+
*
9633+
* @param patterns - (Optional) A set of patterns to unsubscribe from.
9634+
* @param options - (Optional) Additional parameters:
9635+
* - timeout: Maximum time in milliseconds to wait for unsubscription confirmation.
9636+
* - decoder: See {@link DecoderOption}.
9637+
* @returns A promise that resolves when the unsubscription is complete.
9638+
*
9639+
* @example
9640+
* ```typescript
9641+
* await client.punsubscribe(new Set(["news.*"]));
9642+
* // Unsubscribe from all patterns
9643+
* await client.punsubscribe();
9644+
* ```
9645+
*/
9646+
public async punsubscribe(
9647+
patterns?: Set<GlideString>,
9648+
options?: { timeout?: number } & DecoderOption,
9649+
): Promise<void> {
9650+
const patternsArray = patterns ? Array.from(patterns) : undefined;
9651+
9652+
if (options?.timeout !== undefined) {
9653+
// For blocking punsubscribe, we need to provide patterns (empty array if none)
9654+
return this.createWritePromise(
9655+
createPUnsubscribeBlocking(
9656+
patternsArray ?? [],
9657+
options.timeout,
9658+
),
9659+
options,
9660+
);
9661+
} else {
9662+
return this.createWritePromise(
9663+
createPUnsubscribe(patternsArray),
9664+
options,
9665+
);
9666+
}
9667+
}
9668+
9669+
/**
9670+
* @internal
9671+
* Helper method to parse GetSubscriptions response from Rust core.
9672+
* Converts array response to structured object with desired and actual subscriptions.
9673+
*
9674+
* @param response - The response array from Rust core with format:
9675+
* ["desired", { mode: [channels...] }, "actual", { mode: [channels...] }]
9676+
* @returns Parsed subscription state with desired and actual subscriptions
9677+
*/
9678+
protected parseGetSubscriptionsResponse<T extends number>(
9679+
response: unknown[],
9680+
): {
9681+
desiredSubscriptions: Partial<Record<T, Set<GlideString>>>;
9682+
actualSubscriptions: Partial<Record<T, Set<GlideString>>>;
9683+
} {
9684+
// Response format: ["desired", {...}, "actual", {...}]
9685+
if (!Array.isArray(response) || response.length !== 4) {
9686+
throw new Error(
9687+
`Invalid GetSubscriptions response format: expected array of length 4, got ${response}`,
9688+
);
9689+
}
9690+
9691+
const desiredData = response[1] as Record<string, GlideString[]>;
9692+
const actualData = response[3] as Record<string, GlideString[]>;
9693+
9694+
const desiredSubscriptions: Partial<Record<T, Set<GlideString>>> = {};
9695+
const actualSubscriptions: Partial<Record<T, Set<GlideString>>> = {};
9696+
9697+
// Parse desired subscriptions
9698+
for (const [mode, channels] of Object.entries(desiredData)) {
9699+
const modeKey = parseInt(mode) as T;
9700+
desiredSubscriptions[modeKey] = new Set(channels);
9701+
}
9702+
9703+
// Parse actual subscriptions
9704+
for (const [mode, channels] of Object.entries(actualData)) {
9705+
const modeKey = parseInt(mode) as T;
9706+
actualSubscriptions[modeKey] = new Set(channels);
9707+
}
9708+
9709+
return { desiredSubscriptions, actualSubscriptions };
9710+
}
9711+
94739712
/**
94749713
* Return a statistics
94759714
*

0 commit comments

Comments
 (0)