Skip to content

Commit fe018f6

Browse files
authored
RUST-1443 Ensure monitors close after server is removed from topology (#744)
1 parent 55cd3d9 commit fe018f6

File tree

5 files changed

+161
-45
lines changed

5 files changed

+161
-45
lines changed

src/client/options/mod.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,19 @@ impl ClientOptions {
13251325
}
13261326
}
13271327

1328+
if let Some(heartbeat_frequency) = self.heartbeat_freq {
1329+
if heartbeat_frequency < self.min_heartbeat_frequency() {
1330+
return Err(ErrorKind::InvalidArgument {
1331+
message: format!(
1332+
"'heartbeat_freq' must be at least {}ms, but {}ms was given",
1333+
self.min_heartbeat_frequency().as_millis(),
1334+
heartbeat_frequency.as_millis()
1335+
),
1336+
}
1337+
.into());
1338+
}
1339+
}
1340+
13281341
Ok(())
13291342
}
13301343

@@ -1373,6 +1386,21 @@ impl ClientOptions {
13731386
pub(crate) fn test_options_mut(&mut self) -> &mut TestOptions {
13741387
self.test_options.get_or_insert_with(Default::default)
13751388
}
1389+
1390+
pub(crate) fn min_heartbeat_frequency(&self) -> Duration {
1391+
#[cfg(test)]
1392+
{
1393+
self.test_options
1394+
.as_ref()
1395+
.and_then(|to| to.min_heartbeat_freq)
1396+
.unwrap_or(MIN_HEARTBEAT_FREQUENCY)
1397+
}
1398+
1399+
#[cfg(not(test))]
1400+
{
1401+
MIN_HEARTBEAT_FREQUENCY
1402+
}
1403+
}
13761404
}
13771405

