Skip to content

Commit da17db9

Browse files
committed
RUST-544 Ensure correct termination of monitoring tasks (#242)
Co-authored-by: Saghm Rossi <[email protected]>
1 parent 9a02ef5 commit da17db9

File tree

5 files changed

+37
-4
lines changed

5 files changed

+37
-4
lines changed

src/client/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ struct ClientInner {
8282
session_pool: ServerSessionPool,
8383
}
8484

85+
impl Drop for ClientInner {
86+
fn drop(&mut self) {
87+
self.topology.mark_closed()
88+
}
89+
}
90+
8591
impl Client {
8692
/// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
8793
/// MongoDB connection string.

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
)
8080
)]
8181
#![cfg_attr(docsrs, feature(doc_cfg))]
82+
#![cfg_attr(test, type_length_limit = "6000000")]
8283

8384
macro_rules! define_if_single_runtime_enabled {
8485
( $( $def:item )+ ) => {

src/sdam/monitor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ impl Monitor {
5252
.heartbeat_freq
5353
.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY);
5454

55-
while let Some(_server) = self.server.upgrade() {
55+
while self.topology.is_alive() {
56+
if self.server.upgrade().is_none() {
57+
break;
58+
}
59+
5660
let topology = match self.topology.upgrade() {
5761
Some(topology) => topology,
5862
None => break,

src/sdam/srv_polling/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,21 @@ impl SrvPollingMonitor {
5959
}
6060

6161
async fn execute(&mut self) {
62-
while let Some(topology) = self.topology.upgrade() {
62+
while self.topology.is_alive() {
63+
let topology = match self.topology.upgrade() {
64+
Some(topology) => topology,
65+
None => break,
66+
};
67+
6368
let state = topology.clone_state().await;
6469

6570
if state.is_sharded() || state.is_unknown() {
6671
let hosts = self.lookup_hosts().await;
67-
self.update_hosts(hosts, topology, state).await;
72+
self.update_hosts(hosts, topology.clone(), state).await;
6873
}
6974

75+
std::mem::drop(topology);
76+
7077
RUNTIME
7178
.delay_for(self.rescan_interval.unwrap_or(DEFAULT_RESCAN_SRV_INTERVAL))
7279
.await;

src/sdam/state/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ pub(super) mod server;
22

33
use std::{
44
collections::{HashMap, HashSet},
5-
sync::{Arc, Weak},
5+
sync::{
6+
atomic::{AtomicBool, Ordering},
7+
Arc,
8+
Weak,
9+
},
610
time::Duration,
711
};
812

@@ -47,6 +51,7 @@ pub(crate) struct WeakTopology {
4751
/// manager and the client options.
4852
#[derive(Clone, Debug)]
4953
struct Common {
54+
is_alive: Arc<AtomicBool>,
5055
message_manager: TopologyMessageManager,
5156
options: ClientOptions,
5257
}
@@ -71,6 +76,7 @@ impl Topology {
7176
let description = TopologyDescription::new_from_hosts(hosts.clone());
7277

7378
let common = Common {
79+
is_alive: Arc::new(AtomicBool::new(true)),
7480
message_manager: TopologyMessageManager::new(),
7581
options: ClientOptions::new_srv(),
7682
};
@@ -106,6 +112,7 @@ impl Topology {
106112
let hosts: Vec<_> = options.hosts.drain(..).collect();
107113

108114
let common = Common {
115+
is_alive: Arc::new(AtomicBool::new(true)),
109116
message_manager: TopologyMessageManager::new(),
110117
options: options.clone(),
111118
};
@@ -132,6 +139,10 @@ impl Topology {
132139
Ok(topology)
133140
}
134141

142+
pub(crate) fn mark_closed(&self) {
143+
self.common.is_alive.store(false, Ordering::SeqCst);
144+
}
145+
135146
/// Gets the addresses of the servers in the cluster.
136147
#[cfg(test)]
137148
pub(super) async fn servers(&self) -> HashSet<StreamAddress> {
@@ -360,6 +371,10 @@ impl WeakTopology {
360371
})
361372
}
362373

374+
pub(crate) fn is_alive(&self) -> bool {
375+
self.common.is_alive.load(Ordering::SeqCst)
376+
}
377+
363378
pub(crate) fn client_options(&self) -> &ClientOptions {
364379
&self.common.options
365380
}

0 commit comments

Comments
 (0)