Skip to content

Commit 48d8260

Browse files
RUST-51 Retryable Writes (#219)
1 parent 781da09 commit 48d8260

File tree

34 files changed

+1009
-114
lines changed

34 files changed

+1009
-114
lines changed

src/client/executor.rs

Lines changed: 90 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use time::PreciseTime;
88
use crate::{
99
bson::Document,
1010
cmap::Connection,
11-
error::{ErrorKind, Result},
11+
error::{Error, ErrorKind, Result},
1212
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
13-
operation::Operation,
13+
operation::{Operation, Retryability},
1414
options::SelectionCriteria,
1515
sdam::{Server, SessionSupportStatus},
1616
};
@@ -93,8 +93,17 @@ impl Client {
9393
}
9494
};
9595

96+
let retryability = self.get_retryability(&conn, &op).await?;
97+
98+
let txn_number = match session {
99+
Some(ref mut session) if retryability == Retryability::Write => {
100+
Some(session.get_and_increment_txn_number())
101+
}
102+
_ => None,
103+
};
104+
96105
let first_error = match self
97-
.execute_operation_on_connection(&op, &mut conn, &mut session)
106+
.execute_operation_on_connection(&op, &mut conn, &mut session, txn_number)
98107
.await
99108
{
100109
Ok(result) => {
@@ -103,16 +112,36 @@ impl Client {
103112
Err(err) => {
104113
self.inner
105114
.topology
106-
.handle_post_handshake_error(err.clone(), conn, server)
115+
.handle_post_handshake_error(err.clone(), &conn, server)
107116
.await;
108-
// TODO RUST-90: Do not retry if session is in a transaction
109-
if self.inner.options.retry_reads == Some(false)
110-
|| !op.is_read_retryable()
111-
|| !err.is_read_retryable()
117+
118+
// Retryable writes are only supported by storage engines with document-level
119+
// locking, so users need to disable retryable writes if using mmapv1.
120+
if let ErrorKind::CommandError(ref err) = err.kind.as_ref() {
121+
if err.code == 20 && err.message.starts_with("Transaction numbers") {
122+
let mut err = err.clone();
123+
err.message = "This MongoDB deployment does not support retryable writes. \
124+
Please add retryWrites=false to your connection string."
125+
.to_string();
126+
return Err(ErrorKind::CommandError(err).into());
127+
}
128+
}
129+
130+
// For a pre-4.4 connection, an error label should be added to any write-retryable
131+
// error as long as the retry_writes client option is not set to false. For a 4.4+
132+
// connection, a label should be added only to network errors.
133+
let err = match retryability {
134+
Retryability::Write => get_error_with_retryable_write_label(&conn, err).await?,
135+
_ => err,
136+
};
137+
138+
// TODO RUST-90: Do not retry read if session is in a transaction
139+
if retryability == Retryability::Read && err.is_read_retryable()
140+
|| retryability == Retryability::Write && err.is_write_retryable()
112141
{
113-
return Err(err);
114-
} else {
115142
err
143+
} else {
144+
return Err(err);
116145
}
117146
}
118147
};
@@ -135,17 +164,28 @@ impl Client {
135164
}
136165
};
137166

167+
let retryability = self.get_retryability(&conn, &op).await?;
168+
if retryability == Retryability::None {
169+
return Err(first_error);
170+
}
171+
138172
match self
139-
.execute_operation_on_connection(&op, &mut conn, &mut session)
173+
.execute_operation_on_connection(&op, &mut conn, &mut session, txn_number)
140174
.await
141175
{
142176
Ok(result) => Ok(result),
143177
Err(err) => {
144178
self.inner
145179
.topology
146-
.handle_post_handshake_error(err.clone(), conn, server)
180+
.handle_post_handshake_error(err.clone(), &conn, server)
147181
.await;
148-
if err.is_server_error() || err.is_read_retryable() {
182+
183+
let err = match retryability {
184+
Retryability::Write => get_error_with_retryable_write_label(&conn, err).await?,
185+
_ => err,
186+
};
187+
188+
if err.is_server_error() || err.is_read_retryable() || err.is_write_retryable() {
149189
Err(err)
150190
} else {
151191
Err(first_error)
@@ -160,6 +200,7 @@ impl Client {
160200
op: &T,
161201
connection: &mut Connection,
162202
session: &mut Option<&mut ClientSession>,
203+
txn_number: Option<u64>,
163204
) -> Result<T::O> {
164205
if let Some(wc) = op.write_concern() {
165206
wc.validate()?;
@@ -174,6 +215,9 @@ impl Client {
174215
match session {
175216
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
176217
cmd.set_session(session);
218+
if let Some(txn_number) = txn_number {
219+
cmd.set_txn_number(txn_number);
220+
}
177221
session.update_last_use();
178222
}
179223
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
@@ -318,4 +362,37 @@ impl Client {
318362
_ => Ok(initial_status),
319363
}
320364
}
365+
366+
/// Returns the retryability level for the execution of this operation.
367+
async fn get_retryability<T: Operation>(
368+
&self,
369+
conn: &Connection,
370+
op: &T,
371+
) -> Result<Retryability> {
372+
match op.retryability() {
373+
Retryability::Read if self.inner.options.retry_reads != Some(false) => {
374+
Ok(Retryability::Read)
375+
}
376+
Retryability::Write
377+
if self.inner.options.retry_writes != Some(false)
378+
&& conn.stream_description()?.supports_retryable_writes() =>
379+
{
380+
Ok(Retryability::Write)
381+
}
382+
_ => Ok(Retryability::None),
383+
}
384+
}
385+
}
386+
387+
/// Returns an Error with a "RetryableWriteError" label added if necessary. On a pre-4.4
388+
/// connection, a label should be added to any write-retryable error. On a 4.4+ connection, a
389+
/// label should only be added to network errors. Regardless of server version, a label should
390+
/// only be added if the `retry_writes` client option is not set to `false`.
391+
async fn get_error_with_retryable_write_label(conn: &Connection, err: Error) -> Result<Error> {
392+
if let Some(max_wire_version) = conn.stream_description()?.max_wire_version {
393+
if err.should_add_retryable_write_label(max_wire_version) {
394+
return Ok(err.with_label("RetryableWriteError"));
395+
}
396+
}
397+
Ok(err)
321398
}

src/client/options/mod.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,11 @@ pub struct ClientOptions {
316316
#[builder(default)]
317317
pub retry_reads: Option<bool>,
318318

319+
/// Whether or not the client should retry a write operation if the operation fails.
320+
///
321+
/// The default value is true.
319322
#[builder(default)]
320-
pub(crate) retry_writes: Option<bool>,
323+
pub retry_writes: Option<bool>,
321324

322325
/// The default selection criteria for operations performed on the Client. See the
323326
/// SelectionCriteria type documentation for more details.
@@ -370,6 +373,10 @@ pub struct ClientOptions {
370373
#[builder(default)]
371374
#[serde(skip)]
372375
pub(crate) resolver_config: Option<ResolverConfig>,
376+
377+
/// Used by tests to override MIN_HEARTBEAT_FREQUENCY.
378+
#[builder(default)]
379+
pub(crate) heartbeat_freq_test: Option<Duration>,
373380
}
374381

375382
fn default_hosts() -> Vec<StreamAddress> {
@@ -585,6 +592,7 @@ impl From<ClientOptionsParser> for ClientOptions {
585592
original_srv_hostname: None,
586593
original_uri: Some(parser.original_uri),
587594
resolver_config: None,
595+
heartbeat_freq_test: None,
588596
}
589597
}
590598
}
@@ -759,6 +767,43 @@ impl ClientOptions {
759767
}
760768
Ok(())
761769
}
770+
771+
/// Applies the options in other to these options if a value is not already present
772+
#[cfg(test)]
773+
pub(crate) fn merge(&mut self, other: ClientOptions) {
774+
merge_options!(
775+
other,
776+
self,
777+
[
778+
app_name,
779+
compressors,
780+
cmap_event_handler,
781+
command_event_handler,
782+
connect_timeout,
783+
credential,
784+
direct_connection,
785+
driver_info,
786+
heartbeat_freq,
787+
local_threshold,
788+
max_idle_time,
789+
max_pool_size,
790+
min_pool_size,
791+
read_concern,
792+
repl_set_name,
793+
retry_reads,
794+
retry_writes,
795+
selection_criteria,
796+
server_selection_timeout,
797+
socket_timeout,
798+
tls,
799+
wait_queue_timeout,
800+
write_concern,
801+
zlib_compression,
802+
original_srv_hostname,
803+
original_uri
804+
]
805+
);
806+
}
762807
}
763808

764809
/// Splits a string into a section before a given index and a section exclusively after the index.

src/client/session/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ impl ClientSession {
8383
pub(crate) fn update_last_use(&mut self) {
8484
self.server_session.last_use = Instant::now();
8585
}
86+
87+
/// Increments the txn_number and returns the new value.
88+
pub(crate) fn get_and_increment_txn_number(&mut self) -> u64 {
89+
self.server_session.txn_number += 1;
90+
self.server_session.txn_number
91+
}
8692
}
8793

8894
impl Drop for ClientSession {
@@ -92,6 +98,7 @@ impl Drop for ClientSession {
9298
id: self.server_session.id.clone(),
9399
last_use: self.server_session.last_use,
94100
dirty: self.server_session.dirty,
101+
txn_number: self.server_session.txn_number,
95102
};
96103

97104
RUNTIME.execute(async move {
@@ -112,6 +119,9 @@ pub(crate) struct ServerSession {
112119

113120
/// Whether a network error was encountered while using this session.
114121
dirty: bool,
122+
123+
/// A monotonically increasing transaction number for this session.
124+
txn_number: u64,
115125
}
116126

117127
impl ServerSession {
@@ -126,6 +136,7 @@ impl ServerSession {
126136
id: doc! { "id": binary },
127137
last_use: Instant::now(),
128138
dirty: false,
139+
txn_number: 0,
129140
}
130141
}
131142

src/cmap/conn/command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ impl Command {
5656
self.body.insert("$clusterTime", doc);
5757
}
5858
}
59+
60+
pub(crate) fn set_txn_number(&mut self, txn_number: u64) {
61+
self.body.insert("txnNumber", txn_number);
62+
}
5963
}
6064

6165
#[derive(Debug, Clone)]

src/cmap/conn/stream_description.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ impl StreamDescription {
3838
}
3939
}
4040

41+
/// Whether this StreamDescription supports retryable writes.
42+
pub(crate) fn supports_retryable_writes(&self) -> bool {
43+
self.initial_server_type != ServerType::Standalone
44+
&& self.logical_session_timeout.is_some()
45+
&& self.max_wire_version.map_or(false, |version| version >= 6)
46+
}
47+
4148
/// Gets a description of a stream for a 4.2 connection.
4249
#[cfg(test)]
4350
pub(crate) fn new_testing() -> Self {

src/coll/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,11 @@ impl Collection {
287287
}
288288

289289
/// Deletes up to one document found matching `query`.
290+
///
291+
/// This operation will retry once upon failure if the connection and encountered error support
292+
/// retryability. See the documentation
293+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
294+
/// retryable writes.
290295
pub async fn delete_one(
291296
&self,
292297
query: Document,
@@ -349,6 +354,11 @@ impl Collection {
349354
}
350355

351356
/// Atomically finds up to one document in the collection matching `filter` and deletes it.
357+
///
358+
/// This operation will retry once upon failure if the connection and encountered error support
359+
/// retryability. See the documentation
360+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
361+
/// retryable writes.
352362
pub async fn find_one_and_delete(
353363
&self,
354364
filter: Document,
@@ -363,6 +373,11 @@ impl Collection {
363373

364374
/// Atomically finds up to one document in the collection matching `filter` and replaces it with
365375
/// `replacement`.
376+
///
377+
/// This operation will retry once upon failure if the connection and encountered error support
378+
/// retryability. See the documentation
379+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
380+
/// retryable writes.
366381
pub async fn find_one_and_replace(
367382
&self,
368383
filter: Document,
@@ -380,6 +395,11 @@ impl Collection {
380395
/// Both `Document` and `Vec<Document>` implement `Into<UpdateModifications>`, so either can be
381396
/// passed in place of constructing the enum case. Note: pipeline updates are only supported
382397
/// in MongoDB 4.2+.
398+
///
399+
/// This operation will retry once upon failure if the connection and encountered error support
400+
/// retryability. See the documentation
401+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
402+
/// retryable writes.
383403
pub async fn find_one_and_update(
384404
&self,
385405
filter: Document,
@@ -395,6 +415,11 @@ impl Collection {
395415
}
396416

397417
/// Inserts the documents in `docs` into the collection.
418+
///
419+
/// This operation will retry once upon failure if the connection and encountered error support
420+
/// retryability. See the documentation
421+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
422+
/// retryable writes.
398423
pub async fn insert_many(
399424
&self,
400425
docs: impl IntoIterator<Item = Document>,
@@ -477,6 +502,11 @@ impl Collection {
477502
}
478503

479504
/// Inserts `doc` into the collection.
505+
///
506+
/// This operation will retry once upon failure if the connection and encountered error support
507+
/// retryability. See the documentation
508+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
509+
/// retryable writes.
480510
pub async fn insert_one(
481511
&self,
482512
doc: Document,
@@ -498,6 +528,11 @@ impl Collection {
498528
}
499529

500530
/// Replaces up to one document matching `query` in the collection with `replacement`.
531+
///
532+
/// This operation will retry once upon failure if the connection and encountered error support
533+
/// retryability. See the documentation
534+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
535+
/// retryable writes.
501536
pub async fn replace_one(
502537
&self,
503538
query: Document,
@@ -550,6 +585,11 @@ impl Collection {
550585
/// passed in place of constructing the enum case. Note: pipeline updates are only supported
551586
/// in MongoDB 4.2+. See the official MongoDB
552587
/// [documentation](https://docs.mongodb.com/manual/reference/command/update/#behavior) for more information on specifying updates.
588+
///
589+
/// This operation will retry once upon failure if the connection and encountered error support
590+
/// retryability. See the documentation
591+
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
592+
/// retryable writes.
553593
pub async fn update_one(
554594
&self,
555595
query: Document,

0 commit comments

Comments
 (0)