Skip to content

Commit 3a6cd7e

Browse files
authored
RUST-258 Replace connection in monitoring threads when errors oc… (#121)
1 parent a4af0b7 commit 3a6cd7e

File tree

3 files changed

+131
-80
lines changed

3 files changed

+131
-80
lines changed

src/cmap/conn/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ impl Connection {
8989
Ok(conn)
9090
}
9191

92+
pub(crate) fn new_monitoring(
93+
address: StreamAddress,
94+
connect_timeout: Option<Duration>,
95+
tls_options: Option<TlsOptions>,
96+
) -> Result<Self> {
97+
Self::new(0, address, 0, connect_timeout, tls_options, None)
98+
}
99+
92100
pub(crate) fn info(&self) -> ConnectionInfo {
93101
ConnectionInfo {
94102
id: self.id,

src/sdam/monitor.rs

Lines changed: 119 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
sync::{Arc, Weak},
3-
time::Duration,
4-
};
1+
use std::{sync::Weak, time::Duration};
52

63
use bson::{bson, doc};
74
use lazy_static::lazy_static;
@@ -15,6 +12,7 @@ use crate::{
1512
cmap::{Command, Connection},
1613
error::Result,
1714
is_master::IsMasterReply,
15+
options::{ClientOptions, StreamAddress},
1816
sdam::update_topology,
1917
};
2018

@@ -25,27 +23,50 @@ lazy_static! {
2523
pub(crate) static ref MIN_HEARTBEAT_FREQUENCY: time::Duration = time::Duration::milliseconds(500);
2624
}
2725

28-
/// Starts a monitoring thread associated with a given Server. A weak reference is used to ensure
29-
/// that the monitoring thread doesn't keep the server alive after it's been removed from the
30-
/// topology or the client has been dropped.
31-
pub(super) fn monitor_server(
32-
mut conn: Connection,
26+
pub(super) struct Monitor {
27+
address: StreamAddress,
28+
connection: Option<Connection>,
3329
server: Weak<Server>,
34-
heartbeat_frequency: Option<Duration>,
35-
) {
36-
std::thread::spawn(move || {
37-
let mut server_type = ServerType::Unknown;
38-
let heartbeat_frequency = heartbeat_frequency.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY);
30+
server_type: ServerType,
31+
options: ClientOptions,
32+
}
33+
34+
impl Monitor {
35+
/// Starts a monitoring thread associated with a given Server. A weak reference is used to
36+
/// ensure that the monitoring thread doesn't keep the server alive after it's been removed
37+
/// from the topology or the client has been dropped.
38+
pub(super) fn start(
39+
address: StreamAddress,
40+
server: Weak<Server>,
41+
options: ClientOptions,
42+
) -> Result<()> {
43+
let mut monitor = Self {
44+
address,
45+
connection: None,
46+
server,
47+
server_type: ServerType::Unknown,
48+
options,
49+
};
50+
51+
std::thread::spawn(move || {
52+
monitor.execute();
53+
});
54+
55+
Ok(())
56+
}
57+
58+
fn execute(&mut self) {
59+
let heartbeat_frequency = self
60+
.options
61+
.heartbeat_freq
62+
.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY);
3963

4064
loop {
41-
server_type = match monitor_server_check(&mut conn, server_type, &server) {
42-
Some(server_type) => server_type,
43-
None => return,
44-
};
65+
self.check_server_and_update_topology();
4566

4667
let last_check = PreciseTime::now();
4768

48-
let timed_out = match server.upgrade() {
69+
let timed_out = match self.server.upgrade() {
4970
Some(server) => server.wait_timeout(heartbeat_frequency),
5071
None => return,
5172
};
@@ -56,68 +77,100 @@ pub(super) fn monitor_server(
5677
if duration_since_last_check < *MIN_HEARTBEAT_FREQUENCY {
5778
let remaining_time = *MIN_HEARTBEAT_FREQUENCY - duration_since_last_check;
5879

59-
// Since MIN_HEARTBEAT_FREQUENCY is 500 and `duration_since_last_check` is less
60-
// than it but still positive, we can be sure that the time::Duration can be
61-
// successfully converted to a std::time::Duration. However, in the case of some
62-
// bug causing this not to be true, rather than panicking the monitoring thread,
63-
// we instead just don't sleep and proceed to checking the server a bit early.
80+
// Since MIN_HEARTBEAT_FREQUENCY is 500 and `duration_since_last_check` is
81+
// less than it but still positive, we can be sure
82+
// that the time::Duration can be successfully
83+
// converted to a std::time::Duration. However, in the case of some
84+
// bug causing this not to be true, rather than panicking the monitoring
85+
// thread, we instead just don't sleep and proceed
86+
// to checking the server a bit early.
6487
if let Ok(remaining_time) = remaining_time.to_std() {
6588
std::thread::sleep(remaining_time);
6689
}
6790
}
6891
}
6992
}
70-
});
71-
}
93+
}
7294

73-
fn monitor_server_check(
74-
conn: &mut Connection,
75-
mut server_type: ServerType,
76-
server: &Weak<Server>,
77-
) -> Option<ServerType> {
78-
// If the server has been dropped, terminate the monitoring thread.
79-
let server = match server.upgrade() {
80-
Some(server) => server,
81-
None => return None,
82-
};
83-
84-
// If the topology has been dropped, terminate the monitoring thread.
85-
let topology = match server.topology() {
86-
Some(topology) => topology,
87-
None => return None,
88-
};
89-
90-
// Send an isMaster to the server.
91-
let server_description = check_server(conn, server_type, &server);
92-
server_type = server_description.server_type;
93-
94-
update_topology(topology, server_description);
95-
96-
Some(server_type)
97-
}
95+
/// Checks the the server by running an `isMaster` command. If an I/O error occurs, the
96+
/// connection will replaced with a new one.
97+
fn check_server_and_update_topology(&mut self) {
98+
// If the server has been dropped, terminate the monitoring thread.
99+
let server = match self.server.upgrade() {
100+
Some(server) => server,
101+
None => return,
102+
};
103+
104+
// If the topology has been dropped, terminate the monitoring thread.
105+
let topology = match server.topology() {
106+
Some(topology) => topology,
107+
None => return,
108+
};
109+
110+
// Send an isMaster to the server.
111+
let server_description = self.check_server();
112+
self.server_type = server_description.server_type;
113+
114+
update_topology(topology, server_description);
115+
}
98116

99-
fn check_server(
100-
conn: &mut Connection,
101-
server_type: ServerType,
102-
server: &Arc<Server>,
103-
) -> ServerDescription {
104-
let address = conn.address().clone();
117+
fn check_server(&mut self) -> ServerDescription {
118+
let address = self.address.clone();
105119

106-
match is_master(conn) {
107-
Ok(reply) => return ServerDescription::new(address, Some(Ok(reply))),
108-
Err(e) => {
109-
server.clear_connection_pool();
120+
match self.perform_is_master() {
121+
Ok(reply) => ServerDescription::new(address, Some(Ok(reply))),
122+
Err(e) => {
123+
self.clear_connection_pool();
110124

111-
if server_type == ServerType::Unknown {
112-
return ServerDescription::new(address, Some(Err(e)));
125+
if self.server_type == ServerType::Unknown {
126+
return ServerDescription::new(address, Some(Err(e)));
127+
}
128+
129+
ServerDescription::new(address, Some(self.perform_is_master()))
113130
}
114131
}
115132
}
116133

117-
ServerDescription::new(address, Some(is_master(conn)))
134+
fn perform_is_master(&mut self) -> Result<IsMasterReply> {
135+
let connection = self.resolve_connection()?;
136+
let result = is_master(connection);
137+
138+
if result
139+
.as_ref()
140+
.err()
141+
.map(|e| e.kind.is_network_error())
142+
.unwrap_or(false)
143+
{
144+
self.connection.take();
145+
}
146+
147+
result
148+
}
149+
150+
fn resolve_connection(&mut self) -> Result<&mut Connection> {
151+
if let Some(ref mut connection) = self.connection {
152+
return Ok(connection);
153+
}
154+
155+
let connection = Connection::new_monitoring(
156+
self.address.clone(),
157+
self.options.connect_timeout,
158+
self.options.tls_options(),
159+
)?;
160+
161+
// Since the connection was not `Some` above, this will always insert the new connection and
162+
// return a reference to it.
163+
Ok(self.connection.get_or_insert(connection))
164+
}
165+
166+
fn clear_connection_pool(&self) {
167+
if let Some(server) = self.server.upgrade() {
168+
server.clear_connection_pool();
169+
}
170+
}
118171
}
119172

120-
fn is_master(conn: &mut Connection) -> Result<IsMasterReply> {
173+
fn is_master(connection: &mut Connection) -> Result<IsMasterReply> {
121174
let command = Command::new_read(
122175
"isMaster".into(),
123176
"admin".into(),
@@ -126,14 +179,13 @@ fn is_master(conn: &mut Connection) -> Result<IsMasterReply> {
126179
);
127180

128181
let start_time = PreciseTime::now();
129-
let command_response = conn.send_command(command, None)?;
182+
let command_response = connection.send_command(command, None)?;
130183
let end_time = PreciseTime::now();
131184

132185
let command_response = command_response.body()?;
133186

134187
Ok(IsMasterReply {
135188
command_response,
136-
// TODO RUST-193: Round-trip time
137189
round_trip_time: Some(start_time.to(end_time).to_std().unwrap()),
138190
})
139191
}

src/sdam/state/mod.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
options::{ClientOptions, StreamAddress},
1717
sdam::{
1818
description::server::{ServerDescription, ServerType},
19-
monitor::monitor_server,
19+
monitor::Monitor,
2020
},
2121
selection_criteria::SelectionCriteria,
2222
};
@@ -133,18 +133,9 @@ impl Topology {
133133
address.clone(),
134134
&options,
135135
));
136-
self.servers.insert(address, server.clone());
137-
138-
let conn = Connection::new(
139-
0,
140-
server.address.clone(),
141-
0,
142-
options.connect_timeout,
143-
options.tls_options(),
144-
options.cmap_event_handler.clone(),
145-
)?;
146-
147-
monitor_server(conn, Arc::downgrade(&server), options.heartbeat_freq);
136+
self.servers.insert(address.clone(), server.clone());
137+
138+
Monitor::start(address, Arc::downgrade(&server), options)?;
148139

149140
Ok(())
150141
}

0 commit comments

Comments
 (0)