Skip to content

Commit d1819f1

Browse files
authored
Java: Add dynamic pubsub APIs and pubsub stats (#5269)
* Add support for dynamic subscription and unsubscription in Java. * Add methods for retrieving subscription metrics. * Add the pubsub reconciliation interval advanced option. Fixes #5267. Signed-off-by: James Duong <duong.james@gmail.com>
1 parent d79fb55 commit d1819f1

File tree

21 files changed

+1495
-31
lines changed

21 files changed

+1495
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* Python: Add inflight request limit support to sync client
1919
* Python Sync: Add OpenTelemetry support with traces and metrics configuration
2020
* Python: Move OpenTelemetry config classes to glide_shared for code reuse between async and sync clients
21+
* JAVA: Add dynamic PubSub methods (subscribe, psubscribe, unsubscribe, punsubscribe, ssubscribe, sunsubscribe), getSubscriptions() for subscription state tracking, pubsubReconciliationIntervalMs configuration option, and subscription_out_of_sync_count and subscription_last_sync_timestamp metrics ([#5267](https://github.com/valkey-io/valkey-glide/issues/5267))
2122
* Go: Add ALLOW_NON_COVERED_SLOTS flag support for cluster scan ([#4895](https://github.com/valkey-io/valkey-glide/issues/4895))
2223

2324
#### Fixes

java/README.md

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,98 @@ public class Main {
344344
}
345345
```
346346

347+
## PubSub Configuration and Monitoring
348+
349+
### Dynamic PubSub with Reconciliation
350+
351+
Valkey GLIDE supports dynamic PubSub with automatic subscription reconciliation. Configure the reconciliation interval to ensure subscriptions remain synchronized:
352+
353+
```java
354+
import glide.api.GlideClient;
355+
import glide.api.models.configuration.GlideClientConfiguration;
356+
import glide.api.models.configuration.AdvancedGlideClientConfiguration;
357+
import glide.api.models.configuration.NodeAddress;
358+
359+
GlideClientConfiguration config = GlideClientConfiguration.builder()
360+
.address(NodeAddress.builder().host("localhost").port(6379).build())
361+
.advancedConfig(
362+
AdvancedGlideClientConfiguration.builder()
363+
.pubsubReconciliationIntervalMs(5000) // Reconcile every 5 seconds
364+
.build()
365+
)
366+
.build();
367+
368+
try (GlideClient client = GlideClient.createClient(config).get()) {
369+
// Subscribe to channels dynamically
370+
client.subscribe(Set.of("news", "updates")).get();
371+
372+
// Get subscription state
373+
var state = client.getSubscriptions().get();
374+
System.out.println("Desired subscriptions: " + state.desiredSubscriptions);
375+
System.out.println("Actual subscriptions: " + state.actualSubscriptions);
376+
377+
// Unsubscribe
378+
client.unsubscribe(Set.of("news")).get();
379+
}
380+
```
381+
382+
### Monitoring Subscription Health
383+
384+
Monitor subscription health and reconciliation status using `getStatistics()`:
385+
386+
```java
387+
try (GlideClient client = GlideClient.createClient(config).get()) {
388+
// Subscribe to channels
389+
client.subscribe(Set.of("channel1", "channel2")).get();
390+
391+
// Get statistics
392+
Map<String, String> stats = client.getStatistics();
393+
394+
// Available metrics:
395+
// - total_connections: Number of active connections
396+
// - total_clients: Number of client instances
397+
// - total_values_compressed: Count of compressed values
398+
// - total_values_decompressed: Count of decompressed values
399+
// - total_original_bytes: Original data size before compression
400+
// - total_bytes_compressed: Compressed data size
401+
// - total_bytes_decompressed: Decompressed data size
402+
// - compression_skipped_count: Times compression was skipped
403+
// - subscription_out_of_sync_count: Failed reconciliation attempts
404+
// - subscription_last_sync_timestamp: Last successful sync (milliseconds since epoch)
405+
406+
System.out.println("Out of sync count: " + stats.get("subscription_out_of_sync_count"));
407+
System.out.println("Last sync timestamp: " + stats.get("subscription_last_sync_timestamp"));
408+
}
409+
```
410+
411+
### Sharded PubSub (Cluster Mode, Redis 7.0+)
412+
413+
For cluster mode, you can use sharded PubSub channels:
414+
415+
```java
416+
import glide.api.GlideClusterClient;
417+
import glide.api.models.configuration.GlideClusterClientConfiguration;
418+
419+
GlideClusterClientConfiguration config = GlideClusterClientConfiguration.builder()
420+
.address(NodeAddress.builder().host("localhost").port(7001).build())
421+
.advancedConfig(
422+
AdvancedGlideClusterClientConfiguration.builder()
423+
.pubsubReconciliationIntervalMs(5000)
424+
.build()
425+
)
426+
.build();
427+
428+
try (GlideClusterClient client = GlideClusterClient.createClient(config).get()) {
429+
// Subscribe to sharded channels
430+
client.ssubscribe(Set.of("shard-channel-1", "shard-channel-2")).get();
431+
432+
// Get subscription state
433+
var state = client.getSubscriptions().get();
434+
System.out.println("Sharded subscriptions: " +
435+
state.actualSubscriptions.get(PubSubChannelMode.Sharded));
436+
}
437+
```
438+
347439
### Scala and Kotlin Examples
348440
See [our Scala and Kotlin examples](../examples/) to learn how to use Valkey GLIDE in Scala and Kotlin projects.
349441

java/client/src/main/java/glide/api/BaseClient.java

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@
105105
import static command_request.CommandRequestOuterClass.RequestType.PExpire;
106106
import static command_request.CommandRequestOuterClass.RequestType.PExpireAt;
107107
import static command_request.CommandRequestOuterClass.RequestType.PExpireTime;
108+
import static command_request.CommandRequestOuterClass.RequestType.PSubscribe;
109+
import static command_request.CommandRequestOuterClass.RequestType.PSubscribeBlocking;
108110
import static command_request.CommandRequestOuterClass.RequestType.PTTL;
111+
import static command_request.CommandRequestOuterClass.RequestType.PUnsubscribe;
112+
import static command_request.CommandRequestOuterClass.RequestType.PUnsubscribeBlocking;
109113
import static command_request.CommandRequestOuterClass.RequestType.Persist;
110114
import static command_request.CommandRequestOuterClass.RequestType.PfAdd;
111115
import static command_request.CommandRequestOuterClass.RequestType.PfCount;
@@ -148,10 +152,14 @@
148152
import static command_request.CommandRequestOuterClass.RequestType.Sort;
149153
import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly;
150154
import static command_request.CommandRequestOuterClass.RequestType.Strlen;
155+
import static command_request.CommandRequestOuterClass.RequestType.Subscribe;
156+
import static command_request.CommandRequestOuterClass.RequestType.SubscribeBlocking;
151157
import static command_request.CommandRequestOuterClass.RequestType.TTL;
152158
import static command_request.CommandRequestOuterClass.RequestType.Touch;
153159
import static command_request.CommandRequestOuterClass.RequestType.Type;
154160
import static command_request.CommandRequestOuterClass.RequestType.Unlink;
161+
import static command_request.CommandRequestOuterClass.RequestType.Unsubscribe;
162+
import static command_request.CommandRequestOuterClass.RequestType.UnsubscribeBlocking;
155163
import static command_request.CommandRequestOuterClass.RequestType.Wait;
156164
import static command_request.CommandRequestOuterClass.RequestType.WaitAof;
157165
import static command_request.CommandRequestOuterClass.RequestType.Watch;
@@ -378,7 +386,7 @@ public enum ResponseFlags {
378386
public static final String LCS_MATCHES_RESULT_KEY = "matches";
379387

380388
// Constant empty arrays to reduce allocations
381-
private static final String[] EMPTY_STRING_ARRAY = new String[0];
389+
protected static final String[] EMPTY_STRING_ARRAY = new String[0];
382390
protected static final GlideString[] EMPTY_GLIDE_STRING_ARRAY = new GlideString[0];
383391

384392
// Client components
@@ -6243,6 +6251,119 @@ public CompletableFuture<String> aclWhoami() {
62436251
AclWhoami, EMPTY_STRING_ARRAY, this::handleStringResponse);
62446252
}
62456253

6254+
public CompletableFuture<Void> subscribe(Set<String> channels) {
6255+
return commandManager.submitNewCommand(
6256+
Subscribe, channels.toArray(EMPTY_STRING_ARRAY), response -> null);
6257+
}
6258+
6259+
public CompletableFuture<Void> subscribe(Set<String> channels, int timeoutMs) {
6260+
if (timeoutMs < 0) {
6261+
throw new IllegalArgumentException("Timeout must be non-negative, got: " + timeoutMs);
6262+
}
6263+
String[] args = new String[channels.size() + 1];
6264+
int i = 0;
6265+
for (String channel : channels) {
6266+
args[i++] = channel;
6267+
}
6268+
args[i] = String.valueOf(timeoutMs);
6269+
return commandManager.submitNewCommand(SubscribeBlocking, args, response -> null);
6270+
}
6271+
6272+
public CompletableFuture<Void> psubscribe(Set<String> patterns) {
6273+
return commandManager.submitNewCommand(
6274+
PSubscribe, patterns.toArray(EMPTY_STRING_ARRAY), response -> null);
6275+
}
6276+
6277+
public CompletableFuture<Void> psubscribe(Set<String> patterns, int timeoutMs) {
6278+
if (timeoutMs < 0) {
6279+
throw new IllegalArgumentException("Timeout must be non-negative, got: " + timeoutMs);
6280+
}
6281+
String[] args = new String[patterns.size() + 1];
6282+
int i = 0;
6283+
for (String pattern : patterns) {
6284+
args[i++] = pattern;
6285+
}
6286+
args[i] = String.valueOf(timeoutMs);
6287+
return commandManager.submitNewCommand(PSubscribeBlocking, args, response -> null);
6288+
}
6289+
6290+
public CompletableFuture<Void> unsubscribe() {
6291+
return commandManager.submitNewCommand(Unsubscribe, EMPTY_STRING_ARRAY, response -> null);
6292+
}
6293+
6294+
public CompletableFuture<Void> unsubscribe(Set<String> channels) {
6295+
return commandManager.submitNewCommand(
6296+
Unsubscribe, channels.toArray(EMPTY_STRING_ARRAY), response -> null);
6297+
}
6298+
6299+
public CompletableFuture<Void> unsubscribe(Set<String> channels, int timeoutMs) {
6300+
if (timeoutMs < 0) {
6301+
throw new IllegalArgumentException("Timeout must be non-negative, got: " + timeoutMs);
6302+
}
6303+
String[] args = new String[channels.size() + 1];
6304+
int i = 0;
6305+
for (String channel : channels) {
6306+
args[i++] = channel;
6307+
}
6308+
args[i] = String.valueOf(timeoutMs);
6309+
return commandManager.submitNewCommand(UnsubscribeBlocking, args, response -> null);
6310+
}
6311+
6312+
public CompletableFuture<Void> unsubscribe(int timeoutMs) {
6313+
if (timeoutMs < 0) {
6314+
throw new IllegalArgumentException("Timeout must be non-negative, got: " + timeoutMs);
6315+
}
6316+
return commandManager.submitNewCommand(
6317+
UnsubscribeBlocking, new String[] {String.valueOf(timeoutMs)}, response -> null);
6318+
}
6319+
6320+
public CompletableFuture<Void> punsubscribe() {
6321+
return commandManager.submitNewCommand(PUnsubscribe, EMPTY_STRING_ARRAY, response -> null);
6322+
}
6323+
6324+
public CompletableFuture<Void> punsubscribe(Set<String> patterns) {
6325+
return commandManager.submitNewCommand(
6326+
PUnsubscribe, patterns.toArray(EMPTY_STRING_ARRAY), response -> null);
6327+
}
6328+
6329+
public CompletableFuture<Void> punsubscribe(Set<String> patterns, int timeoutMs) {
6330+
if (timeoutMs < 0) {
6331+
throw new IllegalArgumentException("Timeout must be non-negative, got: " + timeoutMs);
6332+
}
6333+
String[] args = new String[patterns.size() + 1];
6334+
int i = 0;
6335+
for (String pattern : patterns) {
6336+
args[i++] = pattern;
6337+
}
6338+
args[i] = String.valueOf(timeoutMs);
6339+
return commandManager.submitNewCommand(PUnsubscribeBlocking, args, response -> null);
6340+
}
6341+
6342+
public CompletableFuture<Void> punsubscribe(int timeoutMs) {
6343+
if (timeoutMs < 0) {
6344+
throw new IllegalArgumentException("Timeout must be non-negative, got: " + timeoutMs);
6345+
}
6346+
return commandManager.submitNewCommand(
6347+
PUnsubscribeBlocking, new String[] {String.valueOf(timeoutMs)}, response -> null);
6348+
}
6349+
6350+
protected Object parseSubscriptionState(Object response) {
6351+
if (!(response instanceof Object[])) {
6352+
throw new RuntimeException("Invalid response format from GetSubscriptions");
6353+
}
6354+
Object[] arr = (Object[]) response;
6355+
if (arr.length != 4) {
6356+
throw new RuntimeException("Invalid response format from GetSubscriptions");
6357+
}
6358+
6359+
@SuppressWarnings("unchecked")
6360+
Map<String, Object[]> desiredMap = (Map<String, Object[]>) arr[1];
6361+
@SuppressWarnings("unchecked")
6362+
Map<String, Object[]> actualMap = (Map<String, Object[]>) arr[3];
6363+
6364+
return new Object[] {desiredMap, actualMap};
6365+
}
6366+
62466367
/**
62476368
* Internal method for enqueueing PubSub messages from native callback. This is called by the
62486369
* native layer when PubSub messages are received.

java/client/src/main/java/glide/api/GlideClient.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static command_request.CommandRequestOuterClass.RequestType.FunctionLoad;
2020
import static command_request.CommandRequestOuterClass.RequestType.FunctionRestore;
2121
import static command_request.CommandRequestOuterClass.RequestType.FunctionStats;
22+
import static command_request.CommandRequestOuterClass.RequestType.GetSubscriptions;
2223
import static command_request.CommandRequestOuterClass.RequestType.Info;
2324
import static command_request.CommandRequestOuterClass.RequestType.Keys;
2425
import static command_request.CommandRequestOuterClass.RequestType.LastSave;
@@ -53,12 +54,15 @@
5354
import glide.api.models.configuration.BackoffStrategy;
5455
import glide.api.models.configuration.BaseClientConfiguration;
5556
import glide.api.models.configuration.GlideClientConfiguration;
57+
import glide.api.models.configuration.PubSubState;
58+
import glide.api.models.configuration.PubSubStateImpl;
5659
import glide.api.models.configuration.ServerCredentials;
5760
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
5861
import glide.utils.ArgsBuilder;
5962
import java.util.Arrays;
6063
import java.util.Map;
6164
import java.util.Optional;
65+
import java.util.Set;
6266
import java.util.concurrent.CompletableFuture;
6367
import java.util.stream.Stream;
6468
import lombok.NonNull;
@@ -552,4 +556,94 @@ public CompletableFuture<Object[]> scan(
552556
GlideString[] arguments = new ArgsBuilder().add(cursor).add(options.toArgs()).toArray();
553557
return commandManager.submitNewCommand(Scan, arguments, this::handleArrayResponseBinary);
554558
}
559+
560+
/**
561+
* Gets the current subscription state for this client.
562+
*
563+
* <p>Returns the desired and actual subscription states, which may differ if subscriptions are
564+
* being reconciled after a connection loss.
565+
*
566+
* <p>The returned {@link PubSubState} contains:
567+
*
568+
* <ul>
569+
* <li><b>Desired subscriptions</b>: The channels/patterns the client intends to be subscribed
570+
* to
571+
* <li><b>Actual subscriptions</b>: The channels/patterns currently subscribed on the server
572+
* </ul>
573+
*
574+
* @return A {@link CompletableFuture} that completes with a {@link PubSubState} containing:
575+
* <ul>
576+
* <li>{@link
577+
* glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode#EXACT
578+
* EXACT} - Set of exact channel names
579+
* <li>{@link
580+
* glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode#PATTERN
581+
* PATTERN} - Set of pattern subscriptions
582+
* </ul>
583+
*
584+
* @example
585+
* <pre>{@code
586+
* // Get current subscription state
587+
* PubSubState<PubSubChannelMode> state = client.getSubscriptions().get();
588+
*
589+
* // Check desired subscriptions
590+
* Set<String> desiredChannels = state.getDesiredSubscriptions()
591+
* .getOrDefault(PubSubChannelMode.EXACT, Set.of());
592+
* System.out.println("Desired channels: " + desiredChannels);
593+
*
594+
* // Check actual subscriptions
595+
* Set<String> actualChannels = state.getActualSubscriptions()
596+
* .getOrDefault(PubSubChannelMode.EXACT, Set.of());
597+
* System.out.println("Actual channels: " + actualChannels);
598+
* }</pre>
599+
*
600+
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for PUBSUB CHANNELS
601+
* @see <a href="https://valkey.io/commands/pubsub-numpat/">valkey.io</a> for PUBSUB NUMPAT
602+
*/
603+
public CompletableFuture<PubSubState<StandaloneSubscriptionConfiguration.PubSubChannelMode>>
604+
getSubscriptions() {
605+
return commandManager.submitNewCommand(
606+
GetSubscriptions,
607+
EMPTY_STRING_ARRAY,
608+
response -> {
609+
Object[] parsed = (Object[]) parseSubscriptionState(response);
610+
@SuppressWarnings("unchecked")
611+
Map<String, Object[]> desiredMap = (Map<String, Object[]>) parsed[0];
612+
@SuppressWarnings("unchecked")
613+
Map<String, Object[]> actualMap = (Map<String, Object[]>) parsed[1];
614+
615+
Map<StandaloneSubscriptionConfiguration.PubSubChannelMode, Set<String>> desired =
616+
new java.util.HashMap<>();
617+
Map<StandaloneSubscriptionConfiguration.PubSubChannelMode, Set<String>> actual =
618+
new java.util.HashMap<>();
619+
620+
for (Map.Entry<String, Object[]> entry : desiredMap.entrySet()) {
621+
String key = entry.getKey();
622+
Set<String> values =
623+
java.util.Arrays.stream(entry.getValue())
624+
.map(Object::toString)
625+
.collect(java.util.stream.Collectors.toSet());
626+
if ("Exact".equals(key)) {
627+
desired.put(StandaloneSubscriptionConfiguration.PubSubChannelMode.EXACT, values);
628+
} else if ("Pattern".equals(key)) {
629+
desired.put(StandaloneSubscriptionConfiguration.PubSubChannelMode.PATTERN, values);
630+
}
631+
}
632+
633+
for (Map.Entry<String, Object[]> entry : actualMap.entrySet()) {
634+
String key = entry.getKey();
635+
Set<String> values =
636+
java.util.Arrays.stream(entry.getValue())
637+
.map(Object::toString)
638+
.collect(java.util.stream.Collectors.toSet());
639+
if ("Exact".equals(key)) {
640+
actual.put(StandaloneSubscriptionConfiguration.PubSubChannelMode.EXACT, values);
641+
} else if ("Pattern".equals(key)) {
642+
actual.put(StandaloneSubscriptionConfiguration.PubSubChannelMode.PATTERN, values);
643+
}
644+
}
645+
646+
return new PubSubStateImpl<>(desired, actual);
647+
});
648+
}
555649
}

0 commit comments

Comments
 (0)