Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Node: Drop support for Node.js 16.x and 18.x. Minimum supported version is now Node.js 20.x.

#### Changes
* Node: add dynamic PubSub support ([#5295](https://github.com/valkey-io/valkey-glide/pull/5295))
* JAVA: Add EVAL_RO, EVALSHA_RO, and SCRIPT DEBUG commands ([#5125](https://github.com/valkey-io/valkey-glide/pull/5125))
* CORE: Add client certificate and private key support for mTLS ([#5092](https://github.com/valkey-io/valkey-glide/issues/5092))
* Python: Add client certificate and private key support for mTLS ([5123](https://github.com/valkey-io/valkey-glide/issues/5123))
Expand Down
12 changes: 11 additions & 1 deletion glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use pin_project_lite::pin_project;
use std::collections::VecDeque;
use std::fmt;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -605,6 +606,7 @@ impl MultiplexedConnection {
stream,
std::time::Duration::MAX,
glide_connection_options,
None, // No socket_addr available in this path
)
.await
}
Expand All @@ -616,6 +618,7 @@ impl MultiplexedConnection {
stream: C,
response_timeout: std::time::Duration,
glide_connection_options: GlideConnectionOptions,
socket_addr: Option<SocketAddr>,
) -> RedisResult<(Self, impl Future<Output = ()>)>
where
C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
Expand All @@ -626,10 +629,17 @@ impl MultiplexedConnection {
let (mut pipeline, driver) =
Pipeline::new(codec, glide_connection_options.disconnect_notifier);
let driver = Box::pin(driver);

// Use the actual socket address if available, otherwise fall back to connection_info.addr
let address_string = if let Some(socket_addr) = socket_addr {
format!("{}:{}", socket_addr.ip(), socket_addr.port())
} else {
connection_info.addr.to_string()
};
let pm = PushManager::new(
glide_connection_options.push_sender,
glide_connection_options.pubsub_synchronizer,
Some(connection_info.addr.to_string()),
Some(address_string),
Copy link
Collaborator Author

@affonsov affonsov Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other way to handle this is create a new type Address that will handle both IP and hostName for comparison. Maybe it could be a better approach and we can do this in a follow-up PR. Please let me know what you think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if this is fixed upstream in redis-rs? Seems like something that others would run into...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code doesn't match with redis repository. I suspect that our core implementation of pubsub is different

);

pipeline.set_push_manager(pm.clone());
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl Client {
con,
response_timeout,
glide_connection_options,
socket_addr,
)
.await
.map(|res| (res.0, res.1, ip))
Expand Down
11 changes: 9 additions & 2 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,9 @@ where
.await
.get_node();
let node_address = if let Some(socket_addr) = socket_addr {
socket_addr.to_string()
// Use format! to avoid reverse DNS lookup that socket_addr.to_string() performs
let addr = format!("{}:{}", socket_addr.ip(), socket_addr.port());
addr
} else {
node_addr
};
Expand Down Expand Up @@ -2250,7 +2252,12 @@ where
let conn_lock =
inner.conn_lock.read().expect(MUTEX_READ_ERR);
socket_addresses.find_map(|socket_addr| {
conn_lock.node_for_address(&socket_addr.to_string())
let addr_str = format!(
"{}:{}",
socket_addr.ip(),
socket_addr.port()
);
conn_lock.node_for_address(&addr_str)
})
},
);
Expand Down
11 changes: 11 additions & 0 deletions node/rust-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,9 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result<Object<'a>> {
let total_bytes_compressed = Telemetry::total_bytes_compressed().to_string();
let total_bytes_decompressed = Telemetry::total_bytes_decompressed().to_string();
let compression_skipped_count = Telemetry::compression_skipped_count().to_string();
let subscription_out_of_sync_count = Telemetry::subscription_out_of_sync_count().to_string();
let subscription_last_sync_timestamp =
Telemetry::subscription_last_sync_timestamp().to_string();

let mut stats = Object::new(env)?;
stats.set_named_property("total_connections", total_connections)?;
Expand All @@ -713,6 +716,14 @@ pub fn get_statistics<'a>(env: &'a Env) -> Result<Object<'a>> {
stats.set_named_property("total_bytes_compressed", total_bytes_compressed)?;
stats.set_named_property("total_bytes_decompressed", total_bytes_decompressed)?;
stats.set_named_property("compression_skipped_count", compression_skipped_count)?;
stats.set_named_property(
"subscription_out_of_sync_count",
subscription_out_of_sync_count,
)?;
stats.set_named_property(
"subscription_last_sync_timestamp",
subscription_last_sync_timestamp,
)?;

Ok(stats)
}
Loading
Loading