Skip to content

Commit e1bcab5

Browse files
authored
Merge pull request #215 from Berrysoft/dev/runtime-hook
feat(runtime): a runtime hook API design
2 parents e2db313 + 58c0deb commit e1bcab5

File tree

6 files changed

+444
-50
lines changed

6 files changed

+444
-50
lines changed

compio-driver/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ windows-sys = { workspace = true, features = [
6161
[target.'cfg(target_os = "linux")'.dependencies]
6262
io-uring = { version = "0.6.2", optional = true }
6363
polling = { version = "3.3.0", optional = true }
64+
os_pipe = { workspace = true, optional = true }
6465
paste = { workspace = true }
6566

6667
# Other platform dependencies
6768
[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies]
6869
polling = "3.3.0"
70+
os_pipe = { workspace = true }
6971

7072
[target.'cfg(unix)'.dependencies]
7173
crossbeam-queue = { workspace = true }
@@ -76,7 +78,7 @@ compio-buf = { workspace = true, features = ["arrayvec"] }
7678

7779
[features]
7880
default = ["io-uring"]
79-
polling = ["dep:polling"]
81+
polling = ["dep:polling", "dep:os_pipe"]
8082

8183
io-uring-sqe128 = []
8284
io-uring-cqe32 = []

compio-driver/src/iour/mod.rs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ cfg_if::cfg_if! {
2323
}
2424
}
2525
use io_uring::{
26-
opcode::{AsyncCancel, Read},
26+
opcode::{AsyncCancel, PollAdd},
2727
types::{Fd, SubmitArgs, Timespec},
2828
IoUring,
2929
};
@@ -76,7 +76,6 @@ pub(crate) struct Driver {
7676
inner: IoUring<SEntry, CEntry>,
7777
squeue: VecDeque<SEntry>,
7878
notifier: Notifier,
79-
notifier_registered: bool,
8079
pool: AsyncifyPool,
8180
pool_completed: Arc<SegQueue<Entry>>,
8281
}
@@ -88,11 +87,20 @@ impl Driver {
8887
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
8988
instrument!(compio_log::Level::TRACE, "new", ?builder);
9089
trace!("new iour driver");
90+
let mut squeue = VecDeque::with_capacity(builder.capacity as usize);
91+
let notifier = Notifier::new()?;
92+
#[allow(clippy::useless_conversion)]
93+
squeue.push_back(
94+
PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
95+
.multi(true)
96+
.build()
97+
.user_data(Self::NOTIFY)
98+
.into(),
99+
);
91100
Ok(Self {
92101
inner: IoUring::builder().build(builder.capacity)?,
93-
squeue: VecDeque::with_capacity(builder.capacity as usize),
94-
notifier: Notifier::new()?,
95-
notifier_registered: false,
102+
squeue,
103+
notifier,
96104
pool: builder.create_or_get_thread_pool(),
97105
pool_completed: Arc::new(SegQueue::new()),
98106
})
@@ -177,7 +185,10 @@ impl Driver {
177185
let completed_entries = cqueue.filter_map(|entry| match entry.user_data() {
178186
Self::CANCEL => None,
179187
Self::NOTIFY => {
180-
self.notifier_registered = false;
188+
const IORING_CQE_F_MORE: u32 = 1 << 1;
189+
let flags = entry.flags();
190+
debug_assert!(flags & IORING_CQE_F_MORE == IORING_CQE_F_MORE);
191+
self.notifier.clear().expect("cannot clear notifier");
181192
None
182193
}
183194
_ => Some(create_entry(entry)),
@@ -256,19 +267,6 @@ impl Driver {
256267
mut entries: OutEntries<impl Extend<usize>>,
257268
) -> io::Result<()> {
258269
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
259-
if !self.notifier_registered {
260-
let fd = self.notifier.as_raw_fd();
261-
let dst = self.notifier.dst();
262-
#[allow(clippy::useless_conversion)]
263-
self.squeue.push_back(
264-
Read::new(Fd(fd), dst.as_mut_ptr(), dst.len() as _)
265-
.build()
266-
.user_data(Self::NOTIFY)
267-
.into(),
268-
);
269-
trace!("registered notifier");
270-
self.notifier_registered = true
271-
}
272270
// Anyway we need to submit once, no matter there are entries in squeue.
273271
trace!("start polling");
274272
loop {
@@ -321,27 +319,40 @@ fn timespec(duration: std::time::Duration) -> Timespec {
321319
#[derive(Debug)]
322320
struct Notifier {
323321
fd: OwnedFd,
324-
read_dst: Box<[u8; 8]>,
325322
}
326323

327324
impl Notifier {
328325
/// Create a new notifier.
329326
fn new() -> io::Result<Self> {
330-
let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC))?;
327+
let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
331328
let fd = unsafe { OwnedFd::from_raw_fd(fd) };
332-
Ok(Self {
333-
fd,
334-
read_dst: Box::new([0; 8]),
335-
})
329+
Ok(Self { fd })
336330
}
337331

338-
fn dst(&mut self) -> &mut [u8] {
339-
self.read_dst.as_mut_slice()
332+
pub fn clear(&self) -> io::Result<()> {
333+
loop {
334+
let mut buffer = [0u64];
335+
let res = syscall!(libc::read(
336+
self.fd.as_raw_fd(),
337+
buffer.as_mut_ptr().cast(),
338+
std::mem::size_of::<u64>()
339+
));
340+
match res {
341+
Ok(len) => {
342+
debug_assert_eq!(len, std::mem::size_of::<u64>() as _);
343+
break Ok(());
344+
}
345+
// Clear the next time:)
346+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
347+
// Just like read_exact
348+
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
349+
Err(e) => break Err(e),
350+
}
351+
}
340352
}
341353

342354
pub fn handle(&self) -> io::Result<NotifyHandle> {
343-
let fd = self.fd.try_clone()?;
344-
Ok(NotifyHandle::new(fd))
355+
Ok(NotifyHandle::new(self.fd.try_clone()?))
345356
}
346357
}
347358

compio-driver/src/poll/mod.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
44
use std::{
55
collections::{HashMap, HashSet, VecDeque},
6-
io,
6+
io::{self, Read, Write},
77
num::NonZeroUsize,
88
os::fd::BorrowedFd,
99
pin::Pin,
@@ -16,7 +16,7 @@ use std::{
1616
use compio_log::{instrument, trace};
1717
use crossbeam_queue::SegQueue;
1818
pub(crate) use libc::{sockaddr_storage, socklen_t};
19-
use polling::{Event, Events, Poller};
19+
use polling::{Event, Events, PollMode, Poller};
2020
use slab::Slab;
2121

2222
use crate::{syscall, AsyncifyPool, Entry, OutEntries, ProactorBuilder};
@@ -151,6 +151,7 @@ pub(crate) struct Driver {
151151
poll: Arc<Poller>,
152152
registry: HashMap<RawFd, FdQueue>,
153153
cancelled: HashSet<usize>,
154+
notifier: Notifier,
154155
pool: AsyncifyPool,
155156
pool_completed: Arc<SegQueue<Entry>>,
156157
}
@@ -166,11 +167,21 @@ impl Driver {
166167
Events::with_capacity(NonZeroUsize::new(entries).unwrap())
167168
};
168169

170+
let notifier = Notifier::new()?;
171+
let fd = notifier.reader_fd();
172+
173+
let poll = Arc::new(Poller::new()?);
174+
// Attach the reader to poll.
175+
unsafe {
176+
poll.add_with_mode(fd, Event::new(fd as _, true, false), PollMode::Level)?;
177+
}
178+
169179
Ok(Self {
170180
events,
171-
poll: Arc::new(Poller::new()?),
181+
poll,
172182
registry: HashMap::new(),
173183
cancelled: HashSet::new(),
184+
notifier,
174185
pool: builder.create_or_get_thread_pool(),
175186
pool_completed: Arc::new(SegQueue::new()),
176187
})
@@ -279,6 +290,10 @@ impl Driver {
279290
}
280291
for event in self.events.iter() {
281292
let fd = event.key as RawFd;
293+
if fd == self.notifier.reader_fd() {
294+
self.notifier.clear()?;
295+
continue;
296+
}
282297
let queue = self
283298
.registry
284299
.get_mut(&fd)
@@ -310,7 +325,7 @@ impl Driver {
310325
}
311326

312327
pub fn handle(&self) -> io::Result<NotifyHandle> {
313-
Ok(NotifyHandle::new(self.poll.clone()))
328+
self.notifier.handle()
314329
}
315330
}
316331

@@ -338,18 +353,59 @@ fn entry_cancelled(user_data: usize) -> Entry {
338353
)
339354
}
340355

356+
struct Notifier {
357+
notify_reader: os_pipe::PipeReader,
358+
notify_writer: os_pipe::PipeWriter,
359+
}
360+
361+
impl Notifier {
362+
pub fn new() -> io::Result<Self> {
363+
let (notify_reader, notify_writer) = os_pipe::pipe()?;
364+
365+
// Set the reader as nonblocking.
366+
let fd = notify_reader.as_raw_fd();
367+
let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
368+
let flags = current_flags | libc::O_NONBLOCK;
369+
if flags != current_flags {
370+
syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
371+
}
372+
373+
Ok(Self {
374+
notify_reader,
375+
notify_writer,
376+
})
377+
}
378+
379+
pub fn handle(&self) -> io::Result<NotifyHandle> {
380+
Ok(NotifyHandle::new(self.notify_writer.try_clone()?))
381+
}
382+
383+
pub fn clear(&self) -> io::Result<()> {
384+
let mut buffer = [0u8];
385+
match (&self.notify_reader).read_exact(&mut buffer) {
386+
Ok(()) => Ok(()),
387+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
388+
Err(e) => Err(e),
389+
}
390+
}
391+
392+
pub fn reader_fd(&self) -> RawFd {
393+
self.notify_reader.as_raw_fd()
394+
}
395+
}
396+
341397
/// A notify handle to the inner driver.
342398
pub struct NotifyHandle {
343-
poll: Arc<Poller>,
399+
sender: os_pipe::PipeWriter,
344400
}
345401

346402
impl NotifyHandle {
347-
fn new(poll: Arc<Poller>) -> Self {
348-
Self { poll }
403+
fn new(sender: os_pipe::PipeWriter) -> Self {
404+
Self { sender }
349405
}
350406

351407
/// Notify the inner driver.
352408
pub fn notify(&self) -> io::Result<()> {
353-
self.poll.notify()
409+
(&self.sender).write_all(&[1u8])
354410
}
355411
}

compio-runtime/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] }
5252
os_pipe = { workspace = true }
5353
libc = { workspace = true }
5454

55+
[target.'cfg(windows)'.dev-dependencies]
56+
windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] }
57+
58+
[target.'cfg(target_os = "macos")'.dev-dependencies]
59+
core-foundation = "0.9.4"
60+
block2 = "0.4.0"
61+
62+
[target.'cfg(not(any(windows, target_os = "macos")))'.dev-dependencies]
63+
glib = "0.19"
64+
5565
[features]
5666
event = ["dep:cfg-if", "compio-buf/arrayvec"]
5767
time = ["dep:slab"]

0 commit comments

Comments
 (0)