From be596a956aeebfd5830ba882e0c57771d197acfe Mon Sep 17 00:00:00 2001 From: Denis Davydov Date: Sat, 13 Sep 2025 22:44:38 +0100 Subject: [PATCH] fs: support io_uring in fs::metadata (statx) --- tokio/Cargo.toml | 4 ++ tokio/src/fs/metadata.rs | 20 ++++++++ tokio/src/io/uring/metadata.rs | 92 ++++++++++++++++++++++++++++++++++ tokio/src/io/uring/mod.rs | 2 + tokio/src/runtime/driver/op.rs | 13 +++-- tokio/tests/fs_metadata.rs | 32 ++++++++++++ tokio/tests/fs_uring.rs | 5 ++ tokio/tokio-shim/Cargo.toml | 13 +++++ tokio/tokio-shim/LICENSE | 21 ++++++++ tokio/tokio-shim/src/lib.rs | 56 +++++++++++++++++++++ 10 files changed, 253 insertions(+), 5 deletions(-) create mode 100644 tokio/src/io/uring/metadata.rs create mode 100644 tokio/tests/fs_metadata.rs create mode 100644 tokio/tokio-shim/Cargo.toml create mode 100644 tokio/tokio-shim/LICENSE create mode 100644 tokio/tokio-shim/src/lib.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 52085e9e3a1..588c9bd615a 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -108,6 +108,10 @@ io-uring = { version = "0.7.6", default-features = false } libc = { version = "0.2.168" } mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"] } slab = "0.4.9" +tokio_shim = { version = "0.0.0", path = "tokio-shim" } + +[target.'cfg(all(tokio_uring, target_os = "linux"))'.dev-dependencies] +serial_test = "2" # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. diff --git a/tokio/src/fs/metadata.rs b/tokio/src/fs/metadata.rs index 21ebe4f786b..37c425f0550 100644 --- a/tokio/src/fs/metadata.rs +++ b/tokio/src/fs/metadata.rs @@ -4,6 +4,11 @@ use std::fs::Metadata; use std::io; use std::path::Path; +cfg_tokio_uring! { + #[cfg(target_env = "gnu")] + use crate::runtime::driver::op::Op; +} + /// Given a path, queries the file system to get information about a file, /// directory, etc. /// @@ -42,5 +47,20 @@ use std::path::Path; /// ``` pub async fn metadata(path: impl AsRef) -> io::Result { let path = path.as_ref().to_owned(); + #[cfg(all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux", + target_env = "gnu" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init()? { + return Op::metadata(path.as_ref())?.await; + } + } + asyncify(|| std::fs::metadata(path)).await } diff --git a/tokio/src/io/uring/metadata.rs b/tokio/src/io/uring/metadata.rs new file mode 100644 index 00000000000..7735b6dcc1d --- /dev/null +++ b/tokio/src/io/uring/metadata.rs @@ -0,0 +1,92 @@ +use super::utils::cstr; +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use io_uring::{opcode, types}; +use std::fmt::Formatter; +use std::{ + ffi::{CString, OsStr}, + fmt, io, + os::unix::ffi::OsStrExt, + path::Path, +}; +use tokio_shim::metadata_with_statx; + +#[derive(Debug)] +pub(crate) struct Metadata { + /// This fields will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + path: CString, + file_attr: HeapStatx, +} + +struct HeapStatx(Box); + +impl HeapStatx { + fn fmt_ts(ts: libc::statx_timestamp) -> String { + let sec = ts.tv_sec; + let nsec = ts.tv_nsec; + format!("{sec}.{nsec:09}") + } +} + +impl fmt::Debug for HeapStatx { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let s: &libc::statx = &self.0; + + f.debug_struct("HeapStatx") + .field("stx_mask", &format_args!("0x{:x}", s.stx_mask)) + .field("stx_blksize", &s.stx_blksize) + .field("stx_attributes", &format_args!("0x{:x}", s.stx_attributes)) + .field("stx_nlink", &s.stx_nlink) + .field("stx_uid", &s.stx_uid) + .field("stx_gid", &s.stx_gid) + .field("stx_mode", &format_args!("0o{:o}", s.stx_mode)) + .field("stx_ino", &s.stx_ino) + .field("stx_size", &s.stx_size) + .field("stx_blocks", &s.stx_blocks) + .field( + "stx_attributes_mask", + &format_args!("0x{:x}", s.stx_attributes_mask), + ) + .field("stx_atime", &HeapStatx::fmt_ts(s.stx_atime)) + .field("stx_btime", &HeapStatx::fmt_ts(s.stx_btime)) + .field("stx_ctime", &HeapStatx::fmt_ts(s.stx_ctime)) + .finish() + } +} + +impl Completable for Metadata { + type Output = std::fs::Metadata; + fn complete(self, cqe: CqeResult) -> io::Result { + let path = Path::new(OsStr::from_bytes(self.path.as_bytes())); + + metadata_with_statx(path, cqe.result? as i32, self.file_attr.0.as_ref()) + } +} + +impl Cancellable for Metadata { + fn cancel(self) -> CancelData { + CancelData::Metadata(self) + } +} + +impl Op { + pub(crate) fn metadata(path: &Path) -> io::Result { + let path = cstr(path)?; + let mut file_attr = HeapStatx(Box::new(unsafe { std::mem::zeroed() })); + let statx_buffer: *mut libc::statx = file_attr.0.as_mut(); + + let sqe = opcode::Statx::new( + types::Fd(libc::AT_FDCWD), + path.as_ptr(), + statx_buffer.cast(), + ) + .flags(libc::AT_STATX_SYNC_AS_STAT) + .mask(libc::STATX_BASIC_STATS | libc::STATX_BTIME) + .build(); + + // SAFETY: parameters of the entry, such as `path` and `file_attr`, are valid + // until this operation completes. + let op = unsafe { Op::new(sqe, Metadata { path, file_attr }) }; + Ok(op) + } +} diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index e5ac85af604..77c5d1f92df 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,2 +1,4 @@ +#[cfg(target_env = "gnu")] +pub(crate) mod metadata; pub(crate) mod open; pub(crate) mod utils; diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 94afe163a13..aa6c886c5a1 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,3 +1,5 @@ +#[cfg(target_env = "gnu")] +use crate::io::uring::metadata::Metadata; use crate::io::uring::open::Open; use crate::runtime::Handle; use io_uring::cqueue; @@ -9,13 +11,14 @@ use std::task::Poll; use std::task::Waker; use std::{io, mem}; +// This field isn't accessed directly, but it holds cancellation data, +// so `#[allow(dead_code)]` is needed. +#[allow(dead_code)] #[derive(Debug)] pub(crate) enum CancelData { - Open( - // This field isn't accessed directly, but it holds cancellation data, - // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] Open, - ), + #[cfg(target_env = "gnu")] + Metadata(Metadata), + Open(Open), } #[derive(Debug)] diff --git a/tokio/tests/fs_metadata.rs b/tokio/tests/fs_metadata.rs new file mode 100644 index 00000000000..4bf906983a6 --- /dev/null +++ b/tokio/tests/fs_metadata.rs @@ -0,0 +1,32 @@ +#![warn(rust_2018_idioms)] +#![cfg(all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux", + target_env = "gnu" +))] + +use std::io::Write; +use tempfile::NamedTempFile; + +use tokio::fs; + +const HELLO: &[u8] = b"hello world..."; + +use std::os::linux::fs::MetadataExt; + +#[tokio::test] +async fn metadata() { + let mut tempfile = NamedTempFile::new().unwrap(); + tempfile.write_all(HELLO).unwrap(); + + let got = fs::metadata(tempfile.path()).await.unwrap(); + let expected = std::fs::metadata(tempfile.path()).unwrap(); + + assert_eq!(got.len(), expected.len()); + assert_eq!(got.created().unwrap(), expected.created().unwrap()); + assert_eq!(got.modified().unwrap(), expected.modified().unwrap()); + assert_eq!(got.st_size(), expected.st_size()); + assert_eq!(got.st_uid(), expected.st_uid()); +} diff --git a/tokio/tests/fs_uring.rs b/tokio/tests/fs_uring.rs index 04ec62ca35e..11cecae7e49 100644 --- a/tokio/tests/fs_uring.rs +++ b/tokio/tests/fs_uring.rs @@ -40,6 +40,7 @@ fn rt_combinations() -> Vec Runtime>> { } #[test] +#[serial_test::serial] fn shutdown_runtime_while_performing_io_uring_ops() { fn run(rt: Runtime) { let (tx, rx) = mpsc::channel(); @@ -78,7 +79,10 @@ fn shutdown_runtime_while_performing_io_uring_ops() { } } +// This test opens too many temp files (that's the point of it) +// but this interfier with other tests which need open temp files as well #[test] +#[serial_test::serial] fn open_many_files() { fn run(rt: Runtime) { const NUM_FILES: usize = 512; @@ -105,6 +109,7 @@ fn open_many_files() { } #[tokio::test] +#[serial_test::serial] async fn cancel_op_future() { let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); diff --git a/tokio/tokio-shim/Cargo.toml b/tokio/tokio-shim/Cargo.toml new file mode 100644 index 00000000000..d083c79cf6a --- /dev/null +++ b/tokio/tokio-shim/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tokio_shim" +version = "0.0.0" +publish = false +edition = "2021" + +[dependencies] +libc = { version = "0.2.168" } + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(tokio_uring)', +] } diff --git a/tokio/tokio-shim/LICENSE b/tokio/tokio-shim/LICENSE new file mode 100644 index 00000000000..f0dbcf4b45d --- /dev/null +++ b/tokio/tokio-shim/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Tokio Contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tokio/tokio-shim/src/lib.rs b/tokio/tokio-shim/src/lib.rs new file mode 100644 index 00000000000..60b5a56a985 --- /dev/null +++ b/tokio/tokio-shim/src/lib.rs @@ -0,0 +1,56 @@ +#![cfg(all(tokio_uring, target_os = "linux", target_env = "gnu"))] + +use libc::{c_char, c_int, c_uint, c_void, dlsym, RTLD_NEXT}; +use std::cell::Cell; +use std::path::Path; +use std::ptr; +use std::sync::Once; + +thread_local! { + static STATX_VALUE: Cell> = const { Cell::new(None) }; +} + +pub fn metadata_with_statx( + path: &Path, + rcode: i32, + statx: *const libc::statx, +) -> std::io::Result { + STATX_VALUE.set(Some((rcode, statx))); + std::fs::metadata(path) +} + +type StatxFn = unsafe extern "C" fn(c_int, *const c_char, c_int, c_uint, *mut libc::statx) -> c_int; + +fn real_statx() -> StatxFn { + static mut REAL: *mut c_void = ptr::null_mut(); + static INIT: Once = Once::new(); + + #[allow(clippy::manual_c_str_literals)] + INIT.call_once(|| unsafe { + REAL = dlsym(RTLD_NEXT, b"statx\0".as_ptr() as *const _); + if REAL.is_null() { + panic!("could not find statx symbol via dlsym"); + } + }); + + unsafe { std::mem::transmute::<*mut c_void, StatxFn>(REAL) } +} + +// ugly hack to get real std::fs::Metadata type +// we override statx() symbol +// so that std::fs::metadata will call our version instead of performing actual syscall +#[no_mangle] +pub(crate) unsafe extern "C" fn statx( + fd: c_int, + pathname: *const c_char, + flags: c_int, + mask: c_uint, + statxbuf: *mut libc::statx, +) -> c_int { + if let Some((result, buffer)) = STATX_VALUE.take() { + *statxbuf = *buffer; + return result; + } + + real_statx()(fd, pathname, flags, mask, statxbuf) +}