Skip to content

Commit e1b2f1a

Browse files
authored
driver: refactor the driver to be stored in a shared set of handles (tokio-rs#196)
This opens up a lot of further cleanup opportunities, especially around shutdown logic. It also opens up the opportunity for us to eventually provide a public Handle API. I am trying to make it so that we don't need to orchestrate our entire shutdown logic around the thread locals rather than purely the CONTEXT thread local, which is slightly broken to begin with (I believe that there are currently issues around creating two runtimes at the same time). Now the CONTEXT behaves more like in tokio. When you enter a runtime context via block_on, we set the context. When we leave that call, we unset the context. This also allows us to have a bit more flexibility with things like sending ops between multiple runtimes as well, so you can now in theory await ops created from another runtime. Part of the goal of this is to add new options around things like fixed or provided buffers. This potentially allows us to work towards things like transferring buffers between uring runtimes. More importantly however, it is a crucial first step towards having a handle to a uring runtime. More refactoring to come.
1 parent 3b2174e commit e1b2f1a

28 files changed

+859
-650
lines changed

src/buf/fixed/pool.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use super::handle::CheckedOutBuf;
22
use super::{FixedBuf, FixedBuffers};
33

4+
use crate::runtime::driver::WeakHandle;
5+
use crate::runtime::CONTEXT;
46
use libc::{iovec, UIO_MAXIOV};
57
use std::cell::RefCell;
68
use std::cmp;
@@ -53,12 +55,12 @@ use std::slice;
5355
/// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
5456
/// # let BUF_SIZE_LARGE = memlock_limit as usize / 8;
5557
/// # let BUF_SIZE_SMALL = memlock_limit as usize / 16;
56-
/// let pool = FixedBufPool::new(
57-
/// iter::once(Vec::with_capacity(BUF_SIZE_LARGE))
58-
/// .chain(iter::repeat_with(|| Vec::with_capacity(BUF_SIZE_SMALL)).take(2))
59-
/// );
60-
///
6158
/// tokio_uring::start(async {
59+
/// let pool = FixedBufPool::new(
60+
/// iter::once(Vec::with_capacity(BUF_SIZE_LARGE))
61+
/// .chain(iter::repeat_with(|| Vec::with_capacity(BUF_SIZE_SMALL)).take(2))
62+
/// );
63+
///
6264
/// pool.register()?;
6365
///
6466
/// let buf = pool.try_next(BUF_SIZE_LARGE).unwrap();
@@ -82,6 +84,7 @@ use std::slice;
8284
#[derive(Clone)]
8385
pub struct FixedBufPool {
8486
inner: Rc<RefCell<Inner>>,
87+
driver: WeakHandle,
8588
}
8689

8790
impl FixedBufPool {
@@ -149,6 +152,12 @@ impl FixedBufPool {
149152
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
150153
FixedBufPool {
151154
inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))),
155+
driver: CONTEXT.with(|x| {
156+
x.handle()
157+
.as_ref()
158+
.expect("Not in a runtime context")
159+
.into()
160+
}),
152161
}
153162
}
154163

