@@ -2,7 +2,7 @@ use std::net::{IpAddr, SocketAddr};
22
33use super :: { connections_container:: ClusterNode , Connect } ;
44use crate :: {
5- aio:: { get_socket_addrs , ConnectionLike , Runtime } ,
5+ aio:: { ConnectionLike , Runtime } ,
66 cluster:: get_connection_info,
77 cluster_client:: ClusterParams ,
88 push_manager:: PushInfo ,
3737 async { conn } . boxed ( ) . shared ( )
3838}
3939
40- /// Return true if a DNS change is detected, otherwise return false.
41- /// This function takes a node's address, examines if its host has encountered a DNS change, where the node's endpoint now leads to a different IP address.
42- /// If no socket addresses are discovered for the node's host address, or if it's a non-DNS address, it returns false.
43- /// In case the node's host address resolves to socket addresses and none of them match the current connection's IP,
44- /// a DNS change is detected, so the current connection isn't valid anymore and a new connection should be made.
45- async fn has_dns_changed ( addr : & str , curr_ip : & IpAddr ) -> bool {
46- let ( host, port) = match get_host_and_port_from_addr ( addr) {
47- Some ( ( host, port) ) => ( host, port) ,
48- None => return false ,
49- } ;
50- let mut updated_addresses = match get_socket_addrs ( host, port) . await {
51- Ok ( socket_addrs) => socket_addrs,
52- Err ( _) => return false ,
53- } ;
54-
55- !updated_addresses. any ( |socket_addr| socket_addr. ip ( ) == * curr_ip)
56- }
57-
5840fn failed_management_connection < C > (
5941 addr : & str ,
6042 user_conn : ConnectionFuture < C > ,
@@ -107,14 +89,6 @@ where
10789 }
10890}
10991
110- fn warn_mismatch_ip ( addr : & str , new_ip : Option < IpAddr > , prev_ip : Option < IpAddr > ) {
111- warn ! (
112- "New IP was found for node {:?}:
113- new connection IP = {:?}, previous connection IP = {:?}" ,
114- addr, new_ip, prev_ip
115- ) ;
116- }
117-
11892fn create_async_node < C > (
11993 user_conn : C ,
12094 management_conn : Option < C > ,
@@ -152,47 +126,25 @@ where
152126 {
153127 ( Ok ( conn_1) , Ok ( conn_2) ) => {
154128 // Both connections were successfully established
155- let ( mut user_conn, mut user_ip) : ( C , Option < IpAddr > ) = conn_1;
156- let ( mut management_conn, management_ip) : ( C , Option < IpAddr > ) = conn_2;
157- if user_ip == management_ip {
158- // Set up both connections
159- if let Err ( err) = setup_user_connection ( & mut user_conn, params) . await {
160- return err. into ( ) ;
161- }
162- match setup_management_connection ( & mut management_conn) . await {
163- Ok ( _) => ConnectAndCheckResult :: Success ( create_async_node (
164- user_conn,
165- Some ( management_conn) ,
166- user_ip,
167- ) ) ,
168- Err ( err) => {
169- failed_management_connection ( addr, to_future ( user_conn) , user_ip, err)
170- }
171- }
172- } else {
173- // Use only the connection with the latest IP address
174- warn_mismatch_ip ( addr, user_ip, management_ip) ;
175- if has_dns_changed ( addr, & user_ip. unwrap ( ) ) . await {
176- // The user_ip is incorrect. Use the created `management_conn` for the user connection
177- user_conn = management_conn;
178- user_ip = management_ip;
179- }
180- match setup_user_connection ( & mut user_conn, params) . await {
181- Ok ( _) => failed_management_connection (
182- addr,
183- to_future ( user_conn) ,
184- user_ip,
185- ( ErrorKind :: IoError , "mismatched IP" ) . into ( ) ,
186- ) ,
187- Err ( err) => err. into ( ) ,
188- }
129+ let ( mut user_conn, ip) : ( C , Option < IpAddr > ) = conn_1;
130+ let ( mut management_conn, _ip) : ( C , Option < IpAddr > ) = conn_2;
131+ if let Err ( err) = setup_user_connection ( & mut user_conn, params) . await {
132+ return err. into ( ) ;
133+ }
134+ match setup_management_connection ( & mut management_conn) . await {
135+ Ok ( _) => ConnectAndCheckResult :: Success ( create_async_node (
136+ user_conn,
137+ Some ( management_conn) ,
138+ ip,
139+ ) ) ,
140+ Err ( err) => failed_management_connection ( addr, to_future ( user_conn) , ip, err) ,
189141 }
190142 }
191143 ( Ok ( conn) , Err ( err) ) | ( Err ( err) , Ok ( conn) ) => {
192144 // Only a single connection was successfully established. Use it for the user connection
193- let ( mut user_conn, user_ip ) : ( C , Option < IpAddr > ) = conn;
145+ let ( mut user_conn, ip ) : ( C , Option < IpAddr > ) = conn;
194146 match setup_user_connection ( & mut user_conn, params) . await {
195- Ok ( _) => failed_management_connection ( addr, to_future ( user_conn) , user_ip , err) ,
147+ Ok ( _) => failed_management_connection ( addr, to_future ( user_conn) , ip , err) ,
196148 Err ( err) => err. into ( ) ,
197149 }
198150 }
@@ -216,49 +168,29 @@ async fn connect_and_check_only_management_conn<C>(
216168 params : ClusterParams ,
217169 socket_addr : Option < SocketAddr > ,
218170 prev_node : AsyncClusterNode < C > ,
219- push_sender : Option < mpsc:: UnboundedSender < PushInfo > > ,
220171) -> ConnectAndCheckResult < C >
221172where
222173 C : ConnectionLike + Connect + Send + Sync + ' static + Clone ,
223174{
224- match future:: join (
225- create_connection :: < C > ( addr, params. clone ( ) , socket_addr, push_sender, false ) ,
226- create_connection :: < C > ( addr, params. clone ( ) , socket_addr, None , true ) ,
227- )
228- . await
229- {
230- ( Err ( user_conn_err) , _) => failed_management_connection (
231- addr,
232- prev_node. user_connection ,
233- prev_node. ip ,
234- user_conn_err,
235- ) ,
236- ( _, Err ( mngm_conn_err) ) => failed_management_connection (
237- addr,
238- prev_node. user_connection ,
239- prev_node. ip ,
240- mngm_conn_err,
241- ) ,
175+ match create_connection :: < C > ( addr, params. clone ( ) , socket_addr, None , true ) . await {
176+ Err ( conn_err) => {
177+ failed_management_connection ( addr, prev_node. user_connection , prev_node. ip , conn_err)
178+ }
242179
243- ( Ok ( mut user_conn) , Ok ( mut mngm_conn) ) => {
244- let ( final_user_conn, user_ip) = if user_conn. 1 != prev_node. ip {
245- // An IP mismatch was detected. Attempt to establish a new connection to replace both the management and user connections.
246- warn_mismatch_ip ( addr, user_conn. 1 , prev_node. ip ) ;
247- if let Err ( err) = setup_user_connection ( & mut user_conn. 0 , params. clone ( ) ) . await {
248- return ConnectAndCheckResult :: Failed ( err) ;
249- }
250- ( to_future ( user_conn. 0 ) , user_conn. 1 )
251- } else {
252- ( prev_node. user_connection , prev_node. ip )
253- } ;
254- if let Err ( err) = setup_management_connection ( & mut mngm_conn. 0 ) . await {
255- return failed_management_connection ( addr, final_user_conn, user_ip, err) ;
180+ Ok ( mut conn) => {
181+ if let Err ( err) = setup_management_connection ( & mut conn. 0 ) . await {
182+ return failed_management_connection (
183+ addr,
184+ prev_node. user_connection ,
185+ prev_node. ip ,
186+ err,
187+ ) ;
256188 }
257189
258190 ConnectAndCheckResult :: Success ( ClusterNode {
259- user_connection : final_user_conn ,
260- ip : user_ip ,
261- management_connection : Some ( to_future ( mngm_conn . 0 ) ) ,
191+ user_connection : prev_node . user_connection ,
192+ ip : prev_node . ip ,
193+ management_connection : Some ( to_future ( conn . 0 ) ) ,
262194 } )
263195 }
264196 }
@@ -364,14 +296,7 @@ where
364296 // Refreshing only the management connection requires the node to exist alongside a user connection. Otherwise, refresh all connections.
365297 match node {
366298 Some ( node) => {
367- connect_and_check_only_management_conn (
368- addr,
369- params,
370- socket_addr,
371- node,
372- push_sender,
373- )
374- . await
299+ connect_and_check_only_management_conn ( addr, params, socket_addr, node) . await
375300 }
376301 None => {
377302 connect_and_check_all_connections ( addr, params, socket_addr, push_sender) . await
0 commit comments