Support custom socket address resolution#5328
Support custom socket address resolution#5328tavomaciel wants to merge 5 commits intovalkey-io:mainfrom
Conversation
94d76ed to
24b2e97
Compare
| /// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be computed from `pipeline`. | ||
| /// - `pipeline_retry_strategy`: Configures retry behavior for pipeline commands. | ||
| /// - `retry_server_error`: If `true`, retries commands on server errors (may cause reordering). | ||
| /// - `retry_connection_error`: If `true`, retries on connection errors (may lead to duplicate executions). |
There was a problem hiding this comment.
These extra spaces were automatically formatted when running the recommended cargo fmt --manifest-path ./Cargo.toml --all. Since it's just 6 lines I decided to leave it, but I can remove it if anyone feels strongly about this.
There was a problem hiding this comment.
You can leave it, no worries
f7f49ce to
89cb666
Compare
affonsov
left a comment
There was a problem hiding this comment.
Thanks for the contribution
The CI is failing
https://github.com/valkey-io/valkey-glide/actions/runs/22008040249/job/63600882948?pr=5328
Also, I have one question about your implementation.
Do you have the chance or ability to test this not using faking addresses?
| /// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be computed from `pipeline`. | ||
| /// - `pipeline_retry_strategy`: Configures retry behavior for pipeline commands. | ||
| /// - `retry_server_error`: If `true`, retries commands on server errors (may cause reordering). | ||
| /// - `retry_connection_error`: If `true`, retries on connection errors (may lead to duplicate executions). |
There was a problem hiding this comment.
You can leave it, no worries
| /// A trait for resolving addresses before connection. | ||
| /// Given a host and port, returns the resolved (host, port) pair. | ||
| /// This allows custom DNS resolution or address translation logic. | ||
| pub trait AddressResolver: Send + Sync + std::fmt::Debug { |
There was a problem hiding this comment.
Not sure if this will resolve the issue.
This is just mapping an address to another address and not resolving the dns
Am I understand this correctly?
There was a problem hiding this comment.
The appliation can map it to a different hostname, but it's also possible to resolve the ip address and return the IP as the host here.
This works because later down the track, all paths lead to either to_socket_addr or c::getaddrinfo (via Connect calling SocketAddr:try_into calling LookupHost::try_into).
Both of these options support:
- Passing a hostname directly
- If no custom resolution happened, or the resolution gave another hostname
- Passing ip directly
- If the user created the client with an ip straight away, or the resolution already converted the hostname into an IP address.
This is actually good because in some cases, the behind-the-firewall hostname is only a CNAME to a new hostname. This makes the application simpler since it doesn't need to resolve all the CNAMEs until it finds an IP. And of course, the solution doesn't prevent anyone returning an IP directly here either.
There was a problem hiding this comment.
Pull request overview
This pull request adds support for custom socket address resolution when connecting to Valkey/Redis servers. The feature allows applications to provide a callback that resolves host and port addresses before connection establishment, which is useful for cases where default DNS resolution is insufficient (e.g., custom service discovery, proxy setups, or internal DNS servers).
Changes:
- Introduces an
AddressResolvertrait in the Redis-rs core library that can be implemented by each language binding - Adds Java binding implementation (
JavaAddressResolver) that bridges JNI calls to Java callback functions - Integrates address resolution at all connection points (initial connections, cluster topology refresh, reconnections)
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
glide-core/redis-rs/redis/src/types.rs |
Defines the AddressResolver trait requiring Send + Sync + Debug |
glide-core/redis-rs/redis/src/lib.rs |
Exports the AddressResolver trait |
glide-core/src/client/types.rs |
Adds address_resolver field to ConnectionRequest |
glide-core/src/client/mod.rs |
Integrates address resolution in get_connection_info() for standalone and cluster clients |
glide-core/src/client/standalone_client.rs |
Passes address resolver to reconnecting connections |
glide-core/src/client/reconnecting_connection.rs |
Uses address resolver when creating client connections |
glide-core/redis-rs/redis/src/cluster_client.rs |
Adds address resolver to ClusterParams and builder |
glide-core/redis-rs/redis/src/cluster.rs |
Uses address resolver in get_connection_addr() |
glide-core/redis-rs/redis/src/cluster_topology.rs |
Applies address resolution during topology parsing and adds unit tests |
glide-core/redis-rs/redis/src/cluster_async/mod.rs |
Passes address resolver to topology calculation (plus trailing whitespace cleanup) |
java/src/address_resolver.rs |
Implements JavaAddressResolver that calls Java callbacks via JNI |
java/src/lib.rs |
Creates global reference to Java resolver and passes it to ConnectionRequest |
java/client/src/main/java/glide/api/models/configuration/AddressResolver.java |
Java functional interface for address resolution callbacks |
java/client/src/main/java/glide/api/models/configuration/ResolvedAddress.java |
Java value class for resolved addresses |
java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java |
Adds addressResolver configuration field with documentation |
java/client/src/main/java/glide/internal/GlideNativeBridge.java |
Updates native method signature to accept address resolver parameter |
java/client/src/main/java/glide/managers/ConnectionManager.java |
Passes address resolver to native bridge |
java/integTest/src/test/java/glide/ConnectionTests.java |
Adds integration tests for standalone and cluster clients with address resolver |
CHANGELOG.md |
Documents the new feature |
java/src/address_resolver.rs
Outdated
| if let Ok(mut env) = self.jvm.attach_current_thread_as_daemon() { | ||
| // Call the resolver's resolve method: ResolvedAddress resolve(String host, int port) | ||
| if let Ok(host_jstring) = env.new_string(host) | ||
| && let Ok(result) = env.call_method( | ||
| self.resolver_global.as_obj(), | ||
| "resolve", | ||
| "(Ljava/lang/String;I)Lglide/api/models/configuration/ResolvedAddress;", | ||
| &[ | ||
| jni::objects::JValue::Object(&host_jstring), | ||
| jni::objects::JValue::Int(port as i32), | ||
| ], | ||
| ) | ||
| && let Ok(resolved_address) = result.l() | ||
| && !resolved_address.is_null() | ||
| { | ||
| // Get the resolved host and port from the ResolvedAddress object | ||
| if let Ok(resolved_host_obj) = | ||
| env.call_method(&resolved_address, "getHost", "()Ljava/lang/String;", &[]) | ||
| && let Ok(resolved_host_jobj) = resolved_host_obj.l() | ||
| && !resolved_host_jobj.is_null() | ||
| && let Ok(resolved_port_val) = | ||
| env.call_method(&resolved_address, "getPort", "()I", &[]) | ||
| && let Ok(resolved_port) = resolved_port_val.i() | ||
| { | ||
| let resolved_host_jstr: jni::objects::JString = resolved_host_jobj.into(); | ||
| if let Ok(resolved_host_str) = env.get_string(&resolved_host_jstr) { | ||
| let resolved_host_string = | ||
| resolved_host_str.to_str().unwrap_or(host).to_string(); | ||
| return (resolved_host_string, resolved_port as u16); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| // Fallback: return original address if resolution fails | ||
| (host.to_string(), port) |
There was a problem hiding this comment.
The resolve method silently falls back to the original address when any error occurs during Java callback invocation. This makes debugging difficult when the resolver is misconfigured or throws exceptions. Consider logging warnings for each failure point (JVM attachment, method call, field access) similar to how errors are logged elsewhere in the codebase. This would help developers identify issues with their address resolver implementations.
| if let Ok(mut env) = self.jvm.attach_current_thread_as_daemon() { | |
| // Call the resolver's resolve method: ResolvedAddress resolve(String host, int port) | |
| if let Ok(host_jstring) = env.new_string(host) | |
| && let Ok(result) = env.call_method( | |
| self.resolver_global.as_obj(), | |
| "resolve", | |
| "(Ljava/lang/String;I)Lglide/api/models/configuration/ResolvedAddress;", | |
| &[ | |
| jni::objects::JValue::Object(&host_jstring), | |
| jni::objects::JValue::Int(port as i32), | |
| ], | |
| ) | |
| && let Ok(resolved_address) = result.l() | |
| && !resolved_address.is_null() | |
| { | |
| // Get the resolved host and port from the ResolvedAddress object | |
| if let Ok(resolved_host_obj) = | |
| env.call_method(&resolved_address, "getHost", "()Ljava/lang/String;", &[]) | |
| && let Ok(resolved_host_jobj) = resolved_host_obj.l() | |
| && !resolved_host_jobj.is_null() | |
| && let Ok(resolved_port_val) = | |
| env.call_method(&resolved_address, "getPort", "()I", &[]) | |
| && let Ok(resolved_port) = resolved_port_val.i() | |
| { | |
| let resolved_host_jstr: jni::objects::JString = resolved_host_jobj.into(); | |
| if let Ok(resolved_host_str) = env.get_string(&resolved_host_jstr) { | |
| let resolved_host_string = | |
| resolved_host_str.to_str().unwrap_or(host).to_string(); | |
| return (resolved_host_string, resolved_port as u16); | |
| } | |
| } | |
| } | |
| } | |
| // Fallback: return original address if resolution fails | |
| (host.to_string(), port) | |
| let mut env = match self.jvm.attach_current_thread_as_daemon() { | |
| Ok(env) => env, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to attach current thread to JVM for address resolution of {host}:{port}: {e}" | |
| ); | |
| // Fallback: return original address if resolution fails | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| // Call the resolver's resolve method: ResolvedAddress resolve(String host, int port) | |
| let host_jstring = match env.new_string(host) { | |
| Ok(s) => s, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to create Java string for host '{host}' during address resolution of {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| let result = match env.call_method( | |
| self.resolver_global.as_obj(), | |
| "resolve", | |
| "(Ljava/lang/String;I)Lglide/api/models/configuration/ResolvedAddress;", | |
| &[ | |
| jni::objects::JValue::Object(&host_jstring), | |
| jni::objects::JValue::Int(port as i32), | |
| ], | |
| ) { | |
| Ok(r) => r, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to invoke Java address resolver 'resolve' for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| let resolved_address = match result.l() { | |
| Ok(obj) => { | |
| if obj.is_null() { | |
| log::warn!( | |
| "Java address resolver returned null ResolvedAddress for {host}:{port}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| obj | |
| } | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to convert Java resolver result to object for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| // Get the resolved host from the ResolvedAddress object | |
| let resolved_host_obj = match env.call_method( | |
| &resolved_address, | |
| "getHost", | |
| "()Ljava/lang/String;", | |
| &[], | |
| ) { | |
| Ok(v) => v, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to call ResolvedAddress.getHost() for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| let resolved_host_jobj = match resolved_host_obj.l() { | |
| Ok(obj) => { | |
| if obj.is_null() { | |
| log::warn!( | |
| "ResolvedAddress.getHost() returned null for {host}:{port}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| obj | |
| } | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to convert result of ResolvedAddress.getHost() to object for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| // Get the resolved port from the ResolvedAddress object | |
| let resolved_port_val = match env.call_method( | |
| &resolved_address, | |
| "getPort", | |
| "()I", | |
| &[], | |
| ) { | |
| Ok(v) => v, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to call ResolvedAddress.getPort() for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| let resolved_port = match resolved_port_val.i() { | |
| Ok(p) => p, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to extract port from ResolvedAddress.getPort() for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| let resolved_host_jstr: jni::objects::JString = resolved_host_jobj.into(); | |
| let resolved_host_str = match env.get_string(&resolved_host_jstr) { | |
| Ok(s) => s, | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to convert Java host string to Rust for {host}:{port}: {e}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| }; | |
| let resolved_host_string = match resolved_host_str.to_str() { | |
| Ok(s) => s.to_string(), | |
| Err(e) => { | |
| log::warn!( | |
| "Resolved host string contains invalid UTF-8 for {host}:{port}: {e}. Falling back to original host." | |
| ); | |
| host.to_string() | |
| } | |
| }; | |
| (resolved_host_string, resolved_port as u16) |
| @FunctionalInterface | ||
| public interface AddressResolver { | ||
|
|
||
| /** | ||
| * Resolves the given host and port to the actual connection address. | ||
| * | ||
| * @param host The configured host name or IP address | ||
| * @param port The configured port number | ||
| * @return A {@link ResolvedAddress} containing the resolved host and port to use for connection | ||
| */ | ||
| ResolvedAddress resolve(String host, int port); | ||
| } |
There was a problem hiding this comment.
The AddressResolver documentation should clarify important implementation requirements: (1) The resolver may be called from multiple threads concurrently, so implementations must be thread-safe. (2) The resolve method is called synchronously during connection establishment, so implementations should avoid blocking operations that could delay connections. (3) If the resolver throws an exception, the original configured address will be used as a fallback, and the connection attempt may fail if that address is invalid. Consider adding a note about these behaviors to help users understand the implications and best practices.
java/src/address_resolver.rs
Outdated
| let resolved_host_jstr: jni::objects::JString = resolved_host_jobj.into(); | ||
| if let Ok(resolved_host_str) = env.get_string(&resolved_host_jstr) { | ||
| let resolved_host_string = | ||
| resolved_host_str.to_str().unwrap_or(host).to_string(); |
There was a problem hiding this comment.
Consider adding validation for the resolved address values. The code should check if the resolved host string is empty or if the resolved port is negative before using them. Currently, an empty host string or negative port from the Java resolver would be passed through to the connection logic, which could cause confusing errors. Add validation and log a warning if invalid values are returned, falling back to the original address in such cases.
| resolved_host_str.to_str().unwrap_or(host).to_string(); | |
| resolved_host_str.to_str().unwrap_or(host).to_string(); | |
| // Validate resolved values: non-empty host and non-negative port. | |
| if resolved_host_string.is_empty() { | |
| log::warn!( | |
| "Java address resolver returned an empty host; \ | |
| falling back to original address {host}:{port}" | |
| ); | |
| return (host.to_string(), port); | |
| } | |
| if resolved_port < 0 { | |
| log::warn!( | |
| "Java address resolver returned a negative port ({}); \ | |
| falling back to original address {}:{}", | |
| resolved_port, | |
| host, | |
| port | |
| ); | |
| return (host.to_string(), port); | |
| } |
3716732 to
c055a62
Compare
Signed-off-by: Gustavo Maciel <gmaciel@atlassian.com>
Signed-off-by: Gustavo Maciel <gmaciel@atlassian.com>
Signed-off-by: Gustavo Maciel <gmaciel@atlassian.com>
Signed-off-by: Gustavo Maciel <gmaciel@atlassian.com>
c055a62 to
689b20d
Compare
Signed-off-by: Gustavo Maciel <gmaciel@atlassian.com>
|
@affonsov Thanks for reviewing, I fixed the issues with it, and fixed CI (was a change I did last minute and didn't push all of it 🤦) wrt fake addresses, I think this is the point of the PR actually: The client or cluster may give addresses that look invalid, but are actually hostnames behind a firewall. The address resolver converts them to IPs or hostnames that can be resolved through the usual |
Summary
Allows the application to resolve the host and port before a connection is made. This is useful for cases where the URL can't be resolved by the default DNS, or is behind a firewall. See: #4396
Issue link
This Pull Request is linked to issue: github.com/valkey-io/valkey-glide/issues/4396
Features / Behaviour Changes
Exposes the
AddressResolvercallback in the builder in Java. The other bindings do not expose this yet.Implementation
AddressResolverwas added, where each binding can implement its own version of it.ConnectionAddrwere changed to call theAddressResolver(client creation, cluster new connection creation)JavaAddressResolverthat stores the pointer to the Java callback, and calls it using JNI.Limitations
Testing
ConnectionTests.javathat verify if initially invalid addresses can be rewritten back into the valid addressescluster_topology.rsthat verify that new connections in the cluster are also rewrittenChecklist
make *-linttargets) and Prettier has been run (make prettier-fix).