Skip to content

Commit 08870a2

Browse files
Merge branch 'main' into task/add-scripting-commands
2 parents 87a3f2d + d1819f1 commit 08870a2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+4371
-430
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111
* CORE: Add client certificate and private key support for mTLS ([#5092](https://github.com/valkey-io/valkey-glide/issues/5092))
1212
* Python: Add client certificate and private key support for mTLS ([5123](https://github.com/valkey-io/valkey-glide/issues/5123))
1313
* JAVA: Add KEYS, MIGRATE and WAITAOF command support ([#5017](https://github.com/valkey-io/valkey-glide/pull/5107))
14+
* Python Sync: Add dynamic PubSub methods (subscribe, psubscribe, unsubscribe, punsubscribe, ssubscribe, sunsubscribe with timeout support), get_subscriptions(), and pubsub_reconciliation_interval configuration support. Unified configuration classes with async client by importing from glide_shared ([#5270](https://github.com/valkey-io/valkey-glide/pull/5270))
15+
* Go: Add dynamic PubSub methods (Subscribe, PSubscribe, SSubscribe, Unsubscribe, PUnsubscribe, SUnsubscribe with blocking variants and timeout support), GetSubscriptions(), and helper functions (UnsubscribeAll, PUnsubscribeAll, SUnsubscribeAll). Export pubsub constants (AllChannels, AllPatterns, AllShardedChannels)
1416
* JAVA: Add ACL server management commands (ACL CAT, ACL DELUSER, ACL DRYRUN, ACL GENPASS, ACL GETUSER, ACL LIST, ACL LOAD, ACL LOG, ACL SAVE, ACL SETUSER, ACL USERS, ACL WHOAMI)
1517
* CORE: Fix typo in ACL SETUSER command mapping (AclSetSser → AclSetUser)
1618
* Python: Add inflight request limit support to sync client
1719
* Python Sync: Add OpenTelemetry support with traces and metrics configuration
1820
* Python: Move OpenTelemetry config classes to glide_shared for code reuse between async and sync clients
21+
* JAVA: Add dynamic PubSub methods (subscribe, psubscribe, unsubscribe, punsubscribe, ssubscribe, sunsubscribe), getSubscriptions() for subscription state tracking, pubsubReconciliationIntervalMs configuration option, and subscription_out_of_sync_count and subscription_last_sync_timestamp metrics ([#5267](https://github.com/valkey-io/valkey-glide/issues/5267))
22+
* Go: Add ALLOW_NON_COVERED_SLOTS flag support for cluster scan ([#4895](https://github.com/valkey-io/valkey-glide/issues/4895))
1923

2024
#### Fixes
2125
* Node: Fix to handle non-string types in toBuffersArray ([#4842](https://github.com/valkey-io/valkey-glide/issues/4842))

ffi/miri-tests/mock-telemetry/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ impl Telemetry {
124124
pub fn total_bytes_decompressed() -> usize { 0 }
125125
pub fn incr_compression_skipped_count(_incr_by: usize) -> usize { 0 }
126126
pub fn compression_skipped_count() -> usize { 0 }
127+
pub fn incr_subscription_out_of_sync() -> usize { 0 }
128+
pub fn subscription_out_of_sync_count() -> usize { 0 }
129+
pub fn update_subscription_last_sync_timestamp(_timestamp: u64) -> u64 { 0 }
130+
pub fn subscription_last_sync_timestamp() -> u64 { 0 }
127131
pub fn reset() {}
128132
}
129133

ffi/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3004,6 +3004,10 @@ pub struct Statistics {
30043004
pub total_bytes_decompressed: c_ulong,
30053005
/// Number of times compression was skipped
30063006
pub compression_skipped_count: c_ulong,
3007+
/// Number of times subscriptions were out of sync during reconciliation
3008+
pub subscription_out_of_sync_count: c_ulong,
3009+
/// Timestamp of last successful subscription sync (milliseconds since epoch)
3010+
pub subscription_last_sync_timestamp: c_ulong,
30073011
}
30083012

30093013
/// Get compression and connection statistics.
@@ -3025,6 +3029,8 @@ pub extern "C" fn get_statistics() -> Statistics {
30253029
total_bytes_compressed: Telemetry::total_bytes_compressed() as c_ulong,
30263030
total_bytes_decompressed: Telemetry::total_bytes_decompressed() as c_ulong,
30273031
compression_skipped_count: Telemetry::compression_skipped_count() as c_ulong,
3032+
subscription_out_of_sync_count: Telemetry::subscription_out_of_sync_count() as c_ulong,
3033+
subscription_last_sync_timestamp: Telemetry::subscription_last_sync_timestamp() as c_ulong,
30283034
}
30293035
}
30303036

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2213,79 +2213,82 @@ where
22132213
let nodes = new_slots.all_node_addresses();
22142214
let nodes_len = nodes.len();
22152215

2216-
// Find existing connections or resolve DNS addresses
2217-
// TODO: optimize by parallelizing DNS lookups - DNS resolution may involve network I/O and is currently performed sequentially
2218-
// Issue link: https://github.com/valkey-io/valkey-glide/issues/5228
2219-
let mut addresses_and_connections = Vec::with_capacity(nodes_len);
2220-
for addr in nodes {
2221-
let addr = addr.to_string();
2222-
2223-
// Check for existing connection by address
2224-
if let Some(node) = inner
2225-
.conn_lock
2226-
.read()
2227-
.expect(MUTEX_READ_ERR)
2228-
.node_for_address(&addr)
2229-
{
2230-
addresses_and_connections.push((addr, Some(node)));
2231-
continue;
2232-
}
2233-
2234-
// If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name.
2235-
// We shall check if a connection is already exists under the resolved IP name.
2236-
let conn = if let Some((host, port)) = get_host_and_port_from_addr(&addr) {
2237-
get_socket_addrs(host, port)
2238-
.await
2239-
.ok()
2240-
.and_then(|mut socket_addresses| {
2241-
let conn_lock = inner.conn_lock.read().expect(MUTEX_READ_ERR);
2242-
socket_addresses.find_map(|socket_addr| {
2243-
conn_lock.node_for_address(&socket_addr.to_string())
2244-
})
2245-
})
2246-
} else {
2247-
None
2248-
};
2249-
2250-
// If we found a connection by IP lookup, update the PushManager. This ensures the PushManager
2251-
// stores the DNS address (which matches the connection_map key) instead of the old IP
2252-
// or config endpoint address, which is needed for pubsub tracking
2253-
if let Some(ref node) = conn {
2254-
node.user_connection
2255-
.conn
2256-
.clone()
2257-
.await
2258-
.update_push_manager_node_address(addr.clone());
2259-
}
2260-
2261-
addresses_and_connections.push((addr, conn));
2262-
}
2263-
22642216
let cluster_params = inner
22652217
.get_cluster_param(|params| params.clone())
22662218
.expect(MUTEX_READ_ERR);
22672219
let glide_connection_options = &inner.glide_connection_options;
22682220

2269-
// Create/retrieve connections in for found nodes
2270-
let connection_futures = addresses_and_connections.into_iter().map(|(addr, node)| {
2221+
// Find existing connections (by address or DNS resolution) or create new ones
2222+
let connection_futures = nodes.into_iter().map(|addr| {
2223+
let addr = addr.to_string();
2224+
let inner = Arc::clone(&inner);
22712225
let cluster_params = cluster_params.clone();
22722226
let glide_connection_options = glide_connection_options.clone();
2227+
let connection_timeout = cluster_params.connection_timeout;
2228+
22732229
async move {
2274-
let result = get_or_create_conn(
2275-
&addr,
2276-
node,
2277-
&cluster_params,
2278-
RefreshConnectionType::AllConnections,
2279-
glide_connection_options,
2280-
)
2281-
.await;
2230+
// TODO: Expose separate `dns_timeout` configuration in advanced settings
2231+
// to allow users to control DNS resolution timeout independently from connection timeout.
2232+
// Issue: https://github.com/valkey-io/valkey-glide/issues/5298
2233+
let result = tokio::time::timeout(connection_timeout, async {
2234+
// Check for existing connection by direct address
2235+
let node = inner
2236+
.conn_lock
2237+
.read()
2238+
.expect(MUTEX_READ_ERR)
2239+
.node_for_address(&addr);
2240+
2241+
let node = match node {
2242+
Some(n) => Some(n),
2243+
None => {
2244+
// If it's a DNS endpoint, it could have been stored in the existing connections vector
2245+
// using the resolved IP address instead of the DNS endpoint's name.
2246+
// We shall check if a connection already exists under the resolved IP name.
2247+
if let Some((host, port)) = get_host_and_port_from_addr(&addr) {
2248+
let conn = get_socket_addrs(host, port).await.ok().and_then(
2249+
|mut socket_addresses| {
2250+
let conn_lock =
2251+
inner.conn_lock.read().expect(MUTEX_READ_ERR);
2252+
socket_addresses.find_map(|socket_addr| {
2253+
conn_lock.node_for_address(&socket_addr.to_string())
2254+
})
2255+
},
2256+
);
2257+
2258+
// If we found a connection by IP lookup, update the PushManager. This ensures
2259+
// the PushManager stores the DNS address (which matches the connection_map key)
2260+
// instead of the old IP or config endpoint address, which is needed for pubsub tracking.
2261+
if let Some(ref node) = conn {
2262+
node.user_connection
2263+
.conn
2264+
.clone()
2265+
.await
2266+
.update_push_manager_node_address(addr.clone());
2267+
}
2268+
conn
2269+
} else {
2270+
None
2271+
}
2272+
}
2273+
};
2274+
2275+
get_or_create_conn(
2276+
&addr,
2277+
node,
2278+
&cluster_params,
2279+
RefreshConnectionType::AllConnections,
2280+
glide_connection_options,
2281+
)
2282+
.await
2283+
})
2284+
.await
2285+
.unwrap_or_else(|e| Err(e.into()));
2286+
22822287
(addr, result)
22832288
}
22842289
});
22852290

2286-
// Await all connection futures
2287-
// This is bounded by `connection_timeout` since `get_or_create_conn` uses it internally,
2288-
// so it is safe to await them all at once.
2291+
// Await all connection futures, this is bounded by `connection_timeout`.
22892292
let results = futures::future::join_all(connection_futures).await;
22902293

22912294
// Collect successful connections

go/README.md

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,154 @@ func main() {
139139
}
140140
```
141141
142+
### PubSub Example:
143+
144+
Valkey GLIDE supports PubSub (Publish/Subscribe) for real-time messaging. You can subscribe to channels and patterns, and dynamically manage subscriptions.
145+
146+
```go
147+
package main
148+
149+
import (
150+
"context"
151+
"fmt"
152+
"time"
153+
154+
glide "github.com/valkey-io/valkey-glide/go/v2"
155+
"github.com/valkey-io/valkey-glide/go/v2/config"
156+
)
157+
158+
func main() {
159+
host := "localhost"
160+
port := 6379
161+
162+
// Configure client with PubSub callback and reconciliation interval
163+
config := config.NewClientConfiguration().
164+
WithAddress(&config.NodeAddress{Host: host, Port: port}).
165+
WithPubSubCallback(func(msg glide.PubSubMessage, ctx glide.MessageContext) {
166+
fmt.Printf("Received message: %s from channel: %s\n", msg.Message, msg.Channel)
167+
}).
168+
WithPubSubReconciliationInterval(5000) // Reconcile subscriptions every 5 seconds
169+
170+
client, err := glide.NewClient(config)
171+
if err != nil {
172+
fmt.Println("Error creating client:", err)
173+
return
174+
}
175+
defer client.Close()
176+
177+
ctx := context.Background()
178+
179+
// Subscribe to channels
180+
err = client.Subscribe(ctx, []string{"news", "updates"})
181+
if err != nil {
182+
fmt.Println("Error subscribing:", err)
183+
return
184+
}
185+
186+
// Subscribe to patterns
187+
err = client.PSubscribe(ctx, []string{"events:*"})
188+
if err != nil {
189+
fmt.Println("Error pattern subscribing:", err)
190+
return
191+
}
192+
193+
// Get current subscriptions
194+
state, err := client.GetSubscriptions(ctx)
195+
if err != nil {
196+
fmt.Println("Error getting subscriptions:", err)
197+
return
198+
}
199+
fmt.Printf("Subscribed to %d channels and %d patterns\n",
200+
len(state.DesiredSubscriptions["exact"]),
201+
len(state.DesiredSubscriptions["pattern"]))
202+
203+
// Wait for messages
204+
time.Sleep(10 * time.Second)
205+
206+
// Unsubscribe from specific channels
207+
err = client.Unsubscribe(ctx, []string{"news"})
208+
if err != nil {
209+
fmt.Println("Error unsubscribing:", err)
210+
return
211+
}
212+
213+
// Unsubscribe from all channels and patterns
214+
err = client.UnsubscribeAll(ctx)
215+
if err != nil {
216+
fmt.Println("Error unsubscribing from all:", err)
217+
return
218+
}
219+
}
220+
```
221+
222+
**PubSub Configuration Options:**
223+
224+
- `WithPubSubCallback`: Set a callback function to handle incoming PubSub messages
225+
- `WithPubSubReconciliationInterval`: Set the interval (in milliseconds) for automatic subscription reconciliation. This ensures subscriptions are maintained even if the connection is temporarily lost. Default is 0 (disabled).
226+
227+
**PubSub Methods:**
228+
229+
- `Subscribe(ctx, channels)`: Subscribe to one or more channels
230+
- `PSubscribe(ctx, patterns)`: Subscribe to channel patterns (e.g., "news:*")
231+
- `SSubscribe(ctx, channels)`: Subscribe to sharded channels (cluster mode only)
232+
- `Unsubscribe(ctx, channels)`: Unsubscribe from specific channels (pass `nil` or `glide.AllChannels` to unsubscribe from all)
233+
- `PUnsubscribe(ctx, patterns)`: Unsubscribe from patterns (pass `nil` or `glide.AllPatterns` to unsubscribe from all)
234+
- `SUnsubscribe(ctx, channels)`: Unsubscribe from sharded channels (pass `nil` or `glide.AllShardedChannels` to unsubscribe from all)
235+
- `GetSubscriptions(ctx)`: Get the current subscription state
236+
237+
**Blocking Variants:**
238+
239+
All subscribe/unsubscribe methods have blocking variants with timeout support:
240+
- `SubscribeBlocking(ctx, channels, timeoutMs)`
241+
- `UnsubscribeBlocking(ctx, channels, timeoutMs)`
242+
- And similar for PSubscribe, SSubscribe, etc.
243+
142244
For more code examples please refer to [examples.md](examples/examples.md).
143245
246+
### Cluster Scan
247+
248+
The cluster scan feature allows you to iterate over all keys in a cluster. You can optionally filter by pattern, type, and control batch size.
249+
250+
#### Basic Cluster Scan
251+
252+
```go
253+
cursor := models.NewClusterScanCursor()
254+
allKeys := []string{}
255+
256+
for !cursor.IsFinished() {
257+
result, err := client.Scan(context.Background(), cursor)
258+
if err != nil {
259+
fmt.Println("Error:", err)
260+
break
261+
}
262+
allKeys = append(allKeys, result.Keys...)
263+
cursor = result.Cursor
264+
}
265+
```
266+
267+
#### Cluster Scan with Options
268+
269+
```go
270+
opts := options.NewClusterScanOptions().
271+
SetMatch("user:*"). // Filter by pattern
272+
SetCount(100). // Batch size hint
273+
SetType(constants.StringType). // Filter by key type
274+
SetAllowNonCoveredSlots(true) // Allow scanning even if some slots are not covered
275+
276+
cursor := models.NewClusterScanCursor()
277+
for !cursor.IsFinished() {
278+
result, err := client.ScanWithOptions(context.Background(), cursor, *opts)
279+
if err != nil {
280+
fmt.Println("Error:", err)
281+
break
282+
}
283+
// Process result.Keys
284+
cursor = result.Cursor
285+
}
286+
```
287+
288+
**Note**: The `AllowNonCoveredSlots` option is useful when the cluster is not fully configured or some nodes are down. It allows the scan to proceed even if some hash slots are not covered by any node.
289+
144290
### Building & Testing
145291
146292
Development instructions for local building & testing the package are in the [DEVELOPER.md](DEVELOPER.md) file.

0 commit comments

Comments
 (0)