@@ -172,7 +181,10 @@ impl FixedBufPool {
172181
/// of the `tokio-uring` runtime this call is made in, the function returns
173182
/// an error.
174183
pub fn register(&self) -> io::Result<()> {
175-
crate::io::register_buffers(Rc::clone(&self.inner) as _)
184+
self.driver
185+
.upgrade()
186+
.expect("Runtime context is no longer present")
187+
.register_buffers(Rc::clone(&self.inner) as _)
176188
}
177189

178190
/// Unregisters this collection of buffers.
@@ -191,7 +203,10 @@ impl FixedBufPool {
191203
/// an error. Calling `unregister` when no `FixedBufPool` is currently
192204
/// registered on this runtime also returns an error.
193205
pub fn unregister(&self) -> io::Result<()> {
194-
crate::io::unregister_buffers(Rc::clone(&self.inner) as _)
206+
self.driver
207+
.upgrade()
208+
.expect("Runtime context is no longer present")
209+
.unregister_buffers(Rc::clone(&self.inner) as _)
195210
}
196211

197212
/// Returns a buffer of requested capacity from this pool

src/buf/fixed/registry.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use super::handle::CheckedOutBuf;
22
use super::{FixedBuf, FixedBuffers};
33

4+
use crate::runtime::driver::WeakHandle;
5+
use crate::runtime::CONTEXT;
46
use libc::{iovec, UIO_MAXIOV};
57
use std::cell::RefCell;
68
use std::cmp;
@@ -35,6 +37,7 @@ use std::slice;
3537
#[derive(Clone)]
3638
pub struct FixedBufRegistry {
3739
inner: Rc<RefCell<Inner>>,
40+
driver: WeakHandle,
3841
}
3942

4043
impl FixedBufRegistry {
@@ -90,11 +93,10 @@ impl FixedBufRegistry {
9093
/// # let (memlock_limit, _) = getrlimit(Resource::RLIMIT_MEMLOCK)?;
9194
/// # let NUM_BUFFERS = std::cmp::max(memlock_limit as usize / 4096 / 8, 1);
9295
/// # let BUF_SIZE = 4096;
93-
/// let registry = FixedBufRegistry::new(
94-
/// iter::repeat_with(|| Vec::with_capacity(BUF_SIZE)).take(NUM_BUFFERS)
95-
/// );
96-
///
9796
/// tokio_uring::start(async {
97+
/// let registry = FixedBufRegistry::new(
98+
/// iter::repeat_with(|| Vec::with_capacity(BUF_SIZE)).take(NUM_BUFFERS)
99+
/// );
98100
/// registry.register()?;
99101
/// // ...
100102
/// Ok(())
@@ -104,6 +106,12 @@ impl FixedBufRegistry {
104106
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
105107
FixedBufRegistry {
106108
inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))),
109+
driver: CONTEXT.with(|x| {
110+
x.handle()
111+
.as_ref()
112+
.expect("Not in a runtime context")
113+
.into()
114+
}),
107115
}
108116
}
109117

@@ -127,7 +135,10 @@ impl FixedBufRegistry {
127135
/// of the `tokio-uring` runtime this call is made in, the function returns
128136
/// an error.
129137
pub fn register(&self) -> io::Result<()> {
130-
crate::io::register_buffers(Rc::clone(&self.inner) as _)
138+
self.driver
139+
.upgrade()
140+
.expect("Runtime context is no longer present")
141+
.register_buffers(Rc::clone(&self.inner) as _)
131142
}
132143

133144
/// Unregisters this collection of buffers.
@@ -146,7 +157,10 @@ impl FixedBufRegistry {
146157
/// an error. Calling `unregister` when no `FixedBufRegistry` is currently
147158
/// registered on this runtime also returns an error.
148159
pub fn unregister(&self) -> io::Result<()> {
149-
crate::io::unregister_buffers(Rc::clone(&self.inner) as _)
160+
self.driver
161+
.upgrade()
162+
.expect("Runtime context is no longer present")
163+
.unregister_buffers(Rc::clone(&self.inner) as _)
150164
}
151165

152166
/// Returns a buffer identified by the specified index for use by the

src/io/accept.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::io::{SharedFd, Socket};
22
use crate::runtime::driver::op;
33
use crate::runtime::driver::op::{Completable, Op};
4+
use crate::runtime::CONTEXT;
45
use std::net::SocketAddr;
56
use std::{boxed::Box, io};
67

@@ -17,21 +18,23 @@ impl Op<Accept> {
1718
unsafe { std::mem::zeroed() },
1819
std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t,
1920
));
20-
Op::submit_with(
21-
Accept {
22-
fd: fd.clone(),
23-
socketaddr,
24-
},
25-
|accept| {
26-
opcode::Accept::new(
27-
types::Fd(accept.fd.raw_fd()),
28-
&mut accept.socketaddr.0 as *mut _ as *mut _,
29-
&mut accept.socketaddr.1,
30-
)
31-
.flags(libc::O_CLOEXEC)
32-
.build()
33-
},
34-
)
21+
CONTEXT.with(|x| {
22+
x.handle().expect("Not in a runtime context").submit_op(
23+
Accept {
24+
fd: fd.clone(),
25+
socketaddr,
26+
},
27+
|accept| {
28+
opcode::Accept::new(
29+
types::Fd(accept.fd.raw_fd()),
30+
&mut accept.socketaddr.0 as *mut _ as *mut _,
31+
&mut accept.socketaddr.1,
32+
)
33+
.flags(libc::O_CLOEXEC)
34+
.build()
35+
},
36+
)
37+
})
3538
}
3639
}
3740

src/io/close.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::runtime::driver::op;
22
use crate::runtime::driver::op::{Completable, Op};
3+
use crate::runtime::CONTEXT;
34
use std::io;
45
use std::os::unix::io::RawFd;
56

