Skip to content

Commit 3121f71

Browse files
committed
Added moved_redirect to the refresh slots function
1 parent c54a463 commit 3121f71

File tree

1 file changed

+58
-12
lines changed

1 file changed

+58
-12
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,27 @@ impl From<String> for OperationTarget {
675675
}
676676
}
677677

678+
/// Represents a node to which a `MOVED` or `ASK` error redirects.
679+
#[derive(Clone, Debug)]
680+
pub(crate) struct RedirectNode {
681+
/// The address of the redirect node.
682+
pub _address: String,
683+
/// The slot of the redirect node.
684+
pub _slot: u16,
685+
}
686+
687+
impl RedirectNode {
688+
/// This function expects an `Option` containing a tuple with a string slice and a u16.
689+
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
690+
/// the function converts the address to a `String` and constructs a `RedirectNode`.
691+
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
692+
option.map(|(address, slot)| RedirectNode {
693+
_address: address.to_string(),
694+
_slot: slot,
695+
})
696+
}
697+
}
698+
678699
struct Message<C> {
679700
cmd: CmdArg<C>,
680701
sender: oneshot::Sender<RedisResult<Response>>,
@@ -822,6 +843,7 @@ enum Next<C> {
822843
// if not set, then a slot refresh should happen without sending a request afterwards
823844
request: Option<PendingRequest<C>>,
824845
sleep_duration: Option<Duration>,
846+
moved_redirect: Option<RedirectNode>,
825847
},
826848
ReconnectToInitialNodes {
827849
// if not set, then a reconnect should happen without sending a request afterwards
@@ -868,6 +890,7 @@ impl<C> Future for Request<C> {
868890
Next::RefreshSlots {
869891
request: None,
870892
sleep_duration: None,
893+
moved_redirect: RedirectNode::from_option_tuple(err.redirect_node()),
871894
}
872895
.into()
873896
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
@@ -913,6 +936,7 @@ impl<C> Future for Request<C> {
913936
return Next::RefreshSlots {
914937
request: Some(request),
915938
sleep_duration: Some(sleep_duration),
939+
moved_redirect: None,
916940
}
917941
.into();
918942
}
@@ -931,13 +955,15 @@ impl<C> Future for Request<C> {
931955
}
932956
crate::types::RetryMethod::MovedRedirect => {
933957
let mut request = this.request.take().unwrap();
958+
let redirect_node = err.redirect_node();
934959
request.info.set_redirect(
935960
err.redirect_node()
936961
.map(|(node, _slot)| Redirect::Moved(node.to_string())),
937962
);
938963
Next::RefreshSlots {
939964
request: Some(request),
940965
sleep_duration: None,
966+
moved_redirect: RedirectNode::from_option_tuple(redirect_node),
941967
}
942968
.into()
943969
}
@@ -1064,6 +1090,7 @@ where
10641090
Self::refresh_slots_and_subscriptions_with_retries(
10651091
connection.inner.clone(),
10661092
&RefreshPolicy::NotThrottable,
1093+
None,
10671094
)
10681095
.await?;
10691096

@@ -1223,6 +1250,7 @@ where
12231250
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
12241251
inner.clone(),
12251252
&RefreshPolicy::Throttable,
1253+
None,
12261254
)
12271255
.await
12281256
{
@@ -1464,6 +1492,7 @@ where
14641492
async fn refresh_slots_and_subscriptions_with_retries(
14651493
inner: Arc<InnerCore<C>>,
14661494
policy: &RefreshPolicy,
1495+
moved_redirect: Option<RedirectNode>,
14671496
) -> RedisResult<()> {
14681497
let SlotRefreshState {
14691498
in_progress,
@@ -1477,7 +1506,7 @@ where
14771506
{
14781507
return Ok(());
14791508
}
1480-
let mut skip_slots_refresh = false;
1509+
let mut should_refresh_slots = true;
14811510
if *policy == RefreshPolicy::Throttable {
14821511
// Check if the current slot refresh is triggered before the wait duration has passed
14831512
let last_run_rlock = last_run.read().await;
@@ -1496,13 +1525,13 @@ where
14961525
if passed_time <= wait_duration {
14971526
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
14981527
Wait duration = {:?}", passed_time, wait_duration);
1499-
skip_slots_refresh = true;
1528+
should_refresh_slots = false;
15001529
}
15011530
}
15021531
}
15031532

15041533
let mut res = Ok(());
1505-
if !skip_slots_refresh {
1534+
if should_refresh_slots {
15061535
let retry_strategy = ExponentialBackoff {
15071536
initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
15081537
max_interval: DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL,
@@ -1515,6 +1544,10 @@ where
15151544
Self::refresh_slots(inner.clone(), curr_retry)
15161545
})
15171546
.await;
1547+
} else if moved_redirect.is_some() {
1548+
// Update relevant slots in the slots map based on the moved_redirect address,
1549+
// rather than refreshing all slots by querying the cluster nodes for their topology view.
1550+
Self::update_slots_for_redirect_change(inner.clone(), moved_redirect).await?;
15181551
}
15191552
in_progress.store(false, Ordering::Relaxed);
15201553

@@ -1523,15 +1556,24 @@ where
15231556
res
15241557
}
15251558

1559+
/// Update relevant slots in the slots map based on the moved_redirect address
1560+
pub(crate) async fn update_slots_for_redirect_change(
1561+
_inner: Arc<InnerCore<C>>,
1562+
_moved_redirect: Option<RedirectNode>,
1563+
) -> RedisResult<()> {
1564+
// TODO: Add implementation
1565+
Ok(())
1566+
}
1567+
15261568
pub(crate) async fn check_topology_and_refresh_if_diff(
15271569
inner: Arc<InnerCore<C>>,
15281570
policy: &RefreshPolicy,
1529-
) -> bool {
1571+
) -> RedisResult<bool> {
15301572
let topology_changed = Self::check_for_topology_diff(inner.clone()).await;
15311573
if topology_changed {
1532-
let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await;
1574+
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy, None).await?;
15331575
}
1534-
topology_changed
1576+
Ok(topology_changed)
15351577
}
15361578

15371579
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
@@ -2131,6 +2173,7 @@ where
21312173
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
21322174
self.inner.clone(),
21332175
&RefreshPolicy::Throttable,
2176+
None,
21342177
));
21352178
Poll::Ready(Err(err))
21362179
}
@@ -2226,9 +2269,10 @@ where
22262269
Next::RefreshSlots {
22272270
request,
22282271
sleep_duration,
2272+
moved_redirect,
22292273
} => {
2230-
poll_flush_action =
2231-
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
2274+
poll_flush_action = poll_flush_action
2275+
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
22322276
if let Some(request) = request {
22332277
let future: RequestState<
22342278
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
@@ -2298,7 +2342,7 @@ where
22982342

22992343
enum PollFlushAction {
23002344
None,
2301-
RebuildSlots,
2345+
RebuildSlots(Option<RedirectNode>),
23022346
Reconnect(Vec<String>),
23032347
ReconnectFromInitialConnections,
23042348
}
@@ -2313,8 +2357,9 @@ impl PollFlushAction {
23132357
PollFlushAction::ReconnectFromInitialConnections
23142358
}
23152359

2316-
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
2317-
PollFlushAction::RebuildSlots
2360+
(PollFlushAction::RebuildSlots(moved_redirect), _)
2361+
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
2362+
PollFlushAction::RebuildSlots(moved_redirect)
23182363
}
23192364

23202365
(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
@@ -2375,11 +2420,12 @@ where
23752420

23762421
match ready!(self.poll_complete(cx)) {
23772422
PollFlushAction::None => return Poll::Ready(Ok(())),
2378-
PollFlushAction::RebuildSlots => {
2423+
PollFlushAction::RebuildSlots(moved_redirect) => {
23792424
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
23802425
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
23812426
self.inner.clone(),
23822427
&RefreshPolicy::Throttable,
2428+
moved_redirect,
23832429
),
23842430
)));
23852431
}

0 commit comments

Comments
 (0)