Skip to content

Commit 8b8c29e

Browse files
authored
RUST-980 Run load balancer tests on evergreen, and update existing tests (#477)
1 parent dd5d0a2 commit 8b8c29e

File tree

144 files changed

+801
-420
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

144 files changed

+801
-420
lines changed

.evergreen/config.yml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,8 @@ functions:
365365
${PREPARE_SHELL}
366366
export MONGODB_URI="${MONGODB_URI}"
367367
export SSL="${SSL}"
368+
export SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}"
369+
export MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}"
368370
. .evergreen/generate-uri.sh
369371
370372
SINGLE_THREAD=${SINGLE_THREAD} \
@@ -603,6 +605,24 @@ functions:
603605
${PREPARE_SHELL}
604606
echo '{"results": [{ "status": "FAIL", "test_file": "Build", "log_raw": "No test-results.json found was created" } ]}' > ${PROJECT_DIRECTORY}/test-results.json
605607
608+
"start load balancer":
609+
- command: shell.exec
610+
params:
611+
script: |
612+
${PREPARE_SHELL}
613+
export MONGODB_URI="${MONGODB_URI}"
614+
${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start
615+
- command: expansions.update
616+
params:
617+
file: lb-expansion.yml
618+
619+
"stop load balancer":
620+
- command: shell.exec
621+
params:
622+
script: |
623+
${PREPARE_SHELL}
624+
${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop
625+
606626
pre:
607627
- func: "fetch source"
608628
- func: "prepare resources"
@@ -613,6 +633,7 @@ pre:
613633
- func: "install dependencies"
614634

615635
post:
636+
- func: "stop load balancer"
616637
- func: "stop mongo orchestration"
617638
- func: "upload-mo-artifacts"
618639
- func: "cleanup"
@@ -815,6 +836,17 @@ tasks:
815836
MONGODB_VERSION: "latest"
816837
TOPOLOGY: "sharded_cluster"
817838
- func: "run tests"
839+
840+
- name: "test-latest-load_balancer"
841+
tags: ["latest", "load_balancer"]
842+
commands:
843+
- func: "bootstrap mongo-orchestration"
844+
vars:
845+
MONGODB_VERSION: "latest"
846+
TOPOLOGY: "sharded_cluster"
847+
- func: "start load balancer"
848+
- func: "run tests"
849+
818850

819851
- name: "test-latest-aws-auth"
820852
# "latest" explicitly left off to keep this out of the generic matrix
@@ -1369,6 +1401,13 @@ buildvariants:
13691401
async-runtime: "*"
13701402
then:
13711403
remove_tasks: [".3.6", ".4.0"]
1404+
# haproxy isn't installed on windows / ubuntu-arm
1405+
- if:
1406+
os: ["ubuntu-18.04-arm64", "windows-64-vs2017"]
1407+
auth-and-tls: "*"
1408+
async-runtime: "*"
1409+
then:
1410+
remove_tasks: ".load_balancer"
13721411
- matrix_name: "x509-auth"
13731412
matrix_spec:
13741413
os:

.evergreen/generate-uri.sh

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,25 @@ DRIVERS_TOOLS_X509=`echo "$DRIVERS_TOOLS_X509" | sed 's/\//%2F/g'`
1010
CA_FILE="${DRIVERS_TOOLS_X509}%2Fca.pem"
1111
CERT_FILE="${DRIVERS_TOOLS_X509}%2Fclient.pem"
1212

13-
if [[ "$MONGODB_URI" == *"?"* ]]; then
14-
export MONGODB_URI="${MONGODB_URI}&"
15-
else
16-
export MONGODB_URI="${MONGODB_URI}/?"
17-
fi
18-
13+
update_uri() {
14+
local ORIG_URI=$1
15+
if [[ "$ORIG_URI" == "" ]]; then
16+
return
17+
fi
18+
# The rustls library requires a domain name.
19+
ORIG_URI=$(echo "$ORIG_URI" | sed s/127.0.0.1/localhost/)
20+
if [[ "$ORIG_URI" == *"?"* ]]; then
21+
ORIG_URI="${ORIG_URI}&"
22+
else
23+
ORIG_URI="${ORIG_URI}/?"
24+
fi
25+
echo "${ORIG_URI}tls=true&tlsCAFile=${CA_FILE}&tlsCertificateKeyFile=${CERT_FILE}&tlsAllowInvalidCertificates=true"
26+
}
1927

20-
export MONGODB_URI="${MONGODB_URI}tls=true&tlsCAFile=${CA_FILE}&tlsCertificateKeyFile=${CERT_FILE}&tlsAllowInvalidCertificates=true"
28+
export MONGODB_URI="$(update_uri ${MONGODB_URI})"
29+
export SINGLE_MONGOS_LB_URI="$(update_uri ${SINGLE_MONGOS_LB_URI})"
30+
export MULTI_MONGOS_LB_URI="$(update_uri ${MULTI_MONGOS_LB_URI})"
2131

2232
echo "MONGODB_URI: ${MONGODB_URI}"
33+
echo "SINGLE_MONGOS_LB_URI: ${SINGLE_MONGOS_LB_URI}"
34+
echo "MULTI_MONGOS_LB_URI: ${MULTI_MONGOS_LB_URI}"

src/client/options/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ pub(crate) struct TestOptions {
569569

570570
/// Mock response for `SrvPollingMonitor::lookup_hosts`.
571571
pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,
572+
573+
/// Mock the `serviceId` response for a load-balanced hello.
574+
pub(crate) mock_service_id: bool,
572575
}
573576

574577
fn default_hosts() -> Vec<ServerAddress> {

src/cmap/conn/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ impl Connection {
313313
let (tx, rx) = mpsc::channel(1);
314314
self.pinned_sender = Some(tx);
315315
Ok(PinnedConnectionHandle {
316+
id: self.id,
316317
receiver: Arc::new(Mutex::new(rx)),
317318
})
318319
}
@@ -369,7 +370,8 @@ impl Drop for Connection {
369370
if let Some(pool_manager) = self.pool_manager.take() {
370371
let mut dropped_connection = self.take();
371372
let result = if let Some(sender) = self.pinned_sender.as_mut() {
372-
// Preserve the timestamp for pinned connections.
373+
// Preserve the pool manager and timestamp for pinned connections.
374+
dropped_connection.pool_manager = Some(pool_manager.clone());
373375
dropped_connection.ready_and_available_time = self.ready_and_available_time;
374376
match sender.try_send(dropped_connection) {
375377
Ok(()) => Ok(()),
@@ -385,7 +387,11 @@ impl Drop for Connection {
385387
Err(mpsc::error::TrySendError::Full(mut conn)) => {
386388
// Panic in debug mode
387389
if cfg!(debug_assertions) {
388-
panic!("buffer full when attempting to return a pinned connection")
390+
panic!(
391+
"buffer full when attempting to return a pinned connection (id = \
392+
{})",
393+
conn.id
394+
);
389395
}
390396
// TODO RUST-230 log an error in non-debug mode.
391397
conn.pinned_sender = None;
@@ -409,6 +415,7 @@ impl Drop for Connection {
409415
/// normal pool via this handle.
410416
#[derive(Debug)]
411417
pub(crate) struct PinnedConnectionHandle {
418+
id: u32,
412419
receiver: Arc<Mutex<mpsc::Receiver<Connection>>>,
413420
}
414421

@@ -418,6 +425,7 @@ impl PinnedConnectionHandle {
418425
/// normal borrow.
419426
pub(crate) fn replicate(&self) -> Self {
420427
Self {
428+
id: self.id,
421429
receiver: self.receiver.clone(),
422430
}
423431
}
@@ -426,10 +434,12 @@ impl PinnedConnectionHandle {
426434
/// connection has been unpinned.
427435
pub(crate) async fn take_connection(&self) -> Result<Connection> {
428436
let mut receiver = self.receiver.lock().await;
429-
receiver
430-
.recv()
431-
.await
432-
.ok_or_else(|| Error::internal("cannot take connection after unpin"))
437+
receiver.recv().await.ok_or_else(|| {
438+
Error::internal(format!(
439+
"cannot take connection after unpin (id={})",
440+
self.id
441+
))
442+
})
433443
}
434444

435445
/// Return the pinned connection to the normal connection pool.

src/cmap/establish/handshake/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ pub(crate) struct Handshaker {
143143
/// given the same pool options, so it can be created at the time the Handshaker is created.
144144
command: Command,
145145
credential: Option<Credential>,
146+
#[cfg(test)]
147+
mock_service_id: bool,
146148
}
147149

148150
impl Handshaker {
@@ -154,6 +156,9 @@ impl Handshaker {
154156
let mut command =
155157
is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref()));
156158

159+
#[cfg(test)]
160+
let mut mock_service_id = false;
161+
157162
if let Some(options) = options {
158163
if let Some(app_name) = options.app_name {
159164
metadata.application = Some(AppMetadata { name: app_name });
@@ -185,13 +190,20 @@ impl Handshaker {
185190
if options.load_balanced {
186191
command.body.insert("loadBalanced", true);
187192
}
193+
194+
#[cfg(test)]
195+
{
196+
mock_service_id = options.mock_service_id;
197+
}
188198
}
189199

190200
command.body.insert("client", metadata);
191201

192202
Self {
193203
command,
194204
credential,
205+
#[cfg(test)]
206+
mock_service_id,
195207
}
196208
}
197209

@@ -207,6 +219,24 @@ impl Handshaker {
207219
let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;
208220

209221
let mut is_master_reply = run_is_master(conn, command, topology, handler).await?;
222+
// TODO PM-2369 Remove serviceId mocking when it's returned by the server.
223+
#[cfg(test)]
224+
{
225+
if self.command.body.contains_key("loadBalanced")
226+
&& is_master_reply.command_response.service_id.is_none()
227+
&& self.mock_service_id
228+
{
229+
is_master_reply.command_response.service_id = Some(
230+
is_master_reply
231+
.command_response
232+
.topology_version
233+
.as_ref()
234+
.unwrap()
235+
.get_object_id("processId")
236+
.unwrap(),
237+
);
238+
}
239+
}
210240
if self.command.body.contains_key("loadBalanced")
211241
&& is_master_reply.command_response.service_id.is_none()
212242
{
@@ -256,6 +286,8 @@ pub(crate) struct HandshakerOptions {
256286
driver_info: Option<DriverInfo>,
257287
server_api: Option<ServerApi>,
258288
load_balanced: bool,
289+
#[cfg(test)]
290+
mock_service_id: bool,
259291
}
260292

261293
impl From<ConnectionPoolOptions> for HandshakerOptions {
@@ -266,6 +298,8 @@ impl From<ConnectionPoolOptions> for HandshakerOptions {
266298
driver_info: options.driver_info,
267299
server_api: options.server_api,
268300
load_balanced: options.load_balanced.unwrap_or(false),
301+
#[cfg(test)]
302+
mock_service_id: options.mock_service_id,
269303
}
270304
}
271305
}
@@ -278,6 +312,8 @@ impl From<ClientOptions> for HandshakerOptions {
278312
driver_info: options.driver_info,
279313
server_api: options.server_api,
280314
load_balanced: options.load_balanced.unwrap_or(false),
315+
#[cfg(test)]
316+
mock_service_id: options.test_options.map_or(false, |to| to.mock_service_id),
281317
}
282318
}
283319
}

src/cmap/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl ConnectionPool {
9999
let (manager, _) = manager::channel();
100100
let handle = PoolWorkerHandle::new_mocked();
101101
let (connection_requester, _) = connection_requester::channel(Default::default(), handle);
102-
let (_, generation_subscriber) = status::channel();
102+
let (_, generation_subscriber) = status::channel(PoolGeneration::normal());
103103

104104
Self {
105105
address,

src/cmap/options.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ pub(crate) struct ConnectionPoolOptions {
8989

9090
/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
9191
pub(crate) load_balanced: Option<bool>,
92+
93+
/// Whether or not to mock the `serviceId` field of a hello through a load balancer.
94+
#[cfg(test)]
95+
#[serde(skip)]
96+
pub(crate) mock_service_id: bool,
9297
}
9398

9499
impl ConnectionPoolOptions {
@@ -109,6 +114,11 @@ impl ConnectionPoolOptions {
109114
#[cfg(test)]
110115
ready: None,
111116
load_balanced: options.load_balanced,
117+
#[cfg(test)]
118+
mock_service_id: options
119+
.test_options
120+
.as_ref()
121+
.map_or(false, |to| to.mock_service_id),
112122
}
113123
}
114124

src/cmap/status.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,9 @@ struct PoolStatus {
77
generation: PoolGeneration,
88
}
99

10-
impl Default for PoolStatus {
11-
fn default() -> Self {
12-
PoolStatus {
13-
generation: PoolGeneration::normal(),
14-
}
15-
}
16-
}
17-
1810
/// Create a channel for publishing and receiving updates to the pool's generation.
19-
pub(super) fn channel() -> (PoolGenerationPublisher, PoolGenerationSubscriber) {
20-
let (sender, receiver) = tokio::sync::watch::channel(Default::default());
11+
pub(super) fn channel(init: PoolGeneration) -> (PoolGenerationPublisher, PoolGenerationSubscriber) {
12+
let (sender, receiver) = tokio::sync::watch::channel(PoolStatus { generation: init });
2113
(
2214
PoolGenerationPublisher { sender },
2315
PoolGenerationSubscriber { receiver },

src/cmap/test/integration.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ async fn concurrent_connections() {
7878
let _guard = LOCK.run_exclusively().await;
7979

8080
let mut options = CLIENT_OPTIONS.clone();
81+
if options.load_balanced.unwrap_or(false) {
82+
println!("skipping concurrent_connections test due to load-balanced topology");
83+
return;
84+
}
8185
options.direct_connection = Some(true);
8286
options.hosts.drain(1..);
8387

@@ -163,6 +167,12 @@ async fn connection_error_during_establishment() {
163167
let _guard: RwLockWriteGuard<_> = LOCK.run_exclusively().await;
164168

165169
let mut client_options = CLIENT_OPTIONS.clone();
170+
if client_options.load_balanced.unwrap_or(false) {
171+
println!(
172+
"skipping connection_error_during_establishment test due to load-balanced topology"
173+
);
174+
return;
175+
}
166176
client_options.heartbeat_freq = Duration::from_secs(300).into(); // high so that monitors dont trip failpoint
167177
client_options.hosts.drain(1..);
168178
client_options.direct_connection = Some(true);

src/cmap/test/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,9 @@ async fn cmap_spec_tests() {
425425
let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await;
426426

427427
let mut options = CLIENT_OPTIONS.clone();
428+
if options.load_balanced.unwrap_or(false) {
429+
return;
430+
}
428431
options.hosts.drain(1..);
429432
options.direct_connection = Some(true);
430433
let client = EventClient::with_options(options).await;

0 commit comments

Comments
 (0)