Skip to content

Commit 2ba0568

Browse files
minor: refactor transactions example (#801)
1 parent 8a36fd4 commit 2ba0568

File tree

2 files changed

+99
-74
lines changed

2 files changed

+99
-74
lines changed

tests/transaction_examples.rs

Lines changed: 0 additions & 74 deletions
This file was deleted.

tests/transactions_example.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#![cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))]
2+
3+
// START TRANSACTIONS EXAMPLE
4+
use mongodb::{
5+
bson::{doc, Document},
6+
error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT},
7+
options::{
8+
Acknowledgment,
9+
ReadConcern,
10+
ReadPreference,
11+
SelectionCriteria,
12+
TransactionOptions,
13+
WriteConcern,
14+
},
15+
Client,
16+
ClientSession,
17+
};
18+
19+
#[tokio::main]
20+
async fn main() -> Result<()> {
21+
let uri = std::env::var("MONGODB_URI").expect("MONGODB_URI must be set");
22+
let client = Client::with_uri_str(uri).await?;
23+
24+
let mut session = client.start_session(None).await?;
25+
run_transaction_with_retry(&mut session).await
26+
}
27+
28+
async fn run_transaction_with_retry(session: &mut ClientSession) -> Result<()> {
29+
loop {
30+
match execute_transaction(session).await {
31+
Ok(()) => {
32+
println!("Transaction succeeded.");
33+
return Ok(());
34+
}
35+
Err(error) => {
36+
if error.contains_label(TRANSIENT_TRANSACTION_ERROR) {
37+
println!("TransientTransactionError, retrying transaction...");
38+
continue;
39+
} else {
40+
session.abort_transaction().await?;
41+
return Err(error);
42+
}
43+
}
44+
}
45+
}
46+
}
47+
48+
async fn execute_transaction(session: &mut ClientSession) -> Result<()> {
49+
let transaction_options = TransactionOptions::builder()
50+
.read_concern(ReadConcern::snapshot())
51+
.write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build())
52+
.selection_criteria(SelectionCriteria::ReadPreference(ReadPreference::Primary))
53+
.build();
54+
session.start_transaction(transaction_options).await?;
55+
56+
let client = session.client();
57+
let employees = client.database("hr").collection::<Document>("employees");
58+
let events = client.database("reporting").collection("events");
59+
60+
employees
61+
.update_one_with_session(
62+
doc! { "employee": 3 },
63+
doc! { "$set": { "status": "Inactive" } },
64+
None,
65+
session,
66+
)
67+
.await?;
68+
69+
events
70+
.insert_one_with_session(
71+
doc! { "employee": 3, "status": { "new": "Inactive", "old": "Active" } },
72+
None,
73+
session,
74+
)
75+
.await?;
76+
77+
commit_with_retry(session).await
78+
}
79+
80+
async fn commit_with_retry(session: &mut ClientSession) -> Result<()> {
81+
loop {
82+
match session.commit_transaction().await {
83+
Ok(()) => {
84+
println!("Transaction committed.");
85+
return Ok(());
86+
}
87+
Err(error) => {
88+
if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
89+
println!("UnknownTransactionCommitResult, retrying commit operation...");
90+
continue;
91+
} else {
92+
println!("Error during commit.");
93+
return Err(error);
94+
}
95+
}
96+
}
97+
}
98+
}
99+
// END TRANSACTIONS EXAMPLE

0 commit comments

Comments
 (0)