diff --git a/src/client/session/test.rs b/src/client/session/test.rs index df370d1fd..9f290a584 100644 --- a/src/client/session/test.rs +++ b/src/client/session/test.rs @@ -15,7 +15,9 @@ use crate::{ selection_criteria::SelectionCriteria, test::{ get_client_options, + get_primary, log_uncaptured, + topology_is_standalone, util::event_buffer::EventBuffer, Event, EventClient, @@ -195,14 +197,14 @@ macro_rules! for_each_op { /// This test also satisifies the `endSession` testing requirement of prose test 5. #[tokio::test] async fn pool_is_lifo() { + if topology_is_standalone().await { + return; + } + let client = Client::for_test().await; // Wait for the implicit sessions created in TestClient::new to be returned to the pool. tokio::time::sleep(Duration::from_millis(500)).await; - if client.is_standalone() { - return; - } - let a = client.start_session().await.unwrap(); let b = client.start_session().await.unwrap(); @@ -228,8 +230,7 @@ async fn pool_is_lifo() { #[tokio::test] #[function_name::named] async fn cluster_time_in_commands() { - let test_client = Client::for_test().await; - if test_client.is_standalone() { + if topology_is_standalone().await { log_uncaptured("skipping cluster_time_in_commands test due to standalone topology"); return; } @@ -303,7 +304,7 @@ async fn cluster_time_in_commands() { // Since we need to run an insert below, ensure the single host is a primary // if we're connected to a replica set. - if let Some(primary) = test_client.primary() { + if let Some(primary) = get_primary().await { options.hosts = vec![primary]; } else { options.hosts.drain(1..); @@ -374,8 +375,7 @@ async fn cluster_time_in_commands() { #[tokio::test] #[function_name::named] async fn session_usage() { - let client = Client::for_test().await; - if client.is_standalone() { + if topology_is_standalone().await { return; } @@ -401,11 +401,12 @@ async fn session_usage() { #[tokio::test] #[function_name::named] async fn implicit_session_returned_after_immediate_exhaust() { - let client = Client::for_test().monitor_events().await; - if client.is_standalone() { + if topology_is_standalone().await { return; } + let client = Client::for_test().monitor_events().await; + let coll = client .init_db_and_coll(function_name!(), function_name!()) .await; @@ -441,11 +442,11 @@ async fn implicit_session_returned_after_immediate_exhaust() { #[tokio::test] #[function_name::named] async fn implicit_session_returned_after_exhaust_by_get_more() { - let client = Client::for_test().monitor_events().await; - if client.is_standalone() { + if topology_is_standalone().await { return; } + let client = Client::for_test().monitor_events().await; let coll = client .init_db_and_coll(function_name!(), function_name!()) .await; @@ -491,14 +492,15 @@ async fn implicit_session_returned_after_exhaust_by_get_more() { #[tokio::test] #[function_name::named] async fn find_and_getmore_share_session() { - let client = Client::for_test().monitor_events().await; - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping find_and_getmore_share_session due to unsupported topology: Standalone", ); return; } + let client = Client::for_test().monitor_events().await; + let coll = client .init_db_and_coll(function_name!(), function_name!()) .await; diff --git a/src/client/session/test/causal_consistency.rs b/src/client/session/test/causal_consistency.rs index 5f54b4d6d..51145d639 100644 --- a/src/client/session/test/causal_consistency.rs +++ b/src/client/session/test/causal_consistency.rs @@ -6,7 +6,7 @@ use crate::{ error::Result, event::command::CommandEvent, options::ReadConcern, - test::log_uncaptured, + test::{log_uncaptured, topology_is_standalone}, Client, ClientSession, Collection, @@ -118,31 +118,29 @@ fn all_session_ops() -> impl Iterator { /// Test 1 from the causal consistency specification. #[tokio::test] async fn new_session_operation_time_null() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping new_session_operation_time_null due to unsupported topology: standalone", ); return; } + let client = Client::for_test().monitor_events().await; let session = client.start_session().await.unwrap(); assert!(session.operation_time().is_none()); } /// Test 2 from the causal consistency specification. #[tokio::test] -async fn first_read_no_after_cluser_time() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { +async fn first_read_no_after_cluster_time() { + if topology_is_standalone().await { log_uncaptured( "skipping first_read_no_after_cluser_time due to unsupported topology: standalone", ); return; } + let client = Client::for_test().monitor_events().await; for op in all_session_ops().filter(|o| o.is_read) { client.events.clone().clear_cached_events(); @@ -172,13 +170,12 @@ async fn first_read_no_after_cluser_time() { /// Test 3 from the causal consistency specification. #[tokio::test] async fn first_op_update_op_time() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured("skipping first_op_update_op_time due to unsupported topology: standalone"); return; } + let client = Client::for_test().monitor_events().await; for op in all_session_ops() { client.events.clone().clear_cached_events(); @@ -221,15 +218,15 @@ async fn first_op_update_op_time() { /// Test 4 from the causal consistency specification. #[tokio::test] async fn read_includes_after_cluster_time() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping read_includes_after_cluster_time due to unsupported topology: standalone", ); return; } + let client = Client::for_test().monitor_events().await; + let coll = client .create_fresh_collection("causal_consistency_4", "causal_consistency_4", None) .await; @@ -262,9 +259,7 @@ async fn read_includes_after_cluster_time() { /// Test 5 from the causal consistency specification. #[tokio::test] async fn find_after_write_includes_after_cluster_time() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping find_after_write_includes_after_cluster_time due to unsupported topology: \ standalone", @@ -272,6 +267,7 @@ async fn find_after_write_includes_after_cluster_time() { return; } + let client = Client::for_test().monitor_events().await; let coll = client .create_fresh_collection("causal_consistency_5", "causal_consistency_5", None) .await; @@ -306,9 +302,7 @@ async fn find_after_write_includes_after_cluster_time() { /// Test 6 from the causal consistency specification. #[tokio::test] async fn not_causally_consistent_omits_after_cluster_time() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping not_causally_consistent_omits_after_cluster_time due to unsupported \ topology: standalone", @@ -316,6 +310,7 @@ async fn not_causally_consistent_omits_after_cluster_time() { return; } + let client = Client::for_test().monitor_events().await; let coll = client .create_fresh_collection("causal_consistency_6", "causal_consistency_6", None) .await; @@ -345,13 +340,12 @@ async fn not_causally_consistent_omits_after_cluster_time() { /// Test 7 from the causal consistency specification. #[tokio::test] async fn omit_after_cluster_time_standalone() { - let client = Client::for_test().monitor_events().await; - - if !client.is_standalone() { + if !topology_is_standalone().await { log_uncaptured("skipping omit_after_cluster_time_standalone due to unsupported topology"); return; } + let client = Client::for_test().monitor_events().await; let coll = client .create_fresh_collection("causal_consistency_7", "causal_consistency_7", None) .await; @@ -381,15 +375,14 @@ async fn omit_after_cluster_time_standalone() { /// Test 8 from the causal consistency specification. #[tokio::test] async fn omit_default_read_concern_level() { - let client = Client::for_test().monitor_events().await; - - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping omit_default_read_concern_level due to unsupported topology: standalone", ); return; } + let client = Client::for_test().monitor_events().await; let coll = client .create_fresh_collection("causal_consistency_8", "causal_consistency_8", None) .await; @@ -421,8 +414,7 @@ async fn omit_default_read_concern_level() { /// Test 9 from the causal consistency specification. #[tokio::test] async fn test_causal_consistency_read_concern_merge() { - let client = Client::for_test().monitor_events().await; - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured( "skipping test_causal_consistency_read_concern_merge due to unsupported topology: \ standalone", @@ -430,6 +422,7 @@ async fn test_causal_consistency_read_concern_merge() { return; } + let client = Client::for_test().monitor_events().await; let mut session = client .start_session() .causal_consistency(true) @@ -470,12 +463,12 @@ async fn test_causal_consistency_read_concern_merge() { /// Test 11 from the causal consistency specification. #[tokio::test] async fn omit_cluster_time_standalone() { - let client = Client::for_test().monitor_events().await; - if !client.is_standalone() { + if !topology_is_standalone().await { log_uncaptured("skipping omit_cluster_time_standalone due to unsupported topology"); return; } + let client = Client::for_test().monitor_events().await; let coll = client .database("causal_consistency_11") .collection::("causal_consistency_11"); @@ -489,15 +482,15 @@ async fn omit_cluster_time_standalone() { /// Test 12 from the causal consistency specification. #[tokio::test] async fn cluster_time_sent_in_commands() { - let client = Client::for_test().monitor_events().await; - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured("skipping cluster_time_sent_in_commands due to unsupported topology"); return; } + let client = Client::for_test().monitor_events().await; let coll = client - .database("causal_consistency_12") - .collection::("causal_consistency_12"); + .create_fresh_collection("causal_consistency_12", "causal_consistency_12", None) + .await; coll.find_one(doc! {}).await.unwrap(); diff --git a/src/cmap/test.rs b/src/cmap/test.rs index fca3495a7..8f86698df 100644 --- a/src/cmap/test.rs +++ b/src/cmap/test.rs @@ -447,6 +447,19 @@ async fn cmap_spec_tests() { return; } + if let Some(ref run_on) = test_file.run_on { + let mut can_run_on = false; + for requirement in run_on { + if requirement.can_run_on().await { + can_run_on = true; + } + } + if !can_run_on { + log_uncaptured("skipping due to runOn requirements"); + return; + } + } + let mut options = get_client_options().await.clone(); if options.load_balanced.unwrap_or(false) { log_uncaptured(format!( @@ -458,13 +471,6 @@ async fn cmap_spec_tests() { options.hosts.drain(1..); options.direct_connection = Some(true); let client = crate::Client::for_test().options(options).await; - if let Some(ref run_on) = test_file.run_on { - let can_run_on = run_on.iter().any(|run_on| run_on.can_run_on(&client)); - if !can_run_on { - log_uncaptured("skipping due to runOn requirements"); - return; - } - } let _guard = if let Some(fail_point) = test_file.fail_point.take() { Some(client.enable_fail_point(fail_point).await.unwrap()) diff --git a/src/cmap/test/integration.rs b/src/cmap/test/integration.rs index 30f08e73a..1d0178bbe 100644 --- a/src/cmap/test/integration.rs +++ b/src/cmap/test/integration.rs @@ -17,8 +17,11 @@ use crate::{ sdam::TopologyUpdater, selection_criteria::ReadPreference, test::{ + block_connection_supported, + fail_command_supported, get_client_options, log_uncaptured, + topology_is_load_balanced, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -26,7 +29,6 @@ use crate::{ }, Client, }; -use semver::VersionReq; use std::time::Duration; #[derive(Debug, Deserialize)] @@ -84,6 +86,13 @@ async fn acquire_connection_and_send_command() { #[tokio::test] async fn concurrent_connections() { + if !block_connection_supported().await { + log_uncaptured( + "skipping concurrent_connections test due to server not supporting block connection", + ); + return; + } + let mut options = get_client_options().await.clone(); if options.load_balanced.unwrap_or(false) { log_uncaptured("skipping concurrent_connections test due to load-balanced topology"); @@ -93,14 +102,6 @@ async fn concurrent_connections() { options.hosts.drain(1..); let client = Client::for_test().options(options).await; - let version = VersionReq::parse(">= 4.2.9").unwrap(); - // blockConnection failpoint option only supported in 4.2.9+. - if !version.matches(&client.server_version) { - log_uncaptured( - "skipping concurrent_connections test due to server not supporting failpoint option", - ); - return; - } // stall creating connections for a while let failpoint = doc! { @@ -169,20 +170,13 @@ async fn concurrent_connections() { #[function_name::named] async fn connection_error_during_establishment() { - let mut client_options = get_client_options().await.clone(); - if client_options.load_balanced.unwrap_or(false) { + if topology_is_load_balanced().await { log_uncaptured( "skipping connection_error_during_establishment test due to load-balanced topology", ); return; } - client_options.heartbeat_freq = Duration::from_secs(300).into(); // high so that monitors dont trip failpoint - client_options.hosts.drain(1..); - client_options.direct_connection = Some(true); - client_options.repl_set_name = None; - - let client = Client::for_test().options(client_options.clone()).await; - if !client.supports_fail_command() { + if !fail_command_supported().await { log_uncaptured(format!( "skipping {} due to failCommand not being supported", function_name!() @@ -190,6 +184,14 @@ async fn connection_error_during_establishment() { return; } + let mut client_options = get_client_options().await.clone(); + client_options.heartbeat_freq = Duration::from_secs(300).into(); // high so that monitors dont trip failpoint + client_options.hosts.drain(1..); + client_options.direct_connection = Some(true); + client_options.repl_set_name = None; + + let client = Client::for_test().options(client_options.clone()).await; + let fail_point = FailPoint::fail_command( &[LEGACY_HELLO_COMMAND_NAME, "hello"], FailPointMode::Times(10), @@ -230,6 +232,14 @@ async fn connection_error_during_establishment() { #[function_name::named] async fn connection_error_during_operation() { + if !fail_command_supported().await { + log_uncaptured(format!( + "skipping {} due to failCommand not being supported", + function_name!() + )); + return; + } + let mut options = get_client_options().await.clone(); let buffer = EventBuffer::::new(); options.cmap_event_handler = Some(buffer.handler()); @@ -237,13 +247,6 @@ async fn connection_error_during_operation() { options.max_pool_size = Some(1); let client = Client::for_test().options(options).await; - if !client.supports_fail_command() { - log_uncaptured(format!( - "skipping {} due to failCommand not being supported", - function_name!() - )); - return; - } let fail_point = FailPoint::fail_command(&["ping"], FailPointMode::Times(10)).close_connection(true); diff --git a/src/concern/test.rs b/src/concern/test.rs index 39be593a7..ca45321f0 100644 --- a/src/concern/test.rs +++ b/src/concern/test.rs @@ -4,7 +4,7 @@ use crate::{ bson::{doc, Bson, Document}, error::ErrorKind, options::{Acknowledgment, ReadConcern, WriteConcern}, - test::EventClient, + test::{server_version_lt, transactions_supported, EventClient}, Client, Collection, }; @@ -131,17 +131,18 @@ async fn unacknowledged_write_concern_rejected() { #[tokio::test] #[function_name::named] async fn snapshot_read_concern() { - let client = Client::for_test().monitor_events().await; // snapshot read concern was introduced in 4.0 - if client.server_version_lt(4, 0) { + if server_version_lt(4, 0).await { return; } + let client = Client::for_test().monitor_events().await; + let coll = client .database(function_name!()) .collection::(function_name!()); - if client.supports_transactions() { + if transactions_supported().await { let mut session = client.start_session().await.unwrap(); session .start_transaction() @@ -153,7 +154,7 @@ async fn snapshot_read_concern() { assert_event_contains_read_concern(&client).await; } - if client.server_version_lt(4, 9) { + if server_version_lt(4, 9).await { let error = coll .find_one(doc! {}) .read_concern(ReadConcern::snapshot()) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 5e2450ac9..fe5cbd26d 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use approx::abs_diff_eq; use bson::{doc, Document}; -use semver::VersionReq; use serde::Deserialize; use crate::{ @@ -14,9 +13,12 @@ use crate::{ sdam::{description::topology::server_selection, Server}, selection_criteria::{ReadPreference, SelectionCriteria}, test::{ + auth_enabled, + block_connection_supported, get_client_options, log_uncaptured, run_spec_test, + topology_is_sharded, util::fail_point::{FailPoint, FailPointMode}, Event, EventClient, @@ -112,40 +114,30 @@ async fn select_in_window() { #[tokio::test(flavor = "multi_thread")] async fn load_balancing_test() { - let mut setup_client_options = get_client_options().await.clone(); - - if setup_client_options.load_balanced.unwrap_or(false) { - log_uncaptured("skipping load_balancing_test test due to load-balanced topology"); + if !topology_is_sharded().await { + log_uncaptured("skipping load_balancing_test test due to topology not being sharded"); return; } - - if setup_client_options.credential.is_some() { + if get_client_options().await.hosts.len() != 2 { + log_uncaptured("skipping load_balancing_test test due to topology not having 2 mongoses"); + return; + } + if auth_enabled().await { log_uncaptured("skipping load_balancing_test test due to auth being enabled"); return; } - - setup_client_options.hosts.drain(1..); - setup_client_options.direct_connection = Some(true); - let setup_client = Client::for_test().options(setup_client_options).await; - - let version = VersionReq::parse(">= 4.2.9").unwrap(); - // blockConnection failpoint option only supported in 4.2.9+. - if !version.matches(&setup_client.server_version) { + if !block_connection_supported().await { log_uncaptured( "skipping load_balancing_test test due to server not supporting blockConnection option", ); return; } - if !setup_client.is_sharded() { - log_uncaptured("skipping load_balancing_test test due to topology not being sharded"); - return; - } + let mut setup_client_options = get_client_options().await.clone(); - if get_client_options().await.hosts.len() != 2 { - log_uncaptured("skipping load_balancing_test test due to topology not having 2 mongoses"); - return; - } + setup_client_options.hosts.drain(1..); + setup_client_options.direct_connection = Some(true); + let setup_client = Client::for_test().options(setup_client_options).await; // clear the collection so subsequent test runs don't increase linearly in time setup_client diff --git a/src/sdam/description/topology/test/sdam.rs b/src/sdam/description/topology/test/sdam.rs index cbbdde61e..c5f0dbd51 100644 --- a/src/sdam/description/topology/test/sdam.rs +++ b/src/sdam/description/topology/test/sdam.rs @@ -25,9 +25,12 @@ use crate::{ }, selection_criteria::TagSet, test::{ + fail_command_appname_initial_handshake_supported, get_client_options, log_uncaptured, run_spec_test, + topology_is_load_balanced, + topology_is_replica_set, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -627,6 +630,11 @@ async fn topology_closed_event_last() { #[tokio::test(flavor = "multi_thread")] async fn heartbeat_events() { + if topology_is_load_balanced().await { + log_uncaptured("skipping heartbeat_events tests due to load-balanced topology"); + return; + } + let mut options = get_client_options().await.clone(); options.hosts.drain(1..); options.heartbeat_freq = Some(Duration::from_millis(50)); @@ -641,11 +649,6 @@ async fn heartbeat_events() { let mut subscriber = client.events.stream_all(); - if client.is_load_balanced() { - log_uncaptured("skipping heartbeat_events tests due to load-balanced topology"); - return; - } - subscriber .next_match(Duration::from_millis(500), |event| { matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatStarted(_))) @@ -660,7 +663,7 @@ async fn heartbeat_events() { .await .expect("should see server heartbeat succeeded event"); - if !client.supports_fail_command_appname_initial_handshake() { + if !fail_command_appname_initial_handshake_supported().await { return; } @@ -687,12 +690,13 @@ async fn heartbeat_events() { #[tokio::test] #[function_name::named] async fn direct_connection() { - let test_client = Client::for_test().await; - if !test_client.is_replica_set() { + if !topology_is_replica_set().await { log_uncaptured("Skipping direct_connection test due to non-replica set topology"); return; } + let test_client = Client::for_test().await; + let criteria = SelectionCriteria::ReadPreference(ReadPreference::Secondary { options: Default::default(), }); diff --git a/src/sdam/test.rs b/src/sdam/test.rs index eedaceb34..09b4e10c0 100644 --- a/src/sdam/test.rs +++ b/src/sdam/test.rs @@ -4,7 +4,6 @@ use std::{ }; use bson::doc; -use semver::VersionReq; use crate::{ client::options::{ClientOptions, ServerAddress}, @@ -14,8 +13,12 @@ use crate::{ hello::{LEGACY_HELLO_COMMAND_NAME, LEGACY_HELLO_COMMAND_NAME_LOWERCASE}, sdam::{ServerDescription, Topology}, test::{ + fail_command_appname_initial_handshake_supported, get_client_options, log_uncaptured, + server_version_matches, + topology_is_load_balanced, + topology_is_replica_set, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -27,19 +30,11 @@ use crate::{ #[tokio::test(flavor = "multi_thread")] async fn min_heartbeat_frequency() { - let mut setup_client_options = get_client_options().await.clone(); - if setup_client_options.load_balanced.unwrap_or(false) { + if topology_is_load_balanced().await { log_uncaptured("skipping min_heartbeat_frequency test due to load-balanced topology"); return; } - setup_client_options.hosts.drain(1..); - setup_client_options.direct_connection = Some(true); - - let setup_client = Client::for_test() - .options(setup_client_options.clone()) - .await; - - if !setup_client.supports_fail_command_appname_initial_handshake() { + if !fail_command_appname_initial_handshake_supported().await { log_uncaptured( "skipping min_heartbeat_frequency test due to server not supporting failcommand \ appname", @@ -47,6 +42,14 @@ async fn min_heartbeat_frequency() { return; } + let mut setup_client_options = get_client_options().await.clone(); + setup_client_options.hosts.drain(1..); + setup_client_options.direct_connection = Some(true); + + let setup_client = Client::for_test() + .options(setup_client_options.clone()) + .await; + let _guard = setup_client .enable_fail_point( FailPoint::fail_command( @@ -86,6 +89,13 @@ async fn min_heartbeat_frequency() { #[tokio::test(flavor = "multi_thread")] async fn sdam_pool_management() { + if !server_version_matches(">= 4.2.9").await { + log_uncaptured( + "skipping sdam_pool_management test due to server not supporting appName failCommand", + ); + return; + } + let mut options = get_client_options().await.clone(); if options.load_balanced.unwrap_or(false) { log_uncaptured("skipping sdam_pool_management test due to load-balanced topology"); @@ -105,16 +115,6 @@ async fn sdam_pool_management() { let mut subscriber = client.events.stream_all(); - if !VersionReq::parse(">= 4.2.9") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured( - "skipping sdam_pool_management test due to server not supporting appName failCommand", - ); - return; - } - subscriber .next_match(Duration::from_millis(500), |event| { matches!(event, Event::Cmap(CmapEvent::PoolReady(_))) @@ -160,6 +160,11 @@ async fn sdam_pool_management() { #[tokio::test(flavor = "multi_thread")] async fn hello_ok_true() { + if !server_version_matches(">= 4.4.5").await { + log_uncaptured("skipping hello_ok_true test due to server not supporting hello"); + return; + } + let mut setup_client_options = get_client_options().await.clone(); setup_client_options.hosts.drain(1..); @@ -173,17 +178,6 @@ async fn hello_ok_true() { return; } - let setup_client = Client::for_test() - .options(setup_client_options.clone()) - .await; - if !VersionReq::parse(">= 4.4.5") - .unwrap() - .matches(&setup_client.server_version) - { - log_uncaptured("skipping hello_ok_true test due to server not supporting hello"); - return; - } - let buffer = EventBuffer::new(); let mut event_stream = buffer.stream(); @@ -226,8 +220,7 @@ async fn hello_ok_true() { #[tokio::test] async fn repl_set_name_mismatch() -> crate::error::Result<()> { - let client = Client::for_test().await; - if !client.is_replica_set() { + if !topology_is_replica_set().await { log_uncaptured("skipping repl_set_name_mismatch due to non-replica set topology"); return Ok(()); } diff --git a/src/sync/test.rs b/src/sync/test.rs index a986e5889..032b29a3b 100644 --- a/src/sync/test.rs +++ b/src/sync/test.rs @@ -20,6 +20,7 @@ use crate::{ WriteConcern, }, sync::{Client, ClientSession, Collection}, + test::transactions_supported, Client as AsyncClient, }; @@ -233,10 +234,7 @@ fn typed_collection() { #[test] #[function_name::named] fn transactions() { - let should_skip = crate::sync::TOKIO_RUNTIME.block_on(async { - let test_client = AsyncClient::for_test().await; - !test_client.supports_transactions() - }); + let should_skip = crate::sync::TOKIO_RUNTIME.block_on(async { transactions_supported().await }); if should_skip { return; } diff --git a/src/test.rs b/src/test.rs index f62b009c8..dd828500e 100644 --- a/src/test.rs +++ b/src/test.rs @@ -55,16 +55,19 @@ use tokio::sync::OnceCell; #[cfg(feature = "tracing-unstable")] use self::util::TracingHandler; use crate::{ + bson::{doc, Document}, client::{ auth::Credential, options::{ServerApi, ServerApiVersion}, }, - options::ClientOptions, + hello::HelloCommandResponse, + options::{ClientOptions, ServerAddress}, + Client, }; use std::{fs::read_to_string, str::FromStr}; -static CLIENT_OPTIONS: OnceCell = OnceCell::const_new(); pub(crate) async fn get_client_options() -> &'static ClientOptions { + static CLIENT_OPTIONS: OnceCell = OnceCell::const_new(); CLIENT_OPTIONS .get_or_init(|| async { let mut options = ClientOptions::parse(&*DEFAULT_URI).await.unwrap(); @@ -73,6 +76,178 @@ pub(crate) async fn get_client_options() -> &'static ClientOptions { }) .await } +pub(crate) async fn auth_enabled() -> bool { + get_client_options().await.credential.is_some() +} + +struct TestClientMetadata { + server_version: semver::Version, + hello_response: HelloCommandResponse, + server_parameters: Document, +} +async fn get_test_client_metadata() -> &'static TestClientMetadata { + static TEST_CLIENT_METADATA: OnceCell = OnceCell::const_new(); + TEST_CLIENT_METADATA + .get_or_init(|| async { + let client = Client::for_test().await; + + let build_info = client + .database("test") + .run_command(doc! { "buildInfo": 1 }) + .await + .unwrap(); + let mut server_version = + semver::Version::parse(build_info.get_str("version").unwrap()).unwrap(); + // ignore whether the version is a prerelease + server_version.pre = semver::Prerelease::EMPTY; + + let hello_response = client.hello().await.unwrap(); + + let server_parameters = client + .database("admin") + .run_command(doc! { "getParameter": "*" }) + .await + .unwrap(); + + TestClientMetadata { + server_version, + hello_response, + server_parameters, + } + }) + .await +} + +// Utility functions to check server version requirements. All but server_version_matches ignore +// the server's patch version; specify a requirement string to server_version_matches for a +// patch-sensitive comparison. +pub(crate) async fn server_version_eq(major: u64, minor: u64) -> bool { + let server_version = &get_test_client_metadata().await.server_version; + server_version.major == major && server_version.minor == minor +} +pub(crate) async fn server_version_gt(major: u64, minor: u64) -> bool { + let server_version = &get_test_client_metadata().await.server_version; + server_version.major > major || server_version.major == major && server_version.minor > minor +} +pub(crate) async fn server_version_gte(major: u64, minor: u64) -> bool { + let server_version = &get_test_client_metadata().await.server_version; + server_version.major > major || server_version.major == major && server_version.minor >= minor +} +pub(crate) async fn server_version_lt(major: u64, minor: u64) -> bool { + let server_version = &get_test_client_metadata().await.server_version; + server_version.major < major || server_version.major == major && server_version.minor < minor +} +pub(crate) async fn server_version_lte(major: u64, minor: u64) -> bool { + let server_version = &get_test_client_metadata().await.server_version; + server_version.major < major || server_version.major == major && server_version.minor <= minor +} +pub(crate) async fn server_version_matches(requirement: &str) -> bool { + let requirement = semver::VersionReq::parse(requirement).unwrap(); + let server_version = &get_test_client_metadata().await.server_version; + requirement.matches(server_version) +} + +pub(crate) async fn get_server_parameters() -> &'static Document { + &get_test_client_metadata().await.server_parameters +} + +pub(crate) async fn get_primary() -> Option { + get_test_client_metadata() + .await + .hello_response + .primary + .as_ref() + .map(|s| ServerAddress::parse(s).unwrap()) +} +pub(crate) async fn get_max_write_batch_size() -> usize { + get_test_client_metadata() + .await + .hello_response + .max_write_batch_size + .unwrap() + .try_into() + .unwrap() +} +pub(crate) async fn get_max_bson_object_size() -> usize { + get_test_client_metadata() + .await + .hello_response + .max_bson_object_size + .try_into() + .unwrap() +} +pub(crate) async fn get_max_message_size_bytes() -> usize { + get_test_client_metadata() + .await + .hello_response + .max_message_size_bytes + .try_into() + .unwrap() +} + +async fn get_topology() -> &'static Topology { + static TOPOLOGY: OnceCell = OnceCell::const_new(); + TOPOLOGY + .get_or_init(|| async { + let client_options = get_client_options().await; + if client_options.load_balanced == Some(true) { + return Topology::LoadBalanced; + } + + let hello_response = &get_test_client_metadata().await.hello_response; + if hello_response.msg.as_deref() == Some("isdbgrid") { + return Topology::Sharded; + } + if hello_response.set_name.is_some() { + return Topology::ReplicaSet; + } + + Topology::Single + }) + .await +} +pub(crate) async fn topology_is_standalone() -> bool { + get_topology().await == &Topology::Single +} +pub(crate) async fn topology_is_replica_set() -> bool { + get_topology().await == &Topology::ReplicaSet +} +pub(crate) async fn topology_is_sharded() -> bool { + get_topology().await == &Topology::Sharded +} +pub(crate) async fn topology_is_load_balanced() -> bool { + get_topology().await == &Topology::LoadBalanced +} + +pub(crate) async fn transactions_supported() -> bool { + topology_is_replica_set().await || topology_is_sharded().await && server_version_gte(4, 2).await +} +pub(crate) async fn block_connection_supported() -> bool { + server_version_matches(">=4.2.9").await +} +pub(crate) async fn fail_command_supported() -> bool { + if topology_is_sharded().await { + server_version_matches(">=4.1.5").await + } else { + true + } +} +pub(crate) async fn fail_command_appname_initial_handshake_supported() -> bool { + let requirements = [">= 4.2.15, < 4.3.0", ">= 4.4.7, < 4.5.0", ">= 4.9.0"]; + for requirement in requirements { + if server_version_matches(requirement).await { + return true; + } + } + false +} +pub(crate) async fn streaming_monitor_protocol_supported() -> bool { + get_test_client_metadata() + .await + .hello_response + .topology_version + .is_some() +} pub(crate) static DEFAULT_URI: Lazy = Lazy::new(get_default_uri); pub(crate) static SERVER_API: Lazy> = diff --git a/src/test/bulk_write.rs b/src/test/bulk_write.rs index 802e20c72..c37de8dd7 100644 --- a/src/test/bulk_write.rs +++ b/src/test/bulk_write.rs @@ -7,7 +7,15 @@ use crate::{ results::UpdateResult, test::{ get_client_options, + get_max_bson_object_size, + get_max_message_size_bytes, + get_max_write_batch_size, log_uncaptured, + server_version_gte, + server_version_lt, + topology_is_load_balanced, + topology_is_sharded, + topology_is_standalone, util::fail_point::{FailPoint, FailPointMode}, }, Client, @@ -40,14 +48,14 @@ impl PartialBulkWriteResult { // CRUD prose test 3 #[tokio::test] async fn max_write_batch_size_batching() { - let client = Client::for_test().monitor_events().await; - - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping max_write_batch_size_batching: bulkWrite requires 8.0+"); return; } - let max_write_batch_size = client.server_info.max_write_batch_size.unwrap() as usize; + let client = Client::for_test().monitor_events().await; + + let max_write_batch_size = get_max_write_batch_size().await; let model = InsertOneModel::builder() .namespace(Namespace::new("db", "coll")) @@ -79,15 +87,15 @@ async fn max_write_batch_size_batching() { // CRUD prose test 4 #[tokio::test] async fn max_message_size_bytes_batching() { - let client = Client::for_test().monitor_events().await; - - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping max_message_size_bytes_batching: bulkWrite requires 8.0+"); return; } - let max_bson_object_size = client.server_info.max_bson_object_size as usize; - let max_message_size_bytes = client.server_info.max_message_size_bytes as usize; + let client = Client::for_test().monitor_events().await; + + let max_bson_object_size = get_max_bson_object_size().await; + let max_message_size_bytes = get_max_message_size_bytes().await; let document = doc! { "a": "b".repeat(max_bson_object_size - 500) }; let model = InsertOneModel::builder() @@ -121,19 +129,19 @@ async fn max_message_size_bytes_batching() { // CRUD prose test 5 #[tokio::test(flavor = "multi_thread")] async fn write_concern_error_batches() { + if server_version_lt(8, 0).await { + log_uncaptured("skipping write_concern_error_batches: bulkWrite requires 8.0+"); + return; + } + let mut options = get_client_options().await.clone(); options.retry_writes = Some(false); - if Client::for_test().await.is_sharded() { + if topology_is_sharded().await { options.hosts.drain(1..); } let client = Client::for_test().options(options).monitor_events().await; - if client.server_version_lt(8, 0) { - log_uncaptured("skipping write_concern_error_batches: bulkWrite requires 8.0+"); - return; - } - - let max_write_batch_size = client.server_info.max_write_batch_size.unwrap() as usize; + let max_write_batch_size = get_max_write_batch_size().await; let fail_point = FailPoint::fail_command(&["bulkWrite"], FailPointMode::Times(2)) .write_concern_error(doc! { "code": 91, "errmsg": "Replication is being shut down" }); @@ -167,19 +175,19 @@ async fn write_concern_error_batches() { // CRUD prose test 6 #[tokio::test] async fn write_error_batches() { - let mut client = Client::for_test().monitor_events().await; - - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping write_error_batches: bulkWrite requires 8.0+"); return; } // TODO RUST-2131 - if client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping write_error_batches: load-balanced topology"); return; } - let max_write_batch_size = client.server_info.max_write_batch_size.unwrap() as usize; + let mut client = Client::for_test().monitor_events().await; + + let max_write_batch_size = get_max_write_batch_size().await; let document = doc! { "_id": 1 }; let collection = client.database("db").collection("coll"); @@ -229,19 +237,19 @@ async fn write_error_batches() { // CRUD prose test 7 #[tokio::test] async fn successful_cursor_iteration() { - let client = Client::for_test().monitor_events().await; - - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping successful_cursor_iteration: bulkWrite requires 8.0+"); return; } // TODO RUST-2131 - if client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping successful_cursor_iteration: load-balanced topology"); return; } - let max_bson_object_size = client.server_info.max_bson_object_size as usize; + let client = Client::for_test().monitor_events().await; + + let max_bson_object_size = get_max_bson_object_size().await; let collection = client.database("db").collection::("coll"); collection.drop().await.unwrap(); @@ -272,9 +280,7 @@ async fn successful_cursor_iteration() { // CRUD prose test 8 #[tokio::test] async fn cursor_iteration_in_a_transaction() { - let client = Client::for_test().monitor_events().await; - - if client.server_version_lt(8, 0) || client.is_standalone() { + if server_version_lt(8, 0).await || topology_is_standalone().await { log_uncaptured( "skipping cursor_iteration_in_a_transaction: bulkWrite requires 8.0+, transactions \ require a non-standalone topology", @@ -282,12 +288,14 @@ async fn cursor_iteration_in_a_transaction() { return; } // TODO RUST-2131 - if client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping cursor_iteration_in_a_transaction: load-balanced topology"); return; } - let max_bson_object_size = client.server_info.max_bson_object_size as usize; + let client = Client::for_test().monitor_events().await; + + let max_bson_object_size = get_max_bson_object_size().await; let collection = client.database("db").collection::("coll"); collection.drop().await.unwrap(); @@ -326,23 +334,23 @@ async fn cursor_iteration_in_a_transaction() { // CRUD prose test 9 #[tokio::test(flavor = "multi_thread")] async fn failed_cursor_iteration() { - let mut options = get_client_options().await.clone(); - if Client::for_test().await.is_sharded() { - options.hosts.drain(1..); - } - let client = Client::for_test().options(options).monitor_events().await; - - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping failed_cursor_iteration: bulkWrite requires 8.0+"); return; } // TODO RUST-2131 - if client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping failed_cursor_iteration: load-balanced topology"); return; } - let max_bson_object_size = client.server_info.max_bson_object_size as usize; + let mut options = get_client_options().await.clone(); + if topology_is_sharded().await { + options.hosts.drain(1..); + } + let client = Client::for_test().options(options).monitor_events().await; + + let max_bson_object_size = get_max_bson_object_size().await; let fail_point = FailPoint::fail_command(&["getMore"], FailPointMode::Times(1)).error_code(8); let _guard = client.enable_fail_point(fail_point).await.unwrap(); @@ -401,16 +409,17 @@ async fn failed_cursor_iteration() { // CRUD prose test 11 #[tokio::test] async fn namespace_batch_splitting() { - let first_namespace = Namespace::new("db", "coll"); - - let mut client = Client::for_test().monitor_events().await; - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping namespace_batch_splitting: bulkWrite requires 8.0+"); return; } - let max_message_size_bytes = client.server_info.max_message_size_bytes as usize; - let max_bson_object_size = client.server_info.max_bson_object_size as usize; + let first_namespace = Namespace::new("db", "coll"); + + let mut client = Client::for_test().monitor_events().await; + + let max_message_size_bytes = get_max_message_size_bytes().await; + let max_bson_object_size = get_max_bson_object_size().await; let ops_bytes = max_message_size_bytes - 1122; let num_models = ops_bytes / max_bson_object_size; @@ -511,14 +520,14 @@ async fn namespace_batch_splitting() { // CRUD prose test 12 #[tokio::test] async fn too_large_client_error() { - let client = Client::for_test().monitor_events().await; - let max_message_size_bytes = client.server_info.max_message_size_bytes as usize; - - if client.server_version_lt(8, 0) { + if server_version_lt(8, 0).await { log_uncaptured("skipping too_large_client_error: bulkWrite requires 8.0+"); return; } + let client = Client::for_test().monitor_events().await; + let max_message_size_bytes = get_max_message_size_bytes().await; + // Case 1: document too large let model = InsertOneModel::builder() .namespace(Namespace::new("db", "coll")) @@ -576,12 +585,12 @@ async fn encryption_error() { #[tokio::test] async fn unsupported_server_client_error() { - let client = Client::for_test().await; - - if client.server_version_gte(8, 0) { + if server_version_gte(8, 0).await { + log_uncaptured("skipping unsupported_server_client_error: bulk write supported"); return; } + let client = Client::for_test().await; let error = client .bulk_write(vec![InsertOneModel::builder() .namespace(Namespace::new("db", "coll")) diff --git a/src/test/change_stream.rs b/src/test/change_stream.rs index dd3d1d9a0..7ec602084 100644 --- a/src/test/change_stream.rs +++ b/src/test/change_stream.rs @@ -1,6 +1,5 @@ use bson::{doc, Bson, Document}; use futures_util::{StreamExt, TryStreamExt}; -use semver::VersionReq; use crate::{ change_stream::{ @@ -17,7 +16,18 @@ use crate::{ Collection, }; -use super::{get_client_options, log_uncaptured, EventClient}; +use super::{ + fail_command_supported, + get_client_options, + log_uncaptured, + server_version_gte, + server_version_lt, + server_version_matches, + topology_is_replica_set, + topology_is_sharded, + transactions_supported, + EventClient, +}; type Result = std::result::Result>; @@ -31,19 +41,18 @@ async fn init_stream( ChangeStream>, )>, > { - let init_client = Client::for_test().await; - if !init_client.is_replica_set() && !init_client.is_sharded() { + if !(topology_is_replica_set().await || topology_is_sharded().await) { log_uncaptured("skipping change stream test on unsupported topology"); return Ok(None); } - if !init_client.supports_fail_command() { + if !fail_command_supported().await { log_uncaptured("skipping change stream test on version without fail commands"); return Ok(None); } let mut options = get_client_options().await.clone(); // Direct connection is needed for reliable behavior with fail points. - if direct_connection && init_client.is_sharded() { + if direct_connection && topology_is_sharded().await { options.direct_connection = Some(true); options.hosts.drain(1..); } @@ -282,21 +291,16 @@ async fn resume_kill_cursor_error_suppressed() -> Result<()> { /// stream. #[tokio::test(flavor = "multi_thread")] // multi_thread required for FailPoint async fn resume_start_at_operation_time() -> Result<()> { + if !server_version_matches(">=4.0, <4.0.7").await { + log_uncaptured("skipping change stream test due to server version"); + return Ok(()); + } + let (client, coll, mut stream) = match init_stream("resume_start_at_operation_time", true).await? { Some(t) => t, None => return Ok(()), }; - if !VersionReq::parse(">=4.0, <4.0.7") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test due to server version {:?}", - client.server_version - )); - return Ok(()); - } let fail_point = FailPoint::fail_command(&["getMore"], FailPointMode::Times(1)).error_code(43); let _guard = client.enable_fail_point(fail_point).await?; @@ -330,20 +334,15 @@ async fn resume_start_at_operation_time() -> Result<()> { /// the postBatchResumeToken from the current command response #[tokio::test] async fn batch_end_resume_token() -> Result<()> { + if !server_version_matches(">=4.0.7").await { + log_uncaptured("skipping change stream test due to server version"); + return Ok(()); + } + let (client, _, mut stream) = match init_stream("batch_end_resume_token", false).await? { Some(t) => t, None => return Ok(()), }; - if !VersionReq::parse(">=4.0.7") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test due to server version {:?}", - client.server_version - )); - return Ok(()); - } assert_eq!(stream.next_if_any().await?, None); let token = stream.resume_token().unwrap().parsed()?; @@ -361,22 +360,16 @@ async fn batch_end_resume_token() -> Result<()> { /// Prose test 12: Running against a server <4.0.7, end of batch resume token must follow the spec #[tokio::test] async fn batch_end_resume_token_legacy() -> Result<()> { - let (client, coll, mut stream) = - match init_stream("batch_end_resume_token_legacy", false).await? { - Some(t) => t, - None => return Ok(()), - }; - if !VersionReq::parse("<4.0.7") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test due to server version {:?}", - client.server_version - )); + if !server_version_matches("<4.0.7").await { + log_uncaptured("skipping change stream test due to server version"); return Ok(()); } + let (_, coll, mut stream) = match init_stream("batch_end_resume_token_legacy", false).await? { + Some(t) => t, + None => return Ok(()), + }; + // Case: empty batch, `resume_after` not specified assert_eq!(stream.next_if_any().await?, None); assert_eq!(stream.resume_token(), None); @@ -433,25 +426,20 @@ async fn batch_mid_resume_token() -> Result<()> { /// spec. #[tokio::test] async fn aggregate_batch() -> Result<()> { - let (client, coll, mut stream) = match init_stream("aggregate_batch", false).await? { - Some(t) => t, - None => return Ok(()), - }; - if client.is_sharded() { + if topology_is_sharded().await { log_uncaptured("skipping change stream test on unsupported topology"); return Ok(()); } - if !VersionReq::parse(">=4.2") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test on unsupported version {:?}", - client.server_version - )); + if server_version_lt(4, 2).await { + log_uncaptured("skipping change stream test on unsupported version > 4.2"); return Ok(()); } + let (_, coll, mut stream) = match init_stream("aggregate_batch", false).await? { + Some(t) => t, + None => return Ok(()), + }; + // Synthesize a resume token for the new stream to start at. coll.insert_one(doc! {}).await?; stream.next().await; @@ -478,20 +466,15 @@ async fn aggregate_batch() -> Result<()> { /// Prose test 17: Resuming a change stream with no results uses `startAfter`. #[tokio::test(flavor = "multi_thread")] // multi_thread required for FailPoint async fn resume_uses_start_after() -> Result<()> { + if !server_version_matches(">=4.1.1").await { + log_uncaptured("skipping change stream test on unsupported version"); + return Ok(()); + } + let (client, coll, mut stream) = match init_stream("resume_uses_start_after", true).await? { Some(t) => t, None => return Ok(()), }; - if !VersionReq::parse(">=4.1.1") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test on unsupported version {:?}", - client.server_version - )); - return Ok(()); - } coll.insert_one(doc! {}).await?; stream.next().await.transpose()?; @@ -529,20 +512,15 @@ async fn resume_uses_start_after() -> Result<()> { /// Prose test 18: Resuming a change stream after results uses `resumeAfter`. #[tokio::test(flavor = "multi_thread")] // multi_thread required for FailPoint async fn resume_uses_resume_after() -> Result<()> { + if !server_version_matches(">=4.1.1").await { + log_uncaptured("skipping change stream test on unsupported version"); + return Ok(()); + } + let (client, coll, mut stream) = match init_stream("resume_uses_resume_after", true).await? { Some(t) => t, None => return Ok(()), }; - if !VersionReq::parse(">=4.1.1") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test on unsupported version {:?}", - client.server_version - )); - return Ok(()); - } coll.insert_one(doc! {}).await?; stream.next().await.transpose()?; @@ -583,18 +561,13 @@ async fn resume_uses_resume_after() -> Result<()> { #[tokio::test] async fn create_coll_pre_post() -> Result<()> { - let client = Client::for_test().await; - if !VersionReq::parse(">=6.0") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test on unsupported version {:?}", - client.server_version - )); + if server_version_lt(6, 0).await { + log_uncaptured("skipping change stream test on unsupported version"); return Ok(()); } + let client = Client::for_test().await; + let db = client.database("create_coll_pre_post"); db.collection::("test").drop().await?; db.create_collection("test") @@ -607,21 +580,17 @@ async fn create_coll_pre_post() -> Result<()> { // Prose test 19: large event splitting #[tokio::test] async fn split_large_event() -> Result<()> { - let client = Client::for_test().await; - if !(client.server_version_matches(">= 6.0.9, < 6.1") - || client.server_version_matches(">= 7.0")) - { - log_uncaptured(format!( - "skipping change stream test on unsupported version {:?}", - client.server_version - )); + if !topology_is_replica_set().await && !topology_is_sharded().await { + log_uncaptured("skipping change stream test on unsupported topology"); return Ok(()); } - if !client.is_replica_set() && !client.is_sharded() { - log_uncaptured("skipping change stream test on unsupported topology"); + if !(server_version_matches(">= 6.0.9, < 6.1").await || server_version_gte(7, 0).await) { + log_uncaptured("skipping change stream test on unsupported version"); return Ok(()); } + let client = Client::for_test().await; + let db = client.database("change_stream_tests"); db.collection::("split_large_event") .drop() @@ -662,32 +631,29 @@ async fn split_large_event() -> Result<()> { /// Test that transaction fields are parsed correctly #[tokio::test] async fn transaction_fields() -> Result<()> { - let (client, coll, mut stream) = - match init_stream("chang_stream_transaction_fields", true).await? { - Some(t) => t, - None => return Ok(()), - }; - if client.is_sharded() { + if topology_is_sharded().await { log_uncaptured("skipping change stream test transaction_fields on unsupported topology"); return Ok(()); } - if !VersionReq::parse(">=5.0") - .unwrap() - .matches(&client.server_version) - { - log_uncaptured(format!( - "skipping change stream test transaction_fields on unsupported version {:?}", - client.server_version - )); + if server_version_lt(5, 0).await { + log_uncaptured( + "skipping change stream test transaction_fields on unsupported server version", + ); return Ok(()); } - if !client.supports_transactions() { + if !transactions_supported().await { log_uncaptured( "skipping change stream transaction_fields test due to lack of transaction support", ); return Ok(()); } + let (client, coll, mut stream) = + match init_stream("chang_stream_transaction_fields", true).await? { + Some(t) => t, + None => return Ok(()), + }; + let mut session = client.start_session().await.unwrap(); let session_id = session.id().get("id").cloned(); assert!(session_id.is_some()); diff --git a/src/test/client.rs b/src/test/client.rs index 967c2989c..eecad2ecd 100644 --- a/src/test/client.rs +++ b/src/test/client.rs @@ -12,12 +12,16 @@ use crate::{ runtime, selection_criteria::{ReadPreference, ReadPreferenceOptions, SelectionCriteria}, test::{ + auth_enabled, get_client_options, log_uncaptured, + topology_is_replica_set, + topology_is_sharded, + topology_is_standalone, + transactions_supported, util::{ event_buffer::{EventBuffer, EventStream}, fail_point::{FailPoint, FailPointMode}, - TestClient, }, Event, SERVER_API, @@ -26,6 +30,11 @@ use crate::{ ServerType, }; +use super::{ + fail_command_appname_initial_handshake_supported, + streaming_monitor_protocol_supported, +}; + #[derive(Debug, Deserialize)] struct ClientMetadata { pub driver: DriverMetadata, @@ -42,14 +51,14 @@ struct DriverMetadata { #[tokio::test] async fn metadata_sent_in_handshake() { - let client = Client::for_test().await; - // skip on other topologies due to different currentOp behavior - if !client.is_standalone() || !client.is_replica_set() { + if !(topology_is_standalone().await || topology_is_replica_set().await) { log_uncaptured("skipping metadata_sent_in_handshake due to unsupported topology"); return; } + let client = Client::for_test().await; + let result = client .database("admin") .run_command(doc! { @@ -240,12 +249,13 @@ async fn list_database_names() { #[tokio::test] #[function_name::named] async fn list_authorized_databases() { - let client = Client::for_test().await; - if client.server_version_lt(4, 0) || !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping list_authorized_databases due to test configuration"); return; } + let client = Client::for_test().await; + let dbs = &[ format!("{}1", function_name!()), format!("{}2", function_name!()), @@ -410,12 +420,7 @@ async fn auth_test_uri( /// /// If only one mechanism is supplied, this will also test that using the other SCRAM mechanism will /// fail. -async fn scram_test( - client: &TestClient, - username: &str, - password: &str, - mechanisms: &[AuthMechanism], -) { +async fn scram_test(username: &str, password: &str, mechanisms: &[AuthMechanism]) { for mechanism in mechanisms { auth_test_uri(username, password, Some(mechanism.clone()), true).await; auth_test_uri(username, password, None, true).await; @@ -424,7 +429,7 @@ async fn scram_test( } // If only one scram mechanism is specified, verify the other doesn't work. - if mechanisms.len() == 1 && client.server_version_gte(4, 0) { + if mechanisms.len() == 1 { let other = match mechanisms[0] { AuthMechanism::ScramSha1 => AuthMechanism::ScramSha256, _ => AuthMechanism::ScramSha1, @@ -436,12 +441,13 @@ async fn scram_test( #[tokio::test] async fn scram_sha1() { - let client = Client::for_test().await; - if !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping scram_sha1 due to missing authentication"); return; } + let client = Client::for_test().await; + client .create_user( "sha1", @@ -452,16 +458,17 @@ async fn scram_sha1() { ) .await .unwrap(); - scram_test(&client, "sha1", "sha1", &[AuthMechanism::ScramSha1]).await; + scram_test("sha1", "sha1", &[AuthMechanism::ScramSha1]).await; } #[tokio::test] async fn scram_sha256() { - let client = Client::for_test().await; - if client.server_version_lt(4, 0) || !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping scram_sha256 due to test configuration"); return; } + + let client = Client::for_test().await; client .create_user( "sha256", @@ -472,16 +479,17 @@ async fn scram_sha256() { ) .await .unwrap(); - scram_test(&client, "sha256", "sha256", &[AuthMechanism::ScramSha256]).await; + scram_test("sha256", "sha256", &[AuthMechanism::ScramSha256]).await; } #[tokio::test] async fn scram_both() { - let client = Client::for_test().await; - if client.server_version_lt(4, 0) || !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping scram_both due to test configuration"); return; } + + let client = Client::for_test().await; client .create_user( "both", @@ -493,7 +501,6 @@ async fn scram_both() { .await .unwrap(); scram_test( - &client, "both", "both", &[AuthMechanism::ScramSha1, AuthMechanism::ScramSha256], @@ -503,8 +510,7 @@ async fn scram_both() { #[tokio::test] async fn scram_missing_user_uri() { - let client = Client::for_test().await; - if !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping scram_missing_user_uri due to missing authentication"); return; } @@ -513,8 +519,7 @@ async fn scram_missing_user_uri() { #[tokio::test] async fn scram_missing_user_options() { - let client = Client::for_test().await; - if !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping scram_missing_user_options due to missing authentication"); return; } @@ -523,13 +528,13 @@ async fn scram_missing_user_options() { #[tokio::test] async fn saslprep() { - let client = Client::for_test().await; - - if client.server_version_lt(4, 0) || !client.auth_enabled() { + if !auth_enabled().await { log_uncaptured("skipping saslprep due to test configuration"); return; } + let client = Client::for_test().await; + client .create_user( "IX", @@ -611,29 +616,27 @@ async fn x509_auth_skip_ci() { /// failure works. #[tokio::test(flavor = "multi_thread")] async fn retry_commit_txn_check_out() { - let setup_client = Client::for_test().await; - if !setup_client.is_replica_set() { + if !topology_is_replica_set().await { log_uncaptured("skipping retry_commit_txn_check_out due to non-replicaset topology"); return; } - - if !setup_client.supports_transactions() { + if !transactions_supported().await { log_uncaptured("skipping retry_commit_txn_check_out due to lack of transaction support"); return; } - - if !setup_client.supports_fail_command_appname_initial_handshake() { + if !fail_command_appname_initial_handshake_supported().await { log_uncaptured( "skipping retry_commit_txn_check_out due to insufficient failCommand support", ); return; } - - if setup_client.supports_streaming_monitoring_protocol() { + if streaming_monitor_protocol_supported().await { log_uncaptured("skipping retry_commit_txn_check_out due to streaming protocol support"); return; } + let setup_client = Client::for_test().await; + // ensure namespace exists setup_client .database("retry_commit_txn_check_out") @@ -747,11 +750,12 @@ async fn manual_shutdown_with_nothing() { /// Verifies that `Client::shutdown` succeeds when resources have been dropped. #[tokio::test] async fn manual_shutdown_with_resources() { - let client = Client::for_test().monitor_events().await; - if !client.supports_transactions() { + if !transactions_supported().await { log_uncaptured("Skipping manual_shutdown_with_resources: no transaction support"); return; } + + let client = Client::for_test().monitor_events().await; let db = client.database("shutdown_test"); db.drop().await.unwrap(); let coll = db.collection::("test"); @@ -780,7 +784,7 @@ async fn manual_shutdown_with_resources() { } let _stream = bucket.open_upload_stream("test").await.unwrap(); } - let is_sharded = client.is_sharded(); + let is_sharded = topology_is_sharded().await; let events = client.events.clone(); client.into_client().shutdown().await; if !is_sharded { @@ -805,11 +809,12 @@ async fn manual_shutdown_immediate_with_nothing() { /// Verifies that `Client::shutdown_immediate` succeeds without waiting for resources. #[tokio::test] async fn manual_shutdown_immediate_with_resources() { - let client = Client::for_test().monitor_events().await; - if !client.supports_transactions() { + if !transactions_supported().await { log_uncaptured("Skipping manual_shutdown_immediate_with_resources: no transaction support"); return; } + + let client = Client::for_test().monitor_events().await; let db = client.database("shutdown_test"); db.drop().await.unwrap(); let coll = db.collection::("test"); diff --git a/src/test/coll.rs b/src/test/coll.rs index 301e6abc2..a0816f97d 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -2,7 +2,6 @@ use std::{fmt::Debug, time::Duration}; use futures::stream::{StreamExt, TryStreamExt}; use once_cell::sync::Lazy; -use semver::VersionReq; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ @@ -31,7 +30,17 @@ use crate::{ WriteConcern, }, results::DeleteResult, - test::{get_client_options, log_uncaptured, EventClient}, + test::{ + get_client_options, + get_max_bson_object_size, + get_max_message_size_bytes, + log_uncaptured, + server_version_eq, + server_version_lt, + topology_is_replica_set, + topology_is_standalone, + EventClient, + }, Client, Collection, Cursor, @@ -42,14 +51,15 @@ use crate::{ #[tokio::test] #[function_name::named] async fn insert_err_details() { + if server_version_lt(4, 0).await || !topology_is_replica_set().await { + log_uncaptured("skipping insert_err_details due to test configuration"); + return; + } + let client = Client::for_test().await; let coll = client .init_db_and_coll(function_name!(), function_name!()) .await; - if client.server_version_lt(4, 0) || !client.is_replica_set() { - log_uncaptured("skipping insert_err_details due to test configuration"); - return; - } client .database("admin") .run_command(doc! { @@ -525,11 +535,12 @@ async fn find_allow_disk_use_not_specified() { #[function_name::named] async fn allow_disk_use_test(options: FindOptions, expected_value: Option) { - let event_client = Client::for_test().monitor_events().await; - if event_client.server_version_lt(4, 3) { + if server_version_lt(4, 3).await { log_uncaptured("skipping allow_disk_use_test due to server version < 4.3"); return; } + + let event_client = Client::for_test().monitor_events().await; let coll = event_client .database(function_name!()) .collection::(function_name!()); @@ -595,14 +606,13 @@ async fn delete_hint_not_specified() { } async fn find_one_and_delete_hint_test(options: Option, name: &str) { - let client = Client::for_test().monitor_events().await; - - let req = VersionReq::parse(">= 4.2").unwrap(); - if options.is_some() && !req.matches(&client.server_version) { + if options.is_some() && server_version_lt(4, 2).await { log_uncaptured("skipping find_one_and_delete_hint_test due to test configuration"); return; } + let client = Client::for_test().monitor_events().await; + let coll = client.database(name).collection(name); let _: Result> = coll .find_one_and_delete(doc! {}) @@ -658,12 +668,10 @@ async fn find_one_and_delete_hint_server_version() { .hint(Hint::Name(String::new())) .await; - let req1 = VersionReq::parse("< 4.2").unwrap(); - let req2 = VersionReq::parse("4.2.*").unwrap(); - if req1.matches(&client.server_version) { + if server_version_lt(4, 2).await { let error = res.expect_err("find one and delete should fail"); assert!(matches!(*error.kind, ErrorKind::InvalidArgument { .. })); - } else if req2.matches(&client.server_version) { + } else if server_version_eq(4, 2).await { let error = res.expect_err("find one and delete should fail"); assert!(matches!(*error.kind, ErrorKind::Command { .. })); } else { @@ -674,13 +682,13 @@ async fn find_one_and_delete_hint_server_version() { #[tokio::test] #[function_name::named] async fn no_read_preference_to_standalone() { - let client = Client::for_test().monitor_events().await; - - if !client.is_standalone() { + if !topology_is_standalone().await { log_uncaptured("skipping no_read_preference_to_standalone due to test topology"); return; } + let client = Client::for_test().monitor_events().await; + client .database(function_name!()) .collection::(function_name!()) @@ -925,7 +933,7 @@ async fn assert_options_inherited(client: &EventClient, command_name: &str) { assert!(event.command.contains_key("readConcern")); assert_eq!( event.command.contains_key("$readPreference"), - !client.is_standalone() + !topology_is_standalone().await ); } @@ -982,7 +990,7 @@ async fn cursor_batch_size() { assert_eq!(docs.len(), 10); // test session cursors - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured("skipping cursor_batch_size due to standalone topology"); return; } @@ -1235,8 +1243,8 @@ async fn insert_many_document_sequences() { let mut event_stream = client.events.stream(); - let max_object_size = client.server_info.max_bson_object_size; - let max_message_size = client.server_info.max_message_size_bytes; + let max_object_size = get_max_bson_object_size().await; + let max_message_size = get_max_message_size_bytes().await; let collection = client .database("insert_many_document_sequences") @@ -1246,8 +1254,8 @@ async fn insert_many_document_sequences() { // A payload with > max_bson_object_size bytes but < max_message_size bytes should require only // one round trip let docs = vec![ - rawdoc! { "s": "a".repeat((max_object_size / 2) as usize) }, - rawdoc! { "s": "b".repeat((max_object_size / 2) as usize) }, + rawdoc! { "s": "a".repeat(max_object_size / 2) }, + rawdoc! { "s": "b".repeat(max_object_size / 2) }, ]; collection.insert_many(docs).await.unwrap(); @@ -1264,8 +1272,8 @@ async fn insert_many_document_sequences() { while size <= max_message_size { // Leave some room for key/metadata bytes in document let string_length = max_object_size - 500; - let doc = rawdoc! { "s": "a".repeat(string_length as usize) }; - size += doc.as_bytes().len() as i32; + let doc = rawdoc! { "s": "a".repeat(string_length) }; + size += doc.as_bytes().len(); docs.push(doc); } let total_docs = docs.len(); diff --git a/src/test/csfle.rs b/src/test/csfle.rs index d781b7f52..30026b506 100644 --- a/src/test/csfle.rs +++ b/src/test/csfle.rs @@ -33,7 +33,7 @@ use crate::{ Namespace, }; -use super::{log_uncaptured, EventClient}; +use super::{log_uncaptured, server_version_lt, topology_is_standalone, EventClient}; type Result = anyhow::Result; pub(crate) type KmsInfo = (KmsProvider, Document, Option); @@ -300,12 +300,11 @@ macro_rules! failure { use failure; async fn fle2v2_ok(name: &str) -> bool { - let setup_client = Client::for_test().await; - if setup_client.server_version_lt(7, 0) { + if server_version_lt(7, 0).await { log_uncaptured(format!("Skipping {}: not supported on server < 7.0", name)); return false; } - if setup_client.is_standalone() { + if topology_is_standalone().await { log_uncaptured(format!("Skipping {}: not supported on standalone", name)); return false; } diff --git a/src/test/csfle/prose.rs b/src/test/csfle/prose.rs index cb235f09f..6e0280b3d 100644 --- a/src/test/csfle/prose.rs +++ b/src/test/csfle/prose.rs @@ -44,6 +44,8 @@ use crate::{ test::{ get_client_options, log_uncaptured, + server_version_lt, + topology_is_standalone, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -845,16 +847,17 @@ mod explicit_encryption { } async fn explicit_encryption_setup() -> Result> { - let key_vault_client = Client::for_test().await; - if key_vault_client.server_version_lt(6, 0) { + if server_version_lt(6, 0).await { log_uncaptured("skipping explicit encryption test: server below 6.0"); return Ok(None); } - if key_vault_client.is_standalone() { + if topology_is_standalone().await { log_uncaptured("skipping explicit encryption test: cannot run on standalone"); return Ok(None); } + let key_vault_client = Client::for_test().await; + let encrypted_fields = load_testdata("data/encryptedFields.json")?; let key1_document = load_testdata("data/keys/key1-document.json")?; let key1_id = match key1_document.get("_id").unwrap() { @@ -1269,11 +1272,12 @@ mod decryption_events { impl DecryptionEventsTestdata { async fn setup() -> Result> { - let setup_client = Client::for_test().await; - if !setup_client.is_standalone() { + if !topology_is_standalone().await { log_uncaptured("skipping decryption events test: requires standalone topology"); return Ok(None); } + + let setup_client = Client::for_test().await; let db = setup_client.database("db"); db.collection::("decryption_events") .drop() @@ -1529,21 +1533,13 @@ mod auto_encryption_keys { use super::*; async fn auto_encryption_keys(master_key: impl Into) -> Result<()> { - let master_key = master_key.into(); - if !fle2v2_ok("auto_encryption_keys").await { return Ok(()); } + let master_key = master_key.into(); + let client = Client::for_test().await; - if client.server_version_lt(6, 0) { - log_uncaptured("Skipping auto_encryption_key test: server < 6.0"); - return Ok(()); - } - if client.is_standalone() { - log_uncaptured("Skipping auto_encryption_key test: standalone server"); - return Ok(()); - } let db = client.database("test_auto_encryption_keys"); db.drop().await?; let ce = ClientEncryption::new( @@ -1920,8 +1916,7 @@ mod range_explicit_encryption { #[tokio::test] async fn range_explicit_encryption() -> Result<()> { - let client = Client::for_test().await; - if client.server_version_lt(8, 0) || client.is_standalone() { + if server_version_lt(8, 0).await || topology_is_standalone().await { log_uncaptured("Skipping range_explicit_encryption due to unsupported topology"); return Ok(()); } @@ -2052,17 +2047,12 @@ async fn range_explicit_encryption_defaults() -> Result<()> { // FLE 2.0 Documentation Example #[tokio::test] async fn fle2_example() -> Result<()> { - // FLE 2 is not supported on Standalone topology. - let test_client = Client::for_test().await; - if test_client.server_version_lt(7, 0) { - log_uncaptured("skipping fle2 example: server below 7.0"); - return Ok(()); - } - if test_client.is_standalone() { - log_uncaptured("skipping fle2 example: cannot run on standalone"); + if !fle2v2_ok("fle2_example").await { return Ok(()); } + let test_client = Client::for_test().await; + // Drop data from prior test runs. test_client .database("keyvault") diff --git a/src/test/db.rs b/src/test/db.rs index 4999d644f..ab70a09c8 100644 --- a/src/test/db.rs +++ b/src/test/db.rs @@ -16,13 +16,12 @@ use crate::{ ValidationLevel, }, results::{CollectionSpecification, CollectionType}, + test::{log_uncaptured, server_version_lt}, Client, Cursor, Database, }; -use super::log_uncaptured; - async fn get_coll_info(db: &Database, filter: Option) -> Vec { let mut colls: Vec = db .list_collections() @@ -221,12 +220,6 @@ async fn collection_management() { #[tokio::test] async fn db_aggregate() { let client = Client::for_test().await; - - if client.server_version_lt(4, 0) { - log_uncaptured("skipping db_aggregate due to server version < 4.0"); - return; - } - let db = client.database("admin"); let pipeline = vec![ @@ -263,12 +256,6 @@ async fn db_aggregate() { #[tokio::test] async fn db_aggregate_disk_use() { let client = Client::for_test().await; - - if client.server_version_lt(4, 0) { - log_uncaptured("skipping db_aggregate_disk_use due to server version < 4.0"); - return; - } - let db = client.database("admin"); let pipeline = vec![ @@ -352,13 +339,13 @@ fn deserialize_clustered_index_option_from_bool() { #[tokio::test] async fn clustered_index_list_collections() { - let client = Client::for_test().await; - let database = client.database("db"); - - if client.server_version_lt(5, 3) { + if server_version_lt(5, 3).await { return; } + let client = Client::for_test().await; + let database = client.database("db"); + database .create_collection("clustered_index_collection") .clustered_index(ClusteredIndex::default()) @@ -381,6 +368,13 @@ async fn clustered_index_list_collections() { #[tokio::test] async fn aggregate_with_generics() { + if server_version_lt(5, 1).await { + log_uncaptured( + "skipping aggregate_with_generics: $documents agg stage only available on 5.1+", + ); + return; + } + #[derive(Deserialize)] struct A { str: String, @@ -389,13 +383,6 @@ async fn aggregate_with_generics() { let client = Client::for_test().await; let database = client.database("aggregate_with_generics"); - if client.server_version_lt(5, 1) { - log_uncaptured( - "skipping aggregate_with_generics: $documents agg stage only available on 5.1+", - ); - return; - } - // The cursor returned will contain these documents let pipeline = vec![doc! { "$documents": [ { "str": "hi" } ] }]; diff --git a/src/test/documentation_examples.rs b/src/test/documentation_examples.rs index 6b0c33638..069814ced 100644 --- a/src/test/documentation_examples.rs +++ b/src/test/documentation_examples.rs @@ -2,13 +2,21 @@ mod aggregation_data; use bson::Document; use futures::TryStreamExt; -use semver::Version; use crate::{ bson::{doc, Bson}, error::Result, options::{ClientOptions, ServerApi, ServerApiVersion}, - test::{log_uncaptured, DEFAULT_URI}, + test::{ + log_uncaptured, + server_version_lt, + server_version_matches, + topology_is_load_balanced, + topology_is_replica_set, + topology_is_sharded, + transactions_supported, + DEFAULT_URI, + }, Client, Collection, }; @@ -1237,23 +1245,24 @@ type GenericResult = std::result::Result>; #[allow(unused_variables)] async fn stable_api_examples() -> GenericResult<()> { - let setup_client = Client::for_test().await; - if setup_client.server_version_lt(4, 9) { + if server_version_lt(4, 9).await { log_uncaptured("skipping stable API examples due to unsupported server version"); return Ok(()); } - if setup_client.is_sharded() && setup_client.server_version <= Version::new(5, 0, 2) { + if topology_is_sharded().await && server_version_matches("<=5.0.2").await { // See SERVER-58794. log_uncaptured( "skipping stable API examples due to unsupported server version on sharded topology", ); return Ok(()); } - if setup_client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping stable API examples due to load-balanced topology"); return Ok(()); } + let setup_client = Client::for_test().await; + let uri = DEFAULT_URI.clone(); // Start Versioned API Example 1 let mut options = ClientOptions::parse(&uri).await?; @@ -1596,11 +1605,12 @@ async fn change_streams_examples() -> Result<()> { use crate::{options::FullDocumentType, runtime}; use std::time::Duration; - let client = Client::for_test().await; - if !client.is_replica_set() && !client.is_sharded() { + if !topology_is_replica_set().await && !topology_is_sharded().await { log_uncaptured("skipping change_streams_examples due to unsupported topology"); return Ok(()); } + + let client = Client::for_test().await; let db = client.database("change_streams_examples"); db.drop().await?; let inventory = db.collection::("inventory"); @@ -1666,9 +1676,7 @@ async fn change_streams_examples() -> Result<()> { async fn convenient_transaction_examples() -> Result<()> { use crate::ClientSession; use futures::FutureExt; - - let setup_client = Client::for_test().await; - if !setup_client.supports_transactions() { + if !transactions_supported().await { log_uncaptured( "skipping convenient transaction API examples due to no transaction support", ); diff --git a/src/test/index_management.rs b/src/test/index_management.rs index 7b233c486..4a03b006b 100644 --- a/src/test/index_management.rs +++ b/src/test/index_management.rs @@ -7,19 +7,26 @@ use crate::{ bson::doc, error::ErrorKind, options::{CommitQuorum, IndexOptions}, - test::{log_uncaptured, spec::unified_runner::run_unified_tests}, + test::{ + log_uncaptured, + server_version_lt, + spec::unified_runner::run_unified_tests, + topology_is_load_balanced, + topology_is_sharded, + topology_is_standalone, + }, Client, IndexModel, }; #[tokio::test] async fn run_unified() { - let client = Client::for_test().await; - let mut skipped_files = Vec::new(); let mut skipped_tests = Vec::new(); // TODO DRIVERS-2794: unskip these tests - if client.server_version_lt(7, 2) && (client.is_sharded() || client.is_load_balanced()) { + if server_version_lt(7, 2).await + && (topology_is_sharded().await || topology_is_load_balanced().await) + { skipped_files.push("listSearchIndexes.json"); skipped_tests.push("listSearchIndexes ignores read and write concern"); } @@ -329,12 +336,13 @@ async fn index_management_executes_commands() { #[tokio::test] #[function_name::named] async fn commit_quorum_error() { - let client = Client::for_test().await; - if client.is_standalone() { + if topology_is_standalone().await { log_uncaptured("skipping commit_quorum_error due to standalone topology"); return; } + let client = Client::for_test().await; + let coll = client .init_db_and_coll(function_name!(), function_name!()) .await; @@ -345,7 +353,7 @@ async fn commit_quorum_error() { .commit_quorum(CommitQuorum::Majority) .await; - if client.server_version_lt(4, 4) { + if server_version_lt(4, 4).await { let err = result.unwrap_err(); assert!(matches!(*err.kind, ErrorKind::InvalidArgument { .. })); } else { diff --git a/src/test/spec/connection_stepdown.rs b/src/test/spec/connection_stepdown.rs index 2fb359bd9..18e572bca 100644 --- a/src/test/spec/connection_stepdown.rs +++ b/src/test/spec/connection_stepdown.rs @@ -7,7 +7,14 @@ use crate::{ error::{CommandError, ErrorKind}, options::{Acknowledgment, WriteConcern}, selection_criteria::SelectionCriteria, - test::{get_client_options, log_uncaptured, EventClient}, + test::{ + get_client_options, + log_uncaptured, + server_version_eq, + server_version_lt, + topology_is_replica_set, + EventClient, + }, Collection, Database, }; @@ -16,6 +23,14 @@ async fn run_test( name: &str, test: impl Fn(EventClient, Database, Collection) -> F, ) { + if !topology_is_replica_set().await { + log_uncaptured(format!( + "skipping test {:?} due to not running on a replica set", + name + )); + return; + } + let mut options = get_client_options().await.clone(); options.retry_writes = Some(false); let client = crate::Client::for_test() @@ -24,14 +39,6 @@ async fn run_test( .monitor_events() .await; - if !client.is_replica_set() { - log_uncaptured(format!( - "skipping test {:?} due to not running on a replica set", - name - )); - return; - } - let name = format!("step-down-{}", name); let db = client.database(&name); @@ -53,7 +60,7 @@ async fn run_test( async fn get_more() { async fn get_more_test(client: EventClient, _db: Database, coll: Collection) { // This test requires server version 4.2 or higher. - if client.server_version_lt(4, 2) { + if server_version_lt(4, 2).await { log_uncaptured("skipping get_more due to server version < 4.2"); return; } @@ -106,7 +113,7 @@ async fn notwritableprimary_keep_pool() { coll: Collection, ) { // This test requires server version 4.2 or higher. - if client.server_version_lt(4, 2) { + if server_version_lt(4, 2).await { log_uncaptured("skipping notwritableprimary_keep_pool due to server version < 4.2"); return; } @@ -156,7 +163,7 @@ async fn notwritableprimary_reset_pool() { coll: Collection, ) { // This test must only run on 4.0 servers. - if !client.server_version_eq(4, 0) { + if !server_version_eq(4, 0).await { log_uncaptured( "skipping notwritableprimary_reset_pool due to unsupported server version", ); @@ -207,11 +214,6 @@ async fn shutdown_in_progress() { _db: Database, coll: Collection, ) { - if client.server_version_lt(4, 0) { - log_uncaptured("skipping shutdown_in_progress due to server version < 4.0"); - return; - } - client .database("admin") .run_command(doc! { @@ -252,11 +254,6 @@ async fn interrupted_at_shutdown() { _db: Database, coll: Collection, ) { - if client.server_version_lt(4, 0) { - log_uncaptured("skipping interrupted_at_shutdown due to server version < 4.2"); - return; - } - client .database("admin") .run_command(doc! { diff --git a/src/test/spec/gridfs.rs b/src/test/spec/gridfs.rs index 31548e23c..5dee57d91 100644 --- a/src/test/spec/gridfs.rs +++ b/src/test/spec/gridfs.rs @@ -9,8 +9,10 @@ use crate::{ options::{FindOneOptions, GridFsBucketOptions, GridFsUploadOptions}, runtime, test::{ + fail_command_supported, get_client_options, spec::unified_runner::run_unified_tests, + topology_is_sharded, util::fail_point::{FailPoint, FailPointMode}, }, Client, @@ -191,14 +193,11 @@ async fn upload_stream_multiple_buffers() { #[tokio::test(flavor = "multi_thread")] async fn upload_stream_errors() { - let client = Client::for_test().await; - let client = if client.is_sharded() { - let mut options = get_client_options().await.clone(); + let mut options = get_client_options().await.clone(); + if topology_is_sharded().await { options.hosts.drain(1..); - Client::for_test().options(options).await - } else { - client - }; + } + let client = Client::for_test().options(options).await; let bucket = client.database("upload_stream_errors").gridfs_bucket(None); bucket.drop().await.unwrap(); @@ -219,7 +218,7 @@ async fn upload_stream_errors() { upload_stream.abort().await.unwrap(); assert_closed(&bucket, upload_stream).await; - if !client.supports_fail_command() { + if !fail_command_supported().await { return; } diff --git a/src/test/spec/initial_dns_seedlist_discovery.rs b/src/test/spec/initial_dns_seedlist_discovery.rs index 44f1231e9..4bb2ac2e2 100644 --- a/src/test/spec/initial_dns_seedlist_discovery.rs +++ b/src/test/spec/initial_dns_seedlist_discovery.rs @@ -7,7 +7,14 @@ use crate::{ client::Client, options::{ClientOptions, ResolverConfig, ServerAddress}, srv::{DomainMismatch, LookupHosts}, - test::{get_client_options, log_uncaptured, run_spec_test}, + test::{ + get_client_options, + log_uncaptured, + run_spec_test, + topology_is_load_balanced, + topology_is_replica_set, + topology_is_sharded, + }, }; #[derive(Debug, Deserialize)] @@ -188,15 +195,15 @@ async fn run_test(mut test_file: TestFile) { #[tokio::test] async fn replica_set() { - let client = Client::for_test().await; - let skip = - if client.is_replica_set() && client.options().repl_set_name.as_deref() != Some("repl0") { - Some("repl_set_name != repl0") - } else if !client.is_replica_set() { - Some("not a replica set") - } else { - None - }; + let skip = if topology_is_replica_set().await + && get_client_options().await.repl_set_name.as_deref() != Some("repl0") + { + Some("repl_set_name != repl0") + } else if !topology_is_replica_set().await { + Some("not a replica set") + } else { + None + }; if let Some(skip) = skip { log_uncaptured(format!( "skipping initial_dns_seedlist_discovery::replica_set due to unmet topology \ @@ -211,8 +218,7 @@ async fn replica_set() { #[tokio::test] async fn load_balanced() { - let client = Client::for_test().await; - if !client.is_load_balanced() { + if !topology_is_load_balanced().await { log_uncaptured( "skipping initial_dns_seedlist_discovery::load_balanced due to unmet topology \ requirement (not a load balanced cluster)", @@ -228,8 +234,7 @@ async fn load_balanced() { #[tokio::test] async fn sharded() { - let client = Client::for_test().await; - if !client.is_sharded() { + if !topology_is_sharded().await { log_uncaptured( "skipping initial_dns_seedlist_discovery::sharded due to unmet topology requirement \ (not a sharded cluster)", diff --git a/src/test/spec/retryable_reads.rs b/src/test/spec/retryable_reads.rs index 1c650cb86..a86d89118 100644 --- a/src/test/spec/retryable_reads.rs +++ b/src/test/spec/retryable_reads.rs @@ -10,9 +10,13 @@ use crate::{ }, runtime::{self, AsyncJoinHandle}, test::{ + block_connection_supported, + fail_command_supported, get_client_options, log_uncaptured, spec::unified_runner::run_unified_tests, + topology_is_load_balanced, + topology_is_sharded, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -31,16 +35,17 @@ async fn run_unified() { /// pool before the second attempt. #[tokio::test(flavor = "multi_thread")] async fn retry_releases_connection() { + if !fail_command_supported().await { + log_uncaptured("skipping retry_releases_connection due to failCommand not being supported"); + return; + } + let mut client_options = get_client_options().await.clone(); client_options.hosts.drain(1..); client_options.retry_reads = Some(true); client_options.max_pool_size = Some(1); let client = Client::for_test().options(client_options).await; - if !client.supports_fail_command() { - log_uncaptured("skipping retry_releases_connection due to failCommand not being supported"); - return; - } let collection = client .database("retry_releases_connection") @@ -65,6 +70,17 @@ async fn retry_releases_connection() { /// Prose test from retryable reads spec verifying that PoolClearedErrors are retried. #[tokio::test(flavor = "multi_thread")] async fn retry_read_pool_cleared() { + if !block_connection_supported().await { + log_uncaptured( + "skipping retry_read_pool_cleared due to blockConnection not being supported", + ); + return; + } + if topology_is_load_balanced().await { + log_uncaptured("skipping retry_read_pool_cleared due to load-balanced topology"); + return; + } + let buffer = EventBuffer::new(); let mut client_options = get_client_options().await.clone(); @@ -78,16 +94,6 @@ async fn retry_read_pool_cleared() { } let client = Client::for_test().options(client_options.clone()).await; - if !client.supports_block_connection() { - log_uncaptured( - "skipping retry_read_pool_cleared due to blockConnection not being supported", - ); - return; - } - if client.is_load_balanced() { - log_uncaptured("skipping retry_read_pool_cleared due to load-balanced topology"); - return; - } let collection = client .database("retry_read_pool_cleared") @@ -153,8 +159,12 @@ async fn retry_read_pool_cleared() { // Retryable Reads Are Retried on a Different mongos if One is Available #[tokio::test(flavor = "multi_thread")] async fn retry_read_different_mongos() { + if !fail_command_supported().await { + log_uncaptured("skipping retry_read_different_mongos: requires failCommand"); + return; + } let mut client_options = get_client_options().await.clone(); - if client_options.repl_set_name.is_some() || client_options.hosts.len() < 2 { + if !(topology_is_sharded().await && client_options.hosts.len() >= 2) { log_uncaptured( "skipping retry_read_different_mongos: requires sharded cluster with at least two \ hosts", @@ -170,10 +180,6 @@ async fn retry_read_different_mongos() { opts.hosts.remove(ix); opts.direct_connection = Some(true); let client = Client::for_test().options(opts).await; - if !client.supports_fail_command() { - log_uncaptured("skipping retry_read_different_mongos: requires failCommand"); - return; - } let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1)) .error_code(6) @@ -212,12 +218,11 @@ async fn retry_read_different_mongos() { // Retryable Reads Are Retried on the Same mongos if No Others are Available #[tokio::test(flavor = "multi_thread")] async fn retry_read_same_mongos() { - let init_client = Client::for_test().await; - if !init_client.supports_fail_command() { + if !fail_command_supported().await { log_uncaptured("skipping retry_read_same_mongos: requires failCommand"); return; } - if !init_client.is_sharded() { + if !topology_is_sharded().await { log_uncaptured("skipping retry_read_same_mongos: requires sharded cluster"); return; } diff --git a/src/test/spec/retryable_writes.rs b/src/test/spec/retryable_writes.rs index 2b7cbf85c..9c12b367f 100644 --- a/src/test/spec/retryable_writes.rs +++ b/src/test/spec/retryable_writes.rs @@ -1,7 +1,6 @@ use std::{sync::Arc, time::Duration}; use bson::Bson; -use semver::VersionReq; use tokio::sync::Mutex; use crate::{ @@ -11,12 +10,19 @@ use crate::{ cmap::{CmapEvent, ConnectionCheckoutFailedReason}, command::CommandEvent, }, - runtime, - runtime::{spawn, AcknowledgedMessage, AsyncJoinHandle}, + runtime::{self, spawn, AcknowledgedMessage, AsyncJoinHandle}, test::{ + block_connection_supported, + fail_command_supported, get_client_options, log_uncaptured, + server_version_gt, + server_version_lt, spec::unified_runner::run_unified_tests, + topology_is_load_balanced, + topology_is_replica_set, + topology_is_sharded, + topology_is_standalone, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -38,14 +44,12 @@ async fn run_unified() { #[tokio::test] #[function_name::named] async fn mmapv1_error_raised() { - let client = Client::for_test().await; - - let req = semver::VersionReq::parse("<=4.0").unwrap(); - if !req.matches(&client.server_version) || !client.is_replica_set() { + if server_version_gt(4, 0).await || !topology_is_replica_set().await { log_uncaptured("skipping mmapv1_error_raised due to test topology"); return; } + let client = Client::for_test().await; let coll = client.init_db_and_coll(function_name!(), "coll").await; let server_status = client @@ -88,6 +92,11 @@ async fn label_not_added_second_read_error() { #[function_name::named] async fn label_not_added(retry_reads: bool) { + if !fail_command_supported().await { + log_uncaptured("skipping label_not_added due to fail command unsupported"); + return; + } + let mut options = get_client_options().await.clone(); options.retry_reads = Some(retry_reads); let client = Client::for_test() @@ -95,18 +104,6 @@ async fn label_not_added(retry_reads: bool) { .use_single_mongos() .await; - // Configuring a failpoint is only supported on 4.0+ replica sets and 4.1.5+ sharded clusters. - let req = VersionReq::parse(">=4.0").unwrap(); - let sharded_req = VersionReq::parse(">=4.1.5").unwrap(); - if client.is_sharded() && !sharded_req.matches(&client.server_version) - || !req.matches(&client.server_version) - { - log_uncaptured( - "skipping label_not_added due to unsupported replica set or sharded cluster version", - ); - return; - } - let coll = client .init_db_and_coll(&format!("{}{}", function_name!(), retry_reads), "coll") .await; @@ -133,6 +130,21 @@ async fn label_not_added(retry_reads: bool) { /// Prose test from retryable writes spec verifying that PoolClearedErrors are retried. #[tokio::test(flavor = "multi_thread")] async fn retry_write_pool_cleared() { + if topology_is_standalone().await { + log_uncaptured("skipping retry_write_pool_cleared due standalone topology"); + return; + } + if topology_is_load_balanced().await { + log_uncaptured("skipping retry_write_pool_cleared due to load-balanced topology"); + return; + } + if !block_connection_supported().await { + log_uncaptured( + "skipping retry_write_pool_cleared due to blockConnection not being supported", + ); + return; + } + let buffer = EventBuffer::new(); let mut client_options = get_client_options().await.clone(); @@ -146,22 +158,6 @@ async fn retry_write_pool_cleared() { } let client = Client::for_test().options(client_options.clone()).await; - if !client.supports_block_connection() { - log_uncaptured( - "skipping retry_write_pool_cleared due to blockConnection not being supported", - ); - return; - } - - if client.is_standalone() { - log_uncaptured("skipping retry_write_pool_cleared due standalone topology"); - return; - } - - if client.is_load_balanced() { - log_uncaptured("skipping retry_write_pool_cleared due to load-balanced topology"); - return; - } let collection = client .database("retry_write_pool_cleared") @@ -228,6 +224,11 @@ async fn retry_write_pool_cleared() { /// encountering a WriteConcernError with a RetryableWriteError label. #[tokio::test(flavor = "multi_thread")] async fn retry_write_retryable_write_error() { + if !topology_is_replica_set().await || server_version_lt(6, 0).await { + log_uncaptured("skipping retry_write_retryable_write_error: invalid topology"); + return; + } + let mut client_options = get_client_options().await.clone(); client_options.retry_writes = Some(true); let (event_tx, event_rx) = tokio::sync::mpsc::channel::>(1); @@ -281,11 +282,6 @@ async fn retry_write_retryable_write_error() { let client = Client::for_test().options(client_options).await; *listener_client.lock().await = Some(client.clone()); - if !client.is_replica_set() || client.server_version_lt(6, 0) { - log_uncaptured("skipping retry_write_retryable_write_error: invalid topology"); - return; - } - let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1)) .write_concern_error(doc! { "code": 91, @@ -307,8 +303,12 @@ async fn retry_write_retryable_write_error() { // Test that in a sharded cluster writes are retried on a different mongos if one available #[tokio::test(flavor = "multi_thread")] async fn retry_write_different_mongos() { + if !fail_command_supported().await { + log_uncaptured("skipping retry_write_different_mongos: requires failCommand"); + return; + } let mut client_options = get_client_options().await.clone(); - if client_options.repl_set_name.is_some() || client_options.hosts.len() < 2 { + if !(topology_is_sharded().await && client_options.hosts.len() >= 2) { log_uncaptured( "skipping retry_write_different_mongos: requires sharded cluster with at least two \ hosts", @@ -324,10 +324,6 @@ async fn retry_write_different_mongos() { opts.hosts.remove(ix); opts.direct_connection = Some(true); let client = Client::for_test().options(opts).await; - if !client.supports_fail_command() { - log_uncaptured("skipping retry_write_different_mongos: requires failCommand"); - return; - } let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1)) .error_code(6) @@ -367,12 +363,11 @@ async fn retry_write_different_mongos() { // Retryable Reads Are Retried on the Same mongos if No Others are Available #[tokio::test(flavor = "multi_thread")] async fn retry_write_same_mongos() { - let init_client = Client::for_test().await; - if !init_client.supports_fail_command() { + if !fail_command_supported().await { log_uncaptured("skipping retry_write_same_mongos: requires failCommand"); return; } - if !init_client.is_sharded() { + if !topology_is_sharded().await { log_uncaptured("skipping retry_write_same_mongos: requires sharded cluster"); return; } diff --git a/src/test/spec/sdam.rs b/src/test/spec/sdam.rs index f1db4f463..b4240ac91 100644 --- a/src/test/spec/sdam.rs +++ b/src/test/spec/sdam.rs @@ -7,9 +7,12 @@ use crate::{ hello::LEGACY_HELLO_COMMAND_NAME, runtime, test::{ + block_connection_supported, get_client_options, log_uncaptured, spec::unified_runner::run_unified_tests, + streaming_monitor_protocol_supported, + topology_is_load_balanced, util::{ event_buffer::EventBuffer, fail_point::{FailPoint, FailPointMode}, @@ -45,8 +48,7 @@ async fn run_unified() { /// Streaming protocol prose test 1 from SDAM spec tests. #[tokio::test(flavor = "multi_thread")] async fn streaming_min_heartbeat_frequency() { - let test_client = Client::for_test().await; - if test_client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping streaming_min_heartbeat_frequency due to load balanced topology"); return; } @@ -96,8 +98,7 @@ async fn streaming_min_heartbeat_frequency() { /// Variant of the previous prose test that checks for a non-minHeartbeatFrequencyMS value. #[tokio::test(flavor = "multi_thread")] async fn heartbeat_frequency_is_respected() { - let test_client = Client::for_test().await; - if test_client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping streaming_min_heartbeat_frequency due to load balanced topology"); return; } @@ -147,20 +148,17 @@ async fn heartbeat_frequency_is_respected() { /// RTT prose test 1 from SDAM spec tests. #[tokio::test(flavor = "multi_thread")] async fn rtt_is_updated() { - let test_client = Client::for_test().await; - if !test_client.supports_streaming_monitoring_protocol() { + if !streaming_monitor_protocol_supported().await { log_uncaptured( "skipping rtt_is_updated due to not supporting streaming monitoring protocol", ); return; } - - if test_client.is_load_balanced() { + if topology_is_load_balanced().await { log_uncaptured("skipping rtt_is_updated due to load balanced topology"); return; } - - if test_client.supports_block_connection() { + if !block_connection_supported().await { log_uncaptured("skipping rtt_is_updated due to not supporting block_connection"); return; } diff --git a/src/test/spec/sessions.rs b/src/test/spec/sessions.rs index a7b7a6c66..798d13a99 100644 --- a/src/test/spec/sessions.rs +++ b/src/test/spec/sessions.rs @@ -13,15 +13,19 @@ use crate::{ bson::{doc, Document}, error::{ErrorKind, Result}, event::command::{CommandEvent, CommandStartedEvent}, - test::{get_client_options, spec::unified_runner::run_unified_tests}, + test::{ + get_client_options, + server_version_gte, + spec::unified_runner::run_unified_tests, + topology_is_sharded, + }, Client, }; #[tokio::test(flavor = "multi_thread")] async fn run_unified() { let mut skipped_files = vec![]; - let client = Client::for_test().await; - if client.is_sharded() && client.server_version_gte(7, 0) { + if topology_is_sharded().await && server_version_gte(7, 0).await { // TODO RUST-1666: unskip this file skipped_files.push("snapshot-sessions.json"); } diff --git a/src/test/spec/sessions/sessions_not_supported.rs b/src/test/spec/sessions/sessions_not_supported.rs index c380b2aa7..387503fb1 100644 --- a/src/test/spec/sessions/sessions_not_supported.rs +++ b/src/test/spec/sessions/sessions_not_supported.rs @@ -6,17 +6,14 @@ use crate::{ error::ErrorKind, event::command::CommandEvent, runtime::process::Process, - test::{log_uncaptured, util::Event, EventClient}, + test::{log_uncaptured, server_version_lt, util::Event, EventClient}, Client, }; async fn spawn_mongocryptd(name: &str) -> Option<(EventClient, Process)> { - let util_client = Client::for_test().await; - // TODO RUST-1447: unskip on 8.1+ - if util_client.server_version_lt(4, 2) || util_client.server_version_gte(8, 1) { + if server_version_lt(4, 2).await { log_uncaptured(format!( - "Skipping {name}: cannot spawn mongocryptd due to server version < 4.2 or server \ - version >= 8.1" + "Skipping {name}: cannot spawn mongocryptd due to server version < 4.2" )); return None; } @@ -29,7 +26,8 @@ async fn spawn_mongocryptd(name: &str) -> Option<(EventClient, Process)> { .await .unwrap(); let client = Client::for_test().options(options).monitor_events().await; - assert!(client.server_info.logical_session_timeout_minutes.is_none()); + let hello_response = client.hello().await.unwrap(); + assert!(hello_response.logical_session_timeout_minutes.is_none()); Some((client, process)) } diff --git a/src/test/spec/trace.rs b/src/test/spec/trace.rs index debce66ff..ce2385de7 100644 --- a/src/test/spec/trace.rs +++ b/src/test/spec/trace.rs @@ -24,6 +24,7 @@ use crate::{ get_client_options, log_uncaptured, spec::unified_runner::run_unified_tests, + topology_is_standalone, DEFAULT_GLOBAL_TRACING_HANDLER, SERVER_API, }, @@ -154,12 +155,9 @@ async fn command_logging_truncation_explicit_limit() { /// Prose test 3: mid-codepoint truncation #[tokio::test] async fn command_logging_truncation_mid_codepoint() { - let mut client_opts = get_client_options().await.clone(); - client_opts.tracing_max_document_length_bytes = Some(215); - let client = Client::for_test().options(client_opts).await; // On non-standalone topologies the command includes a clusterTime and so gets truncated // differently. - if !client.is_standalone() { + if !topology_is_standalone().await { log_uncaptured("Skipping test due to incompatible topology type"); return; } @@ -169,6 +167,10 @@ async fn command_logging_truncation_mid_codepoint() { return; } + let mut client_opts = get_client_options().await.clone(); + client_opts.tracing_max_document_length_bytes = Some(215); + let client = Client::for_test().options(client_opts).await; + let coll = client.init_db_and_coll("tracing_test", "truncation").await; let _levels_guard = DEFAULT_GLOBAL_TRACING_HANDLER.set_levels(HashMap::from([( diff --git a/src/test/spec/transactions.rs b/src/test/spec/transactions.rs index ce67ca5d8..d1b4a1901 100644 --- a/src/test/spec/transactions.rs +++ b/src/test/spec/transactions.rs @@ -9,7 +9,10 @@ use crate::{ test::{ get_client_options, log_uncaptured, + server_version_lt, spec::unified_runner::run_unified_tests, + topology_is_sharded, + transactions_supported, util::fail_point::{FailPoint, FailPointMode}, }, Client, @@ -36,6 +39,11 @@ async fn run_unified_convenient_api() { #[tokio::test(flavor = "multi_thread")] #[function_name::named] async fn deserialize_recovery_token() { + if !topology_is_sharded().await || server_version_lt(4, 2).await { + log_uncaptured("skipping deserialize_recovery_token due to test topology"); + return; + } + #[derive(Debug, Serialize)] struct A { num: i32, @@ -47,10 +55,6 @@ async fn deserialize_recovery_token() { } let client = Client::for_test().await; - if !client.is_sharded() || client.server_version_lt(4, 2) { - log_uncaptured("skipping deserialize_recovery_token due to test topology"); - return; - } let mut session = client.start_session().await.unwrap(); @@ -86,11 +90,12 @@ async fn deserialize_recovery_token() { #[tokio::test] async fn convenient_api_custom_error() { - let client = Client::for_test().monitor_events().await; - if !client.supports_transactions() { + if !transactions_supported().await { log_uncaptured("Skipping convenient_api_custom_error: no transaction support."); return; } + + let client = Client::for_test().monitor_events().await; let mut session = client.start_session().await.unwrap(); let coll = client .database("test_convenient") @@ -118,11 +123,12 @@ async fn convenient_api_custom_error() { #[tokio::test] async fn convenient_api_returned_value() { - let client = Client::for_test().monitor_events().await; - if !client.supports_transactions() { + if !transactions_supported().await { log_uncaptured("Skipping convenient_api_returned_value: no transaction support."); return; } + + let client = Client::for_test().monitor_events().await; let mut session = client.start_session().await.unwrap(); let coll = client .database("test_convenient") @@ -145,11 +151,12 @@ async fn convenient_api_returned_value() { #[tokio::test] async fn convenient_api_retry_timeout_callback() { - let client = Client::for_test().monitor_events().await; - if !client.supports_transactions() { + if !transactions_supported().await { log_uncaptured("Skipping convenient_api_retry_timeout_callback: no transaction support."); return; } + + let client = Client::for_test().monitor_events().await; let mut session = client.start_session().await.unwrap(); session.convenient_transaction_timeout = Some(Duration::ZERO); let coll = client @@ -176,19 +183,20 @@ async fn convenient_api_retry_timeout_callback() { #[tokio::test(flavor = "multi_thread")] async fn convenient_api_retry_timeout_commit_unknown() { + if !transactions_supported().await { + log_uncaptured( + "Skipping convenient_api_retry_timeout_commit_unknown: no transaction support.", + ); + return; + } + let mut options = get_client_options().await.clone(); - if Client::for_test().await.is_sharded() { + if topology_is_sharded().await { options.direct_connection = Some(true); options.hosts.drain(1..); } let client = Client::for_test().options(options).monitor_events().await; - if !client.supports_transactions() { - log_uncaptured( - "Skipping convenient_api_retry_timeout_commit_unknown: no transaction support.", - ); - return; - } let mut session = client.start_session().await.unwrap(); session.convenient_transaction_timeout = Some(Duration::ZERO); let coll = client @@ -217,19 +225,20 @@ async fn convenient_api_retry_timeout_commit_unknown() { #[tokio::test(flavor = "multi_thread")] async fn convenient_api_retry_timeout_commit_transient() { + if !transactions_supported().await { + log_uncaptured( + "Skipping convenient_api_retry_timeout_commit_transient: no transaction support.", + ); + return; + } + let mut options = get_client_options().await.clone(); - if Client::for_test().await.is_sharded() { + if topology_is_sharded().await { options.direct_connection = Some(true); options.hosts.drain(1..); } let client = Client::for_test().options(options).monitor_events().await; - if !client.supports_transactions() { - log_uncaptured( - "Skipping convenient_api_retry_timeout_commit_transient: no transaction support.", - ); - return; - } let mut session = client.start_session().await.unwrap(); session.convenient_transaction_timeout = Some(Duration::ZERO); let coll = client diff --git a/src/test/spec/unified_runner/test_file.rs b/src/test/spec/unified_runner/test_file.rs index 95524cda5..8d68caf66 100644 --- a/src/test/spec/unified_runner/test_file.rs +++ b/src/test/spec/unified_runner/test_file.rs @@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::HashMap, fmt::Write, sync::Arc, time::Durati use percent_encoding::NON_ALPHANUMERIC; use pretty_assertions::assert_eq; use regex::Regex; -use semver::{Version, VersionReq}; +use semver::Version; use serde::{Deserialize, Deserializer}; use tokio::sync::oneshot; @@ -28,7 +28,15 @@ use crate::{ WriteConcern, }, serde_util, - test::{Event, Serverless, TestClient, DEFAULT_URI}, + test::{ + auth_enabled, + get_server_parameters, + get_topology, + server_version_matches, + Event, + Serverless, + DEFAULT_URI, + }, }; #[derive(Debug, Deserialize)] @@ -91,38 +99,31 @@ pub(crate) enum Topology { } impl RunOnRequirement { - pub(crate) async fn can_run_on(&self, client: &TestClient) -> Result<(), String> { + pub(crate) async fn can_run_on(&self) -> Result<(), String> { if let Some(ref min_version) = self.min_server_version { - let req = VersionReq::parse(&format!(">= {}", &min_version)).unwrap(); - if !req.matches(&client.server_version) { - return Err(format!( - "min server version {:?}, actual {:?}", - min_version, client.server_version - )); + if !server_version_matches(&format!(">= {min_version}")).await { + return Err(format!("does not match min server version: {min_version}")); } } if let Some(ref max_version) = self.max_server_version { - let req = VersionReq::parse(&format!("<= {}", &max_version)).unwrap(); - if !req.matches(&client.server_version) { - return Err(format!( - "max server version {:?}, actual {:?}", - max_version, client.server_version - )); + if !server_version_matches(&format!("<= {max_version}")).await { + return Err(format!("does not match max server version: {max_version}")); } } if let Some(ref topologies) = self.topologies { - let client_topology = client.topology(); - if !topologies.contains(&client_topology) { + let client_topology = get_topology().await; + if !topologies.contains(client_topology) { return Err(format!( "allowed topologies {:?}, actual: {:?}", topologies, client_topology )); } } - if let Some(ref actual_server_parameters) = self.server_parameters { + if let Some(ref required_server_parameters) = self.server_parameters { + let actual_server_parameters = get_server_parameters().await; if results_match( - Some(&Bson::Document(client.server_parameters.clone())), - &Bson::Document(actual_server_parameters.clone()), + Some(&Bson::Document(actual_server_parameters.clone())), + &Bson::Document(required_server_parameters.clone()), false, None, ) @@ -130,7 +131,7 @@ impl RunOnRequirement { { return Err(format!( "required server parameters {:?}, actual {:?}", - actual_server_parameters, client.server_parameters + required_server_parameters, actual_server_parameters )); } } @@ -140,7 +141,7 @@ impl RunOnRequirement { } } if let Some(ref auth) = self.auth { - if *auth != client.auth_enabled() { + if *auth != auth_enabled().await { return Err("requires auth".to_string()); } } diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index b966fdf33..00c261e43 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -15,11 +15,13 @@ use crate::{ test::{ get_client_options, log_uncaptured, + server_version_lte, spec::unified_runner::{ entity::EventList, matcher::events_match, test_file::{ExpectedEventType, TestFile}, }, + topology_is_sharded, update_options_for_testing, util::fail_point::FailPointGuard, TestClient, @@ -121,7 +123,7 @@ impl TestRunner { let mut can_run_on = false; let mut run_on_errors = vec![]; for requirement in requirements { - match requirement.can_run_on(&self.internal_client).await { + match requirement.can_run_on().await { Ok(()) => can_run_on = true, Err(e) => run_on_errors.push(e), } @@ -184,7 +186,7 @@ impl TestRunner { let mut can_run_on = false; let mut run_on_errors = vec![]; for requirement in requirements { - match requirement.can_run_on(&self.internal_client).await { + match requirement.can_run_on().await { Ok(()) => can_run_on = true, Err(e) => run_on_errors.push(e), } @@ -230,8 +232,8 @@ impl TestRunner { // test runners MUST execute a non-transactional distinct command on // each mongos server before running any test that might execute distinct within a // transaction. - if self.internal_client.is_sharded() - && self.internal_client.server_version_lte(4, 2) + if topology_is_sharded().await + && server_version_lte(4, 2).await && test_case.operations.iter().any(|op| op.name == "distinct") { self.internal_client.disable_command_events(true); @@ -499,7 +501,7 @@ impl TestRunner { options.server_api = server_api; - if client.use_multiple_mongoses() && Client::for_test().await.is_sharded() { + if client.use_multiple_mongoses() && topology_is_sharded().await { assert!( options.hosts.len() > 1, "[{}]: Test requires multiple mongos hosts", diff --git a/src/test/spec/v2_runner.rs b/src/test/spec/v2_runner.rs index c21c41501..ca940e5ce 100644 --- a/src/test/spec/v2_runner.rs +++ b/src/test/spec/v2_runner.rs @@ -7,7 +7,6 @@ pub(crate) mod test_file; use std::{future::IntoFuture, sync::Arc, time::Duration}; use futures::{future::BoxFuture, FutureExt}; -use semver::VersionReq; use crate::{ bson::{doc, from_bson}, @@ -20,7 +19,10 @@ use crate::{ file_level_log, get_client_options, log_uncaptured, + server_version_gte, + server_version_lte, spec::deserialize_spec_tests, + topology_is_sharded, util::{ fail_point::{FailPoint, FailPointGuard}, get_default_name, @@ -106,13 +108,17 @@ impl FileContext { } } - fn check_topology(&self, test_file: &TestFile) -> bool { + async fn check_topology(&self, test_file: &TestFile) -> bool { if let Some(requirements) = &test_file.run_on { - return requirements - .iter() - .any(|run_on| run_on.can_run_on(&self.internal_client)); + for requirement in requirements { + if requirement.can_run_on().await { + return true; + } + } + false + } else { + true } - true } } @@ -153,10 +159,9 @@ impl TestContext { if let Some(enc_fields) = &test_file.encrypted_fields { options.encrypted_fields = Some(enc_fields.clone()); } - let req = VersionReq::parse(">=4.7").unwrap(); if !(db_name.as_str() == "admin" - && internal_client.is_sharded() - && req.matches(&internal_client.server_version)) + && topology_is_sharded().await + && server_version_gte(4, 7).await) { coll.drop().with_options(options).await.unwrap(); } @@ -219,8 +224,8 @@ impl TestContext { let client = builder.monitor_events().await; // TODO RUST-900: Remove this extraneous call. - if internal_client.is_sharded() - && internal_client.server_version_lte(4, 2) + if topology_is_sharded().await + && server_version_lte(4, 2).await && test.operations.iter().any(|op| op.name == "distinct") { for server_address in internal_client.options().hosts.clone() { @@ -479,7 +484,7 @@ async fn run_v2_test(path: std::path::PathBuf, test_file: TestFile) { file_level_log(format!("Running tests from {}", path.display(),)); - if !file_ctx.check_topology(&test_file) { + if !file_ctx.check_topology(&test_file).await { log_uncaptured("Client topology not compatible with test"); return; } diff --git a/src/test/spec/v2_runner/test_file.rs b/src/test/spec/v2_runner/test_file.rs index d67a6c925..c8d98c63b 100644 --- a/src/test/spec/v2_runner/test_file.rs +++ b/src/test/spec/v2_runner/test_file.rs @@ -2,18 +2,19 @@ use std::collections::HashMap; use bson::{doc, from_document, Bson}; use futures::TryStreamExt; -use semver::VersionReq; use serde::{Deserialize, Deserializer}; use crate::{ bson::Document, options::{ReadPreference, SelectionCriteria, SessionOptions}, test::{ + get_topology, log_uncaptured, + server_version_matches, spec::merge_uri_options, util::{fail_point::FailPoint, is_expected_type}, Serverless, - TestClient, + Topology, DEFAULT_URI, }, Client, @@ -45,38 +46,34 @@ pub(crate) struct TestFile { pub(crate) struct RunOn { pub(crate) min_server_version: Option, pub(crate) max_server_version: Option, - pub(crate) topology: Option>, + pub(crate) topology: Option>, pub(crate) serverless: Option, } impl RunOn { - pub(crate) fn can_run_on(&self, client: &TestClient) -> bool { + pub(crate) async fn can_run_on(&self) -> bool { if let Some(ref min_version) = self.min_server_version { - let req = VersionReq::parse(&format!(">= {}", &min_version)).unwrap(); - if !req.matches(&client.server_version) { + if !server_version_matches(&format!(">= {min_version}")).await { log_uncaptured(format!( - "runOn mismatch: required server version >= {}, got {}", - min_version, client.server_version + "runOn mismatch: required server version >= {min_version}", )); return false; } } if let Some(ref max_version) = self.max_server_version { - let req = VersionReq::parse(&format!("<= {}", &max_version)).unwrap(); - if !req.matches(&client.server_version) { + if !server_version_matches(&format!("<= {max_version}")).await { log_uncaptured(format!( - "runOn mismatch: required server version <= {}, got {}", - max_version, client.server_version + "runOn mismatch: required server version <= {max_version}", )); return false; } } if let Some(ref topology) = self.topology { - if !topology.contains(&client.topology_string()) { + let actual_topology = get_topology().await; + if !topology.contains(actual_topology) { log_uncaptured(format!( "runOn mismatch: required topology in {:?}, got {:?}", - topology, - client.topology_string() + topology, actual_topology )); return false; } diff --git a/src/test/spec/write_error.rs b/src/test/spec/write_error.rs index afedc208a..77ff395ef 100644 --- a/src/test/spec/write_error.rs +++ b/src/test/spec/write_error.rs @@ -1,21 +1,21 @@ use crate::{ bson::{doc, Document}, error::{ErrorKind, WriteFailure}, - test::log_uncaptured, + test::{log_uncaptured, server_version_lt}, Client, Collection, }; #[tokio::test] async fn details() { - let client = Client::for_test().monitor_events().await; - - if client.server_version_lt(5, 0) { + if server_version_lt(5, 0).await { // SERVER-58399 log_uncaptured("skipping write_error_details test due to server version"); return; } + let client = Client::for_test().monitor_events().await; + let db = client.database("write_error_details"); db.drop().await.unwrap(); db.create_collection("test") diff --git a/src/test/timeseries.rs b/src/test/timeseries.rs index 14a30d1a7..2405e1af0 100644 --- a/src/test/timeseries.rs +++ b/src/test/timeseries.rs @@ -1,17 +1,22 @@ use bson::doc; use futures::TryStreamExt; -use crate::{db::options::TimeseriesOptions, test::log_uncaptured, Client}; +use crate::{ + db::options::TimeseriesOptions, + test::{log_uncaptured, server_version_lt}, + Client, +}; type Result = anyhow::Result; #[tokio::test] async fn list_collections_timeseries() -> Result<()> { - let client = Client::for_test().await; - if client.server_version_lt(5, 0) { + if server_version_lt(5, 0).await { log_uncaptured("Skipping list_collections_timeseries: timeseries require server >= 5.0"); return Ok(()); } + + let client = Client::for_test().await; let db = client.database("list_collections_timeseries"); db.drop().await?; db.create_collection("test") diff --git a/src/test/util.rs b/src/test/util.rs index f5572eb95..4453ff0ed 100644 --- a/src/test/util.rs +++ b/src/test/util.rs @@ -8,18 +8,16 @@ mod trace; use std::{env, fmt::Debug, fs::File, future::IntoFuture, io::Write, time::Duration}; use futures::FutureExt; -use semver::{Version, VersionReq}; use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "in-use-encryption")] use crate::client::EncryptedClientBuilder; use crate::{ bson::{doc, Bson, Document}, - client::options::ServerAddress, error::Result, hello::{hello_command, HelloCommandResponse}, options::{AuthMechanism, ClientOptions, CollectionOptions, CreateCollectionOptions}, - test::{get_client_options, Topology}, + test::{get_client_options, server_version_gte, topology_is_sharded}, BoxFuture, Client, Collection, @@ -40,9 +38,6 @@ pub(crate) use self::{ #[derive(Clone, Debug)] pub(crate) struct TestClient { client: Client, - pub(crate) server_info: HelloCommandResponse, - pub(crate) server_version: Version, - pub(crate) server_parameters: Document, } impl std::ops::Deref for TestClient { @@ -124,14 +119,8 @@ impl IntoFuture for TestClientBuilder { options.test_options_mut().min_heartbeat_freq = Some(freq); } - if self.use_single_mongos { - let tmp = TestClient::from_client( - Client::with_options(get_client_options().await.clone()).unwrap(), - ) - .await; - if tmp.is_sharded() { - options.hosts = options.hosts.iter().take(1).cloned().collect(); - } + if self.use_single_mongos && topology_is_sharded().await { + options.hosts = options.hosts.iter().take(1).cloned().collect(); } #[cfg(feature = "in-use-encryption")] @@ -145,50 +134,13 @@ impl IntoFuture for TestClientBuilder { #[cfg(not(feature = "in-use-encryption"))] let client = Client::with_options(options).unwrap(); - TestClient::from_client(client).await + TestClient { client } } .boxed() } } impl TestClient { - async fn from_client(client: Client) -> Self { - let hello = hello_command( - client.options().server_api.as_ref(), - client.options().load_balanced, - None, - None, - ); - let server_info_doc = client - .database("admin") - .run_command(hello.body.try_into().unwrap()) - .await - .unwrap(); - let server_info = bson::from_document(server_info_doc).unwrap(); - - let build_info = client - .database("test") - .run_command(doc! { "buildInfo": 1 }) - .await - .unwrap(); - let mut server_version = Version::parse(build_info.get_str("version").unwrap()).unwrap(); - // Clear prerelease tag to allow version comparisons. - server_version.pre = semver::Prerelease::EMPTY; - - let server_parameters = client - .database("admin") - .run_command(doc! { "getParameter": "*" }) - .await - .unwrap_or_default(); - - Self { - client, - server_info, - server_version, - server_parameters, - } - } - pub(crate) async fn create_user( &self, user: &str, @@ -203,7 +155,7 @@ impl TestClient { cmd.insert("pwd", pwd); } - if self.server_version_gte(4, 0) && !mechanisms.is_empty() { + if server_version_gte(4, 0).await && !mechanisms.is_empty() { let ms: bson::Array = mechanisms.iter().map(|s| Bson::from(s.as_str())).collect(); cmd.insert("mechanisms", ms); } @@ -266,136 +218,29 @@ impl TestClient { self.get_coll(db_name, coll_name) } - pub(crate) fn supports_fail_command(&self) -> bool { - let version = if self.is_sharded() { - ">= 4.1.5" - } else { - ">= 4.0" - }; - self.server_version_matches(version) - } - - pub(crate) fn server_version_matches(&self, req: &str) -> bool { - VersionReq::parse(req) - .unwrap() - .matches(&self.server_version) - } - - pub(crate) fn supports_block_connection(&self) -> bool { - self.server_version_matches(">= 4.2.9") - } - - /// Whether the deployment supports failing the initial handshake - /// only when it uses a specified appName. - /// - /// See SERVER-49336 for more info. - pub(crate) fn supports_fail_command_appname_initial_handshake(&self) -> bool { - let requirements = [ - VersionReq::parse(">= 4.2.15, < 4.3.0").unwrap(), - VersionReq::parse(">= 4.4.7, < 4.5.0").unwrap(), - VersionReq::parse(">= 4.9.0").unwrap(), - ]; - requirements - .iter() - .any(|req| req.matches(&self.server_version)) - } - - pub(crate) fn supports_transactions(&self) -> bool { - self.is_replica_set() && self.server_version_gte(4, 0) - || self.is_sharded() && self.server_version_gte(4, 2) - } - - pub(crate) fn supports_streaming_monitoring_protocol(&self) -> bool { - self.server_info.topology_version.is_some() - } - - pub(crate) fn auth_enabled(&self) -> bool { - self.client.options().credential.is_some() - } - - pub(crate) fn is_standalone(&self) -> bool { - self.topology() == Topology::Single - } - - pub(crate) fn is_replica_set(&self) -> bool { - self.topology() == Topology::ReplicaSet - } - - pub(crate) fn is_sharded(&self) -> bool { - self.topology() == Topology::Sharded - } - - pub(crate) fn is_load_balanced(&self) -> bool { - self.topology() == Topology::LoadBalanced - } - - pub(crate) fn server_version_eq(&self, major: u64, minor: u64) -> bool { - self.server_version.major == major && self.server_version.minor == minor - } - - #[allow(dead_code)] - pub(crate) fn server_version_gt(&self, major: u64, minor: u64) -> bool { - self.server_version.major > major - || (self.server_version.major == major && self.server_version.minor > minor) - } - - pub(crate) fn server_version_gte(&self, major: u64, minor: u64) -> bool { - self.server_version.major > major - || (self.server_version.major == major && self.server_version.minor >= minor) - } - - pub(crate) fn server_version_lt(&self, major: u64, minor: u64) -> bool { - self.server_version.major < major - || (self.server_version.major == major && self.server_version.minor < minor) - } - - #[allow(dead_code)] - pub(crate) fn server_version_lte(&self, major: u64, minor: u64) -> bool { - self.server_version.major < major - || (self.server_version.major == major && self.server_version.minor <= minor) - } - pub(crate) async fn drop_collection(&self, db_name: &str, coll_name: &str) { let coll = self.get_coll(db_name, coll_name); coll.drop().await.unwrap(); } - /// Returns the `Topology' that can be determined without a server query, i.e. all except - /// `Toplogy::ShardedReplicaSet`. - pub(crate) fn topology(&self) -> Topology { - if self.client.options().load_balanced.unwrap_or(false) { - return Topology::LoadBalanced; - } - if self.server_info.msg.as_deref() == Some("isdbgrid") { - return Topology::Sharded; - } - if self.server_info.set_name.is_some() { - return Topology::ReplicaSet; - } - Topology::Single - } - - pub(crate) fn topology_string(&self) -> String { - match self.topology() { - Topology::LoadBalanced => "load-balanced", - Topology::Sharded => "sharded", - Topology::ReplicaSet => "replicaset", - Topology::Single => "single", - } - .to_string() - } - - pub(crate) fn primary(&self) -> Option { - self.server_info - .primary - .as_ref() - .map(|s| ServerAddress::parse(s).unwrap()) - } - #[allow(dead_code)] pub(crate) fn into_client(self) -> Client { self.client } + + pub(crate) async fn hello(&self) -> Result { + let hello = hello_command( + self.options().server_api.as_ref(), + self.options().load_balanced, + None, + None, + ); + let hello_response_doc = self + .database("admin") + .run_command(hello.body.try_into()?) + .await?; + Ok(bson::from_document(hello_response_doc)?) + } } pub(crate) fn get_default_name(description: &str) -> String {