Skip to content

Commit 84e467b

Browse files
authored
RUST-254 Implement minHeartbeatFrequencyMS (#102)
1 parent bb5ff52 commit 84e467b

File tree

4 files changed

+36
-6
lines changed

4 files changed

+36
-6
lines changed

src/client/options/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::{
3232
concern::{Acknowledgment, ReadConcern, WriteConcern},
3333
error::{ErrorKind, Result},
3434
event::{cmap::CmapEventHandler, command::CommandEventHandler},
35+
sdam::MIN_HEARTBEAT_FREQUENCY,
3536
selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
3637
srv::SrvResolver,
3738
};
@@ -1092,7 +1093,7 @@ impl ClientOptionsParser {
10921093
k @ "heartbeatfrequencyms" => {
10931094
let duration = get_duration!(value, k);
10941095

1095-
if duration < 500 {
1096+
if duration < MIN_HEARTBEAT_FREQUENCY.num_milliseconds() as u64 {
10961097
return Err(ErrorKind::ArgumentError {
10971098
message: format!(
10981099
"'heartbeatFrequencyMS' must be at least 500, but {} was given",

src/sdam/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ pub use self::public::{ServerInfo, ServerType};
77

88
pub(crate) use self::{
99
description::{server::ServerDescription, topology::TopologyDescription},
10+
monitor::MIN_HEARTBEAT_FREQUENCY,
1011
state::{server::Server, update_topology, Topology, TopologyUpdateCondvar},
1112
};

src/sdam/monitor.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
};
55

66
use bson::{bson, doc};
7+
use lazy_static::lazy_static;
78
use time::PreciseTime;
89

910
use super::{
@@ -19,6 +20,11 @@ use crate::{
1920

2021
const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
2122

23+
lazy_static! {
24+
// Unfortunately, the `time` crate has not yet updated to make the `Duration` constructors `const`, so we have to use lazy_static.
25+
pub(crate) static ref MIN_HEARTBEAT_FREQUENCY: time::Duration = time::Duration::milliseconds(500);
26+
}
27+
2228
/// Starts a monitoring thread associated with a given Server. A weak reference is used to ensure
2329
/// that the monitoring thread doesn't keep the server alive after it's been removed from the
2430
/// topology or the client has been dropped.
@@ -37,9 +43,28 @@ pub(super) fn monitor_server(
3743
None => return,
3844
};
3945

40-
match server.upgrade() {
46+
let last_check = PreciseTime::now();
47+
48+
let timed_out = match server.upgrade() {
4149
Some(server) => server.wait_timeout(heartbeat_frequency),
4250
None => return,
51+
};
52+
53+
if !timed_out {
54+
let duration_since_last_check = last_check.to(PreciseTime::now());
55+
56+
if duration_since_last_check < *MIN_HEARTBEAT_FREQUENCY {
57+
let remaining_time = *MIN_HEARTBEAT_FREQUENCY - duration_since_last_check;
58+
59+
// Since MIN_HEARTBEAT_FREQUENCY is 500 and `duration_since_last_check` is less
60+
// than it but still positive, we can be sure that the time::Duration can be
61+
// successfully converted to a std::time::Duration. However, in the case of some
62+
// bug causing this not to be true, rather than panicking the monitoring thread,
63+
// we instead just don't sleep and proceed to checking the server a bit early.
64+
if let Ok(remaining_time) = remaining_time.to_std() {
65+
std::thread::sleep(remaining_time);
66+
}
67+
}
4368
}
4469
}
4570
});

src/sdam/state/server.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,14 @@ impl Server {
6161
self.topology.upgrade()
6262
}
6363

64-
pub(crate) fn wait_timeout(&self, duration: Duration) {
65-
let _ = self
66-
.condvar
64+
/// Waits until either the server is requested to do a topology check or until `duration` has
65+
/// elapsed. Returns `true` if `duration` has elapsed and `false` otherwise.
66+
pub(crate) fn wait_timeout(&self, duration: Duration) -> bool {
67+
self.condvar
6768
.wait_timeout(self.condvar_mutex.lock().unwrap(), duration)
68-
.unwrap();
69+
.unwrap()
70+
.1
71+
.timed_out()
6972
}
7073

7174
pub(crate) fn request_topology_check(&self) {

0 commit comments

Comments
 (0)