Skip to content

Commit 45e7e05

Browse files
RUST-128 Retryable Reads (#179)
1 parent c6e42dd commit 45e7e05

File tree

21 files changed

+1181
-35
lines changed

21 files changed

+1181
-35
lines changed

src/client/executor.rs

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl Client {
4646
.into());
4747
}
4848
let mut implicit_session = self.start_implicit_session(&op).await?;
49-
self.select_server_and_execute_operation(op, implicit_session.as_mut())
49+
self.execute_operation_with_retry(op, implicit_session.as_mut())
5050
.await
5151
}
5252

@@ -58,7 +58,7 @@ impl Client {
5858
op: T,
5959
) -> Result<(T::O, Option<ClientSession>)> {
6060
let mut implicit_session = self.start_implicit_session(&op).await?;
61-
self.select_server_and_execute_operation(op, implicit_session.as_mut())
61+
self.execute_operation_with_retry(op, implicit_session.as_mut())
6262
.await
6363
.map(|result| (result, implicit_session))
6464
}
@@ -70,18 +70,15 @@ impl Client {
7070
op: T,
7171
session: &mut ClientSession,
7272
) -> Result<T::O> {
73-
self.select_server_and_execute_operation(op, Some(session))
74-
.await
73+
self.execute_operation_with_retry(op, Some(session)).await
7574
}
7675

7776
/// Selects a server and executes the given operation on it, optionally using a provided
78-
/// session.
79-
///
80-
/// TODO: RUST-128: replace this with `execute_operation_with_retry` when implemented.
81-
async fn select_server_and_execute_operation<T: Operation>(
77+
/// session. Retries the operation upon failure if retryability is supported.
78+
async fn execute_operation_with_retry<T: Operation>(
8279
&self,
8380
op: T,
84-
session: Option<&mut ClientSession>,
81+
mut session: Option<&mut ClientSession>,
8582
) -> Result<T::O> {
8683
let server = self.select_server(op.selection_criteria()).await?;
8784

@@ -96,8 +93,50 @@ impl Client {
9693
}
9794
};
9895

96+
let first_error = match self
97+
.execute_operation_on_connection(&op, &mut conn, &mut session)
98+
.await
99+
{
100+
Ok(result) => {
101+
return Ok(result);
102+
}
103+
Err(err) => {
104+
self.inner
105+
.topology
106+
.handle_post_handshake_error(err.clone(), conn, server)
107+
.await;
108+
// TODO RUST-90: Do not retry if session is in a transaction
109+
if self.inner.options.retry_reads == Some(false)
110+
|| !op.is_read_retryable()
111+
|| !err.is_read_retryable()
112+
{
113+
return Err(err);
114+
} else {
115+
err
116+
}
117+
}
118+
};
119+
120+
let server = match self.select_server(op.selection_criteria()).await {
121+
Ok(server) => server,
122+
Err(err) => {
123+
return Err(first_error);
124+
}
125+
};
126+
127+
let mut conn = match server.checkout_connection().await {
128+
Ok(conn) => conn,
129+
Err(err) => {
130+
self.inner
131+
.topology
132+
.handle_pre_handshake_error(err.clone(), server.address.clone())
133+
.await;
134+
return Err(first_error);
135+
}
136+
};
137+
99138
match self
100-
.execute_operation_on_connection(op, &mut conn, session)
139+
.execute_operation_on_connection(&op, &mut conn, &mut session)
101140
.await
102141
{
103142
Ok(result) => Ok(result),
@@ -106,17 +145,21 @@ impl Client {
106145
.topology
107146
.handle_post_handshake_error(err.clone(), conn, server)
108147
.await;
109-
Err(err)
148+
if err.is_server_error() || err.is_read_retryable() {
149+
Err(err)
150+
} else {
151+
Err(first_error)
152+
}
110153
}
111154
}
112155
}
113156

