@@ -158,7 +158,7 @@ impl Client {
158
158
. execute_operation_with_details ( op. borrow_mut ( ) , None )
159
159
. await ?;
160
160
let pinned =
161
- self . pin_connection_for_cursor ( & details. output , & mut details. connection ) ?;
161
+ self . pin_connection_for_cursor ( & details. output , & mut details. connection , None ) ?;
162
162
Ok ( Cursor :: new (
163
163
self . clone ( ) ,
164
164
details. output ,
@@ -181,8 +181,11 @@ impl Client {
181
181
. execute_operation_with_details ( op. borrow_mut ( ) , & mut * session)
182
182
. await ?;
183
183
184
- let pinned =
185
- self . pin_connection_for_session ( & details. output , & mut details. connection , session) ?;
184
+ let pinned = self . pin_connection_for_cursor (
185
+ & details. output ,
186
+ & mut details. connection ,
187
+ Some ( session) ,
188
+ ) ?;
186
189
Ok ( SessionCursor :: new ( self . clone ( ) , details. output , pinned) )
187
190
}
188
191
@@ -194,25 +197,16 @@ impl Client {
194
197
& self ,
195
198
spec : & CursorSpecification ,
196
199
conn : & mut PooledConnection ,
200
+ session : Option < & mut ClientSession > ,
197
201
) -> Result < Option < PinnedConnectionHandle > > {
198
- if self . is_load_balanced ( ) && spec. info . id != 0 {
199
- Ok ( Some ( conn. pin ( ) ?) )
200
- } else {
201
- Ok ( None )
202
- }
203
- }
204
-
205
- fn pin_connection_for_session (
206
- & self ,
207
- spec : & CursorSpecification ,
208
- conn : & mut PooledConnection ,
209
- session : & mut ClientSession ,
210
- ) -> Result < Option < PinnedConnectionHandle > > {
211
- if let Some ( handle) = session. transaction . pinned_connection ( ) {
202
+ if let Some ( handle) = session. and_then ( |s| s. transaction . pinned_connection ( ) ) {
212
203
// Cursor operations on a transaction share the same pinned connection.
213
204
Ok ( Some ( handle. replicate ( ) ) )
205
+ } else if self . is_load_balanced ( ) && spec. info . id != 0 {
206
+ // Cursor operations on load balanced topologies always pin connections.
207
+ Ok ( Some ( conn. pin ( ) ?) )
214
208
} else {
215
- self . pin_connection_for_cursor ( spec , conn )
209
+ Ok ( None )
216
210
}
217
211
}
218
212
@@ -245,7 +239,8 @@ impl Client {
245
239
details. implicit_session = Some ( session) ;
246
240
}
247
241
let ( cursor_spec, cs_data) = details. output ;
248
- let pinned = self . pin_connection_for_cursor ( & cursor_spec, & mut details. connection ) ?;
242
+ let pinned =
243
+ self . pin_connection_for_cursor ( & cursor_spec, & mut details. connection , None ) ?;
249
244
let cursor = Cursor :: new ( self . clone ( ) , cursor_spec, details. implicit_session , pinned) ;
250
245
251
246
Ok ( ChangeStream :: new ( cursor, args, cs_data) )
@@ -277,8 +272,11 @@ impl Client {
277
272
. execute_operation_with_details ( & mut op, & mut * session)
278
273
. await ?;
279
274
let ( cursor_spec, cs_data) = details. output ;
280
- let pinned =
281
- self . pin_connection_for_session ( & cursor_spec, & mut details. connection , session) ?;
275
+ let pinned = self . pin_connection_for_cursor (
276
+ & cursor_spec,
277
+ & mut details. connection ,
278
+ Some ( session) ,
279
+ ) ?;
282
280
let cursor = SessionCursor :: new ( self . clone ( ) , cursor_spec, pinned) ;
283
281
284
282
Ok ( SessionChangeStream :: new ( cursor, args, cs_data) )
@@ -1063,6 +1061,7 @@ struct ExecutionDetails<T: Operation> {
1063
1061
implicit_session : Option < ClientSession > ,
1064
1062
}
1065
1063
1064
+ #[ derive( Debug ) ]
1066
1065
struct ExecutionRetry {
1067
1066
prior_txn_number : Option < i64 > ,
1068
1067
first_error : Error ,
0 commit comments