Skip to content

Commit fd0f75c

Browse files
authored
RUST-954 Pin connections for transactions when connected to a load balancer (#461)
1 parent 12d827a commit fd0f75c

File tree

7 files changed

+109
-39
lines changed

7 files changed

+109
-39
lines changed

src/client/executor.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,23 +159,32 @@ impl Client {
159159
T: DeserializeOwned + Unpin + Send + Sync,
160160
{
161161
let mut details = self.execute_operation_with_details(op, session).await?;
162-
let pinned = self.pin_connection_for_cursor(&mut details.output)?;
162+
let pinned = if details.output.connection.is_pinned() {
163+
// Cursor operations on load-balanced transactions will be pinned via the transaction
164+
// pin.
165+
None
166+
} else {
167+
self.pin_connection_for_cursor(&mut details.output)?
168+
};
163169
Ok(SessionCursor::new(
164170
self.clone(),
165171
details.output.operation_output,
166172
pinned,
167173
))
168174
}
169175

176+
fn is_load_balanced(&self) -> bool {
177+
self.inner.options.load_balanced.unwrap_or(false)
178+
}
179+
170180
fn pin_connection_for_cursor<Op, T>(
171181
&self,
172182
details: &mut ExecutionOutput<Op>,
173183
) -> Result<Option<PinnedConnectionHandle>>
174184
where
175185
Op: Operation<O = CursorSpecification<T>>,
176186
{
177-
let is_load_balanced = self.inner.options.load_balanced.unwrap_or(false);
178-
if is_load_balanced && details.operation_output.info.id != 0 {
187+
if self.is_load_balanced() && details.operation_output.info.id != 0 {
179188
Ok(Some(details.connection.pin()?))
180189
} else {
181190
Ok(None)
@@ -205,7 +214,7 @@ impl Client {
205214

206215
let selection_criteria = session
207216
.as_ref()
208-
.and_then(|s| s.transaction.pinned_mongos.as_ref())
217+
.and_then(|s| s.transaction.pinned_mongos())
209218
.or_else(|| op.selection_criteria());
210219

211220
let server = match self.select_server(selection_criteria).await {
@@ -317,9 +326,20 @@ impl Client {
317326
}
318327
};
319328

320-
let mut conn = match op.pinned_connection() {
321-
Some(c) => c.take_connection().await?,
322-
None => match server.pool.check_out().await {
329+
let session_pinned = session
330+
.as_ref()
331+
.and_then(|s| s.transaction.pinned_connection());
332+
let mut conn = match (session_pinned, op.pinned_connection()) {
333+
(Some(c), None) | (None, Some(c)) => c.take_connection().await?,
334+
(Some(c), Some(_)) => {
335+
// An operation executing in a transaction should never have a pinned connection,
336+
// but in case it does, prefer the transaction's pin.
337+
if cfg!(debug_assertions) {
338+
panic!("pinned operation executing in pinned transaction");
339+
}
340+
c.take_connection().await?
341+
}
342+
(None, None) => match server.pool.check_out().await {
323343
Ok(c) => c,
324344
Err(_) => return Err(first_error),
325345
},
@@ -411,7 +431,9 @@ impl Client {
411431
cmd.set_start_transaction();
412432
cmd.set_autocommit();
413433
cmd.set_txn_read_concern(*session);
414-
if is_sharded {
434+
if self.is_load_balanced() {
435+
session.pin_connection(connection.pin()?);
436+
} else if is_sharded {
415437
session.pin_mongos(connection.address().clone());
416438
}
417439
session.transaction.state = TransactionState::InProgress;
@@ -816,7 +838,7 @@ impl Error {
816838
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
817839
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
818840
{
819-
session.unpin_mongos();
841+
session.unpin();
820842
}
821843
}
822844

src/client/session/mod.rs

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use uuid::Uuid;
1414

1515
use crate::{
1616
bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
17+
cmap::conn::PinnedConnectionHandle,
1718
error::{ErrorKind, Result},
1819
operation::{AbortTransaction, CommitTransaction, Operation},
1920
options::{SessionOptions, TransactionOptions},
@@ -110,11 +111,11 @@ pub struct ClientSession {
110111
pub(crate) snapshot_time: Option<Timestamp>,
111112
}
112113

113-
#[derive(Clone, Debug)]
114+
#[derive(Debug)]
114115
pub(crate) struct Transaction {
115116
pub(crate) state: TransactionState,
116117
pub(crate) options: Option<TransactionOptions>,
117-
pub(crate) pinned_mongos: Option<SelectionCriteria>,
118+
pub(crate) pinned: Option<TransactionPin>,
118119
pub(crate) recovery_token: Option<Document>,
119120
}
120121

@@ -132,23 +133,46 @@ impl Transaction {
132133
pub(crate) fn abort(&mut self) {
133134
self.state = TransactionState::Aborted;
134135
self.options = None;
135-
self.pinned_mongos = None;
136+
self.pinned = None;
136137
}
137138

138139
pub(crate) fn reset(&mut self) {
139140
self.state = TransactionState::None;
140141
self.options = None;
141-
self.pinned_mongos = None;
142+
self.pinned = None;
142143
self.recovery_token = None;
143144
}
145+
146+
pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
147+
match &self.pinned {
148+
Some(TransactionPin::Mongos(s)) => Some(s),
149+
_ => None,
150+
}
151+
}
152+
153+
pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
154+
match &self.pinned {
155+
Some(TransactionPin::Connection(c)) => Some(c),
156+
_ => None,
157+
}
158+
}
159+
160+
fn take(&mut self) -> Self {
161+
Transaction {
162+
state: self.state.clone(),
163+
options: self.options.take(),
164+
pinned: self.pinned.take(),
165+
recovery_token: self.recovery_token.take(),
166+
}
167+
}
144168
}
145169

146170
impl Default for Transaction {
147171
fn default() -> Self {
148172
Self {
149173
state: TransactionState::None,
150174
options: None,
151-
pinned_mongos: None,
175+
pinned: None,
152176
recovery_token: None,
153177
}
154178
}
@@ -168,6 +192,12 @@ pub(crate) enum TransactionState {
168192
Aborted,
169193
}
170194

195+
#[derive(Debug)]
196+
pub(crate) enum TransactionPin {
197+
Mongos(SelectionCriteria),
198+
Connection(PinnedConnectionHandle),
199+
}
200+
171201
impl ClientSession {
172202
/// Creates a new `ClientSession` wrapping the provided server session.
173203
pub(crate) fn new(
@@ -256,13 +286,18 @@ impl ClientSession {
256286

257287
/// Pin mongos to session.
258288
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
259-
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
260-
move |server_info: &ServerInfo| *server_info.address() == address,
289+
self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
290+
Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
261291
)));
262292
}
263293

264-
pub(crate) fn unpin_mongos(&mut self) {
265-
self.transaction.pinned_mongos = None;
294+
/// Pin the connection to the session.
295+
pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
296+
self.transaction.pinned = Some(TransactionPin::Connection(handle));
297+
}
298+
299+
pub(crate) fn unpin(&mut self) {
300+
self.transaction.pinned = None;
266301
}
267302

268303
/// Whether this session is dirty.
@@ -319,7 +354,7 @@ impl ClientSession {
319354
.into());
320355
}
321356
TransactionState::Committed { .. } => {
322-
self.unpin_mongos(); // Unpin session if previous transaction is committed.
357+
self.unpin(); // Unpin session if previous transaction is committed.
323358
}
324359
_ => {}
325360
}
@@ -495,8 +530,8 @@ impl ClientSession {
495530
.as_ref()
496531
.and_then(|options| options.write_concern.as_ref())
497532
.cloned();
498-
let selection_criteria = self.transaction.pinned_mongos.clone();
499-
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
533+
let abort_transaction =
534+
AbortTransaction::new(write_concern, self.transaction.pinned.take());
500535
self.transaction.abort();
501536
// Errors returned from running an abortTransaction command should be ignored.
502537
let _result = self
@@ -549,7 +584,7 @@ impl Drop for ClientSession {
549584
client: self.client.clone(),
550585
is_implicit: self.is_implicit,
551586
options: self.options.clone(),
552-
transaction: self.transaction.clone(),
587+
transaction: self.transaction.take(),
553588
snapshot_time: self.snapshot_time,
554589
};
555590
RUNTIME.execute(async move {

src/cmap/conn/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,11 @@ impl Connection {
313313
})
314314
}
315315

316+
/// Whether this connection has a live `PinnedConnectionHandle`.
317+
pub(crate) fn is_pinned(&self) -> bool {
318+
self.pinned_sender.is_some()
319+
}
320+
316321
/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
317322
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
318323
self.close(reason);

src/operation/abort_transaction/mod.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use bson::Document;
22

33
use crate::{
44
bson::doc,
5-
cmap::{Command, StreamDescription},
5+
client::session::TransactionPin,
6+
cmap::{conn::PinnedConnectionHandle, Command, StreamDescription},
67
error::Result,
78
operation::{Operation, Retryability},
89
options::WriteConcern,
@@ -13,17 +14,14 @@ use super::{CommandResponse, Response, WriteConcernOnlyBody};
1314

1415
pub(crate) struct AbortTransaction {
1516
write_concern: Option<WriteConcern>,
16-
selection_criteria: Option<SelectionCriteria>,
17+
pinned: Option<TransactionPin>,
1718
}
1819

1920
impl AbortTransaction {
20-
pub(crate) fn new(
21-
write_concern: Option<WriteConcern>,
22-
selection_criteria: Option<SelectionCriteria>,
23-
) -> Self {
21+
pub(crate) fn new(write_concern: Option<WriteConcern>, pinned: Option<TransactionPin>) -> Self {
2422
Self {
2523
write_concern,
26-
selection_criteria,
24+
pinned,
2725
}
2826
}
2927
}
@@ -59,7 +57,17 @@ impl Operation for AbortTransaction {
5957
}
6058

6159
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
62-
self.selection_criteria.as_ref()
60+
match &self.pinned {
61+
Some(TransactionPin::Mongos(s)) => Some(s),
62+
_ => None,
63+
}
64+
}
65+
66+
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
67+
match &self.pinned {
68+
Some(TransactionPin::Connection(h)) => Some(h),
69+
_ => None,
70+
}
6371
}
6472

6573
fn write_concern(&self) -> Option<&WriteConcern> {
@@ -72,6 +80,6 @@ impl Operation for AbortTransaction {
7280

7381
fn update_for_retry(&mut self) {
7482
// The session must be "unpinned" before server selection for a retry.
75-
self.selection_criteria = None;
83+
self.pinned = None;
7684
}
7785
}

src/test/spec/unified_runner/operation.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,8 +1077,8 @@ impl TestOperation for TargetedFailPoint {
10771077
let session = test_runner.get_session(&self.session);
10781078
let selection_criteria = session
10791079
.transaction
1080-
.pinned_mongos
1081-
.clone()
1080+
.pinned_mongos()
1081+
.cloned()
10821082
.unwrap_or_else(|| panic!("ClientSession not pinned"));
10831083
let fail_point_guard = test_runner
10841084
.internal_client
@@ -1312,7 +1312,7 @@ impl TestOperation for AssertSessionPinned {
13121312
assert!(test_runner
13131313
.get_session(&self.session)
13141314
.transaction
1315-
.pinned_mongos
1315+
.pinned_mongos()
13161316
.is_some());
13171317
}
13181318
.boxed()
@@ -1334,7 +1334,7 @@ impl TestOperation for AssertSessionUnpinned {
13341334
assert!(test_runner
13351335
.get_session(&self.session)
13361336
.transaction
1337-
.pinned_mongos
1337+
.pinned_mongos()
13381338
.is_none());
13391339
}
13401340
.boxed()

src/test/spec/v2_runner/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,8 @@ pub async fn run_v2_test(test_file: TestFile) {
263263
let selection_criteria = session
264264
.unwrap()
265265
.transaction
266-
.pinned_mongos
267-
.clone()
266+
.pinned_mongos()
267+
.cloned()
268268
.unwrap_or_else(|| panic!("ClientSession is not pinned"));
269269

270270
fail_point_guards.push(

src/test/spec/v2_runner/operation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ impl TestOperation for AssertSessionPinned {
10561056
session: &'a mut ClientSession,
10571057
) -> BoxFuture<'a, Result<Option<Bson>>> {
10581058
async move {
1059-
assert!(session.transaction.pinned_mongos.is_some());
1059+
assert!(session.transaction.pinned_mongos().is_some());
10601060
Ok(None)
10611061
}
10621062
.boxed()
@@ -1072,7 +1072,7 @@ impl TestOperation for AssertSessionUnpinned {
10721072
session: &'a mut ClientSession,
10731073
) -> BoxFuture<'a, Result<Option<Bson>>> {
10741074
async move {
1075-
assert!(session.transaction.pinned_mongos.is_none());
1075+
assert!(session.transaction.pinned_mongos().is_none());
10761076
Ok(None)
10771077
}
10781078
.boxed()

0 commit comments

Comments
 (0)