Skip to content

Commit 63370e5

Browse files
authored
Merge pull request #35 from Berrysoft/refactor-refcell
Move `RefCell` to runtime
2 parents 4b93d25 + 2277839 commit 63370e5

File tree

8 files changed

+79
-70
lines changed

8 files changed

+79
-70
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use compio::{
4444
op::ReadAt,
4545
};
4646
47-
let driver = Driver::new().unwrap();
47+
let mut driver = Driver::new().unwrap();
4848
let file = File::open("Cargo.toml").unwrap();
4949
// Attach the `RawFd` to driver first.
5050
driver.attach(file.as_raw_fd()).unwrap();

examples/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use compio::{
44
};
55

66
fn main() {
7-
let driver = Driver::new().unwrap();
7+
let mut driver = Driver::new().unwrap();
88
let file = compio::fs::File::open("Cargo.toml").unwrap();
99
driver.attach(file.as_raw_fd()).unwrap();
1010

src/driver/iocp/mod.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{
2-
cell::RefCell,
32
collections::{HashMap, HashSet, VecDeque},
43
ffi::c_void,
54
io,
@@ -119,9 +118,9 @@ pub trait OpCode {
119118
/// Low-level driver of IOCP.
120119
pub struct Driver {
121120
port: OwnedHandle,
122-
operations: RefCell<VecDeque<(*mut dyn OpCode, Overlapped)>>,
123-
submit_map: RefCell<HashMap<usize, *mut OVERLAPPED>>,
124-
cancelled: RefCell<HashSet<*mut OVERLAPPED>>,
121+
operations: VecDeque<(*mut dyn OpCode, Overlapped)>,
122+
submit_map: HashMap<usize, *mut OVERLAPPED>,
123+
cancelled: HashSet<*mut OVERLAPPED>,
125124
}
126125

127126
impl Driver {
@@ -134,9 +133,9 @@ impl Driver {
134133
.map_err(|_| io::Error::last_os_error())?;
135134
Ok(Self {
136135
port,
137-
operations: RefCell::new(VecDeque::with_capacity(Self::DEFAULT_CAPACITY)),
138-
submit_map: RefCell::default(),
139-
cancelled: RefCell::default(),
136+
operations: VecDeque::with_capacity(Self::DEFAULT_CAPACITY),
137+
submit_map: HashMap::default(),
138+
cancelled: HashSet::default(),
140139
})
141140
}
142141
}
@@ -232,7 +231,7 @@ fn ntstatus_from_win32(x: i32) -> NTSTATUS {
232231
}
233232

234233
impl Poller for Driver {
235-
fn attach(&self, fd: RawFd) -> io::Result<()> {
234+
fn attach(&mut self, fd: RawFd) -> io::Result<()> {
236235
detach_iocp(fd)?;
237236
let port = unsafe { CreateIoCompletionPort(fd as _, self.port.as_raw_handle() as _, 0, 0) };
238237
if port == 0 {
@@ -242,39 +241,39 @@ impl Poller for Driver {
242241
}
243242
}
244243

245-
unsafe fn push(&self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()> {
246-
self.operations
247-
.borrow_mut()
248-
.push_back((op, Overlapped::new(user_data)));
244+
unsafe fn push(
245+
&mut self,
246+
op: &mut (impl OpCode + 'static),
247+
user_data: usize,
248+
) -> io::Result<()> {
249+
self.operations.push_back((op, Overlapped::new(user_data)));
249250
Ok(())
250251
}
251252

252-
fn cancel(&self, user_data: usize) {
253-
if let Some(ptr) = self.submit_map.borrow_mut().remove(&user_data) {
253+
fn cancel(&mut self, user_data: usize) {
254+
if let Some(ptr) = self.submit_map.remove(&user_data) {
254255
// TODO: should we call CancelIoEx?
255-
self.cancelled.borrow_mut().insert(ptr);
256+
self.cancelled.insert(ptr);
256257
}
257258
}
258259

259260
fn poll(
260-
&self,
261+
&mut self,
261262
timeout: Option<Duration>,
262263
entries: &mut [MaybeUninit<Entry>],
263264
) -> io::Result<usize> {
264265
if entries.is_empty() {
265266
return Ok(0);
266267
}
267-
while let Some((op, overlapped)) = self.operations.borrow_mut().pop_front() {
268+
while let Some((op, overlapped)) = self.operations.pop_front() {
268269
let overlapped = Box::new(overlapped);
269270
let user_data = overlapped.user_data;
270271
let overlapped_ptr = Box::into_raw(overlapped);
271272
let result = unsafe { op.as_mut().unwrap().operate(overlapped_ptr.cast()) };
272273
if let Poll::Ready(result) = result {
273274
post_driver_raw(self.port.as_raw_handle(), result, overlapped_ptr.cast())?;
274275
} else {
275-
self.submit_map
276-
.borrow_mut()
277-
.insert(user_data, overlapped_ptr.cast());
276+
self.submit_map.insert(user_data, overlapped_ptr.cast());
278277
}
279278
}
280279

@@ -307,7 +306,7 @@ impl Poller for Driver {
307306
let transferred = iocp_entry.dwNumberOfBytesTransferred;
308307
let overlapped_ptr = iocp_entry.lpOverlapped;
309308
let overlapped = unsafe { Box::from_raw(overlapped_ptr.cast::<Overlapped>()) };
310-
if self.cancelled.borrow_mut().remove(&overlapped_ptr) {
309+
if self.cancelled.remove(&overlapped_ptr) {
311310
continue;
312311
}
313312
let res = if matches!(

src/driver/iour/mod.rs

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#[doc(no_inline)]
22
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
3-
use std::{cell::RefCell, collections::VecDeque, io, mem::MaybeUninit, time::Duration};
3+
use std::{collections::VecDeque, io, mem::MaybeUninit, time::Duration};
44

55
use io_uring::{
66
cqueue,
@@ -24,8 +24,8 @@ pub trait OpCode {
2424
/// Low-level driver of io-uring.
2525
pub struct Driver {
2626
inner: IoUring,
27-
squeue: RefCell<VecDeque<squeue::Entry>>,
28-
cqueue: RefCell<VecDeque<Entry>>,
27+
squeue: VecDeque<squeue::Entry>,
28+
cqueue: VecDeque<Entry>,
2929
}
3030

3131
impl Driver {
@@ -38,25 +38,27 @@ impl Driver {
3838
pub fn with_entries(entries: u32) -> io::Result<Self> {
3939
Ok(Self {
4040
inner: IoUring::new(entries)?,
41-
squeue: RefCell::new(VecDeque::with_capacity(entries as usize)),
42-
cqueue: RefCell::new(VecDeque::with_capacity(entries as usize)),
41+
squeue: VecDeque::with_capacity(entries as usize),
42+
cqueue: VecDeque::with_capacity(entries as usize),
4343
})
4444
}
4545

46-
unsafe fn submit(&self, timeout: Option<Duration>) -> io::Result<()> {
46+
fn submit(&mut self, timeout: Option<Duration>) -> io::Result<()> {
4747
// Anyway we need to submit once, no matter there are entries in squeue.
4848
loop {
49-
let mut inner_squeue = self.inner.submission_shared();
50-
while !inner_squeue.is_full() {
51-
if let Some(entry) = self.squeue.borrow_mut().pop_front() {
52-
inner_squeue.push(&entry).unwrap();
53-
} else {
54-
break;
49+
{
50+
let mut inner_squeue = self.inner.submission();
51+
while !inner_squeue.is_full() {
52+
if let Some(entry) = self.squeue.pop_front() {
53+
unsafe { inner_squeue.push(&entry) }.unwrap();
54+
} else {
55+
break;
56+
}
5557
}
58+
inner_squeue.sync();
5659
}
57-
inner_squeue.sync();
5860

59-
let res = if self.squeue.borrow().is_empty() {
61+
let res = if self.squeue.is_empty() {
6062
// Last part of submission queue, wait till timeout.
6163
if let Some(duration) = timeout {
6264
let timespec = timespec(duration);
@@ -68,7 +70,6 @@ impl Driver {
6870
} else {
6971
self.inner.submit()
7072
};
71-
inner_squeue.sync();
7273
match res {
7374
Ok(_) => Ok(()),
7475
Err(e) => match e.raw_os_error() {
@@ -78,7 +79,7 @@ impl Driver {
7879
},
7980
}?;
8081

81-
for entry in self.inner.completion_shared() {
82+
for entry in self.inner.completion() {
8283
let entry = create_entry(entry);
8384
if entry.user_data() == u64::MAX as _ {
8485
// This is a cancel operation.
@@ -90,45 +91,47 @@ impl Driver {
9091
continue;
9192
}
9293
}
93-
self.cqueue.borrow_mut().push_back(entry);
94+
self.cqueue.push_back(entry);
9495
}
9596

96-
if self.squeue.borrow().is_empty() && inner_squeue.is_empty() {
97+
if self.squeue.is_empty() && self.inner.submission().is_empty() {
9798
break;
9899
}
99100
}
100101
Ok(())
101102
}
102103

103-
fn poll_entries(&self, entries: &mut [MaybeUninit<Entry>]) -> usize {
104-
let mut cqueue = self.cqueue.borrow_mut();
105-
let len = cqueue.len().min(entries.len());
104+
fn poll_entries(&mut self, entries: &mut [MaybeUninit<Entry>]) -> usize {
105+
let len = self.cqueue.len().min(entries.len());
106106
for entry in &mut entries[..len] {
107-
entry.write(cqueue.pop_front().unwrap());
107+
entry.write(self.cqueue.pop_front().unwrap());
108108
}
109109
len
110110
}
111111
}
112112

113113
impl Poller for Driver {
114-
fn attach(&self, _fd: RawFd) -> io::Result<()> {
114+
fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
115115
Ok(())
116116
}
117117

118-
unsafe fn push(&self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()> {
118+
unsafe fn push(
119+
&mut self,
120+
op: &mut (impl OpCode + 'static),
121+
user_data: usize,
122+
) -> io::Result<()> {
119123
let entry = op.create_entry().user_data(user_data as _);
120-
self.squeue.borrow_mut().push_back(entry);
124+
self.squeue.push_back(entry);
121125
Ok(())
122126
}
123127

124-
fn cancel(&self, user_data: usize) {
128+
fn cancel(&mut self, user_data: usize) {
125129
self.squeue
126-
.borrow_mut()
127130
.push_back(AsyncCancel::new(user_data as _).build().user_data(u64::MAX));
128131
}
129132

130133
fn poll(
131-
&self,
134+
&mut self,
132135
timeout: Option<Duration>,
133136
entries: &mut [MaybeUninit<Entry>],
134137
) -> io::Result<usize> {
@@ -139,12 +142,18 @@ impl Poller for Driver {
139142
if len > 0 {
140143
return Ok(len);
141144
}
142-
unsafe { self.submit(timeout) }?;
145+
self.submit(timeout)?;
143146
let len = self.poll_entries(entries);
144147
Ok(len)
145148
}
146149
}
147150

151+
impl AsRawFd for Driver {
152+
fn as_raw_fd(&self) -> RawFd {
153+
self.inner.as_raw_fd()
154+
}
155+
}
156+
148157
fn create_entry(entry: cqueue::Entry) -> Entry {
149158
let result = entry.result();
150159
let result = if result < 0 {

src/driver/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ cfg_if::cfg_if! {
4343
/// socket.connect(second_addr).unwrap();
4444
/// other_socket.connect(first_addr).unwrap();
4545
///
46-
/// let driver = Driver::new().unwrap();
46+
/// let mut driver = Driver::new().unwrap();
4747
/// driver.attach(socket.as_raw_fd()).unwrap();
4848
/// driver.attach(other_socket.as_raw_fd()).unwrap();
4949
///
@@ -72,18 +72,19 @@ pub trait Poller {
7272
/// ## Platform specific
7373
/// * IOCP: it will be attached to the IOCP completion port.
7474
/// * io-uring: it will do nothing and return `Ok(())`
75-
fn attach(&self, fd: RawFd) -> io::Result<()>;
75+
fn attach(&mut self, fd: RawFd) -> io::Result<()>;
7676

7777
/// Push an operation with user-defined data.
7878
/// The data could be retrived from [`Entry`] when polling.
7979
///
8080
/// # Safety
8181
///
8282
/// - `op` should be alive until [`Poller::poll`] returns its result.
83-
unsafe fn push(&self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()>;
83+
unsafe fn push(&mut self, op: &mut (impl OpCode + 'static), user_data: usize)
84+
-> io::Result<()>;
8485

8586
/// Cancel an operation with the pushed user-defined data.
86-
fn cancel(&self, user_data: usize);
87+
fn cancel(&mut self, user_data: usize);
8788

8889
/// Poll the driver with an optional timeout.
8990
///
@@ -96,15 +97,15 @@ pub trait Poller {
9697
///
9798
/// [`Event`]: crate::event::Event
9899
fn poll(
99-
&self,
100+
&mut self,
100101
timeout: Option<Duration>,
101102
entries: &mut [MaybeUninit<Entry>],
102103
) -> io::Result<usize>;
103104

104105
/// Poll the driver and get only one entry back.
105106
///
106107
/// See [`Poller::poll`].
107-
fn poll_one(&self, timeout: Option<Duration>) -> io::Result<Entry> {
108+
fn poll_one(&mut self, timeout: Option<Duration>) -> io::Result<Entry> {
108109
let mut entry = MaybeUninit::uninit();
109110
let polled = self.poll(timeout, std::slice::from_mut(&mut entry))?;
110111
debug_assert_eq!(polled, 1);

src/event/iocp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{io, marker::PhantomData};
22

33
use crate::{
4-
driver::{post_driver, AsRawFd, RawFd},
4+
driver::{post_driver, RawFd},
55
key::Key,
66
task::{op::OpFuture, RUNTIME},
77
};
@@ -45,7 +45,7 @@ unsafe impl Sync for EventHandle<'_> {}
4545

4646
impl<'a> EventHandle<'a> {
4747
pub(crate) fn new(user_data: &'a Key<()>) -> Self {
48-
let handle = RUNTIME.with(|runtime| runtime.driver().as_raw_fd());
48+
let handle = RUNTIME.with(|runtime| runtime.raw_driver());
4949
Self {
5050
user_data: **user_data,
5151
handle,

0 commit comments

Comments
 (0)