Skip to content

Commit cdedbfc

Browse files
authored
rt: refactor driver, runtime, and op (tokio-rs#197)
This cleans up a few different pieces and is a followup change from tokio-rs#196.
1 parent 4417d63 commit cdedbfc

File tree

7 files changed

+303
-264
lines changed

7 files changed

+303
-264
lines changed

src/buf/fixed/pool.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,7 @@ impl FixedBufPool {
152152
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
153153
FixedBufPool {
154154
inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))),
155-
driver: CONTEXT.with(|x| {
156-
x.handle()
157-
.as_ref()
158-
.expect("Not in a runtime context")
159-
.into()
160-
}),
155+
driver: CONTEXT.with(|x| x.weak().expect("Not in a runtime context")),
161156
}
162157
}
163158

src/buf/fixed/registry.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,7 @@ impl FixedBufRegistry {
106106
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
107107
FixedBufRegistry {
108108
inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))),
109-
driver: CONTEXT.with(|x| {
110-
x.handle()
111-
.as_ref()
112-
.expect("Not in a runtime context")
113-
.into()
114-
}),
109+
driver: CONTEXT.with(|x| x.weak().expect("Not in a runtime context")),
115110
}
116111
}
117112

src/runtime/context.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::runtime::driver;
2-
use crate::runtime::driver::Handle;
2+
use crate::runtime::driver::{Handle, WeakHandle};
33
use std::cell::RefCell;
44

55
/// Owns the driver and resides in thread-local storage.
@@ -44,17 +44,7 @@ impl RuntimeContext {
4444
self.driver.borrow().clone()
4545
}
4646

47-
/// Execute a function which requires mutable access to the driver.
48-
pub(crate) fn with_handle_mut<F, R>(&self, f: F) -> R
49-
where
50-
F: FnOnce(&mut driver::Handle) -> R,
51-
{
52-
let mut guard = self.driver.borrow_mut();
53-
54-
let driver = guard
55-
.as_mut()
56-
.expect("Attempted to access driver in invalid context");
57-
58-
f(driver)
47+
pub(crate) fn weak(&self) -> Option<WeakHandle> {
48+
self.driver.borrow().as_ref().map(Into::into)
5949
}
6050
}

src/runtime/driver/handle.rs

Lines changed: 17 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@
1212
//! The weak handle should be used by anything which is stored in the driver or does not need to
1313
//! keep the driver alive for it's duration.
1414
15-
use io_uring::{cqueue, squeue};
15+
use io_uring::squeue;
1616
use std::cell::RefCell;
1717
use std::io;
18+
use std::ops::Deref;
1819
use std::os::unix::io::{AsRawFd, RawFd};
1920
use std::rc::{Rc, Weak};
2021
use std::task::{Context, Poll};
2122

2223
use crate::buf::fixed::FixedBuffers;
23-
use crate::runtime::driver::op::{Completable, Lifecycle, MultiCQEFuture, Op, Updateable};
24+
use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable};
2425
use crate::runtime::driver::Driver;
2526

2627
#[derive(Clone)]
@@ -40,8 +41,8 @@ impl Handle {
4041
})
4142
}
4243

