1
- use std:: { future:: IntoFuture , time:: Duration } ;
1
+ use std:: { future:: IntoFuture , sync :: Arc , time:: Duration } ;
2
2
3
3
use crate :: bson:: doc;
4
4
@@ -8,6 +8,7 @@ use crate::{
8
8
cmap:: { CmapEvent , ConnectionCheckoutFailedReason } ,
9
9
command:: CommandEvent ,
10
10
} ,
11
+ options:: SelectionCriteria ,
11
12
runtime:: { self , AsyncJoinHandle } ,
12
13
test:: {
13
14
block_connection_supported,
@@ -174,23 +175,40 @@ async fn retry_read_different_mongos() {
174
175
client_options. hosts . drain ( 2 ..) ;
175
176
client_options. retry_reads = Some ( true ) ;
176
177
177
- let mut guards = vec ! [ ] ;
178
- for ix in [ 0 , 1 ] {
179
- let mut opts = client_options. clone ( ) ;
180
- opts. hosts . remove ( ix) ;
181
- opts. direct_connection = Some ( true ) ;
182
- let client = Client :: for_test ( ) . options ( opts) . await ;
178
+ let hosts = client_options. hosts . clone ( ) ;
179
+ let client = Client :: for_test ( )
180
+ . options ( client_options)
181
+ . monitor_events ( )
182
+ . await ;
183
183
184
+ // NOTE: This test uses a single client to set failpoints on each mongos and run the find
185
+ // operation. This avoids flakiness caused by a race between server discovery and server
186
+ // selection.
187
+
188
+ // When a client is first created, it initializes its view of the topology with all configured
189
+ // mongos addresses, but marks each as Unknown until it completes the server discovery process
190
+ // by sending and receiving "hello" messages Unknown servers are not eligible for server
191
+ // selection.
192
+
193
+ // Previously, we created a new client for each call to `enable_fail_point` and for the find
194
+ // operation. Each new client restarted the discovery process, and sometimes had not yet marked
195
+ // both mongos servers as usable, leading to test failures when the retry logic couldn't find a
196
+ // second eligible server.
197
+
198
+ // By reusing a single client, each `enable_fail_point` call forces discovery to complete for
199
+ // the corresponding mongos. As a result, when the find operation runs, the client has a
200
+ // fully discovered topology and can reliably select between both servers.
201
+ let mut guards = Vec :: new ( ) ;
202
+ for address in hosts {
203
+ let address = address. clone ( ) ;
184
204
let fail_point = FailPoint :: fail_command ( & [ "find" ] , FailPointMode :: Times ( 1 ) )
185
205
. error_code ( 6 )
186
- . close_connection ( true ) ;
206
+ . selection_criteria ( SelectionCriteria :: Predicate ( Arc :: new ( move |info| {
207
+ info. description . address == address
208
+ } ) ) ) ;
187
209
guards. push ( client. enable_fail_point ( fail_point) . await . unwrap ( ) ) ;
188
210
}
189
211
190
- let client = Client :: for_test ( )
191
- . options ( client_options)
192
- . monitor_events ( )
193
- . await ;
194
212
let result = client
195
213
. database ( "test" )
196
214
. collection :: < crate :: bson:: Document > ( "retry_read_different_mongos" )
@@ -211,6 +229,14 @@ async fn retry_read_different_mongos() {
211
229
"unexpected events: {:#?}" ,
212
230
events,
213
231
) ;
232
+ let first_failed = events[ 1 ] . as_command_failed ( ) . unwrap ( ) ;
233
+ let first_address = & first_failed. connection . address ;
234
+ let second_failed = events[ 3 ] . as_command_failed ( ) . unwrap ( ) ;
235
+ let second_address = & second_failed. connection . address ;
236
+ assert_ne ! (
237
+ first_address, second_address,
238
+ "Failed commands did not occur on two different mongos instances"
239
+ ) ;
214
240
215
241
drop ( guards) ; // enforce lifetime
216
242
}
@@ -235,12 +261,11 @@ async fn retry_read_same_mongos() {
235
261
client_options. direct_connection = Some ( true ) ;
236
262
let client = Client :: for_test ( ) . options ( client_options) . await ;
237
263
238
- let fail_point = FailPoint :: fail_command ( & [ "find" ] , FailPointMode :: Times ( 1 ) )
239
- . error_code ( 6 )
240
- . close_connection ( true ) ;
264
+ let fail_point = FailPoint :: fail_command ( & [ "find" ] , FailPointMode :: Times ( 1 ) ) . error_code ( 6 ) ;
241
265
client. enable_fail_point ( fail_point) . await . unwrap ( )
242
266
} ;
243
267
268
+ client_options. direct_connection = Some ( false ) ;
244
269
let client = Client :: for_test ( )
245
270
. options ( client_options)
246
271
. monitor_events ( )
@@ -265,6 +290,14 @@ async fn retry_read_same_mongos() {
265
290
"unexpected events: {:#?}" ,
266
291
events,
267
292
) ;
293
+ let first_failed = events[ 1 ] . as_command_failed ( ) . unwrap ( ) ;
294
+ let first_address = & first_failed. connection . address ;
295
+ let second_failed = events[ 3 ] . as_command_succeeded ( ) . unwrap ( ) ;
296
+ let second_address = & second_failed. connection . address ;
297
+ assert_eq ! (
298
+ first_address, second_address,
299
+ "Failed command and retry did not occur on the same mongos instance" ,
300
+ ) ;
268
301
269
302
drop ( fp_guard) ; // enforce lifetime
270
303
}
0 commit comments