Skip to content

Commit 2a0d65a

Browse files
RUST-2074 Fix retryability bug, add disabled test (#1427)
1 parent 4d6d0d9 commit 2a0d65a

File tree

4 files changed

+132
-41
lines changed

4 files changed

+132
-41
lines changed

src/client/executor.rs

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,19 @@ impl Client {
349349
err.add_label(RETRYABLE_WRITE_ERROR);
350350
}
351351

352-
let op_retry = match self.get_op_retryability(op, &session) {
353-
Retryability::Read => err.is_read_retryable(),
354-
Retryability::Write => err.is_write_retryable(),
355-
_ => false,
352+
let retryability = op.retryability().with_options(self.options());
353+
let can_retry = match retryability {
354+
// Read-retryable operations should be retried on pool cleared errors during
355+
// connection checkout regardless of transaction status.
356+
Retryability::Read if err.is_pool_cleared() => true,
357+
_ => {
358+
retryability.can_retry_error(&err)
359+
&& !session
360+
.as_ref()
361+
.is_some_and(|session| session.in_transaction())
362+
}
356363
};
357-
if err.is_pool_cleared() || op_retry {
364+
if can_retry {
358365
retry = Some(ExecutionRetry {
359366
prior_txn_number: None,
360367
first_error: err,
@@ -380,7 +387,7 @@ impl Client {
380387
session = implicit_session.as_mut();
381388
}
382389

383-
let retryability = self.get_retryability(&conn, op, &session)?;
390+
let retryability = self.get_retryability(op, &session, conn.stream_description()?);
384391
if retryability == Retryability::None {
385392
retry.first_error()?;
386393
}
@@ -449,9 +456,7 @@ impl Client {
449456
} else {
450457
return Err(r.first_error);
451458
}
452-
} else if retryability == Retryability::Read && err.is_read_retryable()
453-
|| retryability == Retryability::Write && err.is_write_retryable()
454-
{
459+
} else if retryability.can_retry_error(&err) {
455460
retry = Some(ExecutionRetry {
456461
prior_txn_number: txn_number,
457462
first_error: err,
@@ -910,52 +915,37 @@ impl Client {
910915
}
911916
}
912917

913-
/// Returns the retryability level for the execution of this operation.
914-
fn get_op_retryability<T: Operation>(
918+
/// Returns the retryability level for the execution of this operation with the given session
919+
/// and connection stream description.
920+
fn get_retryability<T: Operation>(
915921
&self,
916922
op: &T,
917923
session: &Option<&mut ClientSession>,
924+
stream_description: &StreamDescription,
918925
) -> Retryability {
926+
// commitTransaction and abortTransaction are always retried, regardless of the value of
927+
// retry_writes.
928+
if op.name() == CommitTransaction::NAME || op.name() == AbortTransaction::NAME {
929+
return Retryability::Write;
930+
}
931+
919932
if session
920933
.as_ref()
921-
.map(|session| session.in_transaction())
922-
.unwrap_or(false)
934+
.is_some_and(|session| session.in_transaction())
923935
{
924936
return Retryability::None;
925937
}
926-
match op.retryability() {
927-
Retryability::Read if self.inner.options.retry_reads != Some(false) => {
928-
Retryability::Read
929-
}
930-
// commitTransaction and abortTransaction should be retried regardless of the
931-
// value for retry_writes set on the Client
932-
Retryability::Write
933-
if op.name() == CommitTransaction::NAME
934-
|| op.name() == AbortTransaction::NAME
935-
|| self.inner.options.retry_writes != Some(false) =>
936-
{
938+
939+
match op.retryability().with_options(self.options()) {
940+
Retryability::Write if stream_description.supports_retryable_writes() => {
937941
Retryability::Write
938942
}
943+
// All servers compatible with the driver support retryable reads.
944+
Retryability::Read => Retryability::Read,
939945
_ => Retryability::None,
940946
}
941947
}
942948

943-
/// Returns the retryability level for the execution of this operation on this connection.
944-
fn get_retryability<T: Operation>(
945-
&self,
946-
conn: &PooledConnection,
947-
op: &T,
948-
session: &Option<&mut ClientSession>,
949-
) -> Result<Retryability> {
950-
match self.get_op_retryability(op, session) {
951-
Retryability::Read => Ok(Retryability::Read),
952-
Retryability::Write if conn.stream_description()?.supports_retryable_writes() => {
953-
Ok(Retryability::Write)
954-
}
955-
_ => Ok(Retryability::None),
956-
}
957-
}
958-
959949
async fn update_cluster_time(
960950
&self,
961951
cluster_time: Option<ClusterTime>,

src/cmap/test.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,3 +486,74 @@ async fn cmap_spec_tests() {
486486
)
487487
.await;
488488
}
489+
490+
// TODO RUST-2074: investigate why this test is flaky
491+
// #[tokio::test(flavor = "multi_thread")]
492+
// async fn pool_cleared_error_has_transient_transaction_error_label() {
493+
// if !block_connection_supported().await {
494+
// log_uncaptured(
495+
// "skipping pool_cleared_error_has_transient_transaction_error_label: block connection
496+
// \ unsupported",
497+
// );
498+
// return;
499+
// }
500+
// if !transactions_supported().await {
501+
// log_uncaptured(
502+
// "skipping pool_cleared_error_has_transient_transaction_error_label: transactions \
503+
// unsupported",
504+
// );
505+
// return;
506+
// }
507+
// if topology_is_load_balanced().await {
508+
// log_uncaptured(
509+
// "skipping pool_cleared_error_has_transient_transaction_error_label: load balanced \
510+
// topology",
511+
// );
512+
// }
513+
514+
// let app_name = "pool_cleared_error_has_transient_transaction_error_label";
515+
516+
// let mut client_options = get_client_options().await.clone();
517+
// if topology_is_sharded().await {
518+
// client_options.hosts.drain(1..);
519+
// }
520+
// client_options.connect_timeout = Some(Duration::from_millis(500));
521+
// client_options.heartbeat_freq = Some(Duration::from_millis(500));
522+
// client_options.app_name = Some(app_name.to_string());
523+
// let client = Client::for_test()
524+
// .options(client_options)
525+
// .monitor_events()
526+
// .await;
527+
528+
// let mut session = client.start_session().await.unwrap();
529+
// session.start_transaction().await.unwrap();
530+
531+
// let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1))
532+
// .block_connection(Duration::from_secs(15))
533+
// .app_name(app_name);
534+
// let _guard = client.enable_fail_point(fail_point).await.unwrap();
535+
536+
// let insert_client = client.clone();
537+
// let insert_handle = tokio::spawn(async move {
538+
// insert_client
539+
// .database("db")
540+
// .collection("coll")
541+
// .insert_one(doc! { "x": 1 })
542+
// .session(&mut session)
543+
// .await
544+
// });
545+
546+
// let fail_point = FailPoint::fail_command(
547+
// &["hello", LEGACY_HELLO_COMMAND_NAME],
548+
// // The RTT hellos may encounter this failpoint, so use FailPointMode::AlwaysOn to ensure
549+
// // that the server monitors hit it as well.
550+
// FailPointMode::AlwaysOn,
551+
// )
552+
// .block_connection(Duration::from_millis(1500))
553+
// .app_name(app_name);
554+
// let _guard = client.enable_fail_point(fail_point).await.unwrap();
555+
556+
// let insert_error = insert_handle.await.unwrap().unwrap_err();
557+
// assert!(insert_error.is_pool_cleared(), "{:?}", insert_error);
558+
// assert!(insert_error.contains_label(TRANSIENT_TRANSACTION_ERROR));
559+
// }

src/operation.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::{
5252
WriteConcernError,
5353
WriteFailure,
5454
},
55-
options::WriteConcern,
55+
options::{ClientOptions, WriteConcern},
5656
selection_criteria::SelectionCriteria,
5757
BoxFuture,
5858
ClientSession,
@@ -100,6 +100,26 @@ pub(crate) enum Retryability {
100100
None,
101101
}
102102

103+
impl Retryability {
104+
/// Returns this level of retryability in tandem with the client options.
105+
pub(crate) fn with_options(&self, options: &ClientOptions) -> Self {
106+
match self {
107+
Self::Write if options.retry_writes != Some(false) => Self::Write,
108+
Self::Read if options.retry_reads != Some(false) => Self::Read,
109+
_ => Self::None,
110+
}
111+
}
112+
113+
/// Whether this level of retryability can retry the given error.
114+
pub(crate) fn can_retry_error(&self, error: &Error) -> bool {
115+
match self {
116+
Self::Write => error.is_write_retryable(),
117+
Self::Read => error.is_read_retryable(),
118+
Self::None => false,
119+
}
120+
}
121+
}
122+
103123
/// A trait modeling the behavior of a server side operation.
104124
///
105125
/// No methods in this trait should have default behaviors to ensure that wrapper operations

src/test/util/fail_point.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,16 @@ impl Drop for FailPointGuard {
147147
// multi-threaded runtime.
148148
let result = tokio::task::block_in_place(|| {
149149
futures::executor::block_on(async move {
150+
let client = if client.options().app_name.is_some() {
151+
// Create a fresh client with no app name to avoid issues when disabling a
152+
// failpoint configured on the "hello" command.
153+
let mut options = client.options().clone();
154+
options.app_name = None;
155+
Client::for_test().options(options).await.into_client()
156+
} else {
157+
client
158+
};
159+
150160
client
151161
.database("admin")
152162
.run_command(

0 commit comments

Comments
 (0)