Skip to content

Commit bb5eb32

Browse files
committed
Added logic to update the slot map based on MOVED errors (#186)
1 parent 65c9a5d commit bb5eb32

File tree

7 files changed

+1542
-103
lines changed

7 files changed

+1542
-103
lines changed

redis/benches/bench_basic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) {
3131
() = redis::cmd("SET")
3232
.arg(key)
3333
.arg(42)
34-
.query_async(&mut con)
34+
.query_async::<_, Option<Value>>(&mut con)
3535
.await?;
3636
let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?;
3737
() = redis::cmd("DEL").arg(key).query_async(&mut con).await?;

redis/src/cluster_async/connections_container.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use crate::cluster_async::ConnectionFuture;
2-
use crate::cluster_routing::{Route, SlotAddr};
2+
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
33
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
44
use crate::cluster_topology::TopologyHash;
55
use dashmap::DashMap;
66
use futures::FutureExt;
77
use rand::seq::IteratorRandom;
88
use std::net::IpAddr;
9+
use std::sync::Arc;
910

1011
/// A struct that encapsulates a network connection along with its associated IP address.
1112
#[derive(Clone, Eq, PartialEq, Debug)]
@@ -137,6 +138,16 @@ where
137138
}
138139
}
139140

141+
/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
142+
pub(crate) fn slot_map_nodes(
143+
&self,
144+
) -> impl Iterator<Item = (Arc<String>, Arc<ShardAddrs>)> + '_ {
145+
self.slot_map
146+
.nodes_map()
147+
.iter()
148+
.map(|item| (item.key().clone(), item.value().clone()))
149+
}
150+
140151
// Extends the current connection map with the provided one
141152
pub(crate) fn extend_connection_map(
142153
&mut self,
@@ -154,10 +165,7 @@ where
154165
&self,
155166
slot_map_value: &SlotMapValue,
156167
) -> Option<ConnectionAndAddress<Connection>> {
157-
let addrs = &slot_map_value
158-
.addrs
159-
.read()
160-
.expect("Failed to obtain ShardAddrs's read lock");
168+
let addrs = &slot_map_value.addrs;
161169
let initial_index = slot_map_value
162170
.last_used_replica
163171
.load(std::sync::atomic::Ordering::Relaxed);
@@ -185,10 +193,7 @@ where
185193

186194
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
187195
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
188-
let addrs = &slot_map_value
189-
.addrs
190-
.read()
191-
.expect("Failed to obtain ShardAddrs's read lock");
196+
let addrs = &slot_map_value.addrs;
192197
if addrs.replicas().is_empty() {
193198
return self.connection_for_address(addrs.primary().as_str());
194199
}

redis/src/cluster_async/mod.rs

Lines changed: 126 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub mod testing {
3131
}
3232
use crate::{
3333
client::GlideConnectionOptions,
34-
cluster_routing::{Routable, RoutingInfo},
34+
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
3535
cluster_slotmap::SlotMap,
3636
cluster_topology::SLOT_SIZE,
3737
cmd,
@@ -679,19 +679,17 @@ impl From<String> for OperationTarget {
679679
#[derive(Clone, Debug)]
680680
pub(crate) struct RedirectNode {
681681
/// The address of the redirect node.
682-
pub _address: String,
682+
pub address: String,
683683
/// The slot of the redirect node.
684-
pub _slot: u16,
684+
pub slot: u16,
685685
}
686686

687687
impl RedirectNode {
688-
/// This function expects an `Option` containing a tuple with a string slice and a u16.
689-
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
690-
/// the function converts the address to a `String` and constructs a `RedirectNode`.
688+
/// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number.
691689
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
692690
option.map(|(address, slot)| RedirectNode {
693-
_address: address.to_string(),
694-
_slot: slot,
691+
address: address.to_string(),
692+
slot,
695693
})
696694
}
697695
}
@@ -807,6 +805,10 @@ pin_project! {
807805
#[pin]
808806
sleep: BoxFuture<'static, ()>,
809807
},
808+
UpdateMoved {
809+
#[pin]
810+
future: BoxFuture<'static, RedisResult<()>>,
811+
},
810812
}
811813
}
812814

