Skip to content

Commit 1481e0a

Browse files
committed
Add FdQueue
1 parent 8cb611c commit 1481e0a

File tree

2 files changed

+117
-45
lines changed

2 files changed

+117
-45
lines changed

src/driver/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ impl Proactor {
118118
/// attached to one driver, and could only be attached once, even if you
119119
/// `try_clone` it. It will cause unexpected result to attach the handle
120120
/// with one driver and push an op to another driver.
121-
/// * io-uring/polling: it will do nothing and return `Ok(())`
121+
/// * io-uring: it will do nothing and return `Ok(())`.
122+
/// * polling: it will initialize the internal interest queue for the fd.
122123
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
123124
self.driver.attach(fd)
124125
}

src/driver/poll/mod.rs

Lines changed: 115 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#[doc(no_inline)]
22
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
33
use std::{
4-
collections::{HashSet, VecDeque},
4+
collections::{HashMap, HashSet, VecDeque},
55
io,
66
num::NonZeroUsize,
77
ops::ControlFlow,
8+
os::fd::BorrowedFd,
89
pin::Pin,
910
time::Duration,
1011
};
@@ -39,39 +40,81 @@ pub enum Decision {
3940

4041
impl Decision {
4142
/// Decide to wait for the given fd with the given interest.
42-
pub fn wait_for(fd: RawFd, readable: bool, writable: bool) -> Self {
43-
Self::Wait(WaitArg {
44-
fd,
45-
readable,
46-
writable,
47-
})
43+
pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
44+
Self::Wait(WaitArg { fd, interest })
4845
}
4946

5047
/// Decide to wait for the given fd to be readable.
5148
pub fn wait_readable(fd: RawFd) -> Self {
52-
Self::wait_for(fd, true, false)
49+
Self::wait_for(fd, Interest::Readable)
5350
}
5451

5552
/// Decide to wait for the given fd to be writable.
5653
pub fn wait_writable(fd: RawFd) -> Self {
57-
Self::wait_for(fd, false, true)
54+
Self::wait_for(fd, Interest::Writable)
5855
}
5956
}
6057

6158
/// Meta of polling operations.
6259
#[derive(Debug, Clone, Copy)]
6360
pub struct WaitArg {
64-
fd: RawFd,
65-
readable: bool,
66-
writable: bool,
61+
/// The raw fd of the operation.
62+
pub fd: RawFd,
63+
/// The interest to be registered.
64+
pub interest: Interest,
65+
}
66+
67+
/// The interest of the operation
68+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69+
pub enum Interest {
70+
/// Represents a read operation.
71+
Readable,
72+
/// Represents a write operation.
73+
Writable,
74+
}
75+
76+
#[derive(Debug, Default)]
77+
struct FdQueue {
78+
read_queue: VecDeque<usize>,
79+
write_queue: VecDeque<usize>,
80+
}
81+
82+
impl FdQueue {
83+
pub fn push_interest(&mut self, user_data: usize, interest: Interest) {
84+
match interest {
85+
Interest::Readable => self.read_queue.push_back(user_data),
86+
Interest::Writable => self.write_queue.push_back(user_data),
87+
}
88+
}
89+
90+
pub fn event(&self, key: usize) -> Event {
91+
let mut event = Event::all(key);
92+
event.readable = !self.read_queue.is_empty();
93+
event.writable = !self.write_queue.is_empty();
94+
event
95+
}
96+
97+
pub fn pop_interest(&mut self, event: &Event) -> usize {
98+
if event.readable {
99+
if let Some(user_data) = self.read_queue.pop_front() {
100+
return user_data;
101+
}
102+
}
103+
if event.writable {
104+
if let Some(user_data) = self.write_queue.pop_front() {
105+
return user_data;
106+
}
107+
}
108+
unreachable!("should receive event when no interest")
109+
}
67110
}
68111

69112
/// Low-level driver of polling.
70113
pub(crate) struct Driver {
71114
events: Events,
72115
poll: Poller,
116+
registry: HashMap<RawFd, FdQueue>,
73117
cancelled: HashSet<usize>,
74-
cancel_queue: VecDeque<usize>,
75118
}
76119

77120
impl Driver {
@@ -86,25 +129,33 @@ impl Driver {
86129
Ok(Self {
87130
events,
88131
poll: Poller::new()?,
132+
registry: HashMap::new(),
89133
cancelled: HashSet::new(),
90-
cancel_queue: VecDeque::new(),
91134
})
92135
}
93-
}
94136

95-
impl Driver {
96-
fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> {
137+
fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<bool> {
97138
if self.cancelled.remove(&user_data) {
98-
self.cancel_queue.push_back(user_data);
139+
Ok(false)
99140
} else {
100-
let mut event = Event::none(user_data);
101-
event.readable = arg.readable;
102-
event.writable = arg.writable;
141+
let need_add = !self.registry.contains_key(&arg.fd);
142+
let queue = self
143+
.registry
144+
.get_mut(&arg.fd)
145+
.expect("the fd should be attached");
146+
queue.push_interest(user_data, arg.interest);
147+
// We use fd as the key.
148+
let event = queue.event(arg.fd as usize);
103149
unsafe {
104-
self.poll.add(arg.fd, event)?;
150+
if need_add {
151+
self.poll.add(arg.fd, event)?;
152+
} else {
153+
let fd = BorrowedFd::borrow_raw(arg.fd);
154+
self.poll.modify(fd, event)?;
155+
}
105156
}
157+
Ok(true)
106158
}
107-
Ok(())
108159
}
109160

110161
/// Register all operations in the squeue to polling.
@@ -119,7 +170,11 @@ impl Driver {
119170
let op = registry[user_data].as_pin();
120171
match op.pre_submit() {
121172
Ok(Decision::Wait(arg)) => {
122-
self.submit(user_data, arg)?;
173+
let succeeded = self.submit(user_data, arg)?;
174+
if !succeeded {
175+
entries.extend(Some(entry_cancelled(user_data)));
176+
extended = true;
177+
}
123178
}
124179
Ok(Decision::Completed(res)) => {
125180
entries.extend(Some(Entry::new(user_data, Ok(res))));
@@ -148,36 +203,35 @@ impl Driver {
148203
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
149204
}
150205
for event in self.events.iter() {
151-
if self.cancelled.remove(&event.key) {
152-
self.cancel_queue.push_back(event.key);
206+
let fd = event.key as RawFd;
207+
let queue = self
208+
.registry
209+
.get_mut(&fd)
210+
.expect("the fd should be attached");
211+
let user_data = queue.pop_interest(&event);
212+
let renew_event = queue.event(fd as _);
213+
unsafe {
214+
let fd = BorrowedFd::borrow_raw(fd);
215+
self.poll.modify(fd, renew_event)?;
216+
}
217+
if self.cancelled.remove(&user_data) {
218+
entries.extend(Some(entry_cancelled(user_data)));
153219
} else {
154-
let op = registry[event.key].as_pin();
220+
let op = registry[user_data].as_pin();
155221
let res = match op.on_event(&event) {
156222
Ok(ControlFlow::Continue(_)) => continue,
157223
Ok(ControlFlow::Break(res)) => Ok(res),
158224
Err(err) => Err(err),
159225
};
160-
let entry = Entry::new(event.key, res);
226+
let entry = Entry::new(user_data, res);
161227
entries.extend(Some(entry));
162228
}
163229
}
164230
Ok(())
165231
}
166232

167-
fn poll_cancel(&mut self, entries: &mut impl Extend<Entry>) -> bool {
168-
let has_cancel = !self.cancel_queue.is_empty();
169-
if has_cancel {
170-
entries.extend(self.cancel_queue.drain(..).map(|user_data| {
171-
Entry::new(
172-
user_data,
173-
Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
174-
)
175-
}))
176-
}
177-
has_cancel
178-
}
179-
180-
pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
233+
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
234+
self.registry.entry(fd).or_default();
181235
Ok(())
182236
}
183237

@@ -192,8 +246,7 @@ impl Driver {
192246
entries: &mut impl Extend<Entry>,
193247
registry: &mut Slab<RawOp>,
194248
) -> io::Result<()> {
195-
let mut extended = self.submit_squeue(ops, entries, registry)?;
196-
extended |= self.poll_cancel(entries);
249+
let extended = self.submit_squeue(ops, entries, registry)?;
197250
if !extended {
198251
self.poll_impl(timeout, entries, registry)?;
199252
}
@@ -206,3 +259,21 @@ impl AsRawFd for Driver {
206259
self.poll.as_raw_fd()
207260
}
208261
}
262+
263+
impl Drop for Driver {
264+
fn drop(&mut self) {
265+
for fd in self.registry.keys() {
266+
unsafe {
267+
let fd = BorrowedFd::borrow_raw(*fd);
268+
self.poll.delete(fd).ok();
269+
}
270+
}
271+
}
272+
}
273+
274+
fn entry_cancelled(user_data: usize) -> Entry {
275+
Entry::new(
276+
user_data,
277+
Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
278+
)
279+
}

0 commit comments

Comments
 (0)