Skip to content

Commit c7b598b

Browse files
committed
Refactored each node connection to be stored with its own IP address. We now use the ConnectionWithIp type, which wraps both the actual connection and its associated IP address.
1 parent c2fcee3 commit c7b598b

File tree

5 files changed

+187
-268
lines changed

5 files changed

+187
-268
lines changed

redis/src/aio/connection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ pub(crate) async fn get_socket_addrs(
444444
}
445445
}
446446

447+
/// Logs the creation of a connection, including its type, the node, and optionally its IP address.
447448
fn log_conn_creation<T>(conn_type: &str, node: T, ip: Option<IpAddr>)
448449
where
449450
T: std::fmt::Debug,

redis/src/cluster_async/connections_container.rs

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,80 @@
1-
use std::collections::HashMap;
2-
use std::net::IpAddr;
3-
1+
use crate::cluster_async::ConnectionFuture;
42
use arcstr::ArcStr;
3+
use futures::FutureExt;
54
use rand::seq::IteratorRandom;
5+
use std::collections::HashMap;
6+
use std::net::IpAddr;
67

78
use crate::cluster_routing::{Route, SlotAddr};
89
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
910
use crate::cluster_topology::TopologyHash;
1011

12+
/// A struct that encapsulates a network connection along with its associated IP address.
1113
#[derive(Clone, Eq, PartialEq, Debug)]
12-
pub struct ClusterNode<Connection> {
13-
pub user_connection: Connection,
14-
pub management_connection: Option<Connection>,
14+
pub struct ConnectionWithIp<Connection> {
15+
/// The actual connection
16+
pub conn: Connection,
17+
/// The IP associated with the connection
1518
pub ip: Option<IpAddr>,
1619
}
1720

21+
impl<Connection> ConnectionWithIp<Connection>
22+
where
23+
Connection: Clone + Send + 'static,
24+
{
25+
/// Consumes the current instance and returns a new `ConnectionWithIp`
26+
/// where the connection is wrapped in a future.
27+
#[doc(hidden)]
28+
pub fn into_future(self) -> ConnectionWithIp<ConnectionFuture<Connection>> {
29+
ConnectionWithIp {
30+
conn: async { self.conn }.boxed().shared(),
31+
ip: self.ip,
32+
}
33+
}
34+
}
35+
36+
impl<Connection> From<(Connection, Option<IpAddr>)> for ConnectionWithIp<Connection> {
37+
fn from(val: (Connection, Option<IpAddr>)) -> Self {
38+
ConnectionWithIp {
39+
conn: val.0,
40+
ip: val.1,
41+
}
42+
}
43+
}
44+
45+
impl<Connection> From<ConnectionWithIp<Connection>> for (Connection, Option<IpAddr>) {
46+
fn from(val: ConnectionWithIp<Connection>) -> Self {
47+
(val.conn, val.ip)
48+
}
49+
}
50+
51+
#[derive(Clone, Eq, PartialEq, Debug)]
52+
pub struct ClusterNode<Connection> {
53+
pub user_connection: ConnectionWithIp<Connection>,
54+
pub management_connection: Option<ConnectionWithIp<Connection>>,
55+
}
56+
1857
impl<Connection> ClusterNode<Connection>
1958
where
2059
Connection: Clone,
2160
{
2261
pub fn new(
23-
user_connection: Connection,
24-
management_connection: Option<Connection>,
25-
ip: Option<IpAddr>,
62+
user_connection: ConnectionWithIp<Connection>,
63+
management_connection: Option<ConnectionWithIp<Connection>>,
2664
) -> Self {
2765
Self {
2866
user_connection,
2967
management_connection,
30-
ip,
3168
}
3269
}
3370

3471
pub(crate) fn get_connection(&self, conn_type: &ConnectionType) -> Connection {
3572
match conn_type {
36-
ConnectionType::User => self.user_connection.clone(),
37-
ConnectionType::PreferManagement => self
38-
.management_connection
39-
.clone()
40-
.unwrap_or_else(|| self.user_connection.clone()),
73+
ConnectionType::User => self.user_connection.conn.clone(),
74+
ConnectionType::PreferManagement => self.management_connection.as_ref().map_or_else(
75+
|| self.user_connection.conn.clone(),
76+
|management_conn| management_conn.conn.clone(),
77+
),
4178
}
4279
}
4380
}
@@ -54,7 +91,7 @@ pub(crate) struct ConnectionsMap<Connection>(pub(crate) HashMap<ArcStr, ClusterN
5491
impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
5592
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
5693
for (address, node) in self.0.iter() {
57-
match node.ip {
94+
match node.user_connection.ip {
5895
Some(ip) => writeln!(f, "{address} - {ip}")?,
5996
None => writeln!(f, "{address}")?,
6097
};
@@ -178,7 +215,7 @@ where
178215
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
179216
self.connection_map
180217
.iter()
181-
.map(move |(address, node)| (address.clone(), node.user_connection.clone()))
218+
.map(move |(address, node)| (address.clone(), node.user_connection.conn.clone()))
182219
}
183220

184221
pub(crate) fn all_primary_connections(
@@ -200,7 +237,7 @@ where
200237
) -> Option<ConnectionAndAddress<Connection>> {
201238
self.connection_map
202239
.get_key_value(address)
203-
.map(|(address, conn)| (address.clone(), conn.user_connection.clone()))
240+
.map(|(address, conn)| (address.clone(), conn.user_connection.conn.clone()))
204241
}
205242

206243
pub(crate) fn random_connections(
@@ -258,10 +295,10 @@ mod tests {
258295
Connection: Clone,
259296
{
260297
pub(crate) fn new_only_with_user_conn(user_connection: Connection) -> Self {
298+
let ip = None;
261299
Self {
262-
user_connection,
300+
user_connection: (user_connection, ip).into(),
263301
management_connection: None,
264-
ip: None,
265302
}
266303
}
267304
}
@@ -296,14 +333,14 @@ mod tests {
296333
connection: usize,
297334
use_management_connections: bool,
298335
) -> ClusterNode<usize> {
336+
let ip = None;
299337
ClusterNode::new(
300-
connection,
338+
(connection, ip).into(),
301339
if use_management_connections {
302-
Some(connection * 10)
340+
Some((connection * 10, ip).into())
303341
} else {
304342
None
305343
},
306-
None,
307344
)
308345
}
309346

redis/src/cluster_async/connections_logic.rs

Lines changed: 37 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::net::{IpAddr, SocketAddr};
1+
use std::net::SocketAddr;
22

3-
use super::{connections_container::ClusterNode, Connect};
3+
use super::{
4+
connections_container::{ClusterNode, ConnectionWithIp},
5+
Connect,
6+
};
47
use crate::{
58
aio::{ConnectionLike, Runtime},
69
cluster::get_connection_info,
@@ -30,17 +33,9 @@ pub enum RefreshConnectionType {
3033
AllConnections,
3134
}
3235

33-
fn to_future<C>(conn: C) -> ConnectionFuture<C>
34-
where
35-
C: Clone + Send + 'static,
36-
{
37-
async { conn }.boxed().shared()
38-
}
39-
4036
fn failed_management_connection<C>(
4137
addr: &str,
42-
user_conn: ConnectionFuture<C>,
43-
ip: Option<IpAddr>,
38+
user_conn: ConnectionWithIp<ConnectionFuture<C>>,
4439
err: RedisError,
4540
) -> ConnectAndCheckResult<C>
4641
where
@@ -51,7 +46,7 @@ where
5146
addr, err
5247
);
5348
ConnectAndCheckResult::ManagementConnectionFailed {
54-
node: AsyncClusterNode::new(user_conn, None, ip),
49+
node: AsyncClusterNode::new(user_conn, None),
5550
err,
5651
}
5752
}
@@ -90,17 +85,16 @@ where
9085
}
9186

9287
fn create_async_node<C>(
93-
user_conn: C,
94-
management_conn: Option<C>,
95-
ip: Option<IpAddr>,
88+
user_conn: ConnectionWithIp<C>,
89+
management_conn: Option<ConnectionWithIp<C>>,
9690
) -> AsyncClusterNode<C>
9791
where
9892
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
9993
{
100-
let user_conn = to_future(user_conn);
101-
let management_conn = management_conn.map(to_future);
102-
103-
AsyncClusterNode::new(user_conn, management_conn, ip)
94+
AsyncClusterNode::new(
95+
user_conn.into_future(),
96+
management_conn.map(|conn| conn.into_future()),
97+
)
10498
}
10599

106100
pub(crate) async fn connect_and_check_all_connections<C>(
@@ -126,25 +120,23 @@ where
126120
{
127121
(Ok(conn_1), Ok(conn_2)) => {
128122
// Both connections were successfully established
129-
let (mut user_conn, ip): (C, Option<IpAddr>) = conn_1;
130-
let (mut management_conn, _ip): (C, Option<IpAddr>) = conn_2;
131-
if let Err(err) = setup_user_connection(&mut user_conn, params).await {
123+
let mut user_conn: ConnectionWithIp<C> = conn_1;
124+
let mut management_conn: ConnectionWithIp<C> = conn_2;
125+
if let Err(err) = setup_user_connection(&mut user_conn.conn, params).await {
132126
return err.into();
133127
}
134-
match setup_management_connection(&mut management_conn).await {
128+
match setup_management_connection(&mut management_conn.conn).await {
135129
Ok(_) => ConnectAndCheckResult::Success(create_async_node(
136130
user_conn,
137131
Some(management_conn),
138-
ip,
139132
)),
140-
Err(err) => failed_management_connection(addr, to_future(user_conn), ip, err),
133+
Err(err) => failed_management_connection(addr, user_conn.into_future(), err),
141134
}
142135
}
143-
(Ok(conn), Err(err)) | (Err(err), Ok(conn)) => {
136+
(Ok(mut connection), Err(err)) | (Err(err), Ok(mut connection)) => {
144137
// Only a single connection was successfully established. Use it for the user connection
145-
let (mut user_conn, ip): (C, Option<IpAddr>) = conn;
146-
match setup_user_connection(&mut user_conn, params).await {
147-
Ok(_) => failed_management_connection(addr, to_future(user_conn), ip, err),
138+
match setup_user_connection(&mut connection.conn, params).await {
139+
Ok(_) => failed_management_connection(addr, connection.into_future(), err),
148140
Err(err) => err.into(),
149141
}
150142
}
@@ -173,24 +165,16 @@ where
173165
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
174166
{
175167
match create_connection::<C>(addr, params.clone(), socket_addr, None, true).await {
176-
Err(conn_err) => {
177-
failed_management_connection(addr, prev_node.user_connection, prev_node.ip, conn_err)
178-
}
168+
Err(conn_err) => failed_management_connection(addr, prev_node.user_connection, conn_err),
179169

180-
Ok(mut conn) => {
181-
if let Err(err) = setup_management_connection(&mut conn.0).await {
182-
return failed_management_connection(
183-
addr,
184-
prev_node.user_connection,
185-
prev_node.ip,
186-
err,
187-
);
170+
Ok(mut connection) => {
171+
if let Err(err) = setup_management_connection(&mut connection.conn).await {
172+
return failed_management_connection(addr, prev_node.user_connection, err);
188173
}
189174

190175
ConnectAndCheckResult::Success(ClusterNode {
191176
user_connection: prev_node.user_connection,
192-
ip: prev_node.ip,
193-
management_connection: Some(to_future(conn.0)),
177+
management_connection: Some(connection.into_future()),
194178
})
195179
}
196180
}
@@ -263,7 +247,7 @@ where
263247
{
264248
match conn_type {
265249
RefreshConnectionType::OnlyUserConnection => {
266-
let (user_conn, ip) = match create_and_setup_user_connection(
250+
let user_conn = match create_and_setup_user_connection(
267251
addr,
268252
params.clone(),
269253
socket_addr,
@@ -274,23 +258,8 @@ where
274258
Ok(tuple) => tuple,
275259
Err(err) => return err.into(),
276260
};
277-
if let Some(node) = node {
278-
let mut management_conn = match node.management_connection {
279-
Some(ref conn) => Some(conn.clone().await),
280-
None => None,
281-
};
282-
if ip != node.ip {
283-
// New IP was found, refresh the management connection too
284-
management_conn =
285-
create_and_setup_management_connection(addr, params, socket_addr)
286-
.await
287-
.ok()
288-
.map(|(conn, _ip): (C, Option<IpAddr>)| conn);
289-
}
290-
create_async_node(user_conn, management_conn, ip).into()
291-
} else {
292-
create_async_node(user_conn, None, ip).into()
293-
}
261+
let management_conn = node.and_then(|node| node.management_connection);
262+
AsyncClusterNode::new(user_conn.into_future(), management_conn).into()
294263
}
295264
RefreshConnectionType::OnlyManagementConnection => {
296265
// Refreshing only the management connection requires the node to exist alongside a user connection. Otherwise, refresh all connections.
@@ -314,28 +283,14 @@ async fn create_and_setup_user_connection<C>(
314283
params: ClusterParams,
315284
socket_addr: Option<SocketAddr>,
316285
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
317-
) -> RedisResult<(C, Option<IpAddr>)>
286+
) -> RedisResult<ConnectionWithIp<C>>
318287
where
319288
C: ConnectionLike + Connect + Send + 'static,
320289
{
321-
let (mut conn, ip): (C, Option<IpAddr>) =
290+
let mut connection: ConnectionWithIp<C> =
322291
create_connection(node, params.clone(), socket_addr, push_sender, false).await?;
323-
setup_user_connection(&mut conn, params).await?;
324-
Ok((conn, ip))
325-
}
326-
327-
async fn create_and_setup_management_connection<C>(
328-
node: &str,
329-
params: ClusterParams,
330-
socket_addr: Option<SocketAddr>,
331-
) -> RedisResult<(C, Option<IpAddr>)>
332-
where
333-
C: ConnectionLike + Connect + Send + 'static,
334-
{
335-
let (mut conn, ip): (C, Option<IpAddr>) =
336-
create_connection(node, params.clone(), socket_addr, None, true).await?;
337-
setup_management_connection(&mut conn).await?;
338-
Ok((conn, ip))
292+
setup_user_connection(&mut connection.conn, params).await?;
293+
Ok(connection)
339294
}
340295

341296
async fn setup_user_connection<C>(conn: &mut C, params: ClusterParams) -> RedisResult<()>
@@ -373,7 +328,7 @@ async fn create_connection<C>(
373328
socket_addr: Option<SocketAddr>,
374329
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
375330
is_management: bool,
376-
) -> RedisResult<(C, Option<IpAddr>)>
331+
) -> RedisResult<ConnectionWithIp<C>>
377332
where
378333
C: ConnectionLike + Connect + Send + 'static,
379334
{
@@ -392,6 +347,7 @@ where
392347
if !is_management { push_sender } else { None },
393348
)
394349
.await
350+
.map(|conn| conn.into())
395351
}
396352

397353
/// The function returns None if the checked connection/s are healthy. Otherwise, it returns the type of the unhealthy connection/s.
@@ -430,7 +386,7 @@ where
430386
return false;
431387
}
432388
match node.management_connection.clone() {
433-
Some(conn) => check(conn, timeout, "management").await,
389+
Some(connection) => check(connection.conn, timeout, "management").await,
434390
None => {
435391
warn!("The management connection for node {} isn't set", address);
436392
true
@@ -441,7 +397,7 @@ where
441397
if !check_user_connection {
442398
return false;
443399
}
444-
let conn = node.user_connection.clone();
400+
let conn = node.user_connection.conn.clone();
445401
check(conn, timeout, "user").await
446402
},
447403
);

0 commit comments

Comments
 (0)