Skip to content

Commit 7218271

Browse files
committed
fixed close connection logic
1 parent 4571c91 commit 7218271

File tree

1 file changed

+32
-30
lines changed

1 file changed

+32
-30
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ mod connections_logic;
2828
pub mod testing {
2929
pub use super::connections_logic::*;
3030
}
31+
use crate::{
32+
cluster_slotmap::SlotMap,
33+
cluster_topology::SLOT_SIZE,
34+
cmd,
35+
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
36+
FromRedisValue, InfoDict,
37+
};
38+
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
39+
use async_std::task::{spawn, JoinHandle};
40+
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
41+
use futures::executor::block_on;
3142
use std::{
3243
collections::{HashMap, HashSet},
3344
fmt, io, mem,
@@ -40,14 +51,8 @@ use std::{
4051
task::{self, Poll},
4152
time::SystemTime,
4253
};
43-
44-
use crate::{
45-
cluster_slotmap::SlotMap,
46-
cluster_topology::SLOT_SIZE,
47-
cmd,
48-
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
49-
FromRedisValue, InfoDict,
50-
};
54+
#[cfg(feature = "tokio-comp")]
55+
use tokio::task::JoinHandle;
5156

5257
use crate::{
5358
aio::{get_socket_addrs, ConnectionLike, MultiplexedConnection, Runtime},
@@ -88,7 +93,6 @@ use backoff_tokio::{Error as BackoffError, ExponentialBackoff};
8893
use dispose::{Disposable, Dispose};
8994
use futures::{future::BoxFuture, prelude::*, ready};
9095
use pin_project_lite::pin_project;
91-
use std::sync::atomic::AtomicBool;
9296
use tokio::sync::{
9397
mpsc,
9498
oneshot::{self, Receiver},
@@ -393,13 +397,18 @@ pub(crate) struct ClusterConnInner<C> {
393397
#[allow(clippy::complexity)]
394398
in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
395399
refresh_error: Option<RedisError>,
396-
// A flag indicating the connection's closure and the requirement to shut down all related tasks.
397-
shutdown_flag: Arc<AtomicBool>,
400+
// Handler of the periodic check task.
401+
periodic_checks_handler: Option<JoinHandle<()>>,
398402
}
399403

400404
impl<C> Dispose for ClusterConnInner<C> {
401405
fn dispose(self) {
402-
self.shutdown_flag.store(true, Ordering::Relaxed);
406+
if let Some(handle) = self.periodic_checks_handler {
407+
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
408+
block_on(handle.cancel());
409+
#[cfg(feature = "tokio-comp")]
410+
handle.abort()
411+
}
403412
}
404413
}
405414

@@ -913,13 +922,12 @@ where
913922
),
914923
subscriptions_by_address: RwLock::new(Default::default()),
915924
});
916-
let shutdown_flag = Arc::new(AtomicBool::new(false));
917-
let connection = ClusterConnInner {
925+
let mut connection = ClusterConnInner {
918926
inner,
919927
in_flight_requests: Default::default(),
920928
refresh_error: None,
921929
state: ConnectionState::PollComplete,
922-
shutdown_flag: shutdown_flag.clone(),
930+
periodic_checks_handler: None,
923931
};
924932
Self::refresh_slots_and_subscriptions_with_retries(
925933
connection.inner.clone(),
@@ -928,15 +936,16 @@ where
928936
.await?;
929937

930938
if let Some(duration) = topology_checks_interval {
931-
let periodic_task = ClusterConnInner::periodic_topology_check(
932-
connection.inner.clone(),
933-
duration,
934-
shutdown_flag,
935-
);
939+
let periodic_task =
940+
ClusterConnInner::periodic_topology_check(connection.inner.clone(), duration);
936941
#[cfg(feature = "tokio-comp")]
937-
tokio::spawn(periodic_task);
942+
{
943+
connection.periodic_checks_handler = Some(tokio::spawn(periodic_task));
944+
}
938945
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
939-
AsyncStd::spawn(periodic_task);
946+
{
947+
connection.periodic_checks_handler = Some(spawn(periodic_task));
948+
}
940949
}
941950

942951
Ok(Disposable::new(connection))
@@ -1307,15 +1316,8 @@ where
13071316
topology_changed
13081317
}
13091318

1310-
async fn periodic_topology_check(
1311-
inner: Arc<InnerCore<C>>,
1312-
interval_duration: Duration,
1313-
shutdown_flag: Arc<AtomicBool>,
1314-
) {
1319+
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
13151320
loop {
1316-
if shutdown_flag.load(Ordering::Relaxed) {
1317-
return;
1318-
}
13191321
let _ = boxed_sleep(interval_duration).await;
13201322
let topology_changed =
13211323
Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable)

0 commit comments

Comments
 (0)