Skip to content

Commit bf0275e

Browse files
committed
RUST-1100 Fix find_and_getmore_share_session test (#525)
This also fixes RUST-1113 and RUST-1114
1 parent f5bb0c2 commit bf0275e

File tree

9 files changed

+140
-26
lines changed

9 files changed

+140
-26
lines changed

src/client/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,11 @@ impl Client {
367367
}
368368
}
369369

370+
#[cfg(test)]
371+
pub(crate) async fn topology_description(&self) -> crate::sdam::TopologyDescription {
372+
self.inner.topology.description().await
373+
}
374+
370375
#[cfg(all(test, not(feature = "sync")))]
371376
pub(crate) async fn get_hosts(&self) -> Vec<String> {
372377
let servers = self.inner.topology.servers().await;

src/client/session/test/mod.rs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
mod causal_consistency;
22

3-
use std::{future::Future, time::Duration};
3+
use std::{future::Future, sync::Arc, time::Duration};
44

55
use bson::Document;
66
use futures::stream::StreamExt;
77
use tokio::sync::RwLockReadGuard;
88

99
use crate::{
1010
bson::{doc, Bson},
11+
coll::options::{CountOptions, InsertManyOptions},
1112
error::Result,
12-
options::{Acknowledgment, FindOptions, InsertOneOptions, ReadPreference, WriteConcern},
13+
options::{Acknowledgment, FindOptions, ReadConcern, ReadPreference, WriteConcern},
14+
sdam::ServerInfo,
1315
selection_criteria::SelectionCriteria,
1416
test::{EventClient, TestClient, CLIENT_OPTIONS, LOCK},
1517
Collection,
@@ -449,22 +451,19 @@ async fn find_and_getmore_share_session() {
449451
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;
450452

451453
let client = EventClient::new().await;
452-
if !client.is_replica_set() {
454+
if client.is_standalone() {
455+
println!("skipping find_and_getmore_share_session due to unsupported topology: Standalone");
453456
return;
454457
}
455458

456459
let coll = client
457460
.init_db_and_coll(function_name!(), function_name!())
458461
.await;
459462

460-
let options = InsertOneOptions::builder()
463+
let options = InsertManyOptions::builder()
461464
.write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build())
462465
.build();
463-
for _ in 0..3 {
464-
coll.insert_one(doc! {}, options.clone())
465-
.await
466-
.expect("insert should succeed");
467-
}
466+
coll.insert_many(vec![doc! {}; 3], options).await.unwrap();
468467

469468
let read_preferences: Vec<ReadPreference> = vec![
470469
ReadPreference::Primary,
@@ -489,7 +488,8 @@ async fn find_and_getmore_share_session() {
489488
) {
490489
let options = FindOptions::builder()
491490
.batch_size(2)
492-
.selection_criteria(SelectionCriteria::ReadPreference(read_preference))
491+
.selection_criteria(SelectionCriteria::ReadPreference(read_preference.clone()))
492+
.read_concern(ReadConcern::local())
493493
.build();
494494

495495
let mut cursor = coll
@@ -498,7 +498,21 @@ async fn find_and_getmore_share_session() {
498498
.expect("find should succeed");
499499

500500
for _ in 0..3 {
501-
assert!(matches!(cursor.next().await, Some(Ok(_))));
501+
cursor
502+
.next()
503+
.await
504+
.unwrap_or_else(|| {
505+
panic!(
506+
"should get result with read preference {:?}",
507+
read_preference
508+
)
509+
})
510+
.unwrap_or_else(|e| {
511+
panic!(
512+
"result should not be error with read preference {:?}, but got {:?}",
513+
read_preference, e
514+
)
515+
});
502516
}
503517

504518
let (find_started, _) = client.get_successful_command_execution("find");
@@ -516,6 +530,22 @@ async fn find_and_getmore_share_session() {
516530
assert_eq!(getmore_session_id, session_id);
517531
}
518532

533+
let topology_description = client.topology_description().await;
534+
for (addr, server) in topology_description.servers {
535+
if !server.server_type.is_data_bearing() {
536+
continue;
537+
}
538+
539+
let a = addr.clone();
540+
let rp = Arc::new(move |si: &ServerInfo| si.address() == &a);
541+
let options = CountOptions::builder()
542+
.selection_criteria(SelectionCriteria::Predicate(rp))
543+
.read_concern(ReadConcern::local())
544+
.build();
545+
546+
while coll.count_documents(None, options.clone()).await.unwrap() != 3 {}
547+
}
548+
519549
for read_pref in read_preferences {
520550
run_test(&client, &coll, read_pref).await;
521551
}

src/cmap/worker.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,11 @@ impl ConnectionPoolWorker {
300300
} => {
301301
self.clear(cause, service_id);
302302
}
303-
PoolManagementRequest::MarkAsReady { .. } => {
303+
PoolManagementRequest::MarkAsReady {
304+
_completion_handler,
305+
} => {
304306
self.mark_as_ready();
307+
_completion_handler.acknowledge(());
305308
}
306309
PoolManagementRequest::HandleConnectionSucceeded(conn) => {
307310
self.handle_connection_succeeded(conn);

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ pub(crate) use crate::{
354354
db::Database,
355355
};
356356

357-
pub use {coll::Namespace, index::IndexModel, client::session::ClusterTime};
357+
pub use {coll::Namespace, index::IndexModel, client::session::ClusterTime, sdam::public::*};
358358

359359
#[cfg(all(
360360
feature = "tokio-runtime",

src/sdam/description/server.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,41 @@ const DRIVER_MIN_DB_VERSION: &str = "3.6";
1414
const DRIVER_MIN_WIRE_VERSION: i32 = 6;
1515
const DRIVER_MAX_WIRE_VERSION: i32 = 14;
1616

17+
/// Enum representing the possible types of servers that the driver can connect to.
1718
#[derive(Debug, Deserialize, Clone, Copy, Eq, PartialEq)]
1819
#[non_exhaustive]
1920
pub enum ServerType {
21+
/// A single, non-replica set mongod.
2022
Standalone,
23+
24+
/// A router used in sharded deployments.
2125
Mongos,
26+
27+
/// The primary node in a replica set.
2228
#[serde(rename = "RSPrimary")]
2329
RsPrimary,
30+
31+
/// A secondary node in a replica set.
2432
#[serde(rename = "RSSecondary")]
2533
RsSecondary,
34+
35+
/// A non-data bearing node in a replica set which can participate in elections.
2636
#[serde(rename = "RSArbiter")]
2737
RsArbiter,
38+
39+
/// Hidden, starting up, or recovering nodes in a replica set.
2840
#[serde(rename = "RSOther")]
2941
RsOther,
42+
43+
/// A member of an uninitialized replica set or a member that has been removed from the replica
44+
/// set config.
3045
#[serde(rename = "RSGhost")]
3146
RsGhost,
47+
48+
/// A load-balancing proxy between the driver and the MongoDB deployment.
3249
LoadBalancer,
50+
51+
/// A server that the driver hasn't yet communicated with or can't connect to.
3352
#[serde(alias = "PossiblePrimary")]
3453
Unknown,
3554
}

src/sdam/description/topology/server_selection/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl TopologyDescription {
119119
SelectionCriteria::Predicate(ref filter) => self
120120
.servers
121121
.values()
122-
.filter(|s| filter(&ServerInfo::new_borrowed(s)))
122+
.filter(|s| s.server_type.is_data_bearing() && filter(&ServerInfo::new_borrowed(s)))
123123
.collect(),
124124
};
125125

src/sdam/description/topology/server_selection/test/in_window.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ async fn run_test(test_file: TestFile) {
6565

6666
let topology_description = test_file
6767
.topology_description
68-
.into_topology_description(None)
69-
.unwrap();
68+
.into_topology_description(None);
7069

7170
let read_pref = ReadPreference::Nearest {
7271
options: Default::default(),

src/sdam/description/topology/server_selection/test/logic.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,9 @@ async fn run_test(test_file: TestFile) {
6666
None => return,
6767
};
6868

69-
let topology = match test_file
69+
let topology = test_file
7070
.topology_description
71-
.into_topology_description(test_file.heartbeat_frequency_ms.map(Duration::from_millis))
72-
{
73-
Some(t) => t,
74-
None => return,
75-
};
71+
.into_topology_description(test_file.heartbeat_frequency_ms.map(Duration::from_millis));
7672

7773
if let Some(ref expected_suitable_servers) = test_file.suitable_servers {
7874
let mut actual_servers: Vec<_> = topology.suitable_servers(&read_pref).unwrap();

src/sdam/description/topology/server_selection/test/mod.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, time::Duration};
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use serde::Deserialize;
44

@@ -12,7 +12,7 @@ use crate::{
1212
ServerType,
1313
TopologyDescription,
1414
},
15-
selection_criteria::TagSet,
15+
selection_criteria::{SelectionCriteria, TagSet},
1616
};
1717

1818
mod in_window;
@@ -29,7 +29,7 @@ impl TestTopologyDescription {
2929
fn into_topology_description(
3030
self,
3131
heartbeat_frequency: Option<Duration>,
32-
) -> Option<TopologyDescription> {
32+
) -> TopologyDescription {
3333
let servers: HashMap<ServerAddress, ServerDescription> = self
3434
.servers
3535
.into_iter()
@@ -53,7 +53,6 @@ impl TestTopologyDescription {
5353
heartbeat_freq: heartbeat_frequency,
5454
servers,
5555
}
56-
.into()
5756
}
5857
}
5958

@@ -184,3 +183,66 @@ fn is_master_response_from_server_type(server_type: ServerType) -> Option<IsMast
184183

185184
Some(response)
186185
}
186+
187+
#[test]
188+
fn predicate_omits_unavailable() {
189+
let criteria = SelectionCriteria::Predicate(Arc::new(|si| {
190+
!matches!(si.server_type(), ServerType::RsPrimary)
191+
}));
192+
193+
let desc = TestTopologyDescription {
194+
topology_type: TopologyType::ReplicaSetWithPrimary,
195+
servers: vec![
196+
TestServerDescription {
197+
address: "localhost:27017".to_string(),
198+
avg_rtt_ms: Some(12.0),
199+
server_type: TestServerType::RsPrimary,
200+
tags: None,
201+
last_update_time: None,
202+
last_write: None,
203+
_max_wire_version: None,
204+
},
205+
TestServerDescription {
206+
address: "localhost:27018".to_string(),
207+
avg_rtt_ms: Some(12.0),
208+
server_type: TestServerType::Unknown,
209+
tags: None,
210+
last_update_time: None,
211+
last_write: None,
212+
_max_wire_version: None,
213+
},
214+
TestServerDescription {
215+
address: "localhost:27019".to_string(),
216+
avg_rtt_ms: Some(12.0),
217+
server_type: TestServerType::RsArbiter,
218+
tags: None,
219+
last_update_time: None,
220+
last_write: None,
221+
_max_wire_version: None,
222+
},
223+
TestServerDescription {
224+
address: "localhost:27020".to_string(),
225+
avg_rtt_ms: Some(12.0),
226+
server_type: TestServerType::RsGhost,
227+
tags: None,
228+
last_update_time: None,
229+
last_write: None,
230+
_max_wire_version: None,
231+
},
232+
TestServerDescription {
233+
address: "localhost:27021".to_string(),
234+
avg_rtt_ms: Some(12.0),
235+
server_type: TestServerType::RsOther,
236+
tags: None,
237+
last_update_time: None,
238+
last_write: None,
239+
_max_wire_version: None,
240+
},
241+
],
242+
}
243+
.into_topology_description(None);
244+
pretty_assertions::assert_eq!(
245+
desc.suitable_servers_in_latency_window(&criteria).unwrap(),
246+
Vec::<&ServerDescription>::new()
247+
);
248+
}

0 commit comments

Comments
 (0)