@@ -871,8 +873,25 @@ impl<C> Future for Request<C> {
871873
}
872874
.into();
873875
}
876+
RequestStateProj::UpdateMoved { future } => {
877+
if let Err(err) = ready!(future.poll(cx)) {
878+
// Updating the slot map based on the MOVED error is an optimization.
879+
// If it fails, proceed by retrying the request with the redirected node,
880+
// and allow the slot refresh task to correct the slot map.
881+
info!(
882+
"Failed to update the slot map based on the received MOVED error.
883+
Error: {err:?}"
884+
);
885+
}
886+
if let Some(request) = self.project().request.take() {
887+
return Next::Retry { request }.into();
888+
} else {
889+
return Next::Done.into();
890+
}
891+
}
874892
_ => panic!("Request future must be Some"),
875893
};
894+
876895
match ready!(future.poll(cx)) {
877896
Ok(item) => {
878897
self.respond(Ok(item));
@@ -1851,6 +1870,77 @@ where
18511870
Ok(())
18521871
}
18531872

1873+
/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
1874+
/// /// Updates the slot and node mappings in response to a MOVED error.
1875+
/// This function handles various scenarios based on the new primary's role:
1876+
///
1877+
/// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed.
1878+
/// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover),
1879+
/// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses.
1880+
/// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration,
1881+
/// and the slot mapping is updated to point to the new shard addresses.
1882+
/// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to:
1883+
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
1884+
/// - The replica has moved to a different shard as the primary.
1885+
/// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard.
1886+
/// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out.
1887+
///
1888+
/// # Arguments
1889+
/// * `inner` - Shared reference to InnerCore containing connection and slot state.
1890+
/// * `slot` - The slot number reported as moved.
1891+
/// * `new_primary` - The address of the node now responsible for the slot.
1892+
///
1893+
/// # Returns
1894+
/// * `RedisResult<()>` indicating success or failure in updating slot mappings.
1895+
async fn update_upon_moved_error(
1896+
inner: Arc<InnerCore<C>>,
1897+
slot: u16,
1898+
new_primary: Arc<String>,
1899+
) -> RedisResult<()> {
1900+
let mut connections_container = inner.conn_lock.write().await;
1901+
let curr_shard_addrs = connections_container.slot_map.shard_addrs_for_slot(slot);
1902+
// Check if the new primary is part of the current shard and update if required
1903+
if let Some(curr_shard_addrs) = curr_shard_addrs {
1904+
match curr_shard_addrs.attempt_shard_role_update(new_primary.clone()) {
1905+
// Scenario 1: No changes needed as the new primary is already the current slot owner.
1906+
// Scenario 2: Failover occurred and the new primary was promoted from a replica.
1907+
ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()),
1908+
// The node was not found in this shard, proceed with further scenarios.
1909+
ShardUpdateResult::NodeNotFound => {}
1910+
}
1911+
}
1912+
1913+
// Scenario 3 & 4: Check if the new primary exists in other shards
1914+
let mut nodes_iter = connections_container.slot_map_nodes();
1915+
for (node_addr, shard_addrs_arc) in &mut nodes_iter {
1916+
if node_addr == new_primary {
1917+
let is_existing_primary = shard_addrs_arc.primary().eq(&new_primary);
1918+
if is_existing_primary {
1919+
// Scenario 3: Slot Migration - The new primary is an existing primary in another shard
1920+
// Update the associated addresses for `slot` to `shard_addrs`.
1921+
drop(nodes_iter);
1922+
return connections_container
1923+
.slot_map
1924+
.update_slot_range(slot, shard_addrs_arc.clone());
1925+
} else {
1926+
// Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`.
1927+
// Remove the replica from its existing shard and treat it as a new node in a new shard.
1928+
shard_addrs_arc.remove_replica(new_primary.clone())?;
1929+
drop(nodes_iter);
1930+
return connections_container
1931+
.slot_map
1932+
.add_new_primary(slot, new_primary);
1933+
}
1934+
}
1935+
}
1936+
1937+
// Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard.
1938+
drop(nodes_iter);
1939+
connections_container
1940+
.slot_map
1941+
.add_new_primary(slot, new_primary)
1942+
}
1943+
18541944
async fn execute_on_multiple_nodes<'a>(
18551945
cmd: &'a Arc<Cmd>,
18561946
routing: &'a MultipleNodeRoutingInfo,
@@ -2272,25 +2362,37 @@ where
22722362
sleep_duration,
22732363
moved_redirect,
22742364
} => {
2275-
poll_flush_action = poll_flush_action
2276-
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
2277-
if let Some(request) = request {
2278-
let future: RequestState<
2279-
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
2280-
> = match sleep_duration {
2281-
Some(sleep_duration) => RequestState::Sleep {
2365+
poll_flush_action =
2366+
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
2367+
let future: Option<
2368+
RequestState<Pin<Box<dyn Future<Output = OperationResult> + Send>>>,
2369+
> = if let Some(moved_redirect) = moved_redirect {
2370+
Some(RequestState::UpdateMoved {
2371+
future: Box::pin(ClusterConnInner::update_upon_moved_error(
2372+
self.inner.clone(),
2373+
moved_redirect.slot,
2374+
moved_redirect.address.into(),
2375+
)),
2376+
})
2377+
} else if let Some(ref request) = request {
2378+
match sleep_duration {
2379+
Some(sleep_duration) => Some(RequestState::Sleep {
22822380
sleep: boxed_sleep(sleep_duration),
2283-
},
2284-
None => RequestState::Future {
2381+
}),
2382+
None => Some(RequestState::Future {
22852383
future: Box::pin(Self::try_request(
22862384
request.info.clone(),
22872385
self.inner.clone(),
22882386
)),
2289-
},
2290-
};
2387+
}),
2388+
}
2389+
} else {
2390+
None
2391+
};
2392+
if let Some(future) = future {
22912393
self.in_flight_requests.push(Box::pin(Request {
22922394
retry_params: self.inner.cluster_params.retry_params.clone(),
2293-
request: Some(request),
2395+
request,
22942396
future,
22952397
}));
22962398
}
@@ -2343,7 +2445,7 @@ where
23432445

23442446
enum PollFlushAction {
23452447
None,
2346-
RebuildSlots(Option<RedirectNode>),
2448+
RebuildSlots,
23472449
Reconnect(Vec<String>),
23482450
ReconnectFromInitialConnections,
23492451
}
@@ -2358,9 +2460,8 @@ impl PollFlushAction {
23582460
PollFlushAction::ReconnectFromInitialConnections
23592461
}
23602462

2361-
(PollFlushAction::RebuildSlots(moved_redirect), _)
2362-
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
2363-
PollFlushAction::RebuildSlots(moved_redirect)
2463+
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
2464+
PollFlushAction::RebuildSlots
23642465
}
23652466

23662467
(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
@@ -2421,8 +2522,7 @@ where
24212522

24222523
match ready!(self.poll_complete(cx)) {
24232524
PollFlushAction::None => return Poll::Ready(Ok(())),
2424-
PollFlushAction::RebuildSlots(_moved_redirect) => {
2425-
// TODO: Add logic to update the slots map based on the MOVED error
2525+
PollFlushAction::RebuildSlots => {
24262526
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
24272527
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
24282528
self.inner.clone(),

0 commit comments

Comments
 (0)