Skip to content

Commit 2f12793

Browse files
authored
RUST-1064 Retry on handshake failure (#598)
1 parent d55c079 commit 2f12793

File tree

98 files changed

+915
-62
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+915
-62
lines changed

src/client/auth/sasl.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
client::{auth::AuthMechanism, options::ServerApi},
55
cmap::Command,
66
error::{Error, Result},
7+
operation::{CommandErrorBody, CommandResponse},
78
};
89

910
/// Encapsulates the command building of a `saslStart` command.
@@ -98,12 +99,20 @@ fn validate_command_success(auth_mechanism: &str, response: &Document) -> Result
9899

99100
match bson_util::get_int(ok) {
100101
Some(1) => Ok(()),
101-
Some(_) => Err(Error::authentication_error(
102-
auth_mechanism,
103-
response
104-
.get_str("errmsg")
105-
.unwrap_or("Authentication failure"),
106-
)),
102+
Some(_) => {
103+
let source = bson::from_bson::<CommandResponse<CommandErrorBody>>(Bson::Document(
104+
response.clone(),
105+
))
106+
.map(|cmd_resp| cmd_resp.body.into())
107+
.ok();
108+
Err(Error::authentication_error(
109+
auth_mechanism,
110+
response
111+
.get_str("errmsg")
112+
.unwrap_or("Authentication failure"),
113+
)
114+
.with_source(source))
115+
}
107116
_ => Err(Error::invalid_authentication_response(auth_mechanism)),
108117
}
109118
}

src/client/executor.rs

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -332,16 +332,24 @@ impl Client {
332332
Ok(c) => c,
333333
Err(mut err) => {
334334
err.add_labels_and_update_pin(None, &mut session, None)?;
335+
if err.is_read_retryable() && self.inner.options.retry_writes != Some(false) {
336+
err.add_label(RETRYABLE_WRITE_ERROR);
337+
}
335338

336-
if err.is_pool_cleared() {
339+
let op_retry = match self.get_op_retryability(&op, &session) {
340+
Retryability::Read => err.is_read_retryable(),
341+
Retryability::Write => err.is_write_retryable(),
342+
_ => false,
343+
};
344+
if err.is_pool_cleared() || op_retry {
337345
return self.execute_retry(&mut op, &mut session, None, err).await;
338346
} else {
339347
return Err(err);
340348
}
341349
}
342350
};
343351

344-
let retryability = self.get_retryability(&conn, &op, &session).await?;
352+
let retryability = self.get_retryability(&conn, &op, &session)?;
345353

346354
let txn_number = match session {
347355
Some(ref mut session) => {
@@ -433,7 +441,7 @@ impl Client {
433441
Err(_) => return Err(first_error),
434442
};
435443

436-
let retryability = self.get_retryability(&conn, op, session).await?;
444+
let retryability = self.get_retryability(&conn, op, session)?;
437445
if retryability == Retryability::None {
438446
return Err(first_error);
439447
}
@@ -825,35 +833,49 @@ impl Client {
825833
}
826834

827835
/// Returns the retryability level for the execution of this operation.
828-
async fn get_retryability<T: Operation>(
836+
fn get_op_retryability<T: Operation>(
829837
&self,
830-
conn: &Connection,
831838
op: &T,
832839
session: &Option<&mut ClientSession>,
833-
) -> Result<Retryability> {
834-
if !session
840+
) -> Retryability {
841+
if session
835842
.as_ref()
836843
.map(|session| session.in_transaction())
837844
.unwrap_or(false)
838845
{
839-
match op.retryability() {
840-
Retryability::Read if self.inner.options.retry_reads != Some(false) => {
841-
return Ok(Retryability::Read);
842-
}
843-
Retryability::Write if conn.stream_description()?.supports_retryable_writes() => {
844-
// commitTransaction and abortTransaction should be retried regardless of the
845-
// value for retry_writes set on the Client
846-
if op.name() == CommitTransaction::NAME
847-
|| op.name() == AbortTransaction::NAME
848-
|| self.inner.options.retry_writes != Some(false)
849-
{
850-
return Ok(Retryability::Write);
851-
}
852-
}
853-
_ => {}
846+
return Retryability::None;
847+
}
848+
match op.retryability() {
849+
Retryability::Read if self.inner.options.retry_reads != Some(false) => {
850+
Retryability::Read
851+
}
852+
// commitTransaction and abortTransaction should be retried regardless of the
853+
// value for retry_writes set on the Client
854+
Retryability::Write
855+
if op.name() == CommitTransaction::NAME
856+
|| op.name() == AbortTransaction::NAME
857+
|| self.inner.options.retry_writes != Some(false) =>
858+
{
859+
Retryability::Write
860+
}
861+
_ => Retryability::None,
862+
}
863+
}
864+
865+
/// Returns the retryability level for the execution of this operation on this connection.
866+
fn get_retryability<T: Operation>(
867+
&self,
868+
conn: &Connection,
869+
op: &T,
870+
session: &Option<&mut ClientSession>,
871+
) -> Result<Retryability> {
872+
match self.get_op_retryability(op, session) {
873+
Retryability::Read => Ok(Retryability::Read),
874+
Retryability::Write if conn.stream_description()?.supports_retryable_writes() => {
875+
Ok(Retryability::Write)
854876
}
877+
_ => Ok(Retryability::None),
855878
}
856-
Ok(Retryability::None)
857879
}
858880

859881
async fn update_cluster_time(

src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub struct Error {
4747
pub kind: Box<ErrorKind>,
4848
labels: HashSet<String>,
4949
pub(crate) wire_version: Option<i32>,
50+
#[source]
51+
pub(crate) source: Option<Box<Error>>,
5052
}
5153

5254
impl Error {
@@ -61,6 +63,7 @@ impl Error {
6163
kind: Box::new(kind),
6264
labels,
6365
wire_version: None,
66+
source: None,
6467
}
6568
}
6669

@@ -237,6 +240,7 @@ impl Error {
237240
ErrorKind::Write(WriteFailure::WriteConcernError(wc_error)) => Some(wc_error.code),
238241
_ => None,
239242
}
243+
.or_else(|| self.source.as_ref().and_then(|s| s.code()))
240244
}
241245

242246
/// Gets the message for this error, if applicable, for use in testing.
@@ -345,6 +349,11 @@ impl Error {
345349
}
346350
false
347351
}
352+
353+
pub(crate) fn with_source<E: Into<Option<Error>>>(mut self, source: E) -> Self {
354+
self.source = source.into().map(Box::new);
355+
self
356+
}
348357
}
349358

350359
impl<E> From<E> for Error

src/test/spec/json/retryable-reads/README.rst

Lines changed: 13 additions & 9 deletions

0 commit comments

Comments
 (0)