Skip to content

Commit 321cc34

Browse files
authored
RUST-1373 Update unified test format runner to support SDAM integration tests (#712)
This also completes RUST-1432
1 parent 1d52ed7 commit 321cc34

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1896
-874
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ authors = [
77
"Kaitlin Mahar <[email protected]>",
88
]
99
description = "The official MongoDB driver for Rust"
10-
edition = "2018"
10+
edition = "2021"
1111
keywords = ["mongo", "mongodb", "database", "bson", "nosql"]
1212
categories = ["asynchronous", "database", "web-programming"]
1313
repository = "https://github.com/mongodb/mongo-rust-driver"
@@ -163,6 +163,7 @@ function_name = "0.2.1"
163163
futures = "0.3"
164164
home = "0.5"
165165
pretty_assertions = "1.1.0"
166+
serde = { version = "*", features = ["rc"] }
166167
serde_json = "1.0.64"
167168
semver = "1.0.0"
168169
time = "0.3.9"

src/bson_util/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,16 @@ pub(crate) fn serialize_error_as_string<S: Serializer>(
225225
serializer.serialize_str(&val.to_string())
226226
}
227227

228+
/// Serializes a Result, serializing the error value as a string if present.
229+
pub(crate) fn serialize_result_error_as_string<S: Serializer, T: Serialize>(
230+
val: &Result<T>,
231+
serializer: S,
232+
) -> std::result::Result<S::Ok, S::Error> {
233+
val.as_ref()
234+
.map_err(|e| e.to_string())
235+
.serialize(serializer)
236+
}
237+
228238
#[cfg(test)]
229239
mod test {
230240
use crate::bson_util::num_decimal_digits;

src/client/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,11 @@ impl Client {
475475
})
476476
.ok()
477477
}
478+
479+
#[cfg(test)]
480+
pub(crate) fn topology(&self) -> &Topology {
481+
&self.inner.topology
482+
}
478483
}
479484

480485
#[cfg(feature = "csfle")]

src/cmap/test/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct TestFile {
2121
#[serde(default)]
2222
pub ignore: Vec<String>,
2323
pub fail_point: Option<Document>,
24-
pub run_on: Option<Vec<RunOn>>,
24+
pub(crate) run_on: Option<Vec<RunOn>>,
2525
}
2626

2727
#[derive(Debug, Deserialize)]

src/sdam/description/server.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use serde::{Deserialize, Serialize};
44

55
use crate::{
66
bson::{oid::ObjectId, DateTime},
7+
bson_util,
78
client::ClusterTime,
9+
error::{ErrorKind, Result},
810
hello::HelloReply,
911
options::ServerAddress,
1012
selection_criteria::TagSet,
@@ -106,7 +108,8 @@ pub(crate) struct ServerDescription {
106108
// allows us to ensure that only valid states are possible (e.g. preventing that both an error
107109
// and a reply are present) while still making it easy to define helper methods on
108110
// ServerDescription for information we need from the hello reply by propagating with `?`.
109-
pub(crate) reply: Result<Option<HelloReply>, String>,
111+
#[serde(serialize_with = "bson_util::serialize_result_error_as_string")]
112+
pub(crate) reply: Result<Option<HelloReply>>,
110113
}
111114

112115
impl PartialEq for ServerDescription {
@@ -122,17 +125,22 @@ impl PartialEq for ServerDescription {
122125

123126
self_response == other_response
124127
}
125-
(Err(self_err), Err(other_err)) => self_err == other_err,
128+
(Err(self_err), Err(other_err)) => {
129+
match (self_err.kind.as_ref(), other_err.kind.as_ref()) {
130+
(
131+
ErrorKind::Command(self_command_err),
132+
ErrorKind::Command(other_command_err),
133+
) => self_command_err.code == other_command_err.code,
134+
_ => self_err.to_string() == other_err.to_string(),
135+
}
136+
}
126137
_ => false,
127138
}
128139
}
129140
}
130141