13781406
/// Splits a string into a section before a given index and a section exclusively after the index.
@@ -1865,19 +1893,7 @@ impl ConnectionString {
18651893
self.direct_connection = Some(get_bool!(value, k));
18661894
}
18671895
k @ "heartbeatfrequencyms" => {
1868-
let duration = get_duration!(value, k);
1869-
1870-
if duration < MIN_HEARTBEAT_FREQUENCY.as_millis() as u64 {
1871-
return Err(ErrorKind::InvalidArgument {
1872-
message: format!(
1873-
"'heartbeatFrequencyMS' must be at least 500, but {} was given",
1874-
duration
1875-
),
1876-
}
1877-
.into());
1878-
}
1879-
1880-
self.heartbeat_frequency = Some(Duration::from_millis(duration));
1896+
self.heartbeat_frequency = Some(Duration::from_millis(get_duration!(value, k)));
18811897
}
18821898
k @ "journal" => {
18831899
let mut write_concern = self.write_concern.get_or_insert_with(Default::default);

src/client/options/test.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use pretty_assertions::assert_eq;
24
use serde::Deserialize;
35

@@ -7,6 +9,7 @@ use crate::{
79
error::ErrorKind,
810
options::Compressor,
911
test::run_spec_test,
12+
Client,
1013
};
1114
#[derive(Debug, Deserialize)]
1215
struct TestFile {
@@ -282,3 +285,14 @@ async fn options_debug_omits_uri() {
282285
assert!(!debug_output.contains("password"));
283286
assert!(!debug_output.contains("uri"));
284287
}
288+
289+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
290+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
291+
async fn options_enforce_min_heartbeat_frequency() {
292+
let options = ClientOptions::builder()
293+
.hosts(vec![ServerAddress::parse("a:123").unwrap()])
294+
.heartbeat_freq(Duration::from_millis(10))
295+
.build();
296+
297+
Client::with_options(options).unwrap_err();
298+
}

src/sdam/monitor.rs

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -100,25 +100,11 @@ impl Monitor {
100100
// We only go to sleep when using the polling protocol (i.e. server never returned a
101101
// topologyVersion) or when the most recent check failed.
102102
if self.topology_version.is_none() || !check_succeeded {
103-
#[cfg(test)]
104-
let min_frequency = self
105-
.client_options
106-
.test_options
107-
.as_ref()
108-
.and_then(|to| to.min_heartbeat_freq)
109-
.unwrap_or(MIN_HEARTBEAT_FREQUENCY);
110-
111-
#[cfg(not(test))]
112-
let min_frequency = MIN_HEARTBEAT_FREQUENCY;
113-
114-
tokio::select! {
115-
_ = runtime::delay_for(min_frequency) => {},
116-
_ = self.request_receiver.wait_for_server_close() => {
117-
break;
118-
}
119-
}
120103
self.request_receiver
121-
.wait_for_check_request(heartbeat_frequency - min_frequency)
104+
.wait_for_check_request(
105+
self.client_options.min_heartbeat_frequency(),
106+
heartbeat_frequency,
107+
)
122108
.await;
123109
}
124110
}
@@ -596,17 +582,28 @@ impl MonitorRequestReceiver {
596582
err
597583
}
598584

599-
/// Wait for a request to immediately check the server to come in, guarded by the provided
600-
/// timeout. If a cancellation request is received indicating the topology has closed, this
601-
/// method will return. All other cancellation requests will be ignored.
602-
async fn wait_for_check_request(&mut self, timeout: Duration) {
585+
/// Wait for a request to immediately check the server to be received, guarded by the provided
586+
/// timeout. If the server associated with this monitor is removed from the topology, this
587+
/// method will return.
588+
///
589+
/// The `delay` parameter indicates how long this method should wait before listening to
590+
/// requests. The time spent in the delay counts toward the provided timeout.
591+
async fn wait_for_check_request(&mut self, delay: Duration, timeout: Duration) {
603592
let _ = runtime::timeout(timeout, async {
593+
let wait_for_check_request = async {
594+
runtime::delay_for(delay).await;
595+
self.topology_check_request_receiver
596+
.wait_for_check_request()
597+
.await;
598+
};
599+
tokio::pin!(wait_for_check_request);
600+
604601
loop {
605602
tokio::select! {
606603
_ = self.individual_check_request_receiver.changed() => {
607604
break;
608605
}
609-
_ = self.topology_check_request_receiver.wait_for_check_request() => {
606+
_ = &mut wait_for_check_request => {
610607
break;
611608
}
612609
_ = self.handle_listener.wait_for_all_handle_drops() => {
@@ -622,11 +619,6 @@ impl MonitorRequestReceiver {
622619
self.cancellation_receiver.borrow_and_update();
623620
}
624621

625-
/// Wait until the server associated with this monitor has been closed.
626-
async fn wait_for_server_close(&mut self) {
627-
self.handle_listener.wait_for_all_handle_drops().await;
628-
}
629-
630622
fn is_alive(&self) -> bool {
631623
self.handle_listener.is_alive()
632624
}

src/sdam/test.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
collections::HashSet,
23
sync::Arc,
34
time::{Duration, Instant},
45
};
@@ -8,8 +9,12 @@ use semver::VersionReq;
89
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
910

1011
use crate::{
12+
client::options::{ClientOptions, ServerAddress},
13+
cmap::RawCommandResponse,
1114
error::{Error, ErrorKind},
15+
event::sdam::SdamEventHandler,
1216
hello::{LEGACY_HELLO_COMMAND_NAME, LEGACY_HELLO_COMMAND_NAME_LOWERCASE},
17+
sdam::{ServerDescription, Topology},
1318
test::{
1419
log_uncaptured,
1520
CmapEvent,
@@ -269,3 +274,91 @@ async fn repl_set_name_mismatch() -> crate::error::Result<()> {
269274

270275
Ok(())
271276
}
277+
278+
/// Test verifying that a server's monitor stops after the server has been removed from the
279+
/// topology.
280+
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
281+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
282+
async fn removed_server_monitor_stops() -> crate::error::Result<()> {
283+
let _guard = LOCK.run_concurrently().await;
284+
285+
let handler = Arc::new(EventHandler::new());
286+
let options = ClientOptions::builder()
287+
.hosts(vec![
288+
ServerAddress::parse("localhost:49152")?,
289+
ServerAddress::parse("localhost:49153")?,
290+
ServerAddress::parse("localhost:49154")?,
291+
])
292+
.heartbeat_freq(Duration::from_millis(50))
293+
.sdam_event_handler(handler.clone() as Arc<dyn SdamEventHandler>)
294+
.repl_set_name("foo".to_string())
295+
.build();
296+
297+
let hosts = options.hosts.clone();
298+
let set_name = options.repl_set_name.clone().unwrap();
299+
300+
let mut subscriber = handler.subscribe();
301+
let topology = Topology::new(options)?;
302+
303+
// Wait until all three monitors have started.
304+
let mut seen_monitors = HashSet::new();
305+
subscriber
306+
.wait_for_event(Duration::from_millis(500), |event| {
307+
if let Event::Sdam(SdamEvent::ServerHeartbeatStarted(e)) = event {
308+
seen_monitors.insert(e.server_address.clone());
309+
}
310+
seen_monitors.len() == hosts.len()
311+
})
312+
.await
313+
.expect("should see all three monitors start");
314+
315+
// Remove the third host from the topology.
316+
let hello = doc! {
317+
"ok": 1,
318+
"isWritablePrimary": true,
319+
"hosts": [
320+
hosts[0].clone().to_string(),
321+
hosts[1].clone().to_string(),
322+
],
323+
"me": hosts[0].clone().to_string(),
324+
"setName": set_name,
325+
"maxBsonObjectSize": 1234,
326+
"maxWriteBatchSize": 1234,
327+
"maxMessageSizeBytes": 1234,
328+
"minWireVersion": 0,
329+
"maxWireVersion": 13,
330+
};
331+
let hello_reply = RawCommandResponse::with_document_and_address(hosts[0].clone(), hello)
332+
.unwrap()
333+
.into_hello_reply()
334+
.unwrap();
335+
336+
topology
337+
.clone_updater()
338+
.update(ServerDescription::new_from_hello_reply(
339+
hosts[0].clone(),
340+
hello_reply,
341+
Duration::from_millis(10),
342+
))
343+
.await;
344+
345+
subscriber.wait_for_event(Duration::from_secs(1), |event| {
346+
matches!(event, Event::Sdam(SdamEvent::ServerClosed(e)) if e.address == hosts[2])
347+
}).await.expect("should see server closed event");
348+
349+
// Capture heartbeat events for 1 second. The monitor for the removed server should stop
350+
// publishing them.
351+
let events = subscriber.collect_events(Duration::from_secs(1), |event| {
352+
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatStarted(e)) if e.server_address == hosts[2])
353+
}).await;
354+
355+
// Use 3 to account for any heartbeats that happen to start between emitting the ServerClosed
356+
// event and actually publishing the state with the closed server.
357+
assert!(
358+
events.len() < 3,
359+
"expected monitor for removed server to stop performing checks, but saw {} heartbeats",
360+
events.len()
361+
);
362+
363+
Ok(())
364+
}

src/sdam/topology.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,10 +1081,11 @@ pub(crate) struct TopologyCheckRequestReceiver {
10811081

10821082
impl TopologyCheckRequestReceiver {
10831083
pub(crate) async fn wait_for_check_request(&mut self) {
1084-
while self.receiver.changed().await.is_ok() {
1085-
if *self.receiver.borrow() > 0 {
1086-
break;
1087-
}
1084+
while *self.receiver.borrow() == 0 {
1085+
// If all the requesters hung up, then just return early.
1086+
if self.receiver.changed().await.is_err() {
1087+
return;
1088+
};
10881089
}
10891090
}
10901091
}

0 commit comments

Comments
 (0)