114157
/// Executes an operation on a given connection, optionally using a provided session.
115158
async fn execute_operation_on_connection<T: Operation>(
116159
&self,
117-
op: T,
160+
op: &T,
118161
connection: &mut Connection,
119-
mut session: Option<&mut ClientSession>,
162+
session: &mut Option<&mut ClientSession>,
120163
) -> Result<T::O> {
121164
if let Some(wc) = op.write_concern() {
122165
wc.validate()?;

src/client/options/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,11 @@ pub struct ClientOptions {
304304
#[builder(default)]
305305
pub repl_set_name: Option<String>,
306306

307+
/// Whether or not the client should retry a read operation if the operation fails.
308+
///
309+
/// The default value is true.
307310
#[builder(default)]
308-
pub(crate) retry_reads: Option<bool>,
311+
pub retry_reads: Option<bool>,
309312

310313
#[builder(default)]
311314
pub(crate) retry_writes: Option<bool>,
@@ -607,7 +610,7 @@ impl ClientOptions {
607610
/// tag set
608611
/// * `replicaSet`: maps to the `repl_set_name` field
609612
/// * `retryWrites`: not yet implemented
610-
/// * `retryReads`: not yet implemented
613+
/// * `retryReads`: maps to the `retry_reads` field
611614
/// * `serverSelectionTimeoutMS`: maps to the `server_selection_timeout` field
612615
/// * `socketTimeoutMS`: maps to the `socket_timeout` field
613616
/// * `ssl`: an alias of the `tls` option

src/error.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ lazy_static! {
1313
static ref RECOVERING_CODES: Vec<i32> = vec![11600, 11602, 13436, 189, 91];
1414
static ref NOTMASTER_CODES: Vec<i32> = vec![10107, 13435];
1515
static ref SHUTTING_DOWN_CODES: Vec<i32> = vec![11600, 91];
16+
static ref RETRYABLE_READ_CODES: Vec<i32> =
17+
vec![11600, 11602, 10107, 13435, 13436, 189, 91, 7, 6, 89, 9001];
1618
}
1719

1820
/// The result type for all methods that can return an error in the `mongodb` crate.
@@ -69,6 +71,36 @@ impl Error {
6971
_ => false,
7072
}
7173
}
74+
75+
/// Whether a read operation should be retried if this error occurs
76+
pub(crate) fn is_read_retryable(&self) -> bool {
77+
if self.is_network_error() {
78+
return true;
79+
}
80+
match &self.kind.code_and_message() {
81+
Some((code, message)) => {
82+
if RETRYABLE_READ_CODES.contains(&code) {
83+
return true;
84+
}
85+
if is_not_master(*code, message) || is_recovering(*code, message) {
86+
return true;
87+
}
88+
false
89+
}
90+
None => false,
91+
}
92+
}
93+
94+
/// Whether an error originated from the server
95+
pub(crate) fn is_server_error(&self) -> bool {
96+
match self.kind.as_ref() {
97+
ErrorKind::AuthenticationError { .. }
98+
| ErrorKind::BulkWriteError(_)
99+
| ErrorKind::CommandError(_)
100+
| ErrorKind::WriteError(_) => true,
101+
_ => false,
102+
}
103+
}
72104
}
73105

74106
impl<E> From<E> for Error

src/operation/aggregate/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ impl Operation for Aggregate {
9292
.as_ref()
9393
.and_then(|opts| opts.write_concern.as_ref())
9494
}
95+
96+
fn is_read_retryable(&self) -> bool {
97+
!self.is_out_or_merge()
98+
}
9599
}
96100

97101
impl Aggregate {

src/operation/count/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl Operation for Count {
6262
}
6363
None
6464
}
65+
66+
fn is_read_retryable(&self) -> bool {
67+
true
68+
}
6569
}
6670

6771
#[derive(Debug, Deserialize)]

src/operation/distinct/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ impl Operation for Distinct {
8080
}
8181
None
8282
}
83+
84+
fn is_read_retryable(&self) -> bool {
85+
true
86+
}
8387
}
8488

8589
#[derive(Debug, Deserialize)]

src/operation/find/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,8 @@ impl Operation for Find {
113113
.as_ref()
114114
.and_then(|opts| opts.selection_criteria.as_ref())
115115
}
116+
117+
fn is_read_retryable(&self) -> bool {
118+
true
119+
}
116120
}

src/operation/list_collections/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ impl Operation for ListCollections {
7979
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
8080
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)).as_ref()
8181
}
82+
83+
fn is_read_retryable(&self) -> bool {
84+
true
85+
}
8286
}

src/operation/list_databases/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ impl Operation for ListDatabases {
7272
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
7373
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)).as_ref()
7474
}
75+
76+
fn is_read_retryable(&self) -> bool {
77+
true
78+
}
7579
}
7680

7781
#[derive(Debug, Deserialize)]

src/operation/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ pub(crate) trait Operation {
9292
fn supports_sessions(&self) -> bool {
9393
true
9494
}
95+
96+
/// Whether or not the operation supports retryable reads
97+
fn is_read_retryable(&self) -> bool {
98+
false
99+
}
95100
}
96101

97102
/// Appends a serializable struct to the input document.

0 commit comments

Comments
 (0)