131142
impl ServerDescription {
132-
pub(crate) fn new(
133-
mut address: ServerAddress,
134-
hello_reply: Option<Result<HelloReply, String>>,
135-
) -> Self {
143+
pub(crate) fn new(mut address: ServerAddress, hello_reply: Option<Result<HelloReply>>) -> Self {
136144
address = ServerAddress::Tcp {
137145
host: address.host().to_lowercase(),
138146
port: address.port(),
@@ -231,7 +239,7 @@ impl ServerDescription {
231239
None
232240
}
233241

234-
pub(crate) fn set_name(&self) -> Result<Option<String>, String> {
242+
pub(crate) fn set_name(&self) -> Result<Option<String>> {
235243
let set_name = self
236244
.reply
237245
.as_ref()
@@ -241,7 +249,7 @@ impl ServerDescription {
241249
Ok(set_name)
242250
}
243251

244-
pub(crate) fn known_hosts(&self) -> Result<impl Iterator<Item = &String>, String> {
252+
pub(crate) fn known_hosts(&self) -> Result<impl Iterator<Item = &String>> {
245253
let known_hosts = self
246254
.reply
247255
.as_ref()
@@ -262,7 +270,7 @@ impl ServerDescription {
262270
Ok(known_hosts.into_iter().flatten())
263271
}
264272

265-
pub(crate) fn invalid_me(&self) -> Result<bool, String> {
273+
pub(crate) fn invalid_me(&self) -> Result<bool> {
266274
if let Some(ref reply) = self.reply.as_ref().map_err(Clone::clone)? {
267275
if let Some(ref me) = reply.command_response.me {
268276
return Ok(&self.address.to_string() != me);
@@ -272,7 +280,7 @@ impl ServerDescription {
272280
Ok(false)
273281
}
274282

275-
pub(crate) fn set_version(&self) -> Result<Option<i32>, String> {
283+
pub(crate) fn set_version(&self) -> Result<Option<i32>> {
276284
let me = self
277285
.reply
278286
.as_ref()
@@ -282,7 +290,7 @@ impl ServerDescription {
282290
Ok(me)
283291
}
284292

285-
pub(crate) fn election_id(&self) -> Result<Option<ObjectId>, String> {
293+
pub(crate) fn election_id(&self) -> Result<Option<ObjectId>> {
286294
let me = self
287295
.reply
288296
.as_ref()
@@ -293,7 +301,7 @@ impl ServerDescription {
293301
}
294302

295303
#[cfg(test)]
296-
pub(crate) fn min_wire_version(&self) -> Result<Option<i32>, String> {
304+
pub(crate) fn min_wire_version(&self) -> Result<Option<i32>> {
297305
let me = self
298306
.reply
299307
.as_ref()
@@ -303,7 +311,7 @@ impl ServerDescription {
303311
Ok(me)
304312
}
305313

306-
pub(crate) fn max_wire_version(&self) -> Result<Option<i32>, String> {
314+
pub(crate) fn max_wire_version(&self) -> Result<Option<i32>> {
307315
let me = self
308316
.reply
309317
.as_ref()
@@ -313,7 +321,7 @@ impl ServerDescription {
313321
Ok(me)
314322
}
315323

316-
pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>, String> {
324+
pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
317325
match self.reply {
318326
Ok(None) => Ok(None),
319327
Ok(Some(ref reply)) => Ok(reply
@@ -325,7 +333,7 @@ impl ServerDescription {
325333
}
326334
}
327335

328-
pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>, String> {
336+
pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>> {
329337
match self.reply {
330338
Ok(None) => Ok(None),
331339
Ok(Some(ref reply)) => Ok(reply
@@ -336,7 +344,7 @@ impl ServerDescription {
336344
}
337345
}
338346

339-
pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>, String> {
347+
pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>> {
340348
match self.reply {
341349
Ok(None) => Ok(None),
342350
Ok(Some(ref reply)) => Ok(reply.cluster_time.clone()),

src/sdam/description/topology/mod.rs

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
bson::oid::ObjectId,
1414
client::ClusterTime,
1515
cmap::Command,
16+
error::{Error, Result},
1617
options::{ClientOptions, ServerAddress},
1718
sdam::{
1819
description::server::{ServerDescription, ServerType},
@@ -460,10 +461,7 @@ impl TopologyDescription {
460461

461462
/// Update the topology based on the new information about the topology contained by the
462463
/// ServerDescription.
463-
pub(crate) fn update(
464-
&mut self,
465-
mut server_description: ServerDescription,
466-
) -> Result<(), String> {
464+
pub(crate) fn update(&mut self, mut server_description: ServerDescription) -> Result<()> {
467465
// Ignore updates from servers not currently in the cluster.
468466
if !self.servers.contains_key(&server_description.address) {
469467
return Ok(());
@@ -516,10 +514,7 @@ impl TopologyDescription {
516514
}
517515

518516
/// Update the Unknown topology description based on the server description.
519-
fn update_unknown_topology(
520-
&mut self,
521-
server_description: ServerDescription,
522-
) -> Result<(), String> {
517+
fn update_unknown_topology(&mut self, server_description: ServerDescription) -> Result<()> {
523518
match server_description.server_type {
524519
ServerType::Unknown | ServerType::RsGhost => {}
525520
ServerType::Standalone => {
@@ -535,7 +530,7 @@ impl TopologyDescription {
535530
self.update_rs_without_primary_server(server_description)?;
536531
}
537532
ServerType::LoadBalancer => {
538-
return Err("cannot transition to a load balancer".to_string())
533+
return Err(Error::internal("cannot transition to a load balancer"))
539534
}
540535
}
541536

@@ -556,7 +551,7 @@ impl TopologyDescription {
556551
fn update_replica_set_no_primary_topology(
557552
&mut self,
558553
server_description: ServerDescription,
559-
) -> Result<(), String> {
554+
) -> Result<()> {
560555
match server_description.server_type {
561556
ServerType::Unknown | ServerType::RsGhost => {}
562557
ServerType::Standalone | ServerType::Mongos => {
@@ -570,7 +565,7 @@ impl TopologyDescription {
570565
self.update_rs_without_primary_server(server_description)?;
571566
}
572567
ServerType::LoadBalancer => {
573-
return Err("cannot transition to a load balancer".to_string())
568+
return Err(Error::internal("cannot transition to a load balancer"))
574569
}
575570
}
576571

@@ -581,7 +576,7 @@ impl TopologyDescription {
581576
fn update_replica_set_with_primary_topology(
582577
&mut self,
583578
server_description: ServerDescription,
584-
) -> Result<(), String> {
579+
) -> Result<()> {
585580
match server_description.server_type {
586581
ServerType::Unknown | ServerType::RsGhost => {
587582
self.record_primary_state();
@@ -595,7 +590,7 @@ impl TopologyDescription {
595590
self.update_rs_with_primary_from_member(server_description)?;
596591
}
597592
ServerType::LoadBalancer => {
598-
return Err("cannot transition to a load balancer".to_string())
593+
return Err(Error::internal("cannot transition to a load balancer"));
599594
}
600595
}
601596

@@ -616,7 +611,7 @@ impl TopologyDescription {
616611
fn update_rs_without_primary_server(
617612
&mut self,
618613
server_description: ServerDescription,
619-
) -> Result<(), String> {
614+
) -> Result<()> {
620615
if self.set_name.is_none() {
621616
self.set_name = server_description.set_name()?;
622617
} else if self.set_name != server_description.set_name()? {
@@ -639,7 +634,7 @@ impl TopologyDescription {
639634
fn update_rs_with_primary_from_member(
640635
&mut self,
641636
server_description: ServerDescription,
642-
) -> Result<(), String> {
637+
) -> Result<()> {
643638
if self.set_name != server_description.set_name()? {
644639
self.servers.remove(&server_description.address);
645640
self.record_primary_state();
@@ -661,7 +656,7 @@ impl TopologyDescription {
661656
fn update_rs_from_primary_server(
662657
&mut self,
663658
server_description: ServerDescription,
664-
) -> Result<(), String> {
659+
) -> Result<()> {
665660
if self.set_name.is_none() {
666661
self.set_name = server_description.set_name()?;
667662
} else if self.set_name != server_description.set_name()? {
@@ -750,13 +745,8 @@ impl TopologyDescription {
750745
}
751746

752747
/// Create a new ServerDescription for each address and add it to the topology.
753-
fn add_new_servers<'a>(
754-
&mut self,
755-
servers: impl Iterator<Item = &'a String>,
756-
) -> Result<(), String> {
757-
let servers: Result<Vec<_>, String> = servers
758-
.map(|server| ServerAddress::parse(server).map_err(|e| e.to_string()))
759-
.collect();
748+
fn add_new_servers<'a>(&mut self, servers: impl Iterator<Item = &'a String>) -> Result<()> {
749+
let servers: Result<Vec<_>> = servers.map(ServerAddress::parse).collect();
760750

761751
self.add_new_servers_from_addresses(servers?.iter());
762752
Ok(())
@@ -856,16 +846,13 @@ pub(crate) struct TopologyDescriptionDiff<'a> {
856846
}
857847

858848
fn verify_max_staleness(max_staleness: Option<Duration>) -> crate::error::Result<()> {
859-
verify_max_staleness_inner(max_staleness)
860-
.map_err(|s| crate::error::ErrorKind::InvalidArgument { message: s }.into())
861-
}
862-
863-
fn verify_max_staleness_inner(max_staleness: Option<Duration>) -> std::result::Result<(), String> {
864849
if max_staleness
865850
.map(|staleness| staleness > Duration::from_secs(0) && staleness < Duration::from_secs(90))
866851
.unwrap_or(false)
867852
{
868-
return Err("max staleness cannot be both positive and below 90 seconds".into());
853+
return Err(Error::invalid_argument(
854+
"max staleness cannot be both positive and below 90 seconds",
855+
));
869856
}
870857

871858
Ok(())

src/sdam/description/topology/server_selection/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ impl TopologyDescription {
185185
.filter(move |server| types.contains(&server.server_type))
186186
}
187187

188+
#[cfg(test)]
189+
pub(crate) fn primary(&self) -> Option<&ServerDescription> {
190+
self.servers_with_type(&[ServerType::RsPrimary]).next()
191+
}
192+
188193
fn suitable_servers_in_replica_set<'a>(
189194
&self,
190195
read_preference: &'a ReadPreference,

src/sdam/public.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use serde::Serialize;
55
pub use crate::sdam::description::{server::ServerType, topology::TopologyType};
66
use crate::{
77
bson::DateTime,
8+
error::Error,
89
hello::HelloCommandResponse,
910
options::ServerAddress,
1011
sdam::ServerDescription,
@@ -100,6 +101,15 @@ impl<'a> ServerInfo<'a> {
100101
pub fn tags(&self) -> Option<&TagSet> {
101102
self.command_response_getter(|r| r.tags.as_ref())
102103
}
104+
105+
/// Gets the error that caused the server's state to be transitioned to Unknown, if any.
106+
///
107+
/// When the driver encounters certain errors during operation execution or server monitoring,
108+
/// it transitions the affected server's state to Unknown, rendering the server unusable for
109+
/// future operations until it is confirmed to be in healthy state again.
110+
pub fn error(&self) -> Option<&Error> {
111+
self.description.reply.as_ref().err()
112+
}
103113
}
104114

105115
impl<'a> fmt::Debug for ServerInfo<'a> {

0 commit comments

Comments
 (0)