Skip to content

Commit ef2f20f

Browse files
authored
RUST-1882 Consolidate and de-flake test event handling (#1049)
1 parent 83b838e commit ef2f20f

36 files changed

+1177
-1162
lines changed

src/client/session/test.rs

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::{future::Future, sync::Arc, time::Duration};
55
use bson::Document;
66
use futures::stream::StreamExt;
77

8+
#[allow(deprecated)]
9+
use crate::test::EventClient;
810
use crate::{
911
bson::{doc, Bson},
1012
coll::options::CountOptions,
@@ -13,7 +15,13 @@ use crate::{
1315
options::{FindOptions, ReadConcern, ReadPreference, WriteConcern},
1416
sdam::ServerInfo,
1517
selection_criteria::SelectionCriteria,
16-
test::{get_client_options, log_uncaptured, Event, EventClient, EventHandler, TestClient},
18+
test::{
19+
get_client_options,
20+
log_uncaptured,
21+
util::event_buffer::EventBuffer,
22+
Event,
23+
TestClient,
24+
},
1725
Client,
1826
Collection,
1927
};
@@ -231,13 +239,14 @@ async fn cluster_time_in_commands() {
231239
async fn cluster_time_test<F, G, R>(
232240
command_name: &str,
233241
client: &Client,
234-
event_handler: &EventHandler,
242+
event_buffer: &EventBuffer,
235243
operation: F,
236244
) where
237245
F: Fn(Client) -> G,
238246
G: Future<Output = Result<R>>,
239247
{
240-
let mut subscriber = event_handler.subscribe();
248+
#[allow(deprecated)]
249+
let mut subscriber = event_buffer.subscribe();
241250

242251
operation(client.clone())
243252
.await
@@ -284,11 +293,11 @@ async fn cluster_time_in_commands() {
284293
);
285294
}
286295

287-
let handler = Arc::new(EventHandler::new());
296+
let buffer = EventBuffer::new();
288297
let mut options = get_client_options().await.clone();
289298
options.heartbeat_freq = Some(Duration::from_secs(1000));
290-
options.command_event_handler = Some(handler.clone().into());
291-
options.sdam_event_handler = Some(handler.clone().into());
299+
options.command_event_handler = Some(buffer.handler());
300+
options.sdam_event_handler = Some(buffer.handler());
292301

293302
// Ensure we only connect to one server so the monitor checks from other servers
294303
// don't affect the TopologyDescription's clusterTime value between commands.
@@ -304,7 +313,8 @@ async fn cluster_time_in_commands() {
304313
}
305314
}
306315

307-
let mut subscriber = handler.subscribe();
316+
#[allow(deprecated)]
317+
let mut subscriber = buffer.subscribe();
308318

309319
let client = Client::with_options(options).unwrap();
310320

@@ -328,29 +338,24 @@ async fn cluster_time_in_commands() {
328338
.await
329339
.unwrap();
330340

331-
cluster_time_test("ping", &client, handler.as_ref(), |client| async move {
341+
cluster_time_test("ping", &client, &buffer, |client| async move {
332342
client
333343
.database(function_name!())
334344
.run_command(doc! { "ping": 1 })
335345
.await
336346
})
337347
.await;
338348

339-
cluster_time_test(
340-
"aggregate",
341-
&client,
342-
handler.as_ref(),
343-
|client| async move {
344-
client
345-
.database(function_name!())
346-
.collection::<Document>(function_name!())
347-
.aggregate(vec![doc! { "$match": { "x": 1 } }])
348-
.await
349-
},
350-
)
349+
cluster_time_test("aggregate", &client, &buffer, |client| async move {
350+
client
351+
.database(function_name!())
352+
.collection::<Document>(function_name!())
353+
.aggregate(vec![doc! { "$match": { "x": 1 } }])
354+
.await
355+
})
351356
.await;
352357

353-
cluster_time_test("find", &client, handler.as_ref(), |client| async move {
358+
cluster_time_test("find", &client, &buffer, |client| async move {
354359
client
355360
.database(function_name!())
356361
.collection::<Document>(function_name!())
@@ -359,7 +364,7 @@ async fn cluster_time_in_commands() {
359364
})
360365
.await;
361366

362-
cluster_time_test("insert", &client, handler.as_ref(), |client| async move {
367+
cluster_time_test("insert", &client, &buffer, |client| async move {
363368
client
364369
.database(function_name!())
365370
.collection::<Document>(function_name!())
@@ -378,14 +383,17 @@ async fn session_usage() {
378383
return;
379384
}
380385

386+
#[allow(deprecated)]
381387
async fn session_usage_test<F, G>(command_name: &str, operation: F)
382388
where
383389
F: Fn(EventClient) -> G,
384390
G: Future<Output = ()>,
385391
{
386392
let client = EventClient::new().await;
387393
operation(client.clone()).await;
388-
let (command_started, _) = client.get_successful_command_execution(command_name);
394+
let mut events = client.events.clone();
395+
#[allow(deprecated)]
396+
let (command_started, _) = events.get_successful_command_execution(command_name);
389397
assert!(
390398
command_started.command.get("lsid").is_some(),
391399
"implicit session not passed to {}",
@@ -400,6 +408,7 @@ async fn session_usage() {
400408
#[tokio::test]
401409
#[function_name::named]
402410
async fn implicit_session_returned_after_immediate_exhaust() {
411+
#[allow(deprecated)]
403412
let client = EventClient::new().await;
404413
if client.is_standalone() {
405414
return;
@@ -419,7 +428,11 @@ async fn implicit_session_returned_after_immediate_exhaust() {
419428
let mut cursor = coll.find(doc! {}).await.expect("find should succeed");
420429
assert!(matches!(cursor.next().await, Some(Ok(_))));
421430

422-
let (find_started, _) = client.get_successful_command_execution("find");
431+
#[allow(deprecated)]
432+
let (find_started, _) = {
433+
let mut events = client.events.clone();
434+
events.get_successful_command_execution("find")
435+
};
423436
let session_id = find_started
424437
.command
425438
.get("lsid")
@@ -440,6 +453,7 @@ async fn implicit_session_returned_after_immediate_exhaust() {
440453
#[tokio::test]
441454
#[function_name::named]
442455
async fn implicit_session_returned_after_exhaust_by_get_more() {
456+
#[allow(deprecated)]
443457
let client = EventClient::new().await;
444458
if client.is_standalone() {
445459
return;
@@ -468,7 +482,12 @@ async fn implicit_session_returned_after_exhaust_by_get_more() {
468482
assert!(matches!(cursor.next().await, Some(Ok(_))));
469483
}
470484

471-
let (find_started, _) = client.get_successful_command_execution("find");
485+
#[allow(deprecated)]
486+
let (find_started, _) = {
487+
let mut events = client.events.clone();
488+
events.get_successful_command_execution("find")
489+
};
490+
472491
let session_id = find_started
473492
.command
474493
.get("lsid")
@@ -489,6 +508,7 @@ async fn implicit_session_returned_after_exhaust_by_get_more() {
489508
#[tokio::test]
490509
#[function_name::named]
491510
async fn find_and_getmore_share_session() {
511+
#[allow(deprecated)]
492512
let client = EventClient::new().await;
493513
if client.is_standalone() {
494514
log_uncaptured(
@@ -522,6 +542,7 @@ async fn find_and_getmore_share_session() {
522542
},
523543
];
524544

545+
#[allow(deprecated)]
525546
async fn run_test(
526547
client: &EventClient,
527548
coll: &Collection<Document>,
@@ -564,14 +585,17 @@ async fn find_and_getmore_share_session() {
564585
});
565586
}
566587

567-
let (find_started, _) = client.get_successful_command_execution("find");
588+
let mut events = client.events.clone();
589+
#[allow(deprecated)]
590+
let (find_started, _) = events.get_successful_command_execution("find");
568591
let session_id = find_started
569592
.command
570593
.get("lsid")
571594
.expect("find should use implicit session");
572595
assert!(session_id != &Bson::Null);
573596

574-
let (command_started, _) = client.get_successful_command_execution("getMore");
597+
#[allow(deprecated)]
598+
let (command_started, _) = events.get_successful_command_execution("getMore");
575599
let getmore_session_id = command_started
576600
.command
577601
.get("lsid")

0 commit comments

Comments
 (0)