Skip to content

Commit d8cdfee

Browse files
authored
Fixed non key based commands not to be routed to a random node (#174)
1 parent 2fac27a commit d8cdfee

File tree

2 files changed

+149
-10
lines changed

2 files changed

+149
-10
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub mod testing {
2929
pub use super::connections_logic::*;
3030
}
3131
use crate::{
32+
cluster_routing::{Routable, RoutingInfo},
3233
cluster_slotmap::SlotMap,
3334
cluster_topology::SLOT_SIZE,
3435
cmd,
@@ -1765,7 +1766,7 @@ where
17651766

17661767
// if we reached this point, we're sending the command only to single node, and we need to find the
17671768
// right connection to the node.
1768-
let (address, mut conn) = Self::get_connection(routing, core)
1769+
let (address, mut conn) = Self::get_connection(routing, core, Some(cmd.clone()))
17691770
.await
17701771
.map_err(|err| (OperationTarget::NotFound, err))?;
17711772
conn.req_packed_command(&cmd)
@@ -1801,7 +1802,7 @@ where
18011802
pipeline,
18021803
offset,
18031804
count,
1804-
Self::get_connection(route, core),
1805+
Self::get_connection(route, core, None),
18051806
)
18061807
.await
18071808
}
@@ -1825,6 +1826,7 @@ where
18251826
async fn get_connection(
18261827
routing: InternalSingleNodeRouting<C>,
18271828
core: Core<C>,
1829+
cmd: Option<Arc<Cmd>>,
18281830
) -> RedisResult<(ArcStr, C)> {
18291831
let read_guard = core.conn_lock.read().await;
18301832
let mut asking = false;
@@ -1849,15 +1851,32 @@ where
18491851
ConnectionCheck::Found,
18501852
)
18511853
}
1852-
// This means that a request routed to a route without a matching connection will be sent to a random node, hopefully to be redirected afterwards.
18531854
InternalSingleNodeRouting::SpecificNode(route) => {
1854-
read_guard.connection_for_route(&route).map_or_else(
1855-
|| {
1856-
warn!("No connection found for route `{route:?}");
1857-
ConnectionCheck::RandomConnection
1858-
},
1859-
ConnectionCheck::Found,
1860-
)
1855+
match read_guard.connection_for_route(&route) {
1856+
Some((conn, address)) => ConnectionCheck::Found((conn, address)),
1857+
None => {
1858+
// No connection is found for the given route:
1859+
// - For key-based commands, attempt redirection to a random node,
1860+
// hopefully to be redirected afterwards by a MOVED error.
1861+
// - For non-key-based commands, avoid attempting redirection to a random node
1862+
// as it wouldn't result in MOVED hints and can lead to unwanted results
1863+
// (e.g., sending management command to a different node than the user asked for); instead, raise the error.
1864+
let routable_cmd = cmd.and_then(|cmd| Routable::command(&*cmd));
1865+
if routable_cmd.is_some()
1866+
&& !RoutingInfo::is_key_based_cmd(&routable_cmd.unwrap())
1867+
{
1868+
return Err((
1869+
ErrorKind::ClusterConnectionNotFound,
1870+
"Requested connection not found for route",
1871+
format!("{route:?}"),
1872+
)
1873+
.into());
1874+
} else {
1875+
warn!("No connection found for route `{route:?}`. Attempting redirection to a random node.");
1876+
ConnectionCheck::RandomConnection
1877+
}
1878+
}
1879+
}
18611880
}
18621881
InternalSingleNodeRouting::Random => ConnectionCheck::RandomConnection,
18631882
InternalSingleNodeRouting::Connection { address, conn } => {

redis/tests/test_cluster_async.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4020,6 +4020,126 @@ mod cluster_async {
40204020
.unwrap();
40214021
}
40224022

4023+
#[test]
4024+
fn test_async_cluster_dont_route_to_a_random_on_non_key_based_cmd() {
4025+
// This test verifies that non-key-based commands do not get routed to a random node
4026+
// when no connection is found for the given route. Instead, the appropriate error
4027+
// should be raised.
4028+
let name = "test_async_cluster_dont_route_to_a_random_on_non_key_based_cmd";
4029+
let request_counter = Arc::new(AtomicU32::new(0));
4030+
let cloned_req_counter = request_counter.clone();
4031+
let MockEnv {
4032+
runtime,
4033+
async_connection: mut connection,
4034+
handler: _handler,
4035+
..
4036+
} = MockEnv::with_client_builder(
4037+
ClusterClient::builder(vec![&*format!("redis://{name}")]),
4038+
name,
4039+
move |received_cmd: &[u8], _| {
4040+
let slots_config_vec = vec![
4041+
MockSlotRange {
4042+
primary_port: 6379,
4043+
replica_ports: vec![],
4044+
slot_range: (0_u16..8000_u16),
4045+
},
4046+
MockSlotRange {
4047+
primary_port: 6380,
4048+
replica_ports: vec![],
4049+
// Don't cover all slots
4050+
slot_range: (8001_u16..12000_u16),
4051+
},
4052+
];
4053+
respond_startup_with_config(name, received_cmd, Some(slots_config_vec), false)?;
4054+
// If requests are sent to random nodes, they will be caught and counted here.
4055+
request_counter.fetch_add(1, Ordering::Relaxed);
4056+
Err(Ok(Value::Nil))
4057+
},
4058+
);
4059+
4060+
runtime
4061+
.block_on(async move {
4062+
let uncovered_slot = 16000;
4063+
let route = redis::cluster_routing::Route::new(
4064+
uncovered_slot,
4065+
redis::cluster_routing::SlotAddr::Master,
4066+
);
4067+
let single_node_route =
4068+
redis::cluster_routing::SingleNodeRoutingInfo::SpecificNode(route);
4069+
let routing = RoutingInfo::SingleNode(single_node_route);
4070+
let res = connection
4071+
.route_command(&redis::cmd("FLUSHALL"), routing)
4072+
.await;
4073+
assert!(res.is_err());
4074+
let res_err = res.unwrap_err();
4075+
assert_eq!(
4076+
res_err.kind(),
4077+
ErrorKind::ClusterConnectionNotFound,
4078+
"{:?}",
4079+
res_err
4080+
);
4081+
assert_eq!(cloned_req_counter.load(Ordering::Relaxed), 0);
4082+
Ok::<_, RedisError>(())
4083+
})
4084+
.unwrap();
4085+
}
4086+
4087+
#[test]
4088+
fn test_async_cluster_route_to_random_on_key_based_cmd() {
4089+
// This test verifies that key-based commands get routed to a random node
4090+
// when no connection is found for the given route. The command should
4091+
// then be redirected correctly by the server's MOVED error.
4092+
let name = "test_async_cluster_route_to_random_on_key_based_cmd";
4093+
let request_counter = Arc::new(AtomicU32::new(0));
4094+
let cloned_req_counter = request_counter.clone();
4095+
let MockEnv {
4096+
runtime,
4097+
async_connection: mut connection,
4098+
handler: _handler,
4099+
..
4100+
} = MockEnv::with_client_builder(
4101+
ClusterClient::builder(vec![&*format!("redis://{name}")]),
4102+
name,
4103+
move |received_cmd: &[u8], _| {
4104+
let slots_config_vec = vec![
4105+
MockSlotRange {
4106+
primary_port: 6379,
4107+
replica_ports: vec![],
4108+
slot_range: (0_u16..8000_u16),
4109+
},
4110+
MockSlotRange {
4111+
primary_port: 6380,
4112+
replica_ports: vec![],
4113+
// Don't cover all slots
4114+
slot_range: (8001_u16..12000_u16),
4115+
},
4116+
];
4117+
respond_startup_with_config(name, received_cmd, Some(slots_config_vec), false)?;
4118+
if contains_slice(received_cmd, b"GET") {
4119+
if request_counter.fetch_add(1, Ordering::Relaxed) == 0 {
4120+
return Err(parse_redis_value(
4121+
format!("-MOVED 12182 {name}:6380\r\n").as_bytes(),
4122+
));
4123+
} else {
4124+
return Err(Ok(Value::SimpleString("bar".into())));
4125+
}
4126+
}
4127+
panic!("unexpected command {:?}", received_cmd);
4128+
},
4129+
);
4130+
4131+
runtime
4132+
.block_on(async move {
4133+
// The keyslot of "foo" is 12182 and it isn't covered by any node, so we expect the
4134+
// request to be routed to a random node and then to be redirected to the MOVED node (2 requests in total)
4135+
let res: String = connection.get("foo").await.unwrap();
4136+
assert_eq!(res, "bar".to_string());
4137+
assert_eq!(cloned_req_counter.load(Ordering::Relaxed), 2);
4138+
Ok::<_, RedisError>(())
4139+
})
4140+
.unwrap();
4141+
}
4142+
40234143
#[cfg(feature = "tls-rustls")]
40244144
mod mtls_test {
40254145
use crate::support::mtls_test::create_cluster_client_from_cluster;

0 commit comments

Comments
 (0)