Skip to content

Commit d49ecf7

Browse files
authored
Fix bug where backpressure happen too early (#332)
1 parent e0fb67f commit d49ecf7

File tree

1 file changed

+165
-28
lines changed

1 file changed

+165
-28
lines changed

actix-server/src/accept.rs

Lines changed: 165 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,59 @@ struct Accept {
8383
handles: Vec<WorkerHandleAccept>,
8484
srv: Server,
8585
next: usize,
86+
avail: Availability,
8687
backpressure: bool,
8788
}
8889

90+
/// Array of u128 with every bit as marker for a worker handle's availability.
91+
struct Availability([u128; 4]);
92+
93+
impl Default for Availability {
94+
fn default() -> Self {
95+
Self([0; 4])
96+
}
97+
}
98+
99+
impl Availability {
100+
/// Check if any worker handle is available
101+
fn available(&self) -> bool {
102+
self.0.iter().any(|a| *a != 0)
103+
}
104+
105+
/// Set worker handle available state by index.
106+
fn set_available(&mut self, idx: usize, avail: bool) {
107+
let (offset, idx) = if idx < 128 {
108+
(0, idx)
109+
} else if idx < 128 * 2 {
110+
(1, idx - 128)
111+
} else if idx < 128 * 3 {
112+
(2, idx - 128 * 2)
113+
} else if idx < 128 * 4 {
114+
(3, idx - 128 * 3)
115+
} else {
116+
panic!("Max WorkerHandle count is 512")
117+
};
118+
119+
if avail {
120+
self.0[offset] |= 1 << idx as u128;
121+
} else {
122+
let shift = 1 << idx as u128;
123+
124+
debug_assert_ne!(self.0[offset] & shift, 0);
125+
126+
self.0[offset] ^= shift;
127+
}
128+
}
129+
130+
/// Set all worker handle to available state.
131+
/// This would result in a re-check on all workers' availability.
132+
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
133+
handles.iter().for_each(|handle| {
134+
self.set_available(handle.idx, true);
135+
})
136+
}
137+
}
138+
89139
/// This function defines errors that are per-connection. Which basically
90140
/// means that if we get this error from `accept()` system call it means
91141
/// next connection might be ready to be accepted.
@@ -116,6 +166,7 @@ impl Accept {
116166
System::set_current(sys);
117167
let (mut accept, sockets) =
118168
Accept::new_with_sockets(poll, waker, socks, handles, srv);
169+
119170
accept.poll_with(sockets);
120171
})
121172
.unwrap();
@@ -148,12 +199,18 @@ impl Accept {
148199
});
149200
}
150201

202+
let mut avail = Availability::default();
203+
204+
// Assume all handles are avail at construct time.
205+
avail.set_available_all(&handles);
206+
151207
let accept = Accept {
152208
poll,
153209
waker,
154210
handles,
155211
srv,
156212
next: 0,
213+
avail,
157214
backpressure: false,
158215
};
159216

