@@ -26,7 +26,13 @@ use crate::{
26
26
Retryability ,
27
27
} ,
28
28
options:: SelectionCriteria ,
29
- sdam:: { HandshakePhase , SelectedServer , SessionSupportStatus , TransactionSupportStatus } ,
29
+ sdam:: {
30
+ HandshakePhase ,
31
+ SelectedServer ,
32
+ ServerType ,
33
+ SessionSupportStatus ,
34
+ TransactionSupportStatus ,
35
+ } ,
30
36
selection_criteria:: ReadPreference ,
31
37
} ;
32
38
@@ -91,6 +97,7 @@ impl Client {
91
97
. into ( ) ) ;
92
98
}
93
99
}
100
+
94
101
self . execute_operation_with_retry ( op, Some ( session) ) . await
95
102
}
96
103
None => {
@@ -135,18 +142,23 @@ impl Client {
135
142
}
136
143
}
137
144
138
- let server = match self . select_server ( op. selection_criteria ( ) ) . await {
145
+ let selection_criteria = session
146
+ . as_ref ( )
147
+ . and_then ( |s| s. transaction . pinned_mongos . as_ref ( ) )
148
+ . or_else ( || op. selection_criteria ( ) ) ;
149
+
150
+ let server = match self . select_server ( selection_criteria) . await {
139
151
Ok ( server) => server,
140
152
Err ( mut err) => {
141
- err. add_labels ( None , & session, None ) ?;
153
+ err. add_labels_and_update_pin ( None , & mut session, None ) ?;
142
154
return Err ( err) ;
143
155
}
144
156
} ;
145
157
146
158
let mut conn = match server. pool . check_out ( ) . await {
147
159
Ok ( conn) => conn,
148
160
Err ( mut err) => {
149
- err. add_labels ( None , & session, None ) ?;
161
+ err. add_labels_and_update_pin ( None , & mut session, None ) ?;
150
162
151
163
if err. is_pool_cleared ( ) {
152
164
return self . execute_retry ( & mut op, & mut session, None , err) . await ;
@@ -229,6 +241,8 @@ impl Client {
229
241
txn_number : Option < i64 > ,
230
242
first_error : Error ,
231
243
) -> Result < T :: O > {
244
+ op. update_for_retry ( ) ;
245
+
232
246
let server = match self . select_server ( op. selection_criteria ( ) ) . await {
233
247
Ok ( server) => server,
234
248
Err ( _) => {
@@ -246,8 +260,6 @@ impl Client {
246
260
return Err ( first_error) ;
247
261
}
248
262
249
- op. update_for_retry ( ) ;
250
-
251
263
match self
252
264
. execute_operation_on_connection ( op, & mut conn, session, txn_number, & retryability)
253
265
. await
@@ -286,7 +298,8 @@ impl Client {
286
298
wc. validate ( ) ?;
287
299
}
288
300
289
- let mut cmd = op. build ( connection. stream_description ( ) ?) ?;
301
+ let stream_description = connection. stream_description ( ) ?;
302
+ let mut cmd = op. build ( stream_description) ?;
290
303
self . inner
291
304
. topology
292
305
. update_command_with_read_pref ( connection. address ( ) , & mut cmd, op. selection_criteria ( ) )
@@ -324,6 +337,9 @@ impl Client {
324
337
cmd. set_start_transaction ( ) ;
325
338
cmd. set_autocommit ( ) ;
326
339
cmd. set_txn_read_concern ( * session) ?;
340
+ if stream_description. initial_server_type == ServerType :: Mongos {
341
+ session. pin_mongos ( connection. address ( ) . clone ( ) ) ;
342
+ }
327
343
session. transaction . state = TransactionState :: InProgress ;
328
344
}
329
345
TransactionState :: InProgress
@@ -471,13 +487,13 @@ impl Client {
471
487
handler. handle_command_failed_event ( command_failed_event) ;
472
488
} ) ;
473
489
474
- if let Some ( session) = session {
490
+ if let Some ( ref mut session) = session {
475
491
if err. is_network_error ( ) {
476
492
session. mark_dirty ( ) ;
477
493
}
478
494
}
479
495
480
- err. add_labels ( Some ( connection) , session, Some ( retryability) ) ?;
496
+ err. add_labels_and_update_pin ( Some ( connection) , session, Some ( retryability) ) ?;
481
497
op. handle_error ( err)
482
498
}
483
499
Ok ( response) => {
@@ -504,7 +520,11 @@ impl Client {
504
520
match op. handle_response ( response. deserialized , connection. stream_description ( ) ?) {
505
521
Ok ( response) => Ok ( response) ,
506
522
Err ( mut err) => {
507
- err. add_labels ( Some ( connection) , session, Some ( retryability) ) ?;
523
+ err. add_labels_and_update_pin (
524
+ Some ( connection) ,
525
+ session,
526
+ Some ( retryability) ,
527
+ ) ?;
508
528
Err ( err)
509
529
}
510
530
}
@@ -618,7 +638,7 @@ impl Client {
618
638
}
619
639
620
640
impl Error {
621
- /// Adds the necessary labels to this Error.
641
+ /// Adds the necessary labels to this Error, and unpins the session if needed .
622
642
///
623
643
/// A TransientTransactionError label should be added if a transaction is in progress and the
624
644
/// error is a network or server selection error.
@@ -628,10 +648,13 @@ impl Error {
628
648
/// server version, a label should only be added if the `retry_writes` client option is not set
629
649
/// to `false`, the operation during which the error occured is write-retryable, and a
630
650
/// TransientTransactionError label has not already been added.
631
- fn add_labels (
651
+ ///
652
+ /// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the
653
+ /// ClientSession should be unpinned.
654
+ fn add_labels_and_update_pin (
632
655
& mut self ,
633
656
conn : Option < & Connection > ,
634
- session : & Option < & mut ClientSession > ,
657
+ session : & mut Option < & mut ClientSession > ,
635
658
retryability : Option < & Retryability > ,
636
659
) -> Result < ( ) > {
637
660
let transaction_state = session. as_ref ( ) . map_or ( & TransactionState :: None , |session| {
@@ -675,6 +698,15 @@ impl Error {
675
698
}
676
699
}
677
700
}
701
+
702
+ if let Some ( ref mut session) = session {
703
+ if self . contains_label ( TRANSIENT_TRANSACTION_ERROR )
704
+ || self . contains_label ( UNKNOWN_TRANSACTION_COMMIT_RESULT )
705
+ {
706
+ session. unpin_mongos ( ) ;
707
+ }
708
+ }
709
+
678
710
Ok ( ( ) )
679
711
}
680
712
}
0 commit comments