Skip to content

Commit f7f49ce

Browse files
committed
Adds AddressResolver, and implements it for Java, fixing #4396
1 parent a952ba1 commit f7f49ce

File tree

19 files changed

+530
-48
lines changed

19 files changed

+530
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Python: Move OpenTelemetry config classes to glide_shared for code reuse between async and sync clients
2121
* JAVA: Add dynamic PubSub methods (subscribe, psubscribe, unsubscribe, punsubscribe, ssubscribe, sunsubscribe), getSubscriptions() for subscription state tracking, pubsubReconciliationIntervalMs configuration option, and subscription_out_of_sync_count and subscription_last_sync_timestamp metrics ([#5267](https://github.com/valkey-io/valkey-glide/issues/5267))
2222
* Go: Add ALLOW_NON_COVERED_SLOTS flag support for cluster scan ([#4895](https://github.com/valkey-io/valkey-glide/issues/4895))
23+
* CORE: Support custom socket address resolution when connecting to valkey ([#4396](https://github.com/valkey-io/valkey-glide/issues/4396))
2324

2425
#### Fixes
2526
* Node: Fix to handle non-string types in toBuffersArray ([#4842](https://github.com/valkey-io/valkey-glide/issues/4842))

glide-core/redis-rs/redis/src/cluster.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ use crate::connection::{
4848
connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo,
4949
};
5050
use crate::parser::parse_redis_value;
51-
use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, RetryMethod, Value};
51+
use crate::types::{
52+
AddressResolver, ErrorKind, HashMap, RedisError, RedisResult, RetryMethod, Value,
53+
};
5254
pub use crate::TlsMode; // Pub for backwards compatibility
5355
use crate::{
5456
cluster_client::ClusterParams,
@@ -383,7 +385,16 @@ where
383385
ErrorKind::ClientError,
384386
"can't parse node address",
385387
)))?;
386-
match parse_and_count_slots(&value, self.cluster_params.tls, addr).map(
388+
match parse_and_count_slots(
389+
&value,
390+
self.cluster_params.tls,
391+
addr,
392+
self.cluster_params
393+
.address_resolver
394+
.as_ref()
395+
.map(Arc::as_ref),
396+
)
397+
.map(
387398
|ParsedSlotsResult {
388399
slots,
389400
address_to_ip_map,
@@ -1006,7 +1017,8 @@ pub(crate) fn get_connection_info(
10061017
host.to_string(),
10071018
port,
10081019
cluster_params.tls,
1009-
cluster_params.tls_params,
1020+
cluster_params.tls_params.clone(),
1021+
cluster_params.address_resolver.as_ref().map(Arc::as_ref),
10101022
),
10111023
redis: RedisConnectionInfo {
10121024
password: cluster_params.password,
@@ -1024,21 +1036,29 @@ pub(crate) fn get_connection_addr(
10241036
port: u16,
10251037
tls: Option<TlsMode>,
10261038
tls_params: Option<TlsConnParams>,
1039+
address_resolver: Option<&dyn AddressResolver>,
10271040
) -> ConnectionAddr {
1041+
// Resolve the address if a resolver is provided
1042+
let (resolved_host, resolved_port) = if let Some(resolver) = address_resolver {
1043+
resolver.resolve(&host, port)
1044+
} else {
1045+
(host, port)
1046+
};
1047+
10281048
match tls {
10291049
Some(TlsMode::Secure) => ConnectionAddr::TcpTls {
1030-
host,
1031-
port,
1050+
host: resolved_host,
1051+
port: resolved_port,
10321052
insecure: false,
10331053
tls_params,
10341054
},
10351055
Some(TlsMode::Insecure) => ConnectionAddr::TcpTls {
1036-
host,
1037-
port,
1056+
host: resolved_host,
1057+
port: resolved_port,
10381058
insecure: true,
10391059
tls_params,
10401060
},
1041-
_ => ConnectionAddr::Tcp(host, port),
1061+
_ => ConnectionAddr::Tcp(resolved_host, resolved_port),
10421062
}
10431063
}
10441064

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,9 @@ where
289289
}
290290

291291
/// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be computed from `pipeline`.
292-
/// - `pipeline_retry_strategy`: Configures retry behavior for pipeline commands.
293-
/// - `retry_server_error`: If `true`, retries commands on server errors (may cause reordering).
294-
/// - `retry_connection_error`: If `true`, retries on connection errors (may lead to duplicate executions).
292+
/// - `pipeline_retry_strategy`: Configures retry behavior for pipeline commands.
293+
/// - `retry_server_error`: If `true`, retries commands on server errors (may cause reordering).
294+
/// - `retry_connection_error`: If `true`, retries on connection errors (may lead to duplicate executions).
295295
/// TODO: add wiki link.
296296
pub async fn route_pipeline<'a>(
297297
&'a mut self,
@@ -664,9 +664,9 @@ enum CmdArg<C> {
664664
count: usize,
665665
route: Option<InternalSingleNodeRouting<C>>,
666666
sub_pipeline: bool,
667-
/// Configures retry behavior for pipeline commands.
668-
/// - `retry_server_error`: If `true`, retries commands on server errors (may cause reordering).
669-
/// - `retry_connection_error`: If `true`, retries on connection errors (may lead to duplicate executions).
667+
/// Configures retry behavior for pipeline commands.
668+
/// - `retry_server_error`: If `true`, retries commands on server errors (may cause reordering).
669+
/// - `retry_connection_error`: If `true`, retries on connection errors (may lead to duplicate executions).
670670
pipeline_retry_strategy: PipelineRetryStrategy,
671671
},
672672
ClusterScan {
@@ -3698,13 +3698,17 @@ where
36983698
let read_from_replicas = inner
36993699
.get_cluster_param(|params| params.read_from_replicas.clone())
37003700
.expect(MUTEX_READ_ERR);
3701+
let address_resolver = inner
3702+
.get_cluster_param(|params| params.address_resolver.clone())
3703+
.expect(MUTEX_READ_ERR);
37013704
TopologyQueryResult {
37023705
topology_result: calculate_topology(
37033706
topology_values,
37043707
curr_retry,
37053708
tls_mode,
37063709
num_of_nodes_to_query,
37073710
read_from_replicas,
3711+
address_resolver.as_ref().map(Arc::as_ref),
37083712
),
37093713
failed_connections: Some(failed_addresses),
37103714
}

glide-core/redis-rs/redis/src/cluster_client.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::cluster_topology::{
44
DEFAULT_SLOTS_REFRESH_MAX_JITTER_MILLI, DEFAULT_SLOTS_REFRESH_WAIT_DURATION,
55
};
66
use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
7-
use crate::types::{ErrorKind, ProtocolVersion, RedisError, RedisResult};
7+
use crate::types::{AddressResolver, ErrorKind, ProtocolVersion, RedisError, RedisResult};
88
use crate::{cluster, cluster::TlsMode};
99
use crate::{PushInfo, RetryStrategy};
1010
use rand::Rng;
@@ -48,6 +48,7 @@ struct BuilderParams {
4848
refresh_topology_from_initial_nodes: bool,
4949
database_id: i64,
5050
tcp_nodelay: bool,
51+
address_resolver: Option<Arc<dyn AddressResolver>>,
5152
}
5253

5354
#[derive(Clone)]
@@ -151,6 +152,8 @@ pub struct ClusterParams {
151152
pub(crate) refresh_topology_from_initial_nodes: bool,
152153
pub(crate) database_id: i64,
153154
pub(crate) tcp_nodelay: bool,
155+
/// Optional callback for resolving addresses before connection.
156+
pub(crate) address_resolver: Option<Arc<dyn AddressResolver>>,
154157
}
155158

156159
impl ClusterParams {
@@ -183,6 +186,7 @@ impl ClusterParams {
183186
refresh_topology_from_initial_nodes: value.refresh_topology_from_initial_nodes,
184187
database_id: value.database_id,
185188
tcp_nodelay: value.tcp_nodelay,
189+
address_resolver: value.address_resolver,
186190
})
187191
}
188192
}
@@ -506,6 +510,16 @@ impl ClusterClientBuilder {
506510
self
507511
}
508512

513+
/// Sets an address resolver callback for resolving node addresses.
514+
///
515+
/// When set, the resolver will be called to resolve host:port pairs
516+
/// before establishing connections to cluster nodes. This allows custom
517+
/// DNS resolution or address translation logic.
518+
pub fn address_resolver(mut self, resolver: Arc<dyn AddressResolver>) -> ClusterClientBuilder {
519+
self.builder_params.address_resolver = Some(resolver);
520+
self
521+
}
522+
509523
/// Enables timing out on slow connection time.
510524
///
511525
/// If enabled, the cluster will only wait the given time on each connection attempt to each node.

0 commit comments

Comments
 (0)