@@ -6,6 +6,8 @@ use chrono::naive::NaiveDateTime;
6
6
use log:: { debug, error, info, warn} ;
7
7
use once_cell:: sync:: Lazy ;
8
8
use parking_lot:: { Mutex , RwLock } ;
9
+ use rand:: seq:: SliceRandom ;
10
+ use rand:: thread_rng;
9
11
use std:: collections:: HashMap ;
10
12
use std:: sync:: Arc ;
11
13
use std:: time:: Instant ;
@@ -118,7 +120,7 @@ impl ConnectionPool {
118
120
host : server. 0 . clone ( ) ,
119
121
port : server. 1 . to_string ( ) ,
120
122
role : role,
121
- replica_number,
123
+ instance_index : replica_number,
122
124
shard : shard_idx. parse :: < usize > ( ) . unwrap ( ) ,
123
125
username : user_info. username . clone ( ) ,
124
126
poolname : pool_name. clone ( ) ,
@@ -201,42 +203,30 @@ impl ConnectionPool {
201
203
/// the pooler starts up.
202
204
async fn validate ( & mut self ) -> Result < ( ) , Error > {
203
205
let mut server_infos = Vec :: new ( ) ;
204
- let stats = self . stats . clone ( ) ;
205
-
206
206
for shard in 0 ..self . shards ( ) {
207
- let mut round_robin = 0 ;
208
-
209
- for _ in 0 ..self . servers ( shard) {
210
- // To keep stats consistent.
211
- let fake_process_id = 0 ;
212
-
213
- let connection = match self . get ( shard, None , fake_process_id, round_robin) . await {
207
+ for index in 0 ..self . servers ( shard) {
208
+ let connection = match self . databases [ shard] [ index] . get ( ) . await {
214
209
Ok ( conn) => conn,
215
210
Err ( err) => {
216
211
error ! ( "Shard {} down or misconfigured: {:?}" , shard, err) ;
217
212
continue ;
218
213
}
219
214
} ;
220
215
221
- let proxy = connection. 0 ;
222
- let address = connection. 1 ;
216
+ let proxy = connection;
223
217
let server = & * proxy;
224
218
let server_info = server. server_info ( ) ;
225
219
226
- stats. client_disconnecting ( fake_process_id, address. id ) ;
227
-
228
220
if server_infos. len ( ) > 0 {
229
221
// Compare against the last server checked.
230
222
if server_info != server_infos[ server_infos. len ( ) - 1 ] {
231
223
warn ! (
232
224
"{:?} has different server configuration than the last server" ,
233
- address
225
+ proxy . address( )
234
226
) ;
235
227
}
236
228
}
237
-
238
229
server_infos. push ( server_info) ;
239
- round_robin += 1 ;
240
230
}
241
231
}
242
232
@@ -254,70 +244,46 @@ impl ConnectionPool {
254
244
/// Get a connection from the pool.
255
245
pub async fn get (
256
246
& self ,
257
- shard : usize , // shard number
258
- role : Option < Role > , // primary or replica
259
- process_id : i32 , // client id
260
- mut round_robin : usize , // round robin offset
247
+ shard : usize , // shard number
248
+ role : Option < Role > , // primary or replica
249
+ process_id : i32 , // client id
261
250
) -> Result < ( PooledConnection < ' _ , ServerPool > , Address ) , Error > {
262
251
let now = Instant :: now ( ) ;
263
- let addresses = & self . addresses [ shard] ;
264
-
265
- let mut allowed_attempts = match role {
266
- // Primary-specific queries get one attempt, if the primary is down,
267
- // nothing we should do about it I think. It's dangerous to retry
268
- // write queries.
269
- Some ( Role :: Primary ) => 1 ,
252
+ let mut candidates: Vec < Address > = self . addresses [ shard]
253
+ . clone ( )
254
+ . into_iter ( )
255
+ . filter ( |address| address. role == role)
256
+ . collect ( ) ;
270
257
271
- // Replicas get to try as many times as there are replicas
272
- // and connections in the pool.
273
- _ => addresses. len ( ) ,
274
- } ;
275
-
276
- debug ! ( "Allowed attempts for {:?}: {}" , role, allowed_attempts) ;
277
-
278
- let exists = match role {
279
- Some ( role) => addresses. iter ( ) . filter ( |addr| addr. role == role) . count ( ) > 0 ,
280
- None => true ,
281
- } ;
282
-
283
- if !exists {
284
- error ! ( "Requested role {:?}, but none are configured" , role) ;
285
- return Err ( Error :: BadConfig ) ;
286
- }
258
+ // Random load balancing
259
+ candidates. shuffle ( & mut thread_rng ( ) ) ;
287
260
288
261
let healthcheck_timeout = get_config ( ) . general . healthcheck_timeout ;
289
262
let healthcheck_delay = get_config ( ) . general . healthcheck_delay as u128 ;
290
263
291
- while allowed_attempts > 0 {
292
- // Round-robin replicas.
293
- round_robin += 1 ;
294
-
295
- let index = round_robin % addresses. len ( ) ;
296
- let address = & addresses[ index] ;
297
-
298
- // Make sure you're getting a primary or a replica
299
- // as per request. If no specific role is requested, the first
300
- // available will be chosen.
301
- if address. role != role {
302
- continue ;
303
- }
304
-
305
- allowed_attempts -= 1 ;
264
+ while !candidates. is_empty ( ) {
265
+ // Get the next candidate
266
+ let address = match candidates. pop ( ) {
267
+ Some ( address) => address,
268
+ None => break ,
269
+ } ;
306
270
307
- // Don't attempt to connect to banned servers.
308
- if self . is_banned ( address, shard, role) {
271
+ if self . is_banned ( & address, address. shard , role) {
309
272
continue ;
310
273
}
311
274
312
275
// Indicate we're waiting on a server connection from a pool.
313
276
self . stats . client_waiting ( process_id, address. id ) ;
314
277
315
278
// Check if we can connect
316
- let mut conn = match self . databases [ shard] [ index] . get ( ) . await {
279
+ let mut conn = match self . databases [ address. shard ] [ address. instance_index ]
280
+ . get ( )
281
+ . await
282
+ {
317
283
Ok ( conn) => conn,
318
284
Err ( err) => {
319
- error ! ( "Banning replica { }, error: {:?}" , index , err) ;
320
- self . ban ( address, shard, process_id) ;
285
+ error ! ( "Banning instance {:? }, error: {:?}" , address , err) ;
286
+ self . ban ( & address, address . shard , process_id) ;
321
287
self . stats . client_disconnecting ( process_id, address. id ) ;
322
288
self . stats
323
289
. checkout_time ( now. elapsed ( ) . as_micros ( ) , process_id, address. id ) ;
@@ -359,29 +325,34 @@ impl ConnectionPool {
359
325
}
360
326
361
327
// Health check failed.
362
- Err ( _) => {
363
- error ! ( "Banning replica {} because of failed health check" , index) ;
328
+ Err ( err) => {
329
+ error ! (
330
+ "Banning instance {:?} because of failed health check, {:?}" ,
331
+ address, err
332
+ ) ;
364
333
365
334
// Don't leave a bad connection in the pool.
366
335
server. mark_bad ( ) ;
367
336
368
- self . ban ( address, shard, process_id) ;
337
+ self . ban ( & address, address . shard , process_id) ;
369
338
continue ;
370
339
}
371
340
} ,
372
341
373
342
// Health check timed out.
374
- Err ( _) => {
375
- error ! ( "Banning replica {} because of health check timeout" , index) ;
343
+ Err ( err) => {
344
+ error ! (
345
+ "Banning instance {:?} because of health check timeout, {:?}" ,
346
+ address, err
347
+ ) ;
376
348
// Don't leave a bad connection in the pool.
377
349
server. mark_bad ( ) ;
378
350
379
- self . ban ( address, shard, process_id) ;
351
+ self . ban ( & address, address . shard , process_id) ;
380
352
continue ;
381
353
}
382
354
}
383
355
}
384
-
385
356
return Err ( Error :: AllServersDown ) ;
386
357
}
387
358
0 commit comments