@@ -11,8 +12,12 @@ impl Op<Close> {
1112
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
1213
use io_uring::{opcode, types};
1314

14-
Op::submit_with(Close { fd }, |close| {
15-
opcode::Close::new(types::Fd(close.fd)).build()
15+
CONTEXT.with(|x| {
16+
x.handle()
17+
.expect("Not in a runtime context")
18+
.submit_op(Close { fd }, |close| {
19+
opcode::Close::new(types::Fd(close.fd)).build()
20+
})
1621
})
1722
}
1823
}

src/io/connect.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::io::SharedFd;
22
use crate::runtime::driver::op::{Completable, CqeResult, Op};
3+
use crate::runtime::CONTEXT;
34
use socket2::SockAddr;
45
use std::io;
56

@@ -16,20 +17,22 @@ impl Op<Connect> {
1617
pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> io::Result<Op<Connect>> {
1718
use io_uring::{opcode, types};
1819

19-
Op::submit_with(
20-
Connect {
21-
fd: fd.clone(),
22-
socket_addr: Box::new(socket_addr),
23-
},
24-
|connect| {
25-
opcode::Connect::new(
26-
types::Fd(connect.fd.raw_fd()),
27-
connect.socket_addr.as_ptr(),
28-
connect.socket_addr.len(),
29-
)
30-
.build()
31-
},
32-
)
20+
CONTEXT.with(|x| {
21+
x.handle().expect("Not in a runtime context").submit_op(
22+
Connect {
23+
fd: fd.clone(),
24+
socket_addr: Box::new(socket_addr),
25+
},
26+
|connect| {
27+
opcode::Connect::new(
28+
types::Fd(connect.fd.raw_fd()),
29+
connect.socket_addr.as_ptr(),
30+
connect.socket_addr.len(),
31+
)
32+
.build()
33+
},
34+
)
35+
})
3336
}
3437
}
3538

src/io/fsync.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::io;
22

33
use crate::io::SharedFd;
44
use crate::runtime::driver::op::{Completable, CqeResult, Op};
5+
use crate::runtime::CONTEXT;
56
use io_uring::{opcode, types};
67

78
pub(crate) struct Fsync {
@@ -10,16 +11,25 @@ pub(crate) struct Fsync {
1011

1112
impl Op<Fsync> {
1213
pub(crate) fn fsync(fd: &SharedFd) -> io::Result<Op<Fsync>> {
13-
Op::submit_with(Fsync { fd: fd.clone() }, |fsync| {
14-
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())).build()
14+
CONTEXT.with(|x| {
15+
x.handle()
16+
.expect("Not in a runtime context")
17+
.submit_op(Fsync { fd: fd.clone() }, |fsync| {
18+
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())).build()
19+
})
1520
})
1621
}
1722

1823
pub(crate) fn datasync(fd: &SharedFd) -> io::Result<Op<Fsync>> {
19-
Op::submit_with(Fsync { fd: fd.clone() }, |fsync| {
20-
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd()))
21-
.flags(types::FsyncFlags::DATASYNC)
22-
.build()
24+
CONTEXT.with(|x| {
25+
x.handle().expect("Not in a runtime context").submit_op(
26+
Fsync { fd: fd.clone() },
27+
|fsync| {
28+
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd()))
29+
.flags(types::FsyncFlags::DATASYNC)
30+
.build()
31+
},
32+
)
2333
})
2434
}
2535
}

src/io/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ mod read_fixed;
1818

1919
mod readv;
2020

21-
mod register_buffers;
22-
pub(crate) use register_buffers::{register_buffers, unregister_buffers};
23-
2421
mod recv_from;
2522

2623
mod rename_at;

src/io/noop.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::runtime::driver::op::{Completable, CqeResult, Op};
2+
use crate::runtime::CONTEXT;
23
use std::io;
34

45
/// No operation. Just posts a completion event, nothing else.
@@ -10,7 +11,11 @@ impl Op<NoOp> {
1011
pub fn no_op() -> io::Result<Op<NoOp>> {
1112
use io_uring::opcode;
1213

13-
Op::submit_with(NoOp {}, |_| opcode::Nop::new().build())
14+
CONTEXT.with(|x| {
15+
x.handle()
16+
.expect("Not in a runtime context")
17+
.submit_op(NoOp {}, |_| opcode::Nop::new().build())
18+
})
1419
}
1520
}
1621

src/io/open.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::fs::{File, OpenOptions};
22
use crate::io::SharedFd;
33

44
use crate::runtime::driver::op::{Completable, CqeResult, Op};
5+
use crate::runtime::CONTEXT;
56
use std::ffi::CString;
67
use std::io;
78
use std::path::Path;
@@ -23,16 +24,20 @@ impl Op<Open> {
2324
| options.creation_mode()?
2425
| (options.custom_flags & !libc::O_ACCMODE);
2526

26-
Op::submit_with(Open { path, flags }, |open| {
27-
// Get a reference to the memory. The string will be held by the
28-
// operation state and will not be accessed again until the operation
29-
// completes.
30-
let p_ref = open.path.as_c_str().as_ptr();
27+
CONTEXT.with(|x| {
28+
x.handle()
29+
.expect("Not in a runtime context")
30+
.submit_op(Open { path, flags }, |open| {
31+
// Get a reference to the memory. The string will be held by the
32+
// operation state and will not be accessed again until the operation
33+
// completes.
34+
let p_ref = open.path.as_c_str().as_ptr();
3135

32-
opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref)
33-
.flags(flags)
34-
.mode(options.mode)
35-
.build()
36+
opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref)
37+
.flags(flags)
38+
.mode(options.mode)
39+
.build()
40+
})
3641
})
3742
}
3843
}

0 commit comments

Comments
 (0)