Skip to content

Commit a8b14ee

Browse files
committed
epoll: wake up multiple threads if that is needed to deliver all events
1 parent 7a86151 commit a8b14ee

File tree

2 files changed

+72
-17
lines changed

2 files changed

+72
-17
lines changed

src/tools/miri/src/shims/unix/linux_like/epoll.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::cell::RefCell;
2-
use std::collections::BTreeMap;
2+
use std::collections::{BTreeMap, VecDeque};
33
use std::io;
44
use std::time::Duration;
55

@@ -22,8 +22,8 @@ struct Epoll {
2222
/// "ready" list; instead, a boolean flag in this list tracks which subset is ready. This makes
2323
/// `epoll_wait` less efficient, but also requires less bookkeeping.
2424
interest_list: RefCell<BTreeMap<EpollEventKey, EpollEventInterest>>,
25-
/// A list of thread ids blocked on this epoll instance.
26-
blocked: RefCell<Vec<ThreadId>>,
25+
/// The queue of threads blocked on this epoll instance, and how many events they'd like to get.
26+
queue: RefCell<VecDeque<(ThreadId, u32)>>,
2727
}
2828

2929
impl VisitProvenance for Epoll {
@@ -459,7 +459,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
459459
}
460460
};
461461
// Record this thread as blocked.
462-
epfd.blocked.borrow_mut().push(this.active_thread());
462+
epfd.queue
463+
.borrow_mut()
464+
.push_back((this.active_thread(), maxevents.try_into().unwrap()));
463465
// And block it.
464466
let dest = dest.clone();
465467
// We keep a strong ref to the underlying `Epoll` to make sure it sticks around.
@@ -483,8 +485,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
483485
UnblockKind::TimedOut => {
484486
// Remove the current active thread_id from the blocked thread_id list.
485487
epfd
486-
.blocked.borrow_mut()
487-
.retain(|&id| id != this.active_thread());
488+
.queue.borrow_mut()
489+
.retain(|&(id, _events)| id != this.active_thread());
488490
this.write_int(0, &dest)?;
489491
interp_ok(())
490492
},
@@ -550,7 +552,7 @@ fn update_readiness<'tcx>(
550552
&mut dyn FnMut(&mut EpollEventInterest) -> InterpResult<'tcx>,
551553
) -> InterpResult<'tcx>,
552554
) -> InterpResult<'tcx> {
553-
let mut wakeup = false;
555+
let mut num_ready = 0u32; // how many events we have ready to deliver
554556
for_each_interest(&mut |interest| {
555557
// Update the ready events tracked in this interest.
556558
let new_readiness = interest.relevant_events & active_events;
@@ -565,17 +567,20 @@ fn update_readiness<'tcx>(
565567
ecx.release_clock(|clock| {
566568
interest.clock.join(clock);
567569
})?;
568-
wakeup = true;
570+
num_ready = num_ready.saturating_add(1);
569571
}
570572
interp_ok(())
571573
})?;
572-
if wakeup {
573-
// Wake up threads that may have been waiting for events on this epoll.
574-
// Do this only once for all the interests.
575-
// Edge-triggered notification only notify one thread even if there are
576-
// multiple threads blocked on the same epoll.
577-
if let Some(thread_id) = epoll.blocked.borrow_mut().pop() {
578-
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
574+
// Edge-triggered notifications only wake up as many threads as are needed to deliver
575+
// all the events.
576+
while num_ready > 0
577+
&& let Some((thread_id, events)) = epoll.queue.borrow_mut().pop_front()
578+
{
579+
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
580+
// Keep track of how many events we have left to deliver (except if we saturated;
581+
// in that case we just wake up everybody).
582+
if num_ready != u32::MAX {
583+
num_ready = num_ready.saturating_sub(events);
579584
}
580585
}
581586

src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use std::convert::TryInto;
66
use std::thread;
7-
use std::thread::spawn;
87

98
#[path = "../../utils/libc.rs"]
109
mod libc_utils;
@@ -17,6 +16,7 @@ fn main() {
1716
test_notification_after_timeout();
1817
test_epoll_race();
1918
wakeup_on_new_interest();
19+
multiple_events_wake_multiple_threads();
2020
}
2121

2222
// Using `as` cast since `EPOLLET` wraps around
@@ -90,7 +90,7 @@ fn test_epoll_block_then_unblock() {
9090
// epoll_wait before triggering notification so it will block then get unblocked before timeout.
9191
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
9292
let expected_value = fds[0] as u64;
93-
let thread1 = spawn(move || {
93+
let thread1 = thread::spawn(move || {
9494
thread::yield_now();
9595
let data = "abcde".as_bytes().as_ptr();
9696
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
@@ -210,3 +210,53 @@ fn wakeup_on_new_interest() {
210210
// This should wake up the thread.
211211
t.join().unwrap();
212212
}
213+
214+
/// Ensure that if a single operation triggers multiple events, we wake up enough threads
215+
/// to consume them all.
216+
fn multiple_events_wake_multiple_threads() {
217+
// Create an epoll instance.
218+
let epfd = unsafe { libc::epoll_create1(0) };
219+
assert_ne!(epfd, -1);
220+
221+
// Create an eventfd instance.
222+
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
223+
let fd1 = unsafe { libc::eventfd(0, flags) };
224+
// Make a duplicate so that we have two file descriptors for the same file description.
225+
let fd2 = unsafe { libc::dup(fd1) };
226+
227+
// Register both with epoll.
228+
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd1 as u64 };
229+
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd1, &mut ev) };
230+
assert_eq!(res, 0);
231+
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd2 as u64 };
232+
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd2, &mut ev) };
233+
assert_eq!(res, 0);
234+
// Consume the initial event.
235+
let expected = [(libc::EPOLLOUT as u32, fd1 as u64), (libc::EPOLLOUT as u32, fd2 as u64)];
236+
check_epoll_wait::<8>(epfd, &expected, -1);
237+
238+
// Block two threads on the epoll, both wanting to get just one event.
239+
let t1 = thread::spawn(move || {
240+
let mut e = libc::epoll_event { events: 0, u64: 0 };
241+
let res = unsafe { libc::epoll_wait(epfd, &raw mut e, 1, -1) };
242+
assert!(res == 1);
243+
(e.events, e.u64)
244+
});
245+
let t2 = thread::spawn(move || {
246+
let mut e = libc::epoll_event { events: 0, u64: 0 };
247+
let res = unsafe { libc::epoll_wait(epfd, &raw mut e, 1, -1) };
248+
assert!(res == 1);
249+
(e.events, e.u64)
250+
});
251+
// Yield so both threads are waiting now.
252+
thread::yield_now();
253+
254+
// Trigger the eventfd. This triggers two events at once!
255+
libc_utils::write_all_from_slice(fd1, &0_u64.to_ne_bytes()).unwrap();
256+
257+
// Both threads should have been woken up so that both events can be consumed.
258+
let e1 = t1.join().unwrap();
259+
let e2 = t2.join().unwrap();
260+
// Ensure that across the two threads we got both events.
261+
assert!(expected == [e1, e2] || expected == [e2, e1]);
262+
}

0 commit comments

Comments
 (0)