@@ -318,72 +318,74 @@ impl Accept {
318
318
}
319
319
}
320
320
321
- fn accept_one ( & mut self , sockets : & mut Slab < ServerSocketInfo > , mut msg : Conn ) {
321
+ fn accept_one ( & mut self , sockets : & mut Slab < ServerSocketInfo > , mut conn : Conn ) {
322
322
if self . backpressure {
323
+ // send_connection would remove fault worker from handles.
324
+ // worst case here is conn get dropped after all handles are gone.
323
325
while !self . handles . is_empty ( ) {
324
- match self . handles [ self . next ] . send ( msg) {
325
- Ok ( _) => {
326
- self . set_next ( ) ;
327
- break ;
328
- }
329
- Err ( tmp) => {
330
- // worker lost contact and could be gone. a message is sent to
331
- // `ServerBuilder` future to notify it a new worker should be made
332
- // after that remove the fault worker
333
- self . srv . worker_faulted ( self . handles [ self . next ] . idx ) ;
334
- msg = tmp;
335
- self . handles . swap_remove ( self . next ) ;
336
- if self . handles . is_empty ( ) {
337
- error ! ( "No workers" ) ;
338
- return ;
339
- } else if self . handles . len ( ) <= self . next {
340
- self . next = 0 ;
341
- }
342
- continue ;
343
- }
326
+ match self . send_connection ( sockets, conn) {
327
+ Ok ( _) => return ,
328
+ Err ( c) => conn = c,
344
329
}
345
330
}
346
331
} else {
332
+ // Do one round and try to send conn to all workers until it succeed.
333
+ // Start from self.next.
347
334
let mut idx = 0 ;
348
335
while idx < self . handles . len ( ) {
349
336
idx += 1 ;
350
337
if self . handles [ self . next ] . available ( ) {
351
- match self . handles [ self . next ] . send ( msg) {
352
- Ok ( _) => {
353
- self . set_next ( ) ;
354
- return ;
355
- }
356
- // worker lost contact and could be gone. a message is sent to
357
- // `ServerBuilder` future to notify it a new worker should be made.
358
- // after that remove the fault worker and enter backpressure if necessary.
359
- Err ( tmp) => {
360
- self . srv . worker_faulted ( self . handles [ self . next ] . idx ) ;
361
- msg = tmp;
362
- self . handles . swap_remove ( self . next ) ;
363
- if self . handles . is_empty ( ) {
364
- error ! ( "No workers" ) ;
365
- self . maybe_backpressure ( sockets, true ) ;
366
- return ;
367
- } else if self . handles . len ( ) <= self . next {
368
- self . next = 0 ;
369
- }
370
- continue ;
371
- }
338
+ match self . send_connection ( sockets, conn) {
339
+ Ok ( _) => return ,
340
+ Err ( c) => conn = c,
372
341
}
342
+ } else {
343
+ self . set_next ( ) ;
373
344
}
374
- self . set_next ( ) ;
375
345
}
376
- // enable backpressure
346
+ // Sending Conn failed due to either all workers are in error or not available.
347
+ // Enter backpressure state and try again.
377
348
self . maybe_backpressure ( sockets, true ) ;
378
- self . accept_one ( sockets, msg ) ;
349
+ self . accept_one ( sockets, conn ) ;
379
350
}
380
351
}
381
352
382
- // set next worker handle that would accept work.
353
+ // Set next worker handle that would accept work.
383
354
fn set_next ( & mut self ) {
384
355
self . next = ( self . next + 1 ) % self . handles . len ( ) ;
385
356
}
386
357
358
+ // Send connection to worker and handle error.
359
+ fn send_connection (
360
+ & mut self ,
361
+ sockets : & mut Slab < ServerSocketInfo > ,
362
+ conn : Conn ,
363
+ ) -> Result < ( ) , Conn > {
364
+ match self . handles [ self . next ] . send ( conn) {
365
+ Ok ( _) => {
366
+ self . set_next ( ) ;
367
+ Ok ( ( ) )
368
+ }
369
+ Err ( conn) => {
370
+ // worker lost contact and could be gone. a message is sent to
371
+ // `ServerBuilder` future to notify it a new worker should be made.
372
+ // after that remove the fault worker and enter backpressure if necessary.
373
+ self . srv . worker_faulted ( self . handles [ self . next ] . idx ) ;
374
+ self . handles . swap_remove ( self . next ) ;
375
+ if self . handles . is_empty ( ) {
376
+ error ! ( "No workers" ) ;
377
+ self . maybe_backpressure ( sockets, true ) ;
378
+ // All workers are gone and Conn is nowhere to be sent.
379
+ // Treat this situation as Ok and drop Conn.
380
+ return Ok ( ( ) ) ;
381
+ } else if self . handles . len ( ) <= self . next {
382
+ self . next = 0 ;
383
+ }
384
+ Err ( conn)
385
+ }
386
+ }
387
+ }
388
+
387
389
fn accept ( & mut self , sockets : & mut Slab < ServerSocketInfo > , token : usize ) {
388
390
loop {
389
391
let info = sockets
0 commit comments