Skip to content

Commit fbdc830

Browse files
authored
RUST-268 Improve server selection timeout error messages (#116)
1 parent 6131491 commit fbdc830

File tree

7 files changed

+142
-5
lines changed

7 files changed

+142
-5
lines changed

src/client/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,13 @@ impl Client {
213213
}
214214

215215
Err(ErrorKind::ServerSelectionError {
216-
message: "timed out while trying to select server".into(),
216+
message: self
217+
.inner
218+
.topology
219+
.read()
220+
.unwrap()
221+
.description
222+
.server_selection_timeout_error_message(&criteria),
217223
}
218224
.into())
219225
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ pub enum ErrorKind {
149149
ResponseError { message: String },
150150

151151
/// The Client was not able to select a server for the operation.
152-
#[error(display = ": {}", message)]
152+
#[error(display = "{}", message)]
153153
ServerSelectionError { message: String },
154154

155155
/// An error occurred during SRV record lookup.

src/sdam/description/server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ impl ServerDescription {
155155
description
156156
}
157157

158+
/// Whether this server is "available" as per the definition in the server selection spec.
159+
pub(crate) fn is_available(&self) -> bool {
160+
match self.server_type {
161+
ServerType::Unknown => false,
162+
_ => true,
163+
}
164+
}
165+
158166
pub(crate) fn compatibility_error_message(&self) -> Option<String> {
159167
if let Ok(Some(ref reply)) = self.reply {
160168
let is_master_min_wire_version = reply.command_response.min_wire_version.unwrap_or(0);

src/sdam/description/topology/server_selection/mod.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#[cfg(test)]
22
mod test;
33

4-
use std::time::Duration;
4+
use std::{fmt, time::Duration};
55

66
use rand::seq::IteratorRandom;
77

@@ -21,6 +21,24 @@ use crate::{
2121
const DEFAULT_LOCAL_THRESHOLD: Duration = Duration::from_millis(15);
2222

2323
impl TopologyDescription {
24+
pub(crate) fn server_selection_timeout_error_message(
25+
&self,
26+
criteria: &SelectionCriteria,
27+
) -> String {
28+
if self.has_available_servers() {
29+
format!(
30+
"Server selection timeout: None of the available servers suitable for criteria \
31+
{:?}. Topology: {}",
32+
criteria, self
33+
)
34+
} else {
35+
format!(
36+
"Server selection timeout: No available servers. Topology: {}",
37+
self
38+
)
39+
}
40+
}
41+
2442
pub(crate) fn select_server<'a>(
2543
&'a self,
2644
criteria: &'a SelectionCriteria,
@@ -56,6 +74,10 @@ impl TopologyDescription {
5674
Ok(suitable_servers.into_iter().choose(&mut rand::thread_rng()))
5775
}
5876

77+
fn has_available_servers(&self) -> bool {
78+
self.servers.values().any(|server| server.is_available())
79+
}
80+
5981
fn suitable_servers<'a>(
6082
&'a self,
6183
read_preference: &'a ReadPreference,
@@ -290,6 +312,16 @@ impl TopologyDescription {
290312
}
291313
}
292314

315+
impl fmt::Display for TopologyDescription {
316+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317+
write!(f, "{{ Type: {:?}, Servers: [ ", self.topology_type)?;
318+
for server_info in self.servers.values().map(ServerInfo::new) {
319+
write!(f, "{}, ", server_info)?;
320+
}
321+
write!(f, "] }}")
322+
}
323+
}
324+
293325
fn filter_servers_by_tag_sets(servers: &mut Vec<&ServerDescription>, tag_sets: &[TagSet]) {
294326
if tag_sets.is_empty() {
295327
return;

src/sdam/public.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::{fmt, time::Duration};
22

33
use bson::UtcDateTime;
44

@@ -82,3 +82,55 @@ impl<'a> ServerInfo<'a> {
8282
self.command_response_getter(|r| r.tags.as_ref())
8383
}
8484
}
85+
86+
impl<'a> fmt::Display for ServerInfo<'a> {
87+
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
88+
write!(
89+
f,
90+
"{{ Address: {}, Type: {:?}",
91+
self.address(),
92+
self.server_type()
93+
)?;
94+
95+
match self.description.reply {
96+
Ok(_) => {
97+
if let Some(avg_rtt) = self.average_round_trip_time() {
98+
write!(f, ", Average RTT: {:?}", avg_rtt)?;
99+
}
100+
101+
if let Some(last_update_time) = self.last_update_time() {
102+
write!(f, ", Last Update Time: {:?}", last_update_time)?;
103+
}
104+
105+
if let Some(max_wire_version) = self.max_wire_version() {
106+
write!(f, ", Max Wire Version: {}", max_wire_version)?;
107+
}
108+
109+
if let Some(min_wire_version) = self.min_wire_version() {
110+
write!(f, ", Min Wire Version: {}", min_wire_version)?;
111+
}
112+
113+
if let Some(rs_name) = self.replica_set_name() {
114+
write!(f, ", Replica Set Name: {}", rs_name)?;
115+
}
116+
117+
if let Some(rs_version) = self.replica_set_version() {
118+
write!(f, ", Replica Set Version: {}", rs_version)?;
119+
}
120+
121+
if let Some(tags) = self.tags() {
122+
write!(f, ", Tags: {:?}", tags)?;
123+
}
124+
125+
if let Some(compatibility_error) = self.description.compatibility_error_message() {
126+
write!(f, ", Compatiblity Error: {}", compatibility_error)?;
127+
}
128+
}
129+
Err(ref e) => {
130+
write!(f, ", Error: {}", e)?;
131+
}
132+
}
133+
134+
write!(f, " }}")
135+
}
136+
}

src/test/client.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::borrow::Cow;
1+
use std::{borrow::Cow, collections::HashMap, time::Duration};
22

33
use bson::{bson, doc, Bson};
44
use serde::Deserialize;
@@ -9,6 +9,7 @@ use crate::{
99
auth::{AuthMechanism, Credential},
1010
ClientOptions,
1111
},
12+
selection_criteria::{ReadPreference, SelectionCriteria},
1213
test::{CLIENT, LOCK},
1314
Client,
1415
};
@@ -57,6 +58,40 @@ fn metadata_sent_in_handshake() {
5758
assert_eq!(metadata.client.driver.name, "mrd");
5859
}
5960

61+
#[test]
62+
fn server_selection_timeout_message() {
63+
let _guard = LOCK.run_concurrently();
64+
65+
if !CLIENT.is_replica_set() {
66+
return;
67+
}
68+
69+
let mut tag_set = HashMap::new();
70+
tag_set.insert("asdfasdf".to_string(), "asdfadsf".to_string());
71+
72+
let unsatisfiable_read_preference = ReadPreference::Secondary {
73+
tag_sets: Some(vec![tag_set]),
74+
max_staleness: None,
75+
};
76+
77+
let mut options = CLIENT.options.clone();
78+
options.server_selection_timeout = Some(Duration::from_millis(500));
79+
80+
let client = Client::with_options(options).unwrap();
81+
let db = client.database("test");
82+
let error = db
83+
.run_command(
84+
doc! { "isMaster": 1 },
85+
SelectionCriteria::ReadPreference(unsatisfiable_read_preference),
86+
)
87+
.expect_err("should fail with server selection timeout error");
88+
89+
let error_description = format!("{}", error);
90+
for host in CLIENT.options.hosts.iter() {
91+
assert!(error_description.contains(format!("{}", host).as_str()));
92+
}
93+
}
94+
6095
#[test]
6196
#[function_name::named]
6297
fn list_databases() {

src/test/util/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ impl TestClient {
112112
self.options.credential.is_some()
113113
}
114114

115+
pub fn is_replica_set(&self) -> bool {
116+
self.options.repl_set_name.is_some()
117+
}
118+
115119
#[allow(dead_code)]
116120
pub fn server_version_eq(&self, major: u64, minor: u64) -> bool {
117121
self.server_version.major == major && self.server_version.minor == minor

0 commit comments

Comments
 (0)