Skip to content

Commit 19468fe

Browse files
authored
Fix memory ordering of WorkerAvailability (#340)
1 parent bd48908 commit 19468fe

File tree

2 files changed

+23
-12
lines changed

2 files changed

+23
-12
lines changed

actix-server/src/accept.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl Availability {
128128
/// This would result in a re-check on all workers' availability.
129129
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
130130
handles.iter().for_each(|handle| {
131-
self.set_available(handle.idx, true);
131+
self.set_available(handle.idx(), true);
132132
})
133133
}
134134
}
@@ -248,7 +248,7 @@ impl Accept {
248248
drop(guard);
249249
// maybe we want to recover from a backpressure.
250250
self.maybe_backpressure(&mut sockets, false);
251-
self.avail.set_available(handle.idx, true);
251+
self.avail.set_available(handle.idx(), true);
252252
self.handles.push(handle);
253253
}
254254
// got timer interest and it's time to try register socket(s) again
@@ -400,7 +400,7 @@ impl Accept {
400400
} else {
401401
while self.avail.available() {
402402
let next = self.next();
403-
let idx = next.idx;
403+
let idx = next.idx();
404404
if next.available() {
405405
self.avail.set_available(idx, true);
406406
match self.send_connection(sockets, conn) {
@@ -503,7 +503,7 @@ impl Accept {
503503
/// Remove next worker handle that fail to accept connection.
504504
fn remove_next(&mut self) {
505505
let handle = self.handles.swap_remove(self.next);
506-
let idx = handle.idx;
506+
let idx = handle.idx();
507507
// A message is sent to `ServerBuilder` future to notify it a new worker
508508
// should be made.
509509
self.srv.worker_faulted(idx);

actix-server/src/worker.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,7 @@ fn handle_pair(
4747
tx2: UnboundedSender<Stop>,
4848
avail: WorkerAvailability,
4949
) -> (WorkerHandleAccept, WorkerHandleServer) {
50-
let accept = WorkerHandleAccept {
51-
idx,
52-
tx: tx1,
53-
avail,
54-
};
50+
let accept = WorkerHandleAccept { tx: tx1, avail };
5551

5652
let server = WorkerHandleServer { idx, tx: tx2 };
5753

@@ -63,16 +59,22 @@ fn handle_pair(
6359
///
6460
/// Held by [Accept](crate::accept::Accept).
6561
pub(crate) struct WorkerHandleAccept {
66-
pub idx: usize,
6762
tx: UnboundedSender<Conn>,
6863
avail: WorkerAvailability,
6964
}
7065

7166
impl WorkerHandleAccept {
67+
#[inline(always)]
68+
pub(crate) fn idx(&self) -> usize {
69+
self.avail.idx
70+
}
71+
72+
#[inline(always)]
7273
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
7374
self.tx.send(msg).map_err(|msg| msg.0)
7475
}
7576

77+
#[inline(always)]
7678
pub(crate) fn available(&self) -> bool {
7779
self.avail.available()
7880
}
@@ -110,13 +112,18 @@ impl WorkerAvailability {
110112
}
111113
}
112114

115+
#[inline(always)]
113116
pub fn available(&self) -> bool {
114117
self.available.load(Ordering::Acquire)
115118
}
116119

117120
pub fn set(&self, val: bool) {
118-
let old = self.available.swap(val, Ordering::Release);
119-
// notify the accept on switched to available.
121+
// Ordering:
122+
//
123+
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
124+
// Order is important between them.
125+
let old = self.available.swap(val, Ordering::AcqRel);
126+
// Notify the accept on switched to available.
120127
if !old && val {
121128
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
122129
}
@@ -374,6 +381,10 @@ impl Default for WorkerState {
374381

375382
impl Drop for ServerWorker {
376383
fn drop(&mut self) {
384+
// Set availability to true so if accept try to send connection to this worker
385+
// it would find worker is gone and remove it.
386+
// This is helpful when worker is dropped unexpected.
387+
self.availability.set(true);
377388
// Stop the Arbiter ServerWorker runs on on drop.
378389
Arbiter::current().stop();
379390
}

0 commit comments

Comments
 (0)