@@ -300,27 +300,41 @@ impl Accept {
300
300
}
301
301
302
302
fn deregister_all ( & self , sockets : & mut Slab < ServerSocketInfo > ) {
303
- sockets. iter_mut ( ) . for_each ( |( _, info) | {
304
- self . deregister_logged ( info) ;
305
- } ) ;
303
+ // This is a best effort implementation with following limitation:
304
+ //
305
+ // Every ServerSocketInfo with associate timeout will be skipped and it's timeout
306
+ // is removed in the process.
307
+ //
308
+ // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
309
+ // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
310
+ // before expected timing.
311
+ sockets
312
+ . iter_mut ( )
313
+ // Take all timeout.
314
+ // This is to prevent Accept::process_timer method re-register a socket afterwards.
315
+ . map ( |( _, info) | ( info. timeout . take ( ) , info) )
316
+ // Socket info with a timeout is already deregistered so skip them.
317
+ . filter ( |( timeout, _) | timeout. is_none ( ) )
318
+ . for_each ( |( _, info) | self . deregister_logged ( info) ) ;
306
319
}
307
320
308
321
fn maybe_backpressure ( & mut self , sockets : & mut Slab < ServerSocketInfo > , on : bool ) {
309
322
// Only operate when server is in a different backpressure than the given flag.
310
323
if self . backpressure != on {
311
- if on {
312
- self . backpressure = true ;
313
- // TODO: figure out if timing out sockets can be safely de-registered twice.
314
- self . deregister_all ( sockets) ;
315
- } else {
316
- self . backpressure = false ;
317
- sockets
318
- . iter_mut ( )
319
- // Only operate on sockets without associated timeout.
320
- // Sockets with it will attempt to re-register when their timeout expires.
321
- . filter ( |( _, info) | info. timeout . is_none ( ) )
322
- . for_each ( |( token, info) | self . register_logged ( token, info) ) ;
323
- }
324
+ self . backpressure = on;
325
+ sockets
326
+ . iter_mut ( )
327
+ // Only operate on sockets without associated timeout.
328
+ // Sockets with it should be handled by `accept` and `process_timer` methods.
329
+ // They are already deregistered or need to be reregister in the future.
330
+ . filter ( |( _, info) | info. timeout . is_none ( ) )
331
+ . for_each ( |( token, info) | {
332
+ if on {
333
+ self . deregister_logged ( info) ;
334
+ } else {
335
+ self . register_logged ( token, info) ;
336
+ }
337
+ } ) ;
324
338
}
325
339
}
326
340
0 commit comments