Skip to content

Commit a1569aa

Browse files
committed
add the tokio poll_fns test that triggered this investigation
1 parent f9ccc02 commit a1569aa

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed

tests/deps/Cargo.lock

Lines changed: 80 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/deps/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ page_size = "0.6"
2323
# Avoid pulling in all of tokio's dependencies.
2424
# However, without `net` and `signal`, tokio uses fewer relevant system APIs.
2525
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "net", "fs", "sync", "signal", "io-util"] }
26+
futures = { version = "0.3.0", default-features = false, features = ["alloc", "async-await"] }
2627

2728
[target.'cfg(windows)'.dependencies]
2829
windows-sys = { version = "0.60", features = [

tests/pass-dep/tokio/poll_fns.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! This is a stand-alone version of the `poll_fns` test in Tokio. It hits various
2+
//! interesting edge cases in the epoll logic, making it a good integration test.
3+
//! It also seems to depend on Tokio internals, so if Tokio changes we have have to update
4+
//! or remove the test.
5+
6+
//@only-target: linux # We only support tokio on Linux
7+
8+
use std::fs::File;
9+
use std::io::{ErrorKind, Read, Write};
10+
use std::os::fd::FromRawFd;
11+
use std::sync::Arc;
12+
use std::sync::atomic::{AtomicBool, Ordering};
13+
use std::task::{Context, Waker};
14+
use std::time::Duration;
15+
16+
use futures::poll;
17+
use tokio::io::unix::AsyncFd;
18+
19+
macro_rules! assert_pending {
20+
($e:expr) => {{
21+
use core::task::Poll;
22+
match $e {
23+
Poll::Pending => {}
24+
Poll::Ready(v) => panic!("ready; value = {:?}", v),
25+
}
26+
}};
27+
}
28+
29+
struct TestWaker {
30+
inner: Arc<TestWakerInner>,
31+
waker: Waker,
32+
}
33+
34+
#[derive(Default)]
35+
struct TestWakerInner {
36+
awoken: AtomicBool,
37+
}
38+
39+
impl futures::task::ArcWake for TestWakerInner {
40+
fn wake_by_ref(arc_self: &Arc<Self>) {
41+
arc_self.awoken.store(true, Ordering::SeqCst);
42+
}
43+
}
44+
45+
impl TestWaker {
46+
fn new() -> Self {
47+
let inner: Arc<TestWakerInner> = Default::default();
48+
49+
Self { inner: inner.clone(), waker: futures::task::waker(inner) }
50+
}
51+
52+
fn awoken(&self) -> bool {
53+
self.inner.awoken.swap(false, Ordering::SeqCst)
54+
}
55+
56+
fn context(&self) -> Context<'_> {
57+
Context::from_waker(&self.waker)
58+
}
59+
}
60+
61+
fn socketpair() -> (File, File) {
62+
let mut fds = [-1, -1];
63+
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
64+
assert_eq!(res, 0);
65+
66+
assert_eq!(unsafe { libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK) }, 0);
67+
assert_eq!(unsafe { libc::fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK) }, 0);
68+
69+
unsafe { (File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1])) }
70+
}
71+
72+
fn drain(mut fd: &File, mut amt: usize) {
73+
let mut buf = [0u8; 512];
74+
while amt > 0 {
75+
match fd.read(&mut buf[..]) {
76+
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
77+
Ok(0) => panic!("unexpected EOF"),
78+
Err(e) => panic!("unexpected error: {e:?}"),
79+
Ok(x) => amt -= x,
80+
}
81+
}
82+
}
83+
84+
fn main() {
85+
tokio::runtime::Builder::new_current_thread()
86+
.enable_all()
87+
.build()
88+
.unwrap()
89+
.block_on(the_test());
90+
}
91+
92+
async fn the_test() {
93+
let (a, b) = socketpair();
94+
let afd_a = Arc::new(AsyncFd::new(a).unwrap());
95+
let afd_b = Arc::new(AsyncFd::new(b).unwrap());
96+
97+
// Fill up the write side of A
98+
let mut bytes = 0;
99+
while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) {
100+
bytes += amt;
101+
}
102+
103+
let waker = TestWaker::new();
104+
105+
assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
106+
107+
let afd_a_2 = afd_a.clone();
108+
let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
109+
let barrier_clone = r_barrier.clone();
110+
111+
let read_fut = tokio::spawn(async move {
112+
// Move waker onto this task first
113+
assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx))));
114+
barrier_clone.wait().await;
115+
116+
let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
117+
});
118+
119+
let afd_a_2 = afd_a.clone();
120+
let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
121+
let barrier_clone = w_barrier.clone();
122+
123+
let mut write_fut = tokio::spawn(async move {
124+
// Move waker onto this task first
125+
assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx))));
126+
barrier_clone.wait().await;
127+
128+
let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
129+
});
130+
131+
r_barrier.wait().await;
132+
w_barrier.wait().await;
133+
134+
let readable = afd_a.readable();
135+
tokio::pin!(readable);
136+
137+
tokio::select! {
138+
_ = &mut readable => unreachable!(),
139+
_ = tokio::task::yield_now() => {}
140+
}
141+
142+
// Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
143+
afd_b.get_ref().write_all(b"0").unwrap();
144+
145+
let _ = tokio::join!(readable, read_fut);
146+
147+
// Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
148+
assert!(!waker.awoken());
149+
150+
// The writable side should not be awoken
151+
tokio::select! {
152+
_ = &mut write_fut => unreachable!(),
153+
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
154+
}
155+
156+
// Make it writable now
157+
drain(afd_b.get_ref(), bytes);
158+
159+
// now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
160+
let _ = write_fut.await;
161+
}

0 commit comments

Comments
 (0)