From 9011059780d4df4415c3d63498ad8e804318c5c4 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 14 Apr 2025 10:40:59 -0400 Subject: [PATCH 1/8] split topology update from retry --- src/client/executor.rs | 2 ++ src/operation.rs | 10 ++++++++++ src/operation/aggregate.rs | 10 +++++++++- src/operation/raw_output.rs | 4 ++++ src/operation/run_cursor_command.rs | 4 ++++ src/sdam/topology.rs | 1 - 6 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 17b5debd2..61abafca4 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -309,6 +309,8 @@ impl Client { let mut retry: Option = None; let mut implicit_session: Option = None; loop { + op.update_for_topology(&self.inner.topology.description()); + if retry.is_some() { op.update_for_retry(); } diff --git a/src/operation.rs b/src/operation.rs index f0af1b1f6..ddbae4116 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -52,6 +52,7 @@ use crate::{ WriteFailure, }, options::WriteConcern, + sdam::TopologyDescription, selection_criteria::SelectionCriteria, BoxFuture, ClientSession, @@ -148,6 +149,9 @@ pub(crate) trait Operation { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self); + /// Updates this operation based on server topology. + fn update_for_topology(&mut self, _topology: &TopologyDescription); + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>; fn name(&self) -> &str; @@ -235,6 +239,9 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self) {} + /// Updates this operation based on server topology. + fn update_for_topology(&mut self, _topology: &TopologyDescription) {} + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { None } @@ -287,6 +294,9 @@ where fn update_for_retry(&mut self) { self.update_for_retry() } + fn update_for_topology(&mut self, topology: &TopologyDescription) { + self.update_for_topology(topology) + } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.pinned_connection() } diff --git a/src/operation/aggregate.rs b/src/operation/aggregate.rs index dd7568523..29b97984d 100644 --- a/src/operation/aggregate.rs +++ b/src/operation/aggregate.rs @@ -7,7 +7,7 @@ use crate::{ cursor::CursorSpecification, error::Result, operation::{append_options, remove_empty_write_concern, Retryability}, - options::{AggregateOptions, SelectionCriteria, WriteConcern}, + options::{AggregateOptions, ReadPreference, SelectionCriteria, WriteConcern}, Namespace, }; @@ -109,6 +109,14 @@ impl OperationWithDefaults for Aggregate { )) } + fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { + match self.selection_criteria() { + None => return, + Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) => return, + _ => (), + } + } + fn selection_criteria(&self) -> Option<&SelectionCriteria> { self.options .as_ref() diff --git a/src/operation/raw_output.rs b/src/operation/raw_output.rs index ef725a26c..2573a9a4e 100644 --- a/src/operation/raw_output.rs +++ b/src/operation/raw_output.rs @@ -68,6 +68,10 @@ impl Operation for RawOutput { self.0.update_for_retry() } + fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { + self.0.update_for_topology(topology) + } + fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> { self.0.pinned_connection() } diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index b675aca16..2886d4b42 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -79,6 +79,10 @@ impl Operation for RunCursorCommand<'_> { self.run_command.update_for_retry() } + fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { + self.run_command.update_for_topology(topology) + } + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.run_command.pinned_connection() } diff --git a/src/sdam/topology.rs b/src/sdam/topology.rs index 94ee0c0bc..00b3b0143 100644 --- a/src/sdam/topology.rs +++ b/src/sdam/topology.rs @@ -229,7 +229,6 @@ impl Topology { self.watcher.peek_latest().servers() } - #[cfg(test)] pub(crate) fn description(&self) -> TopologyDescription { self.watcher.peek_latest().description.clone() } From f134b857afab8360e3e6d8338ef6cd18fc4bec71 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 14 Apr 2025 10:56:21 -0400 Subject: [PATCH 2/8] wip --- src/operation/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operation/aggregate.rs b/src/operation/aggregate.rs index 29b97984d..3efb754ee 100644 --- a/src/operation/aggregate.rs +++ b/src/operation/aggregate.rs @@ -109,7 +109,7 @@ impl OperationWithDefaults for Aggregate { )) } - fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { + fn update_for_topology(&mut self, _topology: &crate::sdam::TopologyDescription) { match self.selection_criteria() { None => return, Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) => return, From b2c696071d4987bed5d65645735644532965d567 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 16 Apr 2025 12:28:13 -0400 Subject: [PATCH 3/8] plumbing --- src/client.rs | 13 +++++++----- src/client/executor.rs | 19 +++++++----------- src/operation.rs | 11 +--------- src/operation/aggregate.rs | 31 +++++++++++++++++++++++------ src/operation/raw_output.rs | 4 ---- src/operation/run_cursor_command.rs | 4 ---- src/sdam/description/server.rs | 7 +++++++ src/sdam/topology.rs | 1 + 8 files changed, 49 insertions(+), 41 deletions(-) diff --git a/src/client.rs b/src/client.rs index b34b6edf8..555e203fc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -446,7 +446,7 @@ impl Client { &self, criteria: Option<&SelectionCriteria>, ) -> Result { - let server = self + let (server, _) = self .select_server(criteria, "Test select server", None) .await?; Ok(server.address.clone()) @@ -460,7 +460,7 @@ impl Client { #[allow(unused_variables)] // we only use the operation_name for tracing. operation_name: &str, deprioritized: Option<&ServerAddress>, - ) -> Result { + ) -> Result<(SelectedServer, SelectionCriteria)> { let criteria = criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary)); @@ -488,9 +488,12 @@ impl Client { let mut watcher = self.inner.topology.watch(); loop { let state = watcher.observe_latest(); - + for server in state.description.servers.values() { + eprintln!("at selection: {:?}", server.hello_response()); + } + let effective_criteria = criteria; // TODO let result = server_selection::attempt_to_select_server( - criteria, + effective_criteria, &state.description, &state.servers(), deprioritized, @@ -507,7 +510,7 @@ impl Client { #[cfg(feature = "tracing-unstable")] event_emitter.emit_succeeded_event(&state.description, &server); - return Ok(server); + return Ok((server, effective_criteria.clone())); } else { #[cfg(feature = "tracing-unstable")] if !emitted_waiting_message { diff --git a/src/client/executor.rs b/src/client/executor.rs index 61abafca4..79e7ba6b5 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -29,6 +29,7 @@ use crate::{ wire::{next_request_id, Message}, PinnedConnectionHandle, }, + Command, ConnectionPool, RawCommandResponse, StreamDescription, @@ -59,7 +60,7 @@ use crate::{ Retryability, }, options::{ChangeStreamOptions, SelectionCriteria}, - sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus}, + sdam::{HandshakePhase, ServerType, TopologyType, TransactionSupportStatus}, selection_criteria::ReadPreference, tracking_arc::TrackingArc, ClusterTime, @@ -309,7 +310,7 @@ impl Client { let mut retry: Option = None; let mut implicit_session: Option = None; loop { - op.update_for_topology(&self.inner.topology.description()); + //op.update_for_topology(&self.inner.topology.description()); if retry.is_some() { op.update_for_retry(); @@ -320,7 +321,7 @@ impl Client { .and_then(|s| s.transaction.pinned_mongos()) .or_else(|| op.selection_criteria()); - let server = match self + let (server, effective_criteria) = match self .select_server( selection_criteria, op.name(), @@ -328,7 +329,7 @@ impl Client { ) .await { - Ok(server) => server, + Ok(out) => out, Err(mut err) => { retry.first_error()?; @@ -394,13 +395,7 @@ impl Client { }; let details = match self - .execute_operation_on_connection( - op, - &mut conn, - &mut session, - txn_number, - retryability, - ) + .execute_command_on_connection(cmd, op, &mut conn, &mut session, retryability) .await { Ok(output) => ExecutionDetails { @@ -866,7 +861,7 @@ impl Client { (matches!(topology_type, TopologyType::Single) && server_type.is_available()) || server_type.is_data_bearing() })); - let _: SelectedServer = self + let _ = self .select_server(Some(&criteria), operation_name, None) .await?; Ok(()) diff --git a/src/operation.rs b/src/operation.rs index ddbae4116..b15314b83 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -52,7 +52,6 @@ use crate::{ WriteFailure, }, options::WriteConcern, - sdam::TopologyDescription, selection_criteria::SelectionCriteria, BoxFuture, ClientSession, @@ -77,6 +76,7 @@ pub(crate) use update::{Update, UpdateOrReplace}; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; const SERVER_4_4_0_WIRE_VERSION: i32 = 9; +const _SERVER_5_0_0_WIRE_VERSION: i32 = 13; const SERVER_8_0_0_WIRE_VERSION: i32 = 25; // The maximum number of bytes that may be included in a write payload when auto-encryption is // enabled. @@ -149,9 +149,6 @@ pub(crate) trait Operation { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self); - /// Updates this operation based on server topology. - fn update_for_topology(&mut self, _topology: &TopologyDescription); - fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>; fn name(&self) -> &str; @@ -239,9 +236,6 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self) {} - /// Updates this operation based on server topology. - fn update_for_topology(&mut self, _topology: &TopologyDescription) {} - fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { None } @@ -294,9 +288,6 @@ where fn update_for_retry(&mut self) { self.update_for_retry() } - fn update_for_topology(&mut self, topology: &TopologyDescription) { - self.update_for_topology(topology) - } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.pinned_connection() } diff --git a/src/operation/aggregate.rs b/src/operation/aggregate.rs index 3efb754ee..e6464d9f7 100644 --- a/src/operation/aggregate.rs +++ b/src/operation/aggregate.rs @@ -7,7 +7,7 @@ use crate::{ cursor::CursorSpecification, error::Result, operation::{append_options, remove_empty_write_concern, Retryability}, - options::{AggregateOptions, ReadPreference, SelectionCriteria, WriteConcern}, + options::{AggregateOptions, SelectionCriteria, WriteConcern}, Namespace, }; @@ -109,13 +109,32 @@ impl OperationWithDefaults for Aggregate { )) } - fn update_for_topology(&mut self, _topology: &crate::sdam::TopologyDescription) { - match self.selection_criteria() { - None => return, - Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) => return, - _ => (), + /* + fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { + eprintln!("aggregate: update_for_topology"); + if !self.is_out_or_merge() + || matches!( + self.selection_criteria(), + None | Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) + ) + || topology.topology_type() == TopologyType::LoadBalanced + { + eprintln!("aggregate: skipping topology update"); + return; + } + for server in topology.servers.values() { + let _ = dbg!(server.hello_response()); + if let Ok(Some(v)) = server.max_wire_version() { + if v < SERVER_5_0_0_WIRE_VERSION { + eprintln!("aggregate: updating topology"); + self.options.get_or_insert_default().selection_criteria = + Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)); + break; + } + } } } + */ fn selection_criteria(&self) -> Option<&SelectionCriteria> { self.options diff --git a/src/operation/raw_output.rs b/src/operation/raw_output.rs index 2573a9a4e..ef725a26c 100644 --- a/src/operation/raw_output.rs +++ b/src/operation/raw_output.rs @@ -68,10 +68,6 @@ impl Operation for RawOutput { self.0.update_for_retry() } - fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { - self.0.update_for_topology(topology) - } - fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> { self.0.pinned_connection() } diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index 2886d4b42..b675aca16 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -79,10 +79,6 @@ impl Operation for RunCursorCommand<'_> { self.run_command.update_for_retry() } - fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { - self.run_command.update_for_topology(topology) - } - fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.run_command.pinned_connection() } diff --git a/src/sdam/description/server.rs b/src/sdam/description/server.rs index 40a1502dc..13b83caba 100644 --- a/src/sdam/description/server.rs +++ b/src/sdam/description/server.rs @@ -386,6 +386,13 @@ impl ServerDescription { Ok(me) } + pub(crate) fn hello_response(&self) -> Result> { + self.reply + .as_ref() + .map_err(Clone::clone) + .map(|o| o.as_ref().map(|r| &r.command_response)) + } + pub(crate) fn last_write_date(&self) -> Result> { match self.reply { Ok(None) => Ok(None), diff --git a/src/sdam/topology.rs b/src/sdam/topology.rs index 00b3b0143..94ee0c0bc 100644 --- a/src/sdam/topology.rs +++ b/src/sdam/topology.rs @@ -229,6 +229,7 @@ impl Topology { self.watcher.peek_latest().servers() } + #[cfg(test)] pub(crate) fn description(&self) -> TopologyDescription { self.watcher.peek_latest().description.clone() } From 3377348075d75fe7a31acbd6478dfa56cd9a0108 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 17 Apr 2025 11:47:24 -0400 Subject: [PATCH 4/8] working impl --- .evergreen/run-tests.sh | 9 ++++-- src/client.rs | 48 ++++++++++++++++++++++++++--- src/client/executor.rs | 3 +- src/operation.rs | 12 +++++++- src/operation/aggregate.rs | 30 ------------------ src/operation/raw_output.rs | 4 +++ src/operation/run_cursor_command.rs | 4 +++ src/test/spec/crud.rs | 7 ----- 8 files changed, 70 insertions(+), 47 deletions(-) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index b88a5cc36..766280c76 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -33,10 +33,13 @@ if [ "Windows_NT" == "$OS" ]; then export SSL_CERT_DIR=$(cygpath /etc/ssl/certs --windows) fi -cargo_test "" +CARGO_OPTIONS+=("--nocapture") +export TEST_FILE=aggregate-write-readPreference.json +cargo_test "test::spec::crud::run_unified" +#cargo_test "" # cargo-nextest doesn't support doc tests -RUST_BACKTRACE=1 cargo test --doc $(cargo_test_options) -((CARGO_RESULT = ${CARGO_RESULT} || $?)) +#RUST_BACKTRACE=1 cargo test --doc $(cargo_test_options) +#((CARGO_RESULT = ${CARGO_RESULT} || $?)) exit $CARGO_RESULT diff --git a/src/client.rs b/src/client.rs index 555e203fc..a4a20a0c2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -447,7 +447,7 @@ impl Client { criteria: Option<&SelectionCriteria>, ) -> Result { let (server, _) = self - .select_server(criteria, "Test select server", None) + .select_server(criteria, "Test select server", None, false) .await?; Ok(server.address.clone()) } @@ -460,6 +460,7 @@ impl Client { #[allow(unused_variables)] // we only use the operation_name for tracing. operation_name: &str, deprioritized: Option<&ServerAddress>, + is_out_or_merge: bool, ) -> Result<(SelectedServer, SelectionCriteria)> { let criteria = criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary)); @@ -488,10 +489,15 @@ impl Client { let mut watcher = self.inner.topology.watch(); loop { let state = watcher.observe_latest(); - for server in state.description.servers.values() { - eprintln!("at selection: {:?}", server.hello_response()); - } - let effective_criteria = criteria; // TODO + let override_criteria; + let effective_criteria = if let Some(oc) = + Self::override_criteria(criteria, &state.description, is_out_or_merge) + { + override_criteria = oc; + &override_criteria + } else { + criteria + }; let result = server_selection::attempt_to_select_server( effective_criteria, &state.description, @@ -543,6 +549,38 @@ impl Client { } } + /// Check to see if selection criteria need to be overridden. Currently only required for + /// aggregate operations with $merge/$out stages. + fn override_criteria( + criteria: &SelectionCriteria, + desc: &crate::sdam::TopologyDescription, + is_out_or_merge: bool, + ) -> Option { + if is_out_or_merge { + eprintln!("aggregate: checking override"); + } + if !is_out_or_merge + || criteria == &SelectionCriteria::ReadPreference(ReadPreference::Primary) + || desc.topology_type() == crate::TopologyType::LoadBalanced + { + if is_out_or_merge { + eprintln!("aggregate: skipping override"); + } + return None; + } + for server in desc.servers.values() { + let _ = dbg!(server.hello_response()); + if let Ok(Some(v)) = server.max_wire_version() { + static SERVER_5_0_0_WIRE_VERSION: i32 = 13; + if v < SERVER_5_0_0_WIRE_VERSION { + eprintln!("aggregate: overriding criteria"); + return Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)); + } + } + } + return None; + } + #[cfg(all(test, feature = "dns-resolver"))] pub(crate) fn get_hosts(&self) -> Vec { let watcher = self.inner.topology.watch(); diff --git a/src/client/executor.rs b/src/client/executor.rs index 79e7ba6b5..438c4cb6a 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -326,6 +326,7 @@ impl Client { selection_criteria, op.name(), retry.as_ref().map(|r| &r.first_server), + op.is_out_or_merge(), ) .await { @@ -862,7 +863,7 @@ impl Client { || server_type.is_data_bearing() })); let _ = self - .select_server(Some(&criteria), operation_name, None) + .select_server(Some(&criteria), operation_name, None, false) .await?; Ok(()) } diff --git a/src/operation.rs b/src/operation.rs index b15314b83..c64aca8b6 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -76,7 +76,6 @@ pub(crate) use update::{Update, UpdateOrReplace}; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; const SERVER_4_4_0_WIRE_VERSION: i32 = 9; -const _SERVER_5_0_0_WIRE_VERSION: i32 = 13; const SERVER_8_0_0_WIRE_VERSION: i32 = 25; // The maximum number of bytes that may be included in a write payload when auto-encryption is // enabled. @@ -149,6 +148,9 @@ pub(crate) trait Operation { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self); + /// Returns whether this is a $out or $merge aggregation operation. + fn is_out_or_merge(&self) -> bool; + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>; fn name(&self) -> &str; @@ -236,6 +238,11 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self) {} + /// Returns whether this is a $out or $merge aggregation operation. + fn is_out_or_merge(&self) -> bool { + false + } + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { None } @@ -288,6 +295,9 @@ where fn update_for_retry(&mut self) { self.update_for_retry() } + fn is_out_or_merge(&self) -> bool { + self.is_out_or_merge() + } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.pinned_connection() } diff --git a/src/operation/aggregate.rs b/src/operation/aggregate.rs index e6464d9f7..4916e19b1 100644 --- a/src/operation/aggregate.rs +++ b/src/operation/aggregate.rs @@ -109,33 +109,6 @@ impl OperationWithDefaults for Aggregate { )) } - /* - fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) { - eprintln!("aggregate: update_for_topology"); - if !self.is_out_or_merge() - || matches!( - self.selection_criteria(), - None | Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) - ) - || topology.topology_type() == TopologyType::LoadBalanced - { - eprintln!("aggregate: skipping topology update"); - return; - } - for server in topology.servers.values() { - let _ = dbg!(server.hello_response()); - if let Ok(Some(v)) = server.max_wire_version() { - if v < SERVER_5_0_0_WIRE_VERSION { - eprintln!("aggregate: updating topology"); - self.options.get_or_insert_default().selection_criteria = - Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)); - break; - } - } - } - } - */ - fn selection_criteria(&self) -> Option<&SelectionCriteria> { self.options .as_ref() @@ -161,10 +134,7 @@ impl OperationWithDefaults for Aggregate { Retryability::Read } } -} -impl Aggregate { - /// Returns whether this is a $out or $merge aggregation operation. fn is_out_or_merge(&self) -> bool { self.pipeline .last() diff --git a/src/operation/raw_output.rs b/src/operation/raw_output.rs index ef725a26c..92182ff5a 100644 --- a/src/operation/raw_output.rs +++ b/src/operation/raw_output.rs @@ -68,6 +68,10 @@ impl Operation for RawOutput { self.0.update_for_retry() } + fn is_out_or_merge(&self) -> bool { + self.0.is_out_or_merge() + } + fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> { self.0.pinned_connection() } diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index b675aca16..195a0b128 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -79,6 +79,10 @@ impl Operation for RunCursorCommand<'_> { self.run_command.update_for_retry() } + fn is_out_or_merge(&self) -> bool { + self.run_command.is_out_or_merge() + } + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.run_command.pinned_connection() } diff --git a/src/test/spec/crud.rs b/src/test/spec/crud.rs index 6fc1fdb4c..1ec16555c 100644 --- a/src/test/spec/crud.rs +++ b/src/test/spec/crud.rs @@ -43,13 +43,6 @@ async fn run_unified() { pre-5.0 server", "Requesting unacknowledged write with verboseResults is a client-side error", "Requesting unacknowledged write with ordered is a client-side error", - // TODO RUST-663: Unskip these tests. - "Aggregate with $out includes read preference for 5.0+ server", - "Aggregate with $out omits read preference for pre-5.0 server", - "Aggregate with $merge includes read preference for 5.0+ server", - "Aggregate with $merge omits read preference for pre-5.0 server", - "Database-level aggregate with $out omits read preference for pre-5.0 server", - "Database-level aggregate with $merge omits read preference for pre-5.0 server", ]; // TODO: remove this manual skip when this test is fixed to skip on serverless if *SERVERLESS { From d3cdee9b9e85ca31460770908c581ccfe94a1353 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 17 Apr 2025 11:51:38 -0400 Subject: [PATCH 5/8] cleanup --- .evergreen/run-tests.sh | 9 +++------ src/sdam/description/topology.rs | 14 ++++++-------- src/sdam/topology.rs | 2 +- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 766280c76..b88a5cc36 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -33,13 +33,10 @@ if [ "Windows_NT" == "$OS" ]; then export SSL_CERT_DIR=$(cygpath /etc/ssl/certs --windows) fi -CARGO_OPTIONS+=("--nocapture") -export TEST_FILE=aggregate-write-readPreference.json -cargo_test "test::spec::crud::run_unified" -#cargo_test "" +cargo_test "" # cargo-nextest doesn't support doc tests -#RUST_BACKTRACE=1 cargo test --doc $(cargo_test_options) -#((CARGO_RESULT = ${CARGO_RESULT} || $?)) +RUST_BACKTRACE=1 cargo test --doc $(cargo_test_options) +((CARGO_RESULT = ${CARGO_RESULT} || $?)) exit $CARGO_RESULT diff --git a/src/sdam/description/topology.rs b/src/sdam/description/topology.rs index a3e1e699d..2131893b9 100644 --- a/src/sdam/description/topology.rs +++ b/src/sdam/description/topology.rs @@ -205,7 +205,7 @@ impl TopologyDescription { &self, address: &ServerAddress, command: &mut Command, - criteria: Option<&SelectionCriteria>, + criteria: &SelectionCriteria, ) { let server_type = self .get_server_description(address) @@ -220,8 +220,7 @@ impl TopologyDescription { } (TopologyType::Single, ServerType::Standalone) => {} (TopologyType::Single, _) => { - let specified_read_pref = - criteria.and_then(SelectionCriteria::as_read_pref).cloned(); + let specified_read_pref = criteria.as_read_pref().cloned(); let resolved_read_pref = match specified_read_pref { Some(ReadPreference::Primary) | None => ReadPreference::PrimaryPreferred { @@ -235,11 +234,10 @@ impl TopologyDescription { } _ => { let read_pref = match criteria { - Some(SelectionCriteria::ReadPreference(rp)) => rp.clone(), - Some(SelectionCriteria::Predicate(_)) => ReadPreference::PrimaryPreferred { + SelectionCriteria::ReadPreference(rp) => rp.clone(), + SelectionCriteria::Predicate(_) => ReadPreference::PrimaryPreferred { options: Default::default(), }, - None => ReadPreference::Primary, }; if read_pref != ReadPreference::Primary { command.set_read_preference(read_pref) @@ -251,10 +249,10 @@ impl TopologyDescription { fn update_command_read_pref_for_mongos( &self, command: &mut Command, - criteria: Option<&SelectionCriteria>, + criteria: &SelectionCriteria, ) { let read_preference = match criteria { - Some(SelectionCriteria::ReadPreference(rp)) => rp, + SelectionCriteria::ReadPreference(rp) => rp, _ => return, }; match read_preference { diff --git a/src/sdam/topology.rs b/src/sdam/topology.rs index 94ee0c0bc..66ff07129 100644 --- a/src/sdam/topology.rs +++ b/src/sdam/topology.rs @@ -200,7 +200,7 @@ impl Topology { &self, server_address: &ServerAddress, command: &mut Command, - criteria: Option<&SelectionCriteria>, + criteria: &SelectionCriteria, ) { self.watcher .peek_latest() From df27e15e2f52b80bc6b4eebc4553cb2288ffdde2 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 17 Apr 2025 13:13:22 -0400 Subject: [PATCH 6/8] switch to callback --- src/client.rs | 54 ++++++----------------------- src/client/executor.rs | 4 +-- src/operation.rs | 20 +++++++---- src/operation/aggregate.rs | 26 +++++++++++++- src/operation/raw_output.rs | 4 +-- src/operation/run_cursor_command.rs | 4 +-- src/sdam/description/server.rs | 7 ---- 7 files changed, 55 insertions(+), 64 deletions(-) diff --git a/src/client.rs b/src/client.rs index a4a20a0c2..1a4e984af 100644 --- a/src/client.rs +++ b/src/client.rs @@ -35,6 +35,7 @@ use crate::{ error::{Error, ErrorKind, Result}, event::command::CommandEvent, id_set::IdSet, + operation::OverrideCriteriaFn, options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress}, sdam::{ server_selection::{self, attempt_to_select_server}, @@ -447,7 +448,7 @@ impl Client { criteria: Option<&SelectionCriteria>, ) -> Result { let (server, _) = self - .select_server(criteria, "Test select server", None, false) + .select_server(criteria, "Test select server", None, |_, _| None) .await?; Ok(server.address.clone()) } @@ -460,7 +461,7 @@ impl Client { #[allow(unused_variables)] // we only use the operation_name for tracing. operation_name: &str, deprioritized: Option<&ServerAddress>, - is_out_or_merge: bool, + override_criteria: OverrideCriteriaFn, ) -> Result<(SelectedServer, SelectionCriteria)> { let criteria = criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary)); @@ -489,15 +490,14 @@ impl Client { let mut watcher = self.inner.topology.watch(); loop { let state = watcher.observe_latest(); - let override_criteria; - let effective_criteria = if let Some(oc) = - Self::override_criteria(criteria, &state.description, is_out_or_merge) - { - override_criteria = oc; - &override_criteria - } else { - criteria - }; + let override_slot; + let effective_criteria = + if let Some(oc) = override_criteria(criteria, &state.description) { + override_slot = oc; + &override_slot + } else { + criteria + }; let result = server_selection::attempt_to_select_server( effective_criteria, &state.description, @@ -549,38 +549,6 @@ impl Client { } } - /// Check to see if selection criteria need to be overridden. Currently only required for - /// aggregate operations with $merge/$out stages. - fn override_criteria( - criteria: &SelectionCriteria, - desc: &crate::sdam::TopologyDescription, - is_out_or_merge: bool, - ) -> Option { - if is_out_or_merge { - eprintln!("aggregate: checking override"); - } - if !is_out_or_merge - || criteria == &SelectionCriteria::ReadPreference(ReadPreference::Primary) - || desc.topology_type() == crate::TopologyType::LoadBalanced - { - if is_out_or_merge { - eprintln!("aggregate: skipping override"); - } - return None; - } - for server in desc.servers.values() { - let _ = dbg!(server.hello_response()); - if let Ok(Some(v)) = server.max_wire_version() { - static SERVER_5_0_0_WIRE_VERSION: i32 = 13; - if v < SERVER_5_0_0_WIRE_VERSION { - eprintln!("aggregate: overriding criteria"); - return Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)); - } - } - } - return None; - } - #[cfg(all(test, feature = "dns-resolver"))] pub(crate) fn get_hosts(&self) -> Vec { let watcher = self.inner.topology.watch(); diff --git a/src/client/executor.rs b/src/client/executor.rs index 438c4cb6a..185fee0c6 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -326,7 +326,7 @@ impl Client { selection_criteria, op.name(), retry.as_ref().map(|r| &r.first_server), - op.is_out_or_merge(), + op.override_criteria(), ) .await { @@ -863,7 +863,7 @@ impl Client { || server_type.is_data_bearing() })); let _ = self - .select_server(Some(&criteria), operation_name, None, false) + .select_server(Some(&criteria), operation_name, None, |_, _| None) .await?; Ok(()) } diff --git a/src/operation.rs b/src/operation.rs index c64aca8b6..3b4451161 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -76,6 +76,7 @@ pub(crate) use update::{Update, UpdateOrReplace}; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; const SERVER_4_4_0_WIRE_VERSION: i32 = 9; +const SERVER_5_0_0_WIRE_VERSION: i32 = 13; const SERVER_8_0_0_WIRE_VERSION: i32 = 25; // The maximum number of bytes that may be included in a write payload when auto-encryption is // enabled. @@ -148,14 +149,18 @@ pub(crate) trait Operation { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self); - /// Returns whether this is a $out or $merge aggregation operation. - fn is_out_or_merge(&self) -> bool; + /// Returns a function handle to potentially override selection criteria based on server + /// topology. + fn override_criteria(&self) -> OverrideCriteriaFn; fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>; fn name(&self) -> &str; } +pub(crate) type OverrideCriteriaFn = + fn(&SelectionCriteria, &crate::sdam::TopologyDescription) -> Option; + // A mirror of the `Operation` trait, with default behavior where appropriate. Should only be // implemented by operation types that do not delegate to other operations. pub(crate) trait OperationWithDefaults: Send + Sync { @@ -238,9 +243,10 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// Updates this operation as needed for a retry. fn update_for_retry(&mut self) {} - /// Returns whether this is a $out or $merge aggregation operation. - fn is_out_or_merge(&self) -> bool { - false + /// Returns a function handle to potentially override selection criteria based on server + /// topology. + fn override_criteria(&self) -> OverrideCriteriaFn { + |_, _| None } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { @@ -295,8 +301,8 @@ where fn update_for_retry(&mut self) { self.update_for_retry() } - fn is_out_or_merge(&self) -> bool { - self.is_out_or_merge() + fn override_criteria(&self) -> OverrideCriteriaFn { + self.override_criteria() } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { self.pinned_connection() diff --git a/src/operation/aggregate.rs b/src/operation/aggregate.rs index 4916e19b1..17b0277d8 100644 --- a/src/operation/aggregate.rs +++ b/src/operation/aggregate.rs @@ -7,7 +7,7 @@ use crate::{ cursor::CursorSpecification, error::Result, operation::{append_options, remove_empty_write_concern, Retryability}, - options::{AggregateOptions, SelectionCriteria, WriteConcern}, + options::{AggregateOptions, ReadPreference, SelectionCriteria, WriteConcern}, Namespace, }; @@ -135,6 +135,30 @@ impl OperationWithDefaults for Aggregate { } } + fn override_criteria(&self) -> super::OverrideCriteriaFn { + if !self.is_out_or_merge() { + return |_, _| None; + } + |criteria, topology| { + if criteria == &SelectionCriteria::ReadPreference(ReadPreference::Primary) + || topology.topology_type() == crate::TopologyType::LoadBalanced + { + return None; + } + for server in topology.servers.values() { + if let Ok(Some(v)) = server.max_wire_version() { + if v < super::SERVER_5_0_0_WIRE_VERSION { + return Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)); + } + } + } + None + } + } +} + +impl Aggregate { + /// Returns whether this is a $out or $merge aggregation operation. fn is_out_or_merge(&self) -> bool { self.pipeline .last() diff --git a/src/operation/raw_output.rs b/src/operation/raw_output.rs index 92182ff5a..b3ece677e 100644 --- a/src/operation/raw_output.rs +++ b/src/operation/raw_output.rs @@ -68,8 +68,8 @@ impl Operation for RawOutput { self.0.update_for_retry() } - fn is_out_or_merge(&self) -> bool { - self.0.is_out_or_merge() + fn override_criteria(&self) -> super::OverrideCriteriaFn { + self.0.override_criteria() } fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> { diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index 195a0b128..781d84d7e 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -79,8 +79,8 @@ impl Operation for RunCursorCommand<'_> { self.run_command.update_for_retry() } - fn is_out_or_merge(&self) -> bool { - self.run_command.is_out_or_merge() + fn override_criteria(&self) -> super::OverrideCriteriaFn { + self.run_command.override_criteria() } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { diff --git a/src/sdam/description/server.rs b/src/sdam/description/server.rs index 13b83caba..40a1502dc 100644 --- a/src/sdam/description/server.rs +++ b/src/sdam/description/server.rs @@ -386,13 +386,6 @@ impl ServerDescription { Ok(me) } - pub(crate) fn hello_response(&self) -> Result> { - self.reply - .as_ref() - .map_err(Clone::clone) - .map(|o| o.as_ref().map(|r| &r.command_response)) - } - pub(crate) fn last_write_date(&self) -> Result> { match self.reply { Ok(None) => Ok(None), From cd4a22d27020cb5cb6959fc280a7b0bad205eb48 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 28 Apr 2025 10:39:13 -0400 Subject: [PATCH 7/8] Update src/client/executor.rs Co-authored-by: Isabel Atkinson --- src/client/executor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 185fee0c6..b420135b7 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -310,7 +310,6 @@ impl Client { let mut retry: Option = None; let mut implicit_session: Option = None; loop { - //op.update_for_topology(&self.inner.topology.description()); if retry.is_some() { op.update_for_retry(); From 7fc1c8e93a72f6fb86bd0c2a669f54d4c3f0886a Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 28 Apr 2025 11:47:32 -0400 Subject: [PATCH 8/8] post-rebase cleanup --- src/client/executor.rs | 263 ++++++++++++----------- src/operation.rs | 1 + src/operation/aggregate/change_stream.rs | 1 + src/operation/bulk_write.rs | 2 + 4 files changed, 147 insertions(+), 120 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index b420135b7..87a0f7209 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -29,7 +29,6 @@ use crate::{ wire::{next_request_id, Message}, PinnedConnectionHandle, }, - Command, ConnectionPool, RawCommandResponse, StreamDescription, @@ -310,7 +309,6 @@ impl Client { let mut retry: Option = None; let mut implicit_session: Option = None; loop { - if retry.is_some() { op.update_for_retry(); } @@ -395,7 +393,14 @@ impl Client { }; let details = match self - .execute_command_on_connection(cmd, op, &mut conn, &mut session, retryability) + .execute_operation_on_connection( + op, + &mut conn, + &mut session, + txn_number, + retryability, + effective_criteria, + ) .await { Ok(output) => ExecutionDetails { @@ -468,127 +473,21 @@ impl Client { session: &mut Option<&mut ClientSession>, txn_number: Option, retryability: Retryability, + effective_criteria: SelectionCriteria, ) -> Result { loop { - let stream_description = connection.stream_description()?; - let is_sharded = stream_description.initial_server_type == ServerType::Mongos; - let mut cmd = op.build(stream_description)?; - self.inner.topology.update_command_with_read_pref( - connection.address(), - &mut cmd, - op.selection_criteria(), - ); - - match session { - Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => { - cmd.set_session(session); - if let Some(txn_number) = txn_number { - cmd.set_txn_number(txn_number); - } - if session - .options() - .and_then(|opts| opts.snapshot) - .unwrap_or(false) - { - if connection - .stream_description()? - .max_wire_version - .unwrap_or(0) - < 13 - { - let labels: Option> = None; - return Err(Error::new( - ErrorKind::IncompatibleServer { - message: "Snapshot reads require MongoDB 5.0 or later".into(), - }, - labels, - )); - } - cmd.set_snapshot_read_concern(session); - } - // If this is a causally consistent session, set `readConcern.afterClusterTime`. - // Causal consistency defaults to true, unless snapshot is true. - else if session.causal_consistency() - && matches!( - session.transaction.state, - TransactionState::None | TransactionState::Starting - ) - && op.supports_read_concern(stream_description) - { - cmd.set_after_cluster_time(session); - } - - match session.transaction.state { - TransactionState::Starting => { - cmd.set_start_transaction(); - cmd.set_autocommit(); - if session.causal_consistency() { - cmd.set_after_cluster_time(session); - } - - if let Some(ref options) = session.transaction.options { - if let Some(ref read_concern) = options.read_concern { - cmd.set_read_concern_level(read_concern.level.clone()); - } - } - if self.is_load_balanced() { - session.pin_connection(connection.pin()?); - } else if is_sharded { - session.pin_mongos(connection.address().clone()); - } - session.transaction.state = TransactionState::InProgress; - } - TransactionState::InProgress => cmd.set_autocommit(), - TransactionState::Committed { .. } | TransactionState::Aborted => { - cmd.set_autocommit(); - - // Append the recovery token to the command if we are committing or - // aborting on a sharded transaction. - if is_sharded { - if let Some(ref recovery_token) = session.transaction.recovery_token - { - cmd.set_recovery_token(recovery_token); - } - } - } - _ => {} - } - session.update_last_use(); - } - Some(ref session) if !op.supports_sessions() && !session.is_implicit() => { - return Err(ErrorKind::InvalidArgument { - message: format!("{} does not support sessions", cmd.name), - } - .into()); - } - Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => { - return Err(ErrorKind::InvalidArgument { - message: "Cannot use ClientSessions with unacknowledged write concern" - .to_string(), - } - .into()); - } - _ => {} - } - - let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time()); - let client_cluster_time = self.inner.topology.cluster_time(); - let max_cluster_time = - std::cmp::max(session_cluster_time, client_cluster_time.as_ref()); - if let Some(cluster_time) = max_cluster_time { - cmd.set_cluster_time(cluster_time); - } + let cmd = self.build_command( + op, + connection, + session, + txn_number, + effective_criteria.clone(), + )?; let connection_info = connection.info(); let service_id = connection.service_id(); let request_id = next_request_id(); - - if let Some(ref server_api) = self.inner.options.server_api { - cmd.set_server_api(server_api); - } - let should_redact = cmd.should_redact(); - let cmd_name = cmd.name.clone(); let target_db = cmd.target_db.clone(); @@ -627,8 +526,9 @@ impl Client { let start_time = Instant::now(); let command_result = match connection.send_message(message).await { Ok(response) => { - self.handle_response(op, session, is_sharded, response) - .await + let is_sharded = + connection.stream_description()?.initial_server_type == ServerType::Mongos; + self.parse_response(op, session, is_sharded, response).await } Err(err) => Err(err), }; @@ -703,6 +603,7 @@ impl Client { let context = ExecutionContext { connection, session: session.as_deref_mut(), + effective_criteria: effective_criteria.clone(), }; match op.handle_response(response, context).await { @@ -734,6 +635,128 @@ impl Client { } } + fn build_command( + &self, + op: &mut T, + connection: &mut PooledConnection, + session: &mut Option<&mut ClientSession>, + txn_number: Option, + effective_criteria: SelectionCriteria, + ) -> Result { + let stream_description = connection.stream_description()?; + let is_sharded = stream_description.initial_server_type == ServerType::Mongos; + let mut cmd = op.build(stream_description)?; + self.inner.topology.update_command_with_read_pref( + connection.address(), + &mut cmd, + &effective_criteria, + ); + + match session { + Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => { + cmd.set_session(session); + if let Some(txn_number) = txn_number { + cmd.set_txn_number(txn_number); + } + if session + .options() + .and_then(|opts| opts.snapshot) + .unwrap_or(false) + { + if connection + .stream_description()? + .max_wire_version + .unwrap_or(0) + < 13 + { + let labels: Option> = None; + return Err(Error::new( + ErrorKind::IncompatibleServer { + message: "Snapshot reads require MongoDB 5.0 or later".into(), + }, + labels, + )); + } + cmd.set_snapshot_read_concern(session); + } + // If this is a causally consistent session, set `readConcern.afterClusterTime`. + // Causal consistency defaults to true, unless snapshot is true. + else if session.causal_consistency() + && matches!( + session.transaction.state, + TransactionState::None | TransactionState::Starting + ) + && op.supports_read_concern(stream_description) + { + cmd.set_after_cluster_time(session); + } + + match session.transaction.state { + TransactionState::Starting => { + cmd.set_start_transaction(); + cmd.set_autocommit(); + if session.causal_consistency() { + cmd.set_after_cluster_time(session); + } + + if let Some(ref options) = session.transaction.options { + if let Some(ref read_concern) = options.read_concern { + cmd.set_read_concern_level(read_concern.level.clone()); + } + } + if self.is_load_balanced() { + session.pin_connection(connection.pin()?); + } else if is_sharded { + session.pin_mongos(connection.address().clone()); + } + session.transaction.state = TransactionState::InProgress; + } + TransactionState::InProgress => cmd.set_autocommit(), + TransactionState::Committed { .. } | TransactionState::Aborted => { + cmd.set_autocommit(); + + // Append the recovery token to the command if we are committing or aborting + // on a sharded transaction. + if is_sharded { + if let Some(ref recovery_token) = session.transaction.recovery_token { + cmd.set_recovery_token(recovery_token); + } + } + } + _ => {} + } + session.update_last_use(); + } + Some(ref session) if !op.supports_sessions() && !session.is_implicit() => { + return Err(ErrorKind::InvalidArgument { + message: format!("{} does not support sessions", cmd.name), + } + .into()); + } + Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => { + return Err(ErrorKind::InvalidArgument { + message: "Cannot use ClientSessions with unacknowledged write concern" + .to_string(), + } + .into()); + } + _ => {} + } + + let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time()); + let client_cluster_time = self.inner.topology.cluster_time(); + let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref()); + if let Some(cluster_time) = max_cluster_time { + cmd.set_cluster_time(cluster_time); + } + + if let Some(ref server_api) = self.inner.options.server_api { + cmd.set_server_api(server_api); + } + + Ok(cmd) + } + #[cfg(feature = "in-use-encryption")] fn auto_encrypt<'a>( &'a self, @@ -786,7 +809,7 @@ impl Client { .await } - async fn handle_response( + async fn parse_response( &self, op: &T, session: &mut Option<&mut ClientSession>, diff --git a/src/operation.rs b/src/operation.rs index 3b4451161..287d9aebe 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -89,6 +89,7 @@ const OP_MSG_OVERHEAD_BYTES: usize = 1_000; pub(crate) struct ExecutionContext<'a> { pub(crate) connection: &'a mut PooledConnection, pub(crate) session: Option<&'a mut ClientSession>, + pub(crate) effective_criteria: SelectionCriteria, } #[derive(Debug, PartialEq, Clone, Copy)] diff --git a/src/operation/aggregate/change_stream.rs b/src/operation/aggregate/change_stream.rs index 00d56e56d..4cb67cb85 100644 --- a/src/operation/aggregate/change_stream.rs +++ b/src/operation/aggregate/change_stream.rs @@ -94,6 +94,7 @@ impl OperationWithDefaults for ChangeStreamAggregate { let inner_context = ExecutionContext { connection: context.connection, session: context.session.as_deref_mut(), + effective_criteria: context.effective_criteria, }; let spec = self.inner.handle_response(response, inner_context)?; diff --git a/src/operation/bulk_write.rs b/src/operation/bulk_write.rs index 2f7c7b1bb..2b5c9c9ed 100644 --- a/src/operation/bulk_write.rs +++ b/src/operation/bulk_write.rs @@ -114,6 +114,7 @@ where &mut context.session, txn_number, Retryability::None, + context.effective_criteria.clone(), ) .await; @@ -135,6 +136,7 @@ where &mut context.session, txn_number, Retryability::None, + context.effective_criteria.clone(), ) .await; }