Skip to content

Node: add dynamic PubSub support#5295

Open
affonsov wants to merge 9 commits intomainfrom
affonsov/node/dynamic-pubsub
Open

Node: add dynamic PubSub support#5295
affonsov wants to merge 9 commits intomainfrom
affonsov/node/dynamic-pubsub

Conversation

@affonsov
Copy link
Collaborator

@affonsov affonsov commented Feb 3, 2026

Summary

This PR adds dynamic PubSub support to the Node.js client, allowing runtime subscription management without pre-configuration. It introduces new methods for subscribing/unsubscribing to channels and patterns.

A fix to the glide-core was added together with this change. The unsubscribe commands were not working. Eg.: subscription was happening on 127.0.0.1:10102 and the unsubscribe was happening to the localhost:10102. Since the core tracks the synchronization using the address as key and the key wasn't matching the unsubscribe was not happening.

Issue link

This Pull Request is linked to issue (URL): [#5099 ]

Features / Behaviour Changes

  • Added dynamic subscription methods: subscribe(), psubscribe(), unsubscribe(), punsubscribe()
  • Added sharded subscription methods for cluster mode: ssubscribe(), sunsubscribe()
  • Added getSubscriptions() method to query current subscription state (desired vs actual)
  • Added subscription telemetry metrics: subscription_out_of_sync_count and subscription_last_sync_timestamp
  • Added optional pubsubReconciliationIntervalMs configuration parameter for controlling subscription reconciliation frequency
  • Updated validation logic for getPubSubMessage() and tryGetPubSubMessage() to match Java client behavior (only throw error if both config exists AND callback exists)

Implementation

Areas for reviewer attention:

  • Validation logic changes in getPubSubMessage() and tryGetPubSubMessage() - ensure this matches Java behavior correctly
  • Socket address handling in Rust core to avoid DNS lookups
  • Timeout handling in blocking subscription variants
  • Subscription state parsing logic in parseGetSubscriptionsResponse()

Limitations

  • It was noted that node is taking more time to unsubscribe than the other clients

Testing

  • 10 new test cases covering both standalone and cluster modes
  • Tests for dynamic subscription/unsubscription (exact, pattern, and sharded channels)
  • Tests for pre-configured subscription management
  • Tests for subscription metrics and telemetry
  • Tests verify message delivery and non-delivery after unsubscribe operations

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Linters have been run (make *-lint targets) and Prettier has been run (make prettier-fix).
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

- Add subscribe() and psubscribe() methods to BaseClient for channel and pattern subscriptions
- Add unsubscribe() and punsubscribe() methods with optional channel/pattern filtering
- Add pubsubReconciliationIntervalMs configuration option to AdvancedBaseClientConfiguration
- Expose subscription telemetry metrics: subscription_out_of_sync_count and subscription_last_sync_timestamp
- Import new PubSub command creators (subscribe, psubscribe, unsubscribe, punsubscribe variants)
- Add JSDoc documentation with examples for all new subscription methods
- Update Rust FFI bindings to expose subscription reconciliation interval configuration
- Add test coverage for cluster client subscription functionality

Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
- Replace socket_addr.to_string() calls with explicit format!() to prevent reverse DNS lookups
- Add socket_addr parameter to MultiplexedConnection::new_with_stream() for accurate address tracking
- Update address_string construction in PushManager initialization to use actual socket address when available
- Pass socket_addr through client connection flow to cluster async module

Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
…ing or encoding

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
glide_connection_options.push_sender,
glide_connection_options.pubsub_synchronizer,
Some(connection_info.addr.to_string()),
Some(address_string),
Copy link
Collaborator Author

@affonsov affonsov Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other way to handle this is create a new type Address that will handle both IP and hostName for comparison. Maybe it could be a better approach and we can do this in a follow-up PR. Please let me know what you think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if this is fixed upstream in redis-rs? Seems like something that others would run into...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code doesn't match with redis repository. I suspect that our core implementation of pubsub is different

Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
@affonsov affonsov changed the title (WIP) Node: add dynamic PubSub support Node: add dynamic PubSub support Feb 12, 2026
@affonsov affonsov marked this pull request as ready for review February 12, 2026 23:17
…alkey-glide into affonsov/node/dynamic-pubsub
Copy link
Collaborator

@currantw currantw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed pub/sub changes only.

Comment on lines 9532 to 9536
public async subscribe(
channels: Set<GlideString>,
options?: { timeout?: number } & DecoderOption,
): Promise<void> {
const channelsArray = Array.from(channels);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In C#, I just made the change from ISet<string> to IEnumerable<string> for the channels argument (and the patterns and shard channel methods as well in the corresponding methods). Similarly, would it make sense to use Iterable<GlideString> here instead of Set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that what we want here?
Using the Set it makes a strong type interface instead of using Iterable interface.

If the users passes an Array, it will allows the user set repeated names.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more generic interface is a bit more user-friendly? Saves them from having to convert different data types into sets themselves.

In C#, we convert the IEnumerable into an ISet before creating the command (with ToHashSet), to eliminate any duplicates, before passing to the Rust core.

Any thoughts @jduo? I don't have much Node experience so happy to defer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the clients kind all over the place
java and python are using Set, go is using array of strings

We should align on this. I personally don't like using IEnumerable or Iterable that are so generic.

In C#, we convert the IEnumerable into an ISet before creating the command (with ToHashSet), to eliminate any duplicates, before passing to the Rust core.

  • This is not user friendly IMO, because you have to do the conversion later and then throw the error if cannot covert. If you had Iset in the method signature you would force the user only use objects that implements Iset. In C# using IEnumerable they could use something like arrays/lists/sets and the method will allow it. You need to cover all the bases in the tests to make sure you will have the expected result, which results in much more code to test all possible failing points or successful conversions.

We should always try to use strong typed api interfaces as much as we can

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not user friendly IMO, because you have to do the conversion later and then throw the error if cannot covert.

I'm not sure I understand? You should always be able to do this conversion, so I'm not sure under what circumstance you would throw an error?

In C# using IEnumerable they could use something like arrays/lists/sets and the method will allow it. You need to cover all the bases in the tests to make sure you will have the expected result, which results in much more code to test all possible failing points or successful conversions.

I guess that is a bit of a question of testing philosophy, but I don't think that you need to test every implementation of IEnumerable? Any type that implements that interface should suffice.

In C#, we convert the IEnumerable into an ISet before creating the command (with ToHashSet), to eliminate any duplicates, before passing to the Rust core.

All that being said, I am re-thinking this implementation. I think it would be preferable to just pass the collection directly to the Rust core:

  • I can't imagine that Valkey/Redis doesn't already handle duplicate channel names itself. I don't think we need to duplicate this. And even if we wanted and needed to, it seems like the appropriate place to do so would be in the Rust core, rather than separately in each client?
  • It's a bit slower and uses a bit more memory, obviously, to eliminate duplicates and/or create a new collection. So probably something we should avoid if we can.
  • In the case of creating an (unordered) set from a collection (like we are doing in C# right now), this makes the command non-deterministic. I think it is reasonable, if the user uses an unordered collection, that the command we pass to Valkey would be non-deterministic; but if the user passes an ordered collection, I think we should preserve that order.

Let me know what you think.

Comment on lines 2085 to 2101
const channelsArray = channels ? Array.from(channels) : undefined;

if (options?.timeout !== undefined) {
// For blocking sunsubscribe, we need to provide channels (empty array if none)
return this.createWritePromise(
createSUnsubscribeBlocking(
channelsArray ?? [],
options.timeout,
),
options,
);
} else {
return this.createWritePromise(
createSUnsubscribe(channelsArray),
options,
);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Could we always pass an empty array? Then we could also make createSUnsubscribe just accept an non-nullable array?

Suggested change
const channelsArray = channels ? Array.from(channels) : undefined;
if (options?.timeout !== undefined) {
// For blocking sunsubscribe, we need to provide channels (empty array if none)
return this.createWritePromise(
createSUnsubscribeBlocking(
channelsArray ?? [],
options.timeout,
),
options,
);
} else {
return this.createWritePromise(
createSUnsubscribe(channelsArray),
options,
);
}
const channelsArray = channels ? Array.from(channels) : [];
if (options?.timeout !== undefined) {
return this.createWritePromise(
createSUnsubscribeBlocking(
channelsArray,
options.timeout,
),
options,
);
} else {
return this.createWritePromise(
createSUnsubscribe(channelsArray),
options,
);
}

export function createPUnsubscribe(
patterns?: GlideString[],
): command_request.Command {
return createCommand(RequestType.PUnsubscribe, patterns ? patterns : []);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I don't know Node well, but can we use the nullish coalescing operator here instead? 🤷

Suggested change
return createCommand(RequestType.PUnsubscribe, patterns ? patterns : []);
return createCommand(RequestType.PUnsubscribe, patterns ?? []);

…zy for non-blocking operations

- Rename psubscribe/punsubscribe methods to psubscribeLazy/punsubscribeLazy for non-blocking pattern operations
- Add blocking variants (subscribe, psubscribe, unsubscribe, punsubscribe) that accept timeoutMs parameter
- Introduce ALL_CHANNELS and ALL_PATTERNS constants for unsubscribing from all subscriptions
- Update method signatures to clarify blocking vs non-blocking behavior with explicit timeout parameters
- Simplify API by removing optional timeout from options object in favor of explicit timeoutMs parameter
- Update PubSub tests to use new lazy and blocking method variants

Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
* @param clusterMode - Indicates if the test should be run in cluster mode.
*/
it.each([true, false])(
"dynamic_psubscribe_lazy_%p",
Copy link
Collaborator

@jduo jduo Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will want to parameterize these tests based on the subscription method like we do in Python and Java (preconfigured, blocking subscribe, and lazy subscribe). We want to apply this to most previous pubsub test cases.

Copy link
Collaborator

@jduo jduo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments throughout the code.

…azy methods

- Remove protobuf regeneration note from pubsubReconciliationIntervalMs configuration
- Remove redundant "Create clients WITHOUT subscription configuration" comments from PubSub tests

Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants