Skip to content

Commit 9c5eede

Browse files
authored
RUST-978 Unified test format additions in support of load balanced tests (#465)
1 parent d7569ec commit 9c5eede

File tree

83 files changed

+2807
-360
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+2807
-360
lines changed

src/client/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,4 +360,9 @@ impl Client {
360360
.map(|stream_address| format!("{}", stream_address))
361361
.collect()
362362
}
363+
364+
#[cfg(test)]
365+
pub(crate) async fn sync_workers(&self) {
366+
self.inner.topology.sync_workers().await;
367+
}
363368
}

src/cmap/manager.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use tokio::sync::mpsc;
2+
#[cfg(test)]
3+
use tokio::sync::oneshot;
24

35
use super::Connection;
46
use crate::{bson::oid::ObjectId, error::Error, runtime::AcknowledgedMessage};
@@ -72,6 +74,14 @@ impl PoolManager {
7274
.sender
7375
.send(PoolManagementRequest::HandleConnectionSucceeded(conn));
7476
}
77+
78+
/// Create a synchronization point for the pool's worker.
79+
#[cfg(test)]
80+
pub(super) fn sync_worker(&self) -> oneshot::Receiver<()> {
81+
let (tx, rx) = oneshot::channel();
82+
let _ = self.sender.send(PoolManagementRequest::Sync(tx));
83+
rx
84+
}
7585
}
7686

7787
#[derive(Debug)]
@@ -108,6 +118,10 @@ pub(super) enum PoolManagementRequest {
108118
/// Update the pool after a successful connection, optionally populating the pool
109119
/// with the successful connection.
110120
HandleConnectionSucceeded(ConnectionSucceeded),
121+
122+
/// Synchronize the worker queue state with an external caller, i.e. a test.
123+
#[cfg(test)]
124+
Sync(oneshot::Sender<()>),
111125
}
112126

113127
impl PoolManagementRequest {

src/cmap/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ mod worker;
1212
use std::sync::Arc;
1313

1414
use derivative::Derivative;
15+
#[cfg(test)]
16+
use tokio::sync::oneshot;
1517

1618
pub use self::conn::ConnectionInfo;
1719
pub(crate) use self::{
@@ -172,4 +174,9 @@ impl ConnectionPool {
172174
pub(crate) fn generation(&self) -> PoolGeneration {
173175
self.generation_subscriber.generation()
174176
}
177+
178+
#[cfg(test)]
179+
pub(crate) fn sync_worker(&self) -> oneshot::Receiver<()> {
180+
self.manager.sync_worker()
181+
}
175182
}

src/cmap/worker.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ impl ConnectionPoolWorker {
313313
PoolTask::HandleManagementRequest(
314314
PoolManagementRequest::HandleConnectionFailed,
315315
) => self.handle_connection_failed(),
316+
#[cfg(test)]
317+
PoolTask::HandleManagementRequest(PoolManagementRequest::Sync(tx)) => {
318+
let _ = tx.send(());
319+
}
316320
PoolTask::Maintenance => {
317321
self.perform_maintenance();
318322
}

src/cursor/common.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::{
88
use derivative::Derivative;
99
use futures_core::{Future, Stream};
1010
use serde::de::DeserializeOwned;
11+
#[cfg(test)]
12+
use tokio::sync::oneshot;
1113

1214
use crate::{
1315
bson::Document,
@@ -303,13 +305,18 @@ pub(super) fn kill_cursor(
303305
ns: &Namespace,
304306
cursor_id: i64,
305307
pinned_conn: PinnedConnection,
308+
#[cfg(test)] kill_watcher: Option<oneshot::Sender<()>>,
306309
) {
307310
let coll = client
308311
.database(ns.db.as_str())
309312
.collection::<Document>(ns.coll.as_str());
310313
RUNTIME.execute(async move {
311314
if !pinned_conn.is_invalid() {
312315
let _ = coll.kill_cursor(cursor_id, pinned_conn.handle()).await;
316+
#[cfg(test)]
317+
if let Some(tx) = kill_watcher {
318+
let _ = tx.send(());
319+
}
313320
}
314321
pinned_conn.unpin().await;
315322
});

src/cursor/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::{
88

99
use futures_core::{future::BoxFuture, Stream};
1010
use serde::de::DeserializeOwned;
11+
#[cfg(test)]
12+
use tokio::sync::oneshot;
1113

1214
use crate::{
1315
cmap::conn::PinnedConnectionHandle,
@@ -84,6 +86,8 @@ where
8486
{
8587
client: Client,
8688
wrapped_cursor: ImplicitSessionCursor<T>,
89+
#[cfg(test)]
90+
kill_watcher: Option<oneshot::Sender<()>>,
8791
_phantom: std::marker::PhantomData<T>,
8892
}
8993

@@ -107,9 +111,24 @@ where
107111
PinnedConnection::new(pin),
108112
provider,
109113
),
114+
#[cfg(test)]
115+
kill_watcher: None,
110116
_phantom: Default::default(),
111117
}
112118
}
119+
120+
/// Some tests need to be able to observe the events generated by `killCommand` execution;
121+
/// however, because that happens asynchronously on `drop`, the test runner can conclude before
122+
/// the event is published. To fix that, tests can set a "kill watcher" on cursors - a
123+
/// one-shot channel with a `()` value pushed after `killCommand` is run that the test can wait
124+
/// on.
125+
#[cfg(test)]
126+
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
127+
if self.kill_watcher.is_some() {
128+
panic!("cursor already has a kill_watcher");
129+
}
130+
self.kill_watcher = Some(tx);
131+
}
113132
}
114133

115134
impl<T> Stream for Cursor<T>
@@ -137,6 +156,8 @@ where
137156
self.wrapped_cursor.namespace(),
138157
self.wrapped_cursor.id(),
139158
self.wrapped_cursor.pinned_connection().replicate(),
159+
#[cfg(test)]
160+
self.kill_watcher.take(),
140161
);
141162
}
142163
}

src/cursor/session.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::{
77
use futures_core::{future::BoxFuture, Stream};
88
use futures_util::StreamExt;
99
use serde::de::DeserializeOwned;
10+
#[cfg(test)]
11+
use tokio::sync::oneshot;
1012

1113
use super::common::{
1214
kill_cursor,
@@ -64,6 +66,8 @@ where
6466
info: CursorInformation,
6567
buffer: VecDeque<T>,
6668
pinned_connection: PinnedConnection,
69+
#[cfg(test)]
70+
kill_watcher: Option<oneshot::Sender<()>>,
6771
}
6872

6973
impl<T> SessionCursor<T>
@@ -83,6 +87,8 @@ where
8387
info: spec.info,
8488
buffer: spec.initial_buffer,
8589
pinned_connection: PinnedConnection::new(pinned),
90+
#[cfg(test)]
91+
kill_watcher: None,
8692
}
8793
}
8894

@@ -177,6 +183,19 @@ where
177183
pub async fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
178184
self.stream(session).next().await
179185
}
186+
187+
/// Some tests need to be able to observe the events generated by `killCommand` execution;
188+
/// however, because that happens asynchronously on `drop`, the test runner can conclude before
189+
/// the event is published. To fix that, tests can set a "kill watcher" on cursors - a
190+
/// one-shot channel with a `()` value pushed after `killCommand` is run that the test can wait
191+
/// on.
192+
#[cfg(test)]
193+
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
194+
if self.kill_watcher.is_some() {
195+
panic!("cursor already has a kill_watcher");
196+
}
197+
self.kill_watcher = Some(tx);
198+
}
180199
}
181200

182201
impl<T> Drop for SessionCursor<T>
@@ -193,6 +212,8 @@ where
193212
&self.info.ns,
194213
self.info.id,
195214
self.pinned_connection.replicate(),
215+
#[cfg(test)]
216+
self.kill_watcher.take(),
196217
);
197218
}
198219
}

src/operation/count/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ impl Operation for Count {
4747
const NAME: &'static str = "count";
4848

4949
fn build(&mut self, description: &StreamDescription) -> Result<Command> {
50+
let mut name = Self::NAME.to_string();
5051
let mut body = match description.max_wire_version {
5152
Some(v) if v >= SERVER_4_9_0_WIRE_VERSION => {
53+
name = "aggregate".to_string();
5254
doc! {
5355
"aggregate": self.ns.coll.clone(),
5456
"pipeline": [
@@ -74,11 +76,7 @@ impl Operation for Count {
7476

7577
append_options(&mut body, self.options.as_ref())?;
7678

77-
Ok(Command::new(
78-
Self::NAME.to_string(),
79-
self.ns.db.clone(),
80-
body,
81-
))
79+
Ok(Command::new(name, self.ns.db.clone(), body))
8280
}
8381

8482
fn handle_response(

src/sdam/state/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use std::{
99
},
1010
};
1111

12+
#[cfg(test)]
13+
use futures_util::stream::{FuturesUnordered, StreamExt};
1214
use tokio::sync::{RwLock, RwLockWriteGuard};
1315

1416
use self::server::Server;
@@ -532,6 +534,11 @@ impl Topology {
532534
.map(|(addr, server)| (addr.clone(), Arc::downgrade(server)))
533535
.collect()
534536
}
537+
538+
#[cfg(test)]
539+
pub(crate) async fn sync_workers(&self) {
540+
self.state.read().await.sync_workers().await;
541+
}
535542
}
536543

537544
impl WeakTopology {
@@ -680,6 +687,16 @@ impl TopologyState {
680687

681688
self.servers.retain(|host, _| hosts.contains(host));
682689
}
690+
691+
#[cfg(test)]
692+
async fn sync_workers(&self) {
693+
let rxen: FuturesUnordered<_> = self
694+
.servers
695+
.values()
696+
.map(|v| v.pool.sync_worker())
697+
.collect();
698+
let _: Vec<_> = rxen.collect().await;
699+
}
683700
}
684701

685702
/// Enum describing a point in time during an operation's execution relative to when the MongoDB

src/test/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ lazy_static! {
6666
};
6767
pub(crate) static ref SERVERLESS: bool =
6868
matches!(std::env::var("SERVERLESS"), Ok(s) if s == "serverless");
69+
pub(crate) static ref LOAD_BALANCED_SINGLE_URI: Option<String> =
70+
std::env::var("MONGODB_LOAD_BALANCED_SINGLE_URI").ok();
71+
pub(crate) static ref LOAD_BALANCED_MULTIPLE_URI: Option<String> =
72+
std::env::var("MONGODB_LOAD_BALANCED_MULTIPLE_URI").ok();
6973
}
7074

7175
fn get_default_uri() -> String {

0 commit comments

Comments
 (0)