-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Implement uring in AsyncWrite for fs::File
#7713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
d9d2ddc
093070a
6856207
863a69c
ea4040d
226fcf8
937da2d
c30a4dd
b91836c
58e9b0b
9504e53
daa0532
8cf4ca3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -107,7 +107,60 @@ struct Inner { | |
| #[derive(Debug)] | ||
| enum State { | ||
| Idle(Option<Buf>), | ||
| Busy(JoinHandle<(Operation, Buf)>), | ||
| Busy(JoinHandleInner<(Operation, Buf)>), | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| enum JoinHandleInner<T> { | ||
| Blocking(JoinHandle<T>), | ||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| Async(BoxedOp<T>), | ||
| } | ||
|
|
||
| cfg_io_uring! { | ||
| struct BoxedOp<T>(Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>); | ||
|
||
|
|
||
| impl<T> std::fmt::Debug for BoxedOp<T> { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| // format of BoxedFuture(T::type_name()) | ||
| f.debug_tuple("BoxedFuture") | ||
| .field(&std::any::type_name::<T>()) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
||
| impl<T> Future for BoxedOp<T> { | ||
| type Output = T; | ||
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| self.0.as_mut().poll(cx) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Future for JoinHandleInner<(Operation, Buf)> { | ||
| type Output = io::Result<(Operation, Buf)>; | ||
|
|
||
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| match self.get_mut() { | ||
| JoinHandleInner::Blocking(ref mut jh) => Pin::new(jh) | ||
| .poll(cx) | ||
| .map_err(|_| io::Error::new(io::ErrorKind::Other, "background task failed")), | ||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| JoinHandleInner::Async(ref mut jh) => Pin::new(jh).poll(cx).map(Ok), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
|
|
@@ -399,7 +452,7 @@ impl File { | |
|
|
||
| let std = self.std.clone(); | ||
|
|
||
| inner.state = State::Busy(spawn_blocking(move || { | ||
| inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| std.set_len(size)) | ||
| } else { | ||
|
|
@@ -409,7 +462,7 @@ impl File { | |
|
|
||
| // Return the result as a seek | ||
| (Operation::Seek(res), buf) | ||
| })); | ||
| }))); | ||
|
|
||
| let (op, buf) = match inner.state { | ||
| State::Idle(_) => unreachable!(), | ||
|
|
@@ -613,13 +666,14 @@ impl AsyncRead for File { | |
| let std = me.std.clone(); | ||
|
|
||
| let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size); | ||
| inner.state = State::Busy(spawn_blocking(move || { | ||
| // SAFETY: the `Read` implementation of `std` does not | ||
| // read from the buffer it is borrowing and correctly | ||
| // reports the length of the data written into the buffer. | ||
| let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; | ||
| (Operation::Read(res), buf) | ||
| })); | ||
| inner.state = | ||
| State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { | ||
| // SAFETY: the `Read` implementation of `std` does not | ||
| // read from the buffer it is borrowing and correctly | ||
| // reports the length of the data written into the buffer. | ||
| let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; | ||
| (Operation::Read(res), buf) | ||
| }))); | ||
| } | ||
| State::Busy(ref mut rx) => { | ||
| let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; | ||
|
|
@@ -685,10 +739,10 @@ impl AsyncSeek for File { | |
|
|
||
| let std = me.std.clone(); | ||
|
|
||
| inner.state = State::Busy(spawn_blocking(move || { | ||
| inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { | ||
| let res = (&*std).seek(pos); | ||
| (Operation::Seek(res), buf) | ||
| })); | ||
| }))); | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
@@ -753,20 +807,10 @@ impl AsyncWrite for File { | |
| let n = buf.copy_from(src, me.max_buf_size); | ||
| let std = me.std.clone(); | ||
|
|
||
| let blocking_task_join_handle = spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
| #[allow(unused_mut)] | ||
| let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?; | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| { | ||
| io::Error::new(io::ErrorKind::Other, "background task failed") | ||
| })?; | ||
|
|
||
| inner.state = State::Busy(blocking_task_join_handle); | ||
| inner.state = State::Busy(task_join_handle); | ||
|
|
||
| return Poll::Ready(Ok(n)); | ||
| } | ||
|
|
@@ -824,20 +868,88 @@ impl AsyncWrite for File { | |
| let n = buf.copy_from_bufs(bufs, me.max_buf_size); | ||
| let std = me.std.clone(); | ||
|
|
||
| let blocking_task_join_handle = spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
| #[allow(unused_mut)] | ||
| let mut data = Some((std, buf)); | ||
|
|
||
| let mut task_join_handle = None; | ||
|
|
||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| { | ||
| use crate::runtime::Handle; | ||
|
|
||
| // Handle not present in some tests? | ||
| if let Ok(handle) = Handle::try_current() { | ||
| if handle.inner.driver().io().check_and_init()? { | ||
| task_join_handle = { | ||
| use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; | ||
|
|
||
| let (std, mut buf) = data.take().unwrap(); | ||
| if let Some(seek) = seek { | ||
| // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset | ||
| // seeking only modifies kernel metadata and does not block, so we can do it here | ||
| (&*std).seek(seek).map_err(|e| { | ||
| io::Error::new( | ||
| e.kind(), | ||
| format!("failed to seek before write: {e}"), | ||
| ) | ||
| })?; | ||
| } | ||
|
|
||
| let mut fd: ArcFd = std; | ||
| let handle = BoxedOp(Box::pin(async move { | ||
| loop { | ||
| let op = Op::write_at(fd, buf, u64::MAX); | ||
| let (r, _buf, _fd) = op.await; | ||
| buf = _buf; | ||
| fd = _fd; | ||
| match r { | ||
| Ok(0) => { | ||
| break ( | ||
| Operation::Write(Err( | ||
| io::ErrorKind::WriteZero.into(), | ||
| )), | ||
| buf, | ||
| ); | ||
| } | ||
| Ok(_) if buf.is_empty() => { | ||
|
||
| break (Operation::Write(Ok(())), buf); | ||
| } | ||
| Ok(_) => continue, // more to write | ||
| Err(e) => break (Operation::Write(Err(e)), buf), | ||
| } | ||
| } | ||
| })); | ||
|
|
||
| Some(JoinHandleInner::Async(handle)) | ||
| }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| { | ||
| io::Error::new(io::ErrorKind::Other, "background task failed") | ||
| })?; | ||
| if let Some((std, mut buf)) = data { | ||
| task_join_handle = Some(JoinHandleInner::Blocking( | ||
| spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| { | ||
| io::Error::new(io::ErrorKind::Other, "background task failed") | ||
| })?, | ||
| )); | ||
| } | ||
|
|
||
| inner.state = State::Busy(blocking_task_join_handle); | ||
| inner.state = State::Busy(task_join_handle.unwrap()); | ||
|
|
||
| return Poll::Ready(Ok(n)); | ||
| } | ||
|
|
@@ -985,6 +1097,92 @@ impl Inner { | |
| Operation::Seek(_) => Poll::Ready(Ok(())), | ||
| } | ||
| } | ||
|
|
||
| fn poll_write_inner( | ||
| &self, | ||
| data: (Arc<StdFile>, Buf), | ||
| seek: Option<SeekFrom>, | ||
| ) -> io::Result<JoinHandleInner<(Operation, Buf)>> { | ||
| #[allow(unused_mut)] | ||
| let mut data = Some(data); | ||
| let mut task_join_handle = None; | ||
|
|
||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| { | ||
| use crate::runtime::Handle; | ||
|
|
||
| // Handle not present in some tests? | ||
| if let Ok(handle) = Handle::try_current() { | ||
| if handle.inner.driver().io().check_and_init()? { | ||
| task_join_handle = { | ||
| use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; | ||
|
|
||
| let (std, mut buf) = data.take().unwrap(); | ||
| if let Some(seek) = seek { | ||
| // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset | ||
| // seeking only modifies kernel metadata and does not block, so we can do it here | ||
| (&*std).seek(seek).map_err(|e| { | ||
| io::Error::new( | ||
| e.kind(), | ||
| format!("failed to seek before write: {e}"), | ||
| ) | ||
| })?; | ||
| } | ||
|
|
||
| let mut fd: ArcFd = std; | ||
| let handle = BoxedOp(Box::pin(async move { | ||
| loop { | ||
| let op = Op::write_at(fd, buf, u64::MAX); | ||
| let (r, _buf, _fd) = op.await; | ||
| buf = _buf; | ||
| fd = _fd; | ||
| match r { | ||
| Ok(0) => { | ||
| break ( | ||
| Operation::Write(Err(io::ErrorKind::WriteZero.into())), | ||
| buf, | ||
| ); | ||
| } | ||
| Ok(_) if buf.is_empty() => { | ||
| break (Operation::Write(Ok(())), buf); | ||
| } | ||
| Ok(_) => continue, // more to write | ||
| Err(e) => break (Operation::Write(Err(e)), buf), | ||
| } | ||
| } | ||
| })); | ||
|
|
||
| Some(JoinHandleInner::Async(handle)) | ||
| }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if let Some((std, mut buf)) = data { | ||
| task_join_handle = { | ||
| let handle = spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "background task failed"))?; | ||
|
|
||
| Some(JoinHandleInner::Blocking(handle)) | ||
| }; | ||
| } | ||
|
|
||
| Ok(task_join_handle.unwrap()) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an idea:
What if you perform the io-uring logic in a
tokio::spawntask? That way, you can use JoinHandle in both cases.I think it will simply code a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Blocking(JoinHandle)iscrate::blocking::JoinHandlenottokio::runtime::task::join::JoinHandleso some enum would be required anyway, and at that point might as well just have this kind of a boxed futureThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a re-export of the same thing.
tokio/tokio/src/blocking.rs
Line 9 in 4714ca1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One reason that I suggest this is that the io-uring logic performs multiple writes in a loop to write the entire buffer, which requires the end-user to await flush or write on the file to make progress. However, the implementation we have today does not require the user to interact with the file to make progress - it happens in the background automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, I'll change it.
Out of curiosity, general rule is that for a future to keep making progress, user needs to keep polling it. I feel like not having to do that specifically for
fs::Fileis just an implementation detail, and users shouldn't rely on that always being the case. So i guess my question is how much is it worth preserving old behavior like this when implementing new features, if it comes at the cost of (small) performance overhead? relevant XKCDThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to adjust mocks to accommodate non-blocking spawning https://github.com/tokio-rs/tokio/actions/runs/19464918802/job/55697554687 b91836c - can you doublecheck im doing it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of thing is always a hard question. Though I would say that I'm pretty sure people rely on writes continuing even if the file is dropped, and it would be very hard to change that.