@@ -166,12 +223,8 @@ impl Accept {
166223
loop {
167224
if let Err(e) = self.poll.poll(&mut events, None) {
168225
match e.kind() {
169-
std::io::ErrorKind::Interrupted => {
170-
continue;
171-
}
172-
_ => {
173-
panic!("Poll error: {}", e);
174-
}
226+
std::io::ErrorKind::Interrupted => continue,
227+
_ => panic!("Poll error: {}", e),
175228
}
176229
}
177230

@@ -190,13 +243,16 @@ impl Accept {
190243
// from backpressure.
191244
Some(WakerInterest::WorkerAvailable) => {
192245
drop(guard);
246+
// Assume all worker are avail as no worker index returned.
247+
self.avail.set_available_all(&self.handles);
193248
self.maybe_backpressure(&mut sockets, false);
194249
}
195250
// a new worker thread is made and it's handle would be added to Accept
196251
Some(WakerInterest::Worker(handle)) => {
197252
drop(guard);
198253
// maybe we want to recover from a backpressure.
199254
self.maybe_backpressure(&mut sockets, false);
255+
self.avail.set_available(handle.idx, true);
200256
self.handles.push(handle);
201257
}
202258
// got timer interest and it's time to try register socket(s) again
@@ -342,56 +398,48 @@ impl Accept {
342398
if self.backpressure {
343399
// send_connection would remove fault worker from handles.
344400
// worst case here is conn get dropped after all handles are gone.
345-
while !self.handles.is_empty() {
346-
match self.send_connection(sockets, conn) {
347-
Ok(_) => return,
348-
Err(c) => conn = c,
349-
}
401+
while let Err(c) = self.send_connection(sockets, conn) {
402+
conn = c
350403
}
351404
} else {
352-
// Do one round and try to send conn to all workers until it succeed.
353-
// Start from self.next.
354-
let mut idx = 0;
355-
while idx < self.handles.len() {
356-
idx += 1;
357-
if self.handles[self.next].available() {
405+
while self.avail.available() {
406+
let next = self.next();
407+
let idx = next.idx;
408+
if next.available() {
409+
self.avail.set_available(idx, true);
358410
match self.send_connection(sockets, conn) {
359411
Ok(_) => return,
360412
Err(c) => conn = c,
361413
}
362414
} else {
415+
self.avail.set_available(idx, false);
363416
self.set_next();
364417
}
365418
}
419+
366420
// Sending Conn failed due to either all workers are in error or not available.
367421
// Enter backpressure state and try again.
368422
self.maybe_backpressure(sockets, true);
369423
self.accept_one(sockets, conn);
370424
}
371425
}
372426

373-
// Set next worker handle that would accept work.
374-
fn set_next(&mut self) {
375-
self.next = (self.next + 1) % self.handles.len();
376-
}
377-
378427
// Send connection to worker and handle error.
379428
fn send_connection(
380429
&mut self,
381430
sockets: &mut Slab<ServerSocketInfo>,
382431
conn: Conn,
383432
) -> Result<(), Conn> {
384-
match self.handles[self.next].send(conn) {
433+
match self.next().send(conn) {
385434
Ok(_) => {
386435
self.set_next();
387436
Ok(())
388437
}
389438
Err(conn) => {
390-
// worker lost contact and could be gone. a message is sent to
391-
// `ServerBuilder` future to notify it a new worker should be made.
392-
// after that remove the fault worker and enter backpressure if necessary.
393-
self.srv.worker_faulted(self.handles[self.next].idx);
394-
self.handles.swap_remove(self.next);
439+
// Worker thread is error and could be gone.
440+
// Remove worker handle and notify `ServerBuilder`.
441+
self.remove_next();
442+
395443
if self.handles.is_empty() {
396444
error!("No workers");
397445
self.maybe_backpressure(sockets, true);
@@ -401,6 +449,7 @@ impl Accept {
401449
} else if self.handles.len() <= self.next {
402450
self.next = 0;
403451
}
452+
404453
Err(conn)
405454
}
406455
}
@@ -445,4 +494,92 @@ impl Accept {
445494
};
446495
}
447496
}
497+
498+
fn next(&self) -> &WorkerHandleAccept {
499+
&self.handles[self.next]
500+
}
501+
502+
/// Set next worker handle that would accept connection.
503+
fn set_next(&mut self) {
504+
self.next = (self.next + 1) % self.handles.len();
505+
}
506+
507+
/// Remove next worker handle that fail to accept connection.
508+
fn remove_next(&mut self) {
509+
let handle = self.handles.swap_remove(self.next);
510+
let idx = handle.idx;
511+
// A message is sent to `ServerBuilder` future to notify it a new worker
512+
// should be made.
513+
self.srv.worker_faulted(idx);
514+
self.avail.set_available(idx, false);
515+
}
516+
}
517+
518+
#[cfg(test)]
519+
mod test {
520+
use super::Availability;
521+
522+
fn single(aval: &mut Availability, idx: usize) {
523+
aval.set_available(idx, true);
524+
assert!(aval.available());
525+
526+
aval.set_available(idx, true);
527+
528+
aval.set_available(idx, false);
529+
assert!(!aval.available());
530+
}
531+
532+
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
533+
idx.iter().for_each(|idx| aval.set_available(*idx, true));
534+
535+
assert!(aval.available());
536+
537+
while let Some(idx) = idx.pop() {
538+
assert!(aval.available());
539+
aval.set_available(idx, false);
540+
}
541+
542+
assert!(!aval.available());
543+
}
544+
545+
#[test]
546+
fn availability() {
547+
let mut aval = Availability::default();
548+
549+
single(&mut aval, 1);
550+
single(&mut aval, 128);
551+
single(&mut aval, 256);
552+
single(&mut aval, 511);
553+
554+
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
555+
556+
multi(&mut aval, idx);
557+
558+
multi(&mut aval, (0..511).collect())
559+
}
560+
561+
#[test]
562+
#[should_panic]
563+
fn overflow() {
564+
let mut aval = Availability::default();
565+
single(&mut aval, 512);
566+
}
567+
568+
#[test]
569+
#[should_panic]
570+
fn double_set_unavailable() {
571+
let mut aval = Availability::default();
572+
aval.set_available(233, false);
573+
}
574+
575+
#[test]
576+
fn pin_point() {
577+
let mut aval = Availability::default();
578+
579+
aval.set_available(438, true);
580+
581+
aval.set_available(479, true);
582+
583+
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
584+
}
448585
}

0 commit comments

Comments
 (0)