Skip to content

Commit 30def14

Browse files
committed
RUST-97 Support sharded transactions recovery token (#398)
1 parent f77590c commit 30def14

File tree

11 files changed

+1000
-11
lines changed

11 files changed

+1000
-11
lines changed

src/client/executor.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ impl Client {
299299
}
300300

301301
let stream_description = connection.stream_description()?;
302+
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
302303
let mut cmd = op.build(stream_description)?;
303304
self.inner
304305
.topology
@@ -337,15 +338,22 @@ impl Client {
337338
cmd.set_start_transaction();
338339
cmd.set_autocommit();
339340
cmd.set_txn_read_concern(*session)?;
340-
if stream_description.initial_server_type == ServerType::Mongos {
341+
if is_sharded {
341342
session.pin_mongos(connection.address().clone());
342343
}
343344
session.transaction.state = TransactionState::InProgress;
344345
}
345-
TransactionState::InProgress
346-
| TransactionState::Committed { .. }
347-
| TransactionState::Aborted => {
346+
TransactionState::InProgress => cmd.set_autocommit(),
347+
TransactionState::Committed { .. } | TransactionState::Aborted => {
348348
cmd.set_autocommit();
349+
350+
// Append the recovery token to the command if we are committing or aborting
351+
// on a sharded transaction.
352+
if is_sharded {
353+
if let Some(ref recovery_token) = session.transaction.recovery_token {
354+
cmd.set_recovery_token(recovery_token);
355+
}
356+
}
349357
}
350358
_ => {}
351359
}
@@ -414,6 +422,9 @@ impl Client {
414422
Ok(r) => {
415423
self.update_cluster_time(&r, session).await;
416424
if r.is_success() {
425+
// Retrieve recovery token from successful response.
426+
Client::update_recovery_token(is_sharded, &r, session).await;
427+
417428
Ok(CommandResult {
418429
raw: response,
419430
deserialized: r.into_body(),
@@ -458,7 +469,15 @@ impl Client {
458469
}))
459470
}
460471
// for ok: 1 just return the original deserialization error.
461-
_ => Err(deserialize_error),
472+
_ => {
473+
Client::update_recovery_token(
474+
is_sharded,
475+
&error_response,
476+
session,
477+
)
478+
.await;
479+
Err(deserialize_error)
480+
}
462481
}
463482
}
464483
// We failed to deserialize even that, so just return the original
@@ -635,6 +654,18 @@ impl Client {
635654
}
636655
}
637656
}
657+
658+
async fn update_recovery_token<T: Response>(
659+
is_sharded: bool,
660+
response: &T,
661+
session: &mut Option<&mut ClientSession>,
662+
) {
663+
if let Some(ref mut session) = session {
664+
if is_sharded && session.in_transaction() {
665+
session.transaction.recovery_token = response.recovery_token().cloned();
666+
}
667+
}
668+
}
638669
}
639670

640671
impl Error {

src/client/session/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,14 @@ pub(crate) struct Transaction {
118118
pub(crate) state: TransactionState,
119119
pub(crate) options: Option<TransactionOptions>,
120120
pub(crate) pinned_mongos: Option<SelectionCriteria>,
121+
pub(crate) recovery_token: Option<Document>,
121122
}
122123

123124
impl Transaction {
124125
pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
125126
self.state = TransactionState::Starting;
126127
self.options = options;
128+
self.recovery_token = None;
127129
}
128130

129131
pub(crate) fn commit(&mut self, data_committed: bool) {
@@ -140,6 +142,7 @@ impl Transaction {
140142
self.state = TransactionState::None;
141143
self.options = None;
142144
self.pinned_mongos = None;
145+
self.recovery_token = None;
143146
}
144147
}
145148

@@ -149,6 +152,7 @@ impl Default for Transaction {
149152
state: TransactionState::None,
150153
options: None,
151154
pinned_mongos: None,
155+
recovery_token: None,
152156
}
153157
}
154158
}

src/cmap/conn/command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ impl Command {
4141
}
4242
}
4343

44+
pub(crate) fn set_recovery_token(&mut self, recovery_token: &Document) {
45+
self.body.insert("recoveryToken", recovery_token);
46+
}
47+
4448
pub(crate) fn set_txn_number(&mut self, txn_number: i64) {
4549
self.body.insert("txnNumber", txn_number);
4650
}

src/operation/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ pub(crate) trait Response: Sized {
153153
/// The `atClusterTime` field of the response.
154154
fn at_cluster_time(&self) -> Option<Timestamp>;
155155

156+
/// The `recoveryToken` field of the response.
157+
fn recovery_token(&self) -> Option<&Document>;
158+
156159
/// Convert into the body of the response.
157160
fn into_body(self) -> Self::Body;
158161
}
@@ -168,6 +171,8 @@ pub(crate) struct CommandResponse<T> {
168171

169172
pub(crate) at_cluster_time: Option<Timestamp>,
170173

174+
pub(crate) recovery_token: Option<Document>,
175+
171176
#[serde(flatten)]
172177
pub(crate) body: T,
173178
}
@@ -197,6 +202,10 @@ impl<T: DeserializeOwned> Response for CommandResponse<T> {
197202
self.at_cluster_time
198203
}
199204

205+
fn recovery_token(&self) -> Option<&Document> {
206+
self.recovery_token.as_ref()
207+
}
208+
200209
fn into_body(self) -> Self::Body {
201210
self.body
202211
}
@@ -229,6 +238,10 @@ impl<T: DeserializeOwned> Response for CursorResponse<T> {
229238
self.response.body.cursor.at_cluster_time
230239
}
231240

241+
fn recovery_token(&self) -> Option<&Document> {
242+
self.response.recovery_token()
243+
}
244+
232245
fn into_body(self) -> Self::Body {
233246
self.response.body
234247
}

src/operation/run_command/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl Operation for RunCommand {
9696
pub(crate) struct Response {
9797
doc: Document,
9898
cluster_time: Option<ClusterTime>,
99+
recovery_token: Option<Document>,
99100
}
100101

101102
impl super::Response for Response {
@@ -109,7 +110,13 @@ impl super::Response for Response {
109110
.ok()
110111
.and_then(|doc| bson::from_document(doc.clone()).ok());
111112

112-
Ok(Self { doc, cluster_time })
113+
let recovery_token = doc.get_document("recoveryToken").ok().cloned();
114+
115+
Ok(Self {
116+
doc,
117+
cluster_time,
118+
recovery_token,
119+
})
113120
}
114121

115122
fn ok(&self) -> Option<&Bson> {
@@ -131,6 +138,10 @@ impl super::Response for Response {
131138
.ok()
132139
}
133140

141+
fn recovery_token(&self) -> Option<&Document> {
142+
self.recovery_token.as_ref()
143+
}
144+
134145
fn into_body(self) -> Self::Body {
135146
self.doc
136147
}

0 commit comments

Comments
 (0)