Skip to content

Commit 2c5c916

Browse files
fakeshadowrobjtede
andauthored
Fix bug where timed out socket would register itself when server in b… (#302)
Co-authored-by: Rob Ede <[email protected]>
1 parent ee3a548 commit 2c5c916

File tree

1 file changed

+31
-25
lines changed

1 file changed

+31
-25
lines changed

actix-server/src/accept.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -187,21 +187,19 @@ impl Accept {
187187
let mut guard = self.waker.guard();
188188
match guard.pop_front() {
189189
// worker notify it becomes available. we may want to recover
190-
// from backpressure.
190+
// from backpressure.
191191
Some(WakerInterest::WorkerAvailable) => {
192192
drop(guard);
193193
self.maybe_backpressure(&mut sockets, false);
194194
}
195-
// a new worker thread is made and it's handle would be added
196-
// to Accept
195+
// a new worker thread is made and it's handle would be added to Accept
197196
Some(WakerInterest::Worker(handle)) => {
198197
drop(guard);
199198
// maybe we want to recover from a backpressure.
200199
self.maybe_backpressure(&mut sockets, false);
201200
self.handles.push(handle);
202201
}
203-
// got timer interest and it's time to try register socket(s)
204-
// again.
202+
// got timer interest and it's time to try register socket(s) again
205203
Some(WakerInterest::Timer) => {
206204
drop(guard);
207205
self.process_timer(&mut sockets)
@@ -238,16 +236,23 @@ impl Accept {
238236

239237
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
240238
let now = Instant::now();
241-
sockets.iter_mut().for_each(|(token, info)| {
242-
// only the ServerSocketInfo have an associate timeout value was de registered.
243-
if let Some(inst) = info.timeout.take() {
244-
if now > inst {
245-
self.register_logged(token, info);
246-
} else {
239+
sockets
240+
.iter_mut()
241+
// Only sockets that had an associated timeout were deregistered.
242+
.filter(|(_, info)| info.timeout.is_some())
243+
.for_each(|(token, info)| {
244+
let inst = info.timeout.take().unwrap();
245+
246+
if now < inst {
247247
info.timeout = Some(inst);
248+
} else if !self.backpressure {
249+
self.register_logged(token, info);
248250
}
249-
}
250-
});
251+
252+
// Drop the timeout if server is in backpressure and socket timeout is expired.
253+
// When server recovers from backpressure it will register all sockets without
254+
// a timeout value so this socket register will be delayed till then.
255+
});
251256
}
252257

253258
#[cfg(not(target_os = "windows"))]
@@ -301,20 +306,21 @@ impl Accept {
301306
}
302307

303308
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
304-
if self.backpressure {
305-
if !on {
309+
// Only operate when server is in a different backpressure than the given flag.
310+
if self.backpressure != on {
311+
if on {
312+
self.backpressure = true;
313+
// TODO: figure out if timing out sockets can be safely de-registered twice.
314+
self.deregister_all(sockets);
315+
} else {
306316
self.backpressure = false;
307-
for (token, info) in sockets.iter_mut() {
308-
if info.timeout.is_some() {
309-
// socket will attempt to re-register itself when its timeout completes
310-
continue;
311-
}
312-
self.register_logged(token, info);
313-
}
317+
sockets
318+
.iter_mut()
319+
// Only operate on sockets without associated timeout.
320+
// Sockets with it will attempt to re-register when their timeout expires.
321+
.filter(|(_, info)| info.timeout.is_none())
322+
.for_each(|(token, info)| self.register_logged(token, info));
314323
}
315-
} else if on {
316-
self.backpressure = true;
317-
self.deregister_all(sockets);
318324
}
319325
}
320326

0 commit comments

Comments
 (0)