Skip to content

Commit 391b54d

Browse files
committed
Changed check_topology_and_refresh_if_diff to return a result
1 parent 3121f71 commit 391b54d

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,9 @@ where
15651565
Ok(())
15661566
}
15671567

1568+
/// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed.
1569+
/// Returns `RedisResult` with `true` if changes were detected and slots were refreshed,
1570+
/// or `false` if no changes were found. Raises an error if refreshing the topology fails.
15681571
pub(crate) async fn check_topology_and_refresh_if_diff(
15691572
inner: Arc<InnerCore<C>>,
15701573
policy: &RefreshPolicy,
@@ -1579,14 +1582,30 @@ where
15791582
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
15801583
loop {
15811584
let _ = boxed_sleep(interval_duration).await;
1582-
let topology_changed =
1583-
Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable)
1584-
.await;
1585-
if !topology_changed {
1586-
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
1587-
// while topology stayed the same.
1588-
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
1589-
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.
1585+
1586+
// Check and refresh topology if needed
1587+
let should_refresh_pubsub = match Self::check_topology_and_refresh_if_diff(
1588+
inner.clone(),
1589+
&RefreshPolicy::Throttable,
1590+
)
1591+
.await
1592+
{
1593+
Ok(topology_changed) => !topology_changed,
1594+
Err(err) => {
1595+
warn!(
1596+
"Failed to refresh slots during periodic topology checks:\n{:?}",
1597+
err
1598+
);
1599+
true
1600+
}
1601+
};
1602+
1603+
// Refresh pubsub subscriptions if topology wasn't changed or an error occurred.
1604+
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
1605+
// while topology stayed the same.
1606+
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
1607+
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.
1608+
if should_refresh_pubsub {
15901609
Self::refresh_pubsub_subscriptions(inner.clone()).await;
15911610
}
15921611
}

redis/src/commands/cluster_scan.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ pub(crate) trait ClusterInScan {
150150
async fn are_all_slots_covered(&self) -> bool;
151151

152152
/// Check if the topology of the cluster has changed and refresh the slots if needed
153-
async fn refresh_if_topology_changed(&self);
153+
async fn refresh_if_topology_changed(&self) -> RedisResult<bool>;
154154
}
155155

156156
/// Represents the state of a scan operation in a Redis cluster.
@@ -288,7 +288,16 @@ impl ScanState {
288288
&mut self,
289289
connection: &C,
290290
) -> RedisResult<ScanState> {
291-
let _ = connection.refresh_if_topology_changed().await;
291+
connection
292+
.refresh_if_topology_changed()
293+
.await
294+
.map_err(|err| {
295+
RedisError::from((
296+
ErrorKind::ResponseError,
297+
"Error during cluster scan: failed to refresh slots",
298+
format!("{:?}", err),
299+
))
300+
})?;
292301
let mut scanned_slots_map = self.scanned_slots_map;
293302
// If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later.
294303
// In this case we will skip updating the scanned_slots_map and will just update the address and the cursor
@@ -387,14 +396,14 @@ where
387396
async fn are_all_slots_covered(&self) -> bool {
388397
ClusterConnInner::<C>::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map)
389398
}
390-
async fn refresh_if_topology_changed(&self) {
399+
async fn refresh_if_topology_changed(&self) -> RedisResult<bool> {
391400
ClusterConnInner::check_topology_and_refresh_if_diff(
392401
self.to_owned(),
393402
// The cluster SCAN implementation must refresh the slots when a topology change is found
394403
// to ensure the scan logic is correct.
395404
&RefreshPolicy::NotThrottable,
396405
)
397-
.await;
406+
.await
398407
}
399408
}
400409

@@ -529,7 +538,13 @@ where
529538
{
530539
// TODO: This mechanism of refreshing on failure to route to address should be part of the routing mechanism
531540
// After the routing mechanism is updated to handle this case, this refresh in the case bellow should be removed
532-
core.refresh_if_topology_changed().await;
541+
core.refresh_if_topology_changed().await.map_err(|err| {
542+
RedisError::from((
543+
ErrorKind::ResponseError,
544+
"Error during cluster scan: failed to refresh slots",
545+
format!("{:?}", err),
546+
))
547+
})?;
533548
if !core.are_all_slots_covered().await {
534549
return Err(RedisError::from((
535550
ErrorKind::NotAllSlotsCovered,
@@ -615,7 +630,9 @@ mod tests {
615630
struct MockConnection;
616631
#[async_trait]
617632
impl ClusterInScan for MockConnection {
618-
async fn refresh_if_topology_changed(&self) {}
633+
async fn refresh_if_topology_changed(&self) -> RedisResult<bool> {
634+
Ok(true)
635+
}
619636
async fn get_address_by_slot(&self, _slot: u16) -> RedisResult<String> {
620637
Ok("mock_address".to_string())
621638
}

0 commit comments

Comments
 (0)