Skip to content

Commit 7d72b87

Browse files
authored
Core: SlotMap refactor - Added NodesMap, Update the slot map upon MOVED errors (#2682)
* Core: SlotMap refactor - Added NodesMap, Update the slot map upon MOVED errors (#2682) Signed-off-by: barshaul <[email protected]>
1 parent 7d65d4e commit 7d72b87

File tree

10 files changed

+1854
-240
lines changed

10 files changed

+1854
-240
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
* Node: Add `JSON.NUMINCRBY` and `JSON.NUMMULTBY` command ([#2555](https://github.com/valkey-io/valkey-glide/pull/2555))
8585
* Core: Improve retry logic and update unmaintained dependencies for Rust lint CI ([#2673](https://github.com/valkey-io/valkey-glide/pull/2643))
8686
* Core: Release the read lock while creating connections in `refresh_connections` ([#2630](https://github.com/valkey-io/valkey-glide/issues/2630))
87+
* Core: SlotMap refactor - Added NodesMap, Update the slot map upon MOVED errors ([#2682](https://github.com/valkey-io/valkey-glide/issues/2682))
8788

8889
#### Breaking Changes
8990

glide-core/redis-rs/redis/src/cluster.rs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,6 @@
3535
//! .expire(key, 60).ignore()
3636
//! .query(&mut connection).unwrap();
3737
//! ```
38-
use std::cell::RefCell;
39-
use std::collections::HashSet;
40-
use std::str::FromStr;
41-
use std::thread;
42-
use std::time::Duration;
43-
44-
use rand::{seq::IteratorRandom, thread_rng};
45-
4638
pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
4739
use crate::cluster_pipeline::UNROUTABLE_ERROR;
4840
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};
@@ -63,6 +55,13 @@ use crate::{
6355
cluster_routing::{Redirect, Route, RoutingInfo},
6456
IntoConnectionInfo, PushInfo,
6557
};
58+
use rand::{seq::IteratorRandom, thread_rng};
59+
use std::cell::RefCell;
60+
use std::collections::HashSet;
61+
use std::str::FromStr;
62+
use std::sync::Arc;
63+
use std::thread;
64+
use std::time::Duration;
6665

6766
use tokio::sync::mpsc;
6867

@@ -342,22 +341,20 @@ where
342341
let mut slots = self.slots.borrow_mut();
343342
*slots = self.create_new_slots()?;
344343

345-
let mut nodes = slots.values().flatten().collect::<Vec<_>>();
346-
nodes.sort_unstable();
347-
nodes.dedup();
348-
344+
let nodes = slots.all_node_addresses();
349345
let mut connections = self.connections.borrow_mut();
350346
*connections = nodes
351347
.into_iter()
352348
.filter_map(|addr| {
353-
if connections.contains_key(addr) {
354-
let mut conn = connections.remove(addr).unwrap();
349+
let addr = addr.to_string();
350+
if connections.contains_key(&addr) {
351+
let mut conn = connections.remove(&addr).unwrap();
355352
if conn.check_connection() {
356353
return Some((addr.to_string(), conn));
357354
}
358355
}
359356

360-
if let Ok(mut conn) = self.connect(addr) {
357+
if let Ok(mut conn) = self.connect(&addr) {
361358
if conn.check_connection() {
362359
return Some((addr.to_string(), conn));
363360
}
@@ -423,7 +420,7 @@ where
423420
if let Some(addr) = slots.slot_addr_for_route(route) {
424421
Ok((
425422
addr.to_string(),
426-
self.get_connection_by_addr(connections, addr)?,
423+
self.get_connection_by_addr(connections, &addr)?,
427424
))
428425
} else {
429426
// try a random node next. This is safe if slots are involved
@@ -491,13 +488,13 @@ where
491488
fn execute_on_all<'a>(
492489
&'a self,
493490
input: Input,
494-
addresses: HashSet<&'a str>,
491+
addresses: HashSet<Arc<String>>,
495492
connections: &'a mut HashMap<String, C>,
496-
) -> Vec<RedisResult<(&'a str, Value)>> {
493+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
497494
addresses
498495
.into_iter()
499496
.map(|addr| {
500-
let connection = self.get_connection_by_addr(connections, addr)?;
497+
let connection = self.get_connection_by_addr(connections, &addr)?;
501498
match input {
502499
Input::Slice { cmd, routable: _ } => connection.req_packed_command(cmd),
503500
Input::Cmd(cmd) => connection.req_command(cmd),
@@ -522,16 +519,16 @@ where
522519
input: Input,
523520
slots: &'a mut SlotMap,
524521
connections: &'a mut HashMap<String, C>,
525-
) -> Vec<RedisResult<(&'a str, Value)>> {
526-
self.execute_on_all(input, slots.addresses_for_all_nodes(), connections)
522+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
523+
self.execute_on_all(input, slots.all_node_addresses(), connections)
527524
}
528525

529526
fn execute_on_all_primaries<'a>(
530527
&'a self,
531528
input: Input,
532529
slots: &'a mut SlotMap,
533530
connections: &'a mut HashMap<String, C>,
534-
) -> Vec<RedisResult<(&'a str, Value)>> {
531+
) -> Vec<RedisResult<(Arc<String>, Value)>> {
535532
self.execute_on_all(input, slots.addresses_for_all_primaries(), connections)
536533
}
537534

@@ -541,7 +538,7 @@ where
541538
slots: &'a mut SlotMap,
542539
connections: &'a mut HashMap<String, C>,
543540
routes: &'b [(Route, Vec<usize>)],
544-
) -> Vec<RedisResult<(&'a str, Value)>>
541+
) -> Vec<RedisResult<(Arc<String>, Value)>>
545542
where
546543
'b: 'a,
547544
{
@@ -553,7 +550,7 @@ where
553550
ErrorKind::IoError,
554551
"Couldn't find connection",
555552
)))?;
556-
let connection = self.get_connection_by_addr(connections, addr)?;
553+
let connection = self.get_connection_by_addr(connections, &addr)?;
557554
let (_, indices) = routes.get(index).unwrap();
558555
let cmd =
559556
crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());

glide-core/redis-rs/redis/src/cluster_async/connections_container.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use crate::cluster_async::ConnectionFuture;
2-
use crate::cluster_routing::{Route, SlotAddr};
2+
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
33
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
44
use crate::cluster_topology::TopologyHash;
55
use dashmap::DashMap;
66
use futures::FutureExt;
77
use rand::seq::IteratorRandom;
88
use std::net::IpAddr;
9+
use std::sync::Arc;
910
use telemetrylib::Telemetry;
1011

1112
/// Count the number of connections in a connections_map object
@@ -175,6 +176,16 @@ where
175176
}
176177
}
177178

179+
/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
180+
pub(crate) fn slot_map_nodes(
181+
&self,
182+
) -> impl Iterator<Item = (Arc<String>, Arc<ShardAddrs>)> + '_ {
183+
self.slot_map
184+
.nodes_map()
185+
.iter()
186+
.map(|item| (item.key().clone(), item.value().clone()))
187+
}
188+
178189
// Extends the current connection map with the provided one
179190
pub(crate) fn extend_connection_map(
180191
&mut self,
@@ -189,11 +200,7 @@ where
189200

190201
/// Returns true if the address represents a known primary node.
191202
pub(crate) fn is_primary(&self, address: &String) -> bool {
192-
self.connection_for_address(address).is_some()
193-
&& self
194-
.slot_map
195-
.values()
196-
.any(|slot_addrs| slot_addrs.primary.as_str() == address)
203+
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
197204
}
198205

199206
fn round_robin_read_from_replica(
@@ -202,19 +209,20 @@ where
202209
) -> Option<ConnectionAndAddress<Connection>> {
203210
let addrs = &slot_map_value.addrs;
204211
let initial_index = slot_map_value
205-
.latest_used_replica
212+
.last_used_replica
206213
.load(std::sync::atomic::Ordering::Relaxed);
207214
let mut check_count = 0;
208215
loop {
209216
check_count += 1;
210217

211218
// Looped through all replicas, no connected replica was found.
212-
if check_count > addrs.replicas.len() {
213-
return self.connection_for_address(addrs.primary.as_str());
219+
if check_count > addrs.replicas().len() {
220+
return self.connection_for_address(addrs.primary().as_str());
214221
}
215-
let index = (initial_index + check_count) % addrs.replicas.len();
216-
if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) {
217-
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
222+
let index = (initial_index + check_count) % addrs.replicas().len();
223+
if let Some(connection) = self.connection_for_address(addrs.replicas()[index].as_str())
224+
{
225+
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
218226
initial_index,
219227
index,
220228
std::sync::atomic::Ordering::Relaxed,
@@ -228,15 +236,15 @@ where
228236
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
229237
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
230238
let addrs = &slot_map_value.addrs;
231-
if addrs.replicas.is_empty() {
232-
return self.connection_for_address(addrs.primary.as_str());
239+
if addrs.replicas().is_empty() {
240+
return self.connection_for_address(addrs.primary().as_str());
233241
}
234242

235243
match route.slot_addr() {
236-
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
244+
SlotAddr::Master => self.connection_for_address(addrs.primary().as_str()),
237245
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
238246
ReadFromReplicaStrategy::AlwaysFromPrimary => {
239-
self.connection_for_address(addrs.primary.as_str())
247+
self.connection_for_address(addrs.primary().as_str())
240248
}
241249
ReadFromReplicaStrategy::RoundRobin => {
242250
self.round_robin_read_from_replica(slot_map_value)
@@ -274,7 +282,7 @@ where
274282
self.slot_map
275283
.addresses_for_all_primaries()
276284
.into_iter()
277-
.flat_map(|addr| self.connection_for_address(addr))
285+
.flat_map(|addr| self.connection_for_address(&addr))
278286
}
279287

280288
pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {

0 commit comments

Comments
 (0)