Skip to content

Commit f4a3806

Browse files
authored
RUST-828 Reduce flakiness of cluster_time_in_commands test (#740)
1 parent 45a1c3f commit f4a3806

File tree

3 files changed

+186
-35
lines changed

3 files changed

+186
-35
lines changed

src/client/session/test/mod.rs

Lines changed: 99 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,17 @@ use crate::{
1414
runtime,
1515
sdam::ServerInfo,
1616
selection_criteria::SelectionCriteria,
17-
test::{log_uncaptured, EventClient, TestClient, CLIENT_OPTIONS, LOCK},
17+
test::{
18+
log_uncaptured,
19+
Event,
20+
EventClient,
21+
EventHandler,
22+
SdamEvent,
23+
TestClient,
24+
CLIENT_OPTIONS,
25+
LOCK,
26+
},
27+
Client,
1828
Collection,
1929
};
2030

@@ -238,19 +248,22 @@ async fn pool_is_lifo() {
238248
async fn cluster_time_in_commands() {
239249
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;
240250

241-
let client = TestClient::new().await;
242-
if client.is_standalone() {
251+
let test_client = TestClient::new().await;
252+
if test_client.is_standalone() {
253+
log_uncaptured("skipping cluster_time_in_commands test due to standalone topology");
243254
return;
244255
}
245256

246-
async fn cluster_time_test<F, G, R>(command_name: &str, operation: F)
247-
where
248-
F: Fn(EventClient) -> G,
257+
async fn cluster_time_test<F, G, R>(
258+
command_name: &str,
259+
client: &Client,
260+
event_handler: &EventHandler,
261+
operation: F,
262+
) where
263+
F: Fn(Client) -> G,
249264
G: Future<Output = Result<R>>,
250265
{
251-
let mut options = CLIENT_OPTIONS.get().await.clone();
252-
options.heartbeat_freq = Some(Duration::from_secs(1000));
253-
let client = EventClient::with_options(options).await;
266+
let mut subscriber = event_handler.subscribe();
254267

255268
operation(client.clone())
256269
.await
@@ -260,16 +273,31 @@ async fn cluster_time_in_commands() {
260273
.await
261274
.expect("operation should succeed");
262275

263-
let (first_command_started, first_command_succeeded) =
264-
client.get_successful_command_execution(command_name);
276+
let (first_command_started, first_command_succeeded) = subscriber
277+
.wait_for_successful_command_execution(Duration::from_secs(5), command_name)
278+
.await
279+
.unwrap_or_else(|| {
280+
panic!(
281+
"did not see command started and succeeded events for {}",
282+
command_name
283+
)
284+
});
265285

266286
assert!(first_command_started.command.get("$clusterTime").is_some());
267287
let response_cluster_time = first_command_succeeded
268288
.reply
269289
.get("$clusterTime")
270290
.expect("should get cluster time from command response");
271291

272-
let (second_command_started, _) = client.get_successful_command_execution(command_name);
292+
let (second_command_started, _) = subscriber
293+
.wait_for_successful_command_execution(Duration::from_secs(5), command_name)
294+
.await
295+
.unwrap_or_else(|| {
296+
panic!(
297+
"did not see command started and succeeded events for {}",
298+
command_name
299+
)
300+
});
273301

274302
assert_eq!(
275303
response_cluster_time,
@@ -282,24 +310,73 @@ async fn cluster_time_in_commands() {
282310
);
283311
}
284312

285-
cluster_time_test("ping", |client| async move {
313+
let handler = Arc::new(EventHandler::new());
314+
let mut options = CLIENT_OPTIONS.get().await.clone();
315+
options.heartbeat_freq = Some(Duration::from_secs(1000));
316+
options.command_event_handler = Some(handler.clone());
317+
options.sdam_event_handler = Some(handler.clone());
318+
319+
// Ensure we only connect to one server so the monitor checks from other servers
320+
// don't affect the TopologyDescription's clusterTime value between commands.
321+
if options.load_balanced != Some(true) {
322+
options.direct_connection = Some(true);
323+
324+
// Since we need to run an insert below, ensure the single host is a primary
325+
// if we're connected to a replica set.
326+
if let Some(primary) = test_client.primary() {
327+
options.hosts = vec![primary];
328+
} else {
329+
options.hosts.drain(1..);
330+
}
331+
}
332+
333+
let mut subscriber = handler.subscribe();
334+
335+
let client = Client::with_options(options).unwrap();
336+
337+
// Wait for initial monitor check to complete and discover the server.
338+
subscriber
339+
.wait_for_event(Duration::from_secs(5), |event| match event {
340+
Event::Sdam(SdamEvent::ServerDescriptionChanged(e)) => {
341+
!e.previous_description.server_type().is_available()
342+
&& e.new_description.server_type().is_available()
343+
}
344+
_ => false,
345+
})
346+
.await
347+
.expect("server should be discovered");
348+
349+
// LoadBalanced topologies don't have monitors, so the client needs to get a clusterTime from
350+
// a command invocation.
351+
client
352+
.database("admin")
353+
.run_command(doc! { "ping": 1 }, None)
354+
.await
355+
.unwrap();
356+
357+
cluster_time_test("ping", &client, handler.as_ref(), |client| async move {
286358
client
287359
.database(function_name!())
288360
.run_command(doc! { "ping": 1 }, None)
289361
.await
290362
})
291363
.await;
292364

293-
cluster_time_test("aggregate", |client| async move {
294-
client
295-
.database(function_name!())
296-
.collection::<Document>(function_name!())
297-
.aggregate(vec![doc! { "$match": { "x": 1 } }], None)
298-
.await
299-
})
365+
cluster_time_test(
366+
"aggregate",
367+
&client,
368+
handler.as_ref(),
369+
|client| async move {
370+
client
371+
.database(function_name!())
372+
.collection::<Document>(function_name!())
373+
.aggregate(vec![doc! { "$match": { "x": 1 } }], None)
374+
.await
375+
},
376+
)
300377
.await;
301378

302-
cluster_time_test("find", |client| async move {
379+
cluster_time_test("find", &client, handler.as_ref(), |client| async move {
303380
client
304381
.database(function_name!())
305382
.collection::<Document>(function_name!())
@@ -308,7 +385,7 @@ async fn cluster_time_in_commands() {
308385
})
309386
.await;
310387

311-
cluster_time_test("insert", |client| async move {
388+
cluster_time_test("insert", &client, handler.as_ref(), |client| async move {
312389
client
313390
.database(function_name!())
314391
.collection::<Document>(function_name!())

src/test/util/event.rs

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -481,17 +481,49 @@ impl EventSubscriber<'_> {
481481
) -> Option<Event>
482482
where
483483
F: FnMut(&Event) -> bool,
484+
{
485+
self.filter_map_event(timeout, |e| if filter(&e) { Some(e) } else { None })
486+
.await
487+
}
488+
489+
pub(crate) async fn collect_events<F>(&mut self, timeout: Duration, mut filter: F) -> Vec<Event>
490+
where
491+
F: FnMut(&Event) -> bool,
492+
{
493+
let mut events = Vec::new();
494+
let _ = runtime::timeout(timeout, async {
495+
while let Some(event) = self.wait_for_event(timeout, &mut filter).await {
496+
events.push(event);
497+
}
498+
})
499+
.await;
500+
events
501+
}
502+
503+
/// Consume and pass events to the provided closure until it returns Some or the timeout is hit.
504+
pub(crate) async fn filter_map_event<F, T>(
505+
&mut self,
506+
timeout: Duration,
507+
mut filter_map: F,
508+
) -> Option<T>
509+
where
510+
F: FnMut(Event) -> Option<T>,
484511
{
485512
runtime::timeout(timeout, async {
486513
loop {
487514
match self.receiver.recv().await {
488-
Ok(event) if filter(&event) => return event.into(),
515+
Ok(event) => {
516+
if let Some(e) = filter_map(event) {
517+
return Some(e);
518+
} else {
519+
continue;
520+
}
521+
}
489522
// the channel hit capacity and missed some events.
490523
Err(RecvError::Lagged(amount_skipped)) => {
491524
panic!("receiver lagged and skipped {} events", amount_skipped)
492525
}
493526
Err(_) => return None,
494-
_ => continue,
495527
}
496528
}
497529
})
@@ -500,18 +532,52 @@ impl EventSubscriber<'_> {
500532
.flatten()
501533
}
502534

503-
pub(crate) async fn collect_events<F>(&mut self, timeout: Duration, mut filter: F) -> Vec<Event>
504-
where
505-
F: FnMut(&Event) -> bool,
506-
{
507-
let mut events = Vec::new();
508-
let _ = runtime::timeout(timeout, async {
509-
while let Some(event) = self.wait_for_event(timeout, &mut filter).await {
510-
events.push(event);
511-
}
535+
/// Waits for the next CommandStartedEvent/CommandFailedEvent pair.
536+
/// If the next CommandStartedEvent is associated with a CommandFailedEvent, this method will
537+
/// panic.
538+
pub(crate) async fn wait_for_successful_command_execution(
539+
&mut self,
540+
timeout: Duration,
541+
command_name: impl AsRef<str>,
542+
) -> Option<(CommandStartedEvent, CommandSucceededEvent)> {
543+
runtime::timeout(timeout, async {
544+
let started = self
545+
.filter_map_event(Duration::MAX, |event| match event {
546+
Event::Command(CommandEvent::Started(s))
547+
if s.command_name == command_name.as_ref() =>
548+
{
549+
Some(s)
550+
}
551+
_ => None,
552+
})
553+
.await
554+
.unwrap();
555+
556+
let succeeded = self
557+
.filter_map_event(Duration::MAX, |event| match event {
558+
Event::Command(CommandEvent::Succeeded(s))
559+
if s.request_id == started.request_id =>
560+
{
561+
Some(s)
562+
}
563+
Event::Command(CommandEvent::Failed(f))
564+
if f.request_id == started.request_id =>
565+
{
566+
panic!(
567+
"expected {} to succeed but it failed: {:#?}",
568+
command_name.as_ref(),
569+
f
570+
)
571+
}
572+
_ => None,
573+
})
574+
.await
575+
.unwrap();
576+
577+
(started, succeeded)
512578
})
513-
.await;
514-
events
579+
.await
580+
.ok()
515581
}
516582
}
517583

src/test/util/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration};
1414

1515
use crate::{
1616
bson::{doc, Bson},
17+
client::options::ServerAddress,
1718
hello::{hello_command, HelloCommandResponse},
1819
selection_criteria::SelectionCriteria,
1920
};
@@ -386,6 +387,13 @@ impl TestClient {
386387
.to_string()
387388
}
388389

390+
pub(crate) fn primary(&self) -> Option<ServerAddress> {
391+
self.server_info
392+
.primary
393+
.as_ref()
394+
.map(|s| ServerAddress::parse(s).unwrap())
395+
}
396+
389397
pub(crate) async fn options_for_multiple_mongoses(
390398
options: Option<ClientOptions>,
391399
use_multiple_mongoses: bool,

0 commit comments

Comments
 (0)