diff --git a/Cargo.toml b/Cargo.toml index 9b707e80..17385d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,14 +16,22 @@ keywords = ["async", "fs", "io-uring"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +# Enable 128-byte submission queue entries. This requires Linux >= 5.19. +sqe128 = [] +# Enable 32-byte completion queue entries. This requires Linux >= 5.19. +cqe32 = [] + [dependencies] tokio = { version = "1.2", features = ["net", "rt", "sync"] } slab = "0.4.2" libc = "0.2.80" -io-uring = "0.5.13" +io-uring = "0.6.0" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } -futures-util = { version = "0.3.26", default-features = false, features = ["std"] } +futures-util = { version = "0.3.26", default-features = false, features = [ + "std", +] } [dev-dependencies] tempfile = "3.2.0" diff --git a/src/buf/fixed/handle.rs b/src/buf/fixed/handle.rs index 5e77c125..e8f910d6 100644 --- a/src/buf/fixed/handle.rs +++ b/src/buf/fixed/handle.rs @@ -82,9 +82,7 @@ unsafe impl IoBufMut for FixedBuf { } unsafe fn set_init(&mut self, pos: usize) { - if self.buf.init_len < pos { - self.buf.init_len = pos - } + self.buf.init_len = pos } } diff --git a/src/buf/io_buf_mut.rs b/src/buf/io_buf_mut.rs index ea1e19c8..c3b2acc4 100644 --- a/src/buf/io_buf_mut.rs +++ b/src/buf/io_buf_mut.rs @@ -40,9 +40,7 @@ unsafe impl IoBufMut for Vec { } unsafe fn set_init(&mut self, init_len: usize) { - if self.len() < init_len { - self.set_len(init_len); - } + self.set_len(init_len); } } @@ -53,8 +51,6 @@ unsafe impl IoBufMut for bytes::BytesMut { } unsafe fn set_init(&mut self, init_len: usize) { - if self.len() < init_len { - self.set_len(init_len); - } + self.set_len(init_len); } } diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..2d92dd5e 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -182,6 +182,27 @@ impl File { op.await } + /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl(2). + /// + /// Marked as unsafe, as it is the callers responsibility to ensure that any data which is required + /// to be stable is either passed in as the metadata argument, or otherwise stable through the life of the operation + pub async unsafe fn uring_cmd16( + &self, + cmd_op: u32, + cmd: [u8; 16], + metadata: T, + ) -> (io::Result, T) { + let op = Op::uring_cmd16(&self.fd, cmd_op, cmd, metadata).unwrap(); + op.await + } + + #[cfg(feature = "sqe128")] + /// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl(2). + pub async fn uring_cmd80(&self, cmd_op: u32, cmd: [u8; 80]) -> io::Result { + let op = Op::uring_cmd80(&self.fd, cmd_op, cmd).unwrap(); + op.await + } + /// Read some bytes at the specified offset from the file into the specified /// array of buffers, returning how many bytes were read. /// diff --git a/src/io/fallocate.rs b/src/io/fallocate.rs index fa932124..382710d0 100644 --- a/src/io/fallocate.rs +++ b/src/io/fallocate.rs @@ -25,8 +25,8 @@ impl Op { x.handle().expect("not in a runtime context").submit_op( Fallocate { fd: fd.clone() }, |fallocate| { - opcode::Fallocate64::new(types::Fd(fallocate.fd.raw_fd()), len as _) - .offset64(offset as _) + opcode::Fallocate::new(types::Fd(fallocate.fd.raw_fd()), len as _) + .offset(offset as _) .mode(flags) .build() }, diff --git a/src/io/mod.rs b/src/io/mod.rs index 6985bdd3..1679928f 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -56,3 +56,5 @@ mod writev; mod writev_all; pub(crate) use writev_all::writev_at_all; + +mod uring_cmd; diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index df37722b..43afbf1e 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -53,6 +53,6 @@ impl Completable for SendZc { impl Updateable for SendZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more - self.bytes += *cqe.result.as_ref().unwrap() as usize; + self.bytes += cqe.result.unwrap_or_default() as usize; } } diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index e7da9fe5..719397c2 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -113,6 +113,6 @@ impl Completable for SendMsgZc { impl Updateable for SendMsgZc { fn update(&mut self, cqe: CqeResult) { // uring send_zc promises there will be no error on CQE's marked more - self.bytes += *cqe.result.as_ref().unwrap() as usize; + self.bytes += cqe.result.unwrap_or_default() as usize; } } diff --git a/src/io/uring_cmd.rs b/src/io/uring_cmd.rs new file mode 100644 index 00000000..904f6fb9 --- /dev/null +++ b/src/io/uring_cmd.rs @@ -0,0 +1,87 @@ +use crate::io::SharedFd; +use crate::runtime::driver::op::{Completable, CqeResult, Op}; +use crate::runtime::CONTEXT; +use std::io; + +pub(crate) struct UringCmd16 { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(dead_code)] + fd: SharedFd, + // Holds any required metadata stable for the lifetime of the operation + metadata: T, +} + +impl Op> { + /// A file/device-specific 16-byte command, akin (but not equivalent) to ioctl + pub(crate) fn uring_cmd16( + fd: &SharedFd, + cmd_op: u32, + cmd: [u8; 16], + metadata: T, + ) -> io::Result>> { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + UringCmd16 { + fd: fd.clone(), + metadata, + }, + |_| { + opcode::UringCmd16::new(types::Fd(fd.raw_fd()), cmd_op) + .cmd(cmd) + .build() + }, + ) + }) + } +} + +impl Completable for UringCmd16 { + type Output = (std::result::Result, T); + + fn complete(self, cqe: CqeResult) -> Self::Output { + (cqe.result, self.metadata) + } +} + +#[cfg(feature = "sqe128")] +pub(crate) struct UringCmd80 { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + #[allow(dead_code)] + fd: SharedFd, +} + +#[cfg(feature = "sqe128")] +impl Op { + /// A file/device-specific 80-byte command, akin (but not equivalent) to ioctl + pub(crate) fn uring_cmd80( + fd: &SharedFd, + cmd_op: u32, + cmd: [u8; 80], + ) -> io::Result> { + use io_uring::{opcode, types}; + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + UringCmd80 { fd: fd.clone() }, + |_| { + opcode::UringCmd80::new(types::Fd(fd.raw_fd()), cmd_op) + .cmd(cmd) + .build() + }, + ) + }) + } +} + +#[cfg(feature = "sqe128")] +impl Completable for UringCmd80 { + type Output = io::Result; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result + } +} diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index 38ee8a63..ef5b9d40 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -134,7 +134,7 @@ impl Op> { // So this wouldn't need to be a function. Just pass in the entry. |write| { opcode::Writev::new(types::Fd(write.fd.raw_fd()), iovs_ptr, iovs_len) - .offset64(offset as _) + .offset(offset as _) .build() }, ) diff --git a/src/lib.rs b/src/lib.rs index d1cc6e02..42b24987 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,6 +84,7 @@ pub use runtime::spawn; pub use runtime::Runtime; use crate::runtime::driver::op::Op; +use crate::runtime::driver::{CEntry, SEntry}; use std::future::Future; /// Starts an `io_uring` enabled Tokio runtime. @@ -154,7 +155,7 @@ pub fn start(future: F) -> F::Output { /// This function is provided to avoid requiring the user of this crate from /// having to use the io_uring crate as well. Refer to Builder::start example /// for its intended usage. -pub fn uring_builder() -> io_uring::Builder { +pub fn uring_builder() -> io_uring::Builder { io_uring::IoUring::builder() } @@ -163,7 +164,8 @@ pub fn uring_builder() -> io_uring::Builder { // #[derive(Clone, Default)] pub struct Builder { entries: u32, - urb: io_uring::Builder, + max_workers: [u32; 2], + urb: io_uring::Builder, } /// Constructs a [`Builder`] with default settings. @@ -175,6 +177,7 @@ pub struct Builder { pub fn builder() -> Builder { Builder { entries: 256, + max_workers: [0; 2], urb: io_uring::IoUring::builder(), } } @@ -191,11 +194,23 @@ impl Builder { self } + /// Get and/or set the limit for number of io_uring worker threads per NUMA + /// node. `bounded` holds the limit for bounded workers, which process I/O + /// operations expected to be bound in time, that is I/O on regular files or + /// block devices. While `unbounded` holds the limit for unbounded workers, + /// which carry out I/O operations that can never complete, for instance I/O + /// on sockets. Setting `None` leaves the default + pub fn max_workers(&mut self, bounded: Option, unbounded: Option) -> &mut Self { + self.max_workers = [bounded.unwrap_or_default(), unbounded.unwrap_or_default()]; + self + } + /// Replaces the default [`io_uring::Builder`], which controls the settings for the /// inner `io_uring` API. /// - /// Refer to the [`io_uring::Builder`] documentation for all the supported methods. - pub fn uring_builder(&mut self, b: &io_uring::Builder) -> &mut Self { + /// Refer to the Builder start method for an example. + /// Refer to the io_uring::builder documentation for all the supported methods. + pub fn uring_builder(&mut self, b: &io_uring::Builder) -> &mut Self { self.urb = b.clone(); self } diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index 115f780d..02e4e548 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -22,7 +22,7 @@ use std::task::{Context, Poll}; use crate::buf::fixed::FixedBuffers; use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable}; -use crate::runtime::driver::Driver; +use crate::runtime::driver::{Driver, SEntry}; #[derive(Clone)] pub(crate) struct Handle { @@ -67,10 +67,11 @@ impl Handle { self.inner.borrow_mut().submit_op_2(sqe) } - pub(crate) fn submit_op(&self, data: T, f: F) -> io::Result> + pub(crate) fn submit_op(&self, data: T, f: F) -> io::Result> where T: Completable, - F: FnOnce(&mut T) -> squeue::Entry, + A: Into, + F: FnOnce(&mut T) -> A, { self.inner.borrow_mut().submit_op(data, f, self.into()) } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 21d7de0b..d06d43b1 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -11,6 +11,16 @@ use std::{io, mem}; pub(crate) use handle::*; +#[cfg(not(feature = "sqe128"))] +pub(crate) type SEntry = squeue::Entry; +#[cfg(feature = "sqe128")] +pub(crate) type SEntry = squeue::Entry128; + +#[cfg(not(feature = "cqe32"))] +pub(crate) type CEntry = cqueue::Entry; +#[cfg(feature = "cqe32")] +pub(crate) type CEntry = cqueue::Entry32; + mod handle; pub(crate) mod op; @@ -19,7 +29,7 @@ pub(crate) struct Driver { ops: Ops, /// IoUring bindings - uring: IoUring, + uring: IoUring, /// Reference to the currently registered buffers. /// Ensures that the buffers are not dropped until @@ -40,6 +50,10 @@ impl Driver { pub(crate) fn new(b: &crate::Builder) -> io::Result { let uring = b.urb.build(b.entries)?; + uring + .submitter() + .register_iowq_max_workers(&mut b.max_workers.clone())?; + Ok(Driver { ops: Ops::new(), uring, @@ -97,9 +111,11 @@ impl Driver { &mut self, buffers: Rc>, ) -> io::Result<()> { - self.uring - .submitter() - .register_buffers(buffers.borrow().iovecs())?; + unsafe { + self.uring + .submitter() + .register_buffers(buffers.borrow().iovecs())?; + } self.fixed_buffers = Some(buffers); Ok(()) @@ -122,11 +138,15 @@ impl Driver { )) } - pub(crate) fn submit_op_2(&mut self, sqe: squeue::Entry) -> usize { + pub(crate) fn submit_op_2(&mut self, sqe: A) -> usize + where + A: Into, + { let index = self.ops.insert(); // Configure the SQE - let sqe = sqe.user_data(index as _); + let sqe: SEntry = sqe.into(); + let sqe: SEntry = sqe.user_data(index as _); // Push the new operation while unsafe { self.uring.submission().push(&sqe).is_err() } { @@ -137,7 +157,7 @@ impl Driver { index } - pub(crate) fn submit_op( + pub(crate) fn submit_op( &mut self, mut data: T, f: F, @@ -145,12 +165,14 @@ impl Driver { ) -> io::Result> where T: Completable, - F: FnOnce(&mut T) -> squeue::Entry, + A: Into, + F: FnOnce(&mut T) -> A, { let index = self.ops.insert(); // Configure the SQE - let sqe = f(&mut data).user_data(index as _); + let sqe: SEntry = f(&mut data).into(); + let sqe: SEntry = sqe.user_data(index as _); // Create the operation let op = Op::new(handle, data, index); @@ -413,7 +435,15 @@ impl Drop for Driver { while self .uring .submission() - .push(&AsyncCancel::new(id as u64).build().user_data(u64::MAX)) + .push( + // Conversion to configured squeue::EntryMarker is useless when + // sqe128 feature is disabled. + #[allow(clippy::useless_conversion)] + &AsyncCancel::new(id as u64) + .build() + .user_data(u64::MAX) + .into(), + ) .is_err() { self.uring @@ -484,9 +514,14 @@ impl Ops { self.lifecycle.remove(index); } - fn complete(&mut self, index: usize, cqe: cqueue::Entry) { + fn complete(&mut self, index: usize, cqe: A) + where + A: Into, + { let completions = &mut self.completions; - if self.lifecycle[index].complete(completions, cqe) { + let cqe: CEntry = cqe.into(); + + if self.lifecycle[index].complete(completions, cqe.into()) { self.lifecycle.remove(index); } } diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 32ba3e7a..ec6cd592 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -194,6 +194,19 @@ impl From for CqeResult { } } +impl From for CqeResult { + fn from(cqe: cqueue::Entry32) -> Self { + let res = cqe.result(); + let flags = cqe.flags(); + let result = if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + }; + CqeResult { result, flags } + } +} + impl Op { /// Create a new operation pub(super) fn new(driver: driver::WeakHandle, data: T, index: usize) -> Self { diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 6ec14d43..e0aa04cd 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -11,6 +11,8 @@ use tokio_uring::buf::fixed::FixedBufRegistry; use tokio_uring::buf::{BoundedBuf, BoundedBufMut}; use tokio_uring::fs::File; +use io_uring::{IoUring, Probe}; + #[path = "../src/future.rs"] #[allow(warnings)] mod future; @@ -26,6 +28,33 @@ async fn read_hello(file: &File) { assert_eq!(&buf[..n], HELLO); } +#[test] +fn uring_cmd16() { + // Check that io_uring::opcode::UringCmd16 is supported on the running kernel. + let entries = 8; + let ring = IoUring::new(entries).unwrap(); + let mut probe = Probe::new(); + ring.submitter().register_probe(&mut probe).unwrap(); + if probe.is_supported(io_uring::opcode::UringCmd16::CODE) { + tokio_uring::start(async { + let file = File::open("/dev/null").await.unwrap(); + let res = file.uring_cmd16(0, [0x00; 16]).await.unwrap(); + assert_eq!(res, 0); + }); + } +} + +#[cfg(feature = "sqe128")] +#[test] +fn uring_cmd80() { + tokio_uring::start(async { + let file = File::open("/dev/null").await.unwrap(); + let res = file.uring_cmd80(0, [0x00; 80]).await.unwrap(); + + assert_eq!(res, 0); + }); +} + #[test] fn basic_read() { tokio_uring::start(async {