43-
pub(crate) fn tick(&self) {
44-
self.inner.borrow_mut().tick()
44+
pub(crate) fn dispatch_completions(&self) {
45+
self.inner.borrow_mut().dispatch_completions()
4546
}
4647

4748
pub(crate) fn flush(&self) -> io::Result<usize> {
@@ -52,99 +53,29 @@ impl Handle {
5253
&self,
5354
buffers: Rc<RefCell<dyn FixedBuffers>>,
5455
) -> io::Result<()> {
55-
let mut driver = self.inner.borrow_mut();
56-
57-
driver
58-
.uring
59-
.submitter()
60-
.register_buffers(buffers.borrow().iovecs())?;
61-
62-
driver.fixed_buffers = Some(buffers);
63-
Ok(())
56+
self.inner.borrow_mut().register_buffers(buffers)
6457
}
6558

6659
pub(crate) fn unregister_buffers(
6760
&self,
6861
buffers: Rc<RefCell<dyn FixedBuffers>>,
6962
) -> io::Result<()> {
70-
let mut driver = self.inner.borrow_mut();
71-
72-
if let Some(currently_registered) = &driver.fixed_buffers {
73-
if Rc::ptr_eq(&buffers, currently_registered) {
74-
driver.uring.submitter().unregister_buffers()?;
75-
driver.fixed_buffers = None;
76-
return Ok(());
77-
}
78-
}
79-
Err(io::Error::new(
80-
io::ErrorKind::Other,
81-
"fixed buffers are not currently registered",
82-
))
63+
self.inner.borrow_mut().unregister_buffers(buffers)
8364
}
8465

85-
/// Submit an operation to uring.
86-
///
87-
/// `state` is stored during the operation tracking any state submitted to
88-
/// the kernel.
89-
pub(crate) fn submit_op<T, S, F>(&self, mut data: T, f: F) -> io::Result<Op<T, S>>
66+
pub(crate) fn submit_op<T, S, F>(&self, data: T, f: F) -> io::Result<Op<T, S>>
9067
where
9168
T: Completable,
9269
F: FnOnce(&mut T) -> squeue::Entry,
9370
{
94-
let mut driver = self.inner.borrow_mut();
95-
let index = driver.ops.insert();
96-
97-
// Configure the SQE
98-
let sqe = f(&mut data).user_data(index as _);
99-
100-
// Create the operation
101-
let op = Op::new(self.into(), data, index);
102-
103-
// Push the new operation
104-
while unsafe { driver.uring.submission().push(&sqe).is_err() } {
105-
// If the submission queue is full, flush it to the kernel
106-
driver.submit()?;
107-
}
108-
109-
Ok(op)
71+
self.inner.borrow_mut().submit_op(data, f, self.into())
11072
}
11173

11274
pub(crate) fn poll_op<T>(&self, op: &mut Op<T>, cx: &mut Context<'_>) -> Poll<T::Output>
11375
where
11476
T: Unpin + 'static + Completable,
11577
{
116-
use std::mem;
117-
118-
let mut driver = self.inner.borrow_mut();
119-
120-
let (lifecycle, _) = driver
121-
.ops
122-
.get_mut(op.index)
123-
.expect("invalid internal state");
124-
125-
match mem::replace(lifecycle, Lifecycle::Submitted) {
126-
Lifecycle::Submitted => {
127-
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
128-
Poll::Pending
129-
}
130-
Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => {
131-
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
132-
Poll::Pending
133-
}
134-
Lifecycle::Waiting(waker) => {
135-
*lifecycle = Lifecycle::Waiting(waker);
136-
Poll::Pending
137-
}
138-
Lifecycle::Ignored(..) => unreachable!(),
139-
Lifecycle::Completed(cqe) => {
140-
driver.ops.remove(op.index);
141-
op.index = usize::MAX;
142-
Poll::Ready(op.data.take().unwrap().complete(cqe))
143-
}
144-
Lifecycle::CompletionList(..) => {
145-
unreachable!("No `more` flag set for SingleCQE")
146-
}
147-
}
78+
self.inner.borrow_mut().poll_op(op, cx)
14879
}
14980

15081
pub(crate) fn poll_multishot_op<T>(
@@ -155,103 +86,11 @@ impl Handle {
15586
where
15687
T: Unpin + 'static + Completable + Updateable,
15788
{
158-
use std::mem;
159-
160-
let mut driver = self.inner.borrow_mut();
161-
162-
let (lifecycle, completions) = driver
163-
.ops
164-
.get_mut(op.index)
165-
.expect("invalid internal state");
166-
167-
match mem::replace(lifecycle, Lifecycle::Submitted) {
168-
Lifecycle::Submitted => {
169-
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
170-
Poll::Pending
171-
}
172-
Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => {
173-
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
174-
Poll::Pending
175-
}
176-
Lifecycle::Waiting(waker) => {
177-
*lifecycle = Lifecycle::Waiting(waker);
178-
Poll::Pending
179-
}
180-
Lifecycle::Ignored(..) => unreachable!(),
181-
Lifecycle::Completed(cqe) => {
182-
// This is possible. We may have previously polled a CompletionList,
183-
// and the final CQE registered as Completed
184-
driver.ops.remove(op.index);
185-
op.index = usize::MAX;
186-
Poll::Ready(op.data.take().unwrap().complete(cqe))
187-
}
188-
Lifecycle::CompletionList(indices) => {
189-
let mut data = op.data.take().unwrap();
190-
let mut status = Poll::Pending;
191-
// Consume the CqeResult list, calling update on the Op on all Cqe's flagged `more`
192-
// If the final Cqe is present, clean up and return Poll::Ready
193-
for cqe in indices.into_list(completions) {
194-
if cqueue::more(cqe.flags) {
195-
data.update(cqe);
196-
} else {
197-
status = Poll::Ready(cqe);
198-
break;
199-
}
200-
}
201-
match status {
202-
Poll::Pending => {
203-
// We need more CQE's. Restore the op state
204-
let _ = op.data.insert(data);
205-
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
206-
Poll::Pending
207-
}
208-
Poll::Ready(cqe) => {
209-
driver.ops.remove(op.index);
210-
op.index = usize::MAX;
211-
Poll::Ready(data.complete(cqe))
212-
}
213-
}
214-
}
215-
}
89+
self.inner.borrow_mut().poll_multishot_op(op, cx)
21690
}
21791

21892
pub(crate) fn remove_op<T, CqeType>(&self, op: &mut Op<T, CqeType>) {
219-
use std::mem;
220-
221-
let mut driver = self.inner.borrow_mut();
222-
223-
// Get the Op Lifecycle state from the driver
224-
let (lifecycle, completions) = match driver.ops.get_mut(op.index) {
225-
Some(val) => val,
226-
None => {
227-
// Op dropped after the driver
228-
return;
229-
}
230-
};
231-
232-
match mem::replace(lifecycle, Lifecycle::Submitted) {
233-
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
234-
*lifecycle = Lifecycle::Ignored(Box::new(op.data.take()));
235-
}
236-
Lifecycle::Completed(..) => {
237-
driver.ops.remove(op.index);
238-
}
239-
Lifecycle::CompletionList(indices) => {
240-
// Deallocate list entries, recording if more CQE's are expected
241-
let more = {
242-
let mut list = indices.into_list(completions);
243-
cqueue::more(list.peek_end().unwrap().flags)
244-
// Dropping list deallocates the list entries
245-
};
246-
if more {
247-
// If more are expected, we have to keep the op around
248-
*lifecycle = Lifecycle::Ignored(Box::new(op.data.take()));
249-
} else {
250-
driver.ops.remove(op.index);
251-
}
252-
}
253-
Lifecycle::Ignored(..) => unreachable!(),
254-
}
93+
self.inner.borrow_mut().remove_op(op)
25594
}
25695
}
25796

@@ -277,8 +116,11 @@ impl From<Driver> for Handle {
277116
}
278117
}
279118

280-
impl From<&Handle> for WeakHandle {
281-
fn from(handle: &Handle) -> Self {
119+
impl<T> From<T> for WeakHandle
120+
where
121+
T: Deref<Target = Handle>,
122+
{
123+
fn from(handle: T) -> Self {
282124
Self {
283125
inner: Rc::downgrade(&handle.inner),
284126
}

0 commit comments

Comments
 (0)