Skip to content

Commit 3d5dba9

Browse files
authored
refactor(driver): record multishot results in ops (#748)
* refactor(driver): record multishot results in ops * docs(driver,iour): fixme * docs(driver,iour): grammar * fix(driver,iour): rename
1 parent 98f63c8 commit 3d5dba9

File tree

13 files changed

+372
-110
lines changed

13 files changed

+372
-110
lines changed

compio-driver/src/buffer_pool/fallback.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use std::{
1515
};
1616

1717
use compio_buf::{IntoInner, IoBuf, IoBufMut, SetLen, Slice};
18+
#[cfg(not(fusion))]
19+
pub use {BufferPool as FallbackBufferPool, OwnedBuffer as FallbackOwnedBuffer};
1820

1921
struct BufferPoolInner {
2022
buffers: RefCell<VecDeque<Vec<u8>>>,

compio-driver/src/buffer_pool/fusion.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::{
55
};
66

77
pub use fallback::BufferPool as FallbackBufferPool;
8-
pub(crate) use fallback::OwnedBuffer;
8+
pub(crate) use fallback::OwnedBuffer as FallbackOwnedBuffer;
99
pub use iour::BufferPool as IoUringBufferPool;
10+
pub(crate) use iour::OwnedBuffer as IoUringOwnedBuffer;
1011

1112
use super::{fallback, iour};
1213

compio-driver/src/buffer_pool/iour.rs

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,32 @@ use std::{
77
fmt::{Debug, Formatter},
88
io,
99
ops::{Deref, DerefMut},
10+
rc::Rc,
1011
};
1112

1213
use io_uring_buf_ring::IoUringBufRing;
14+
#[cfg(not(fusion))]
15+
pub use {BufferPool as IoUringBufferPool, OwnedBuffer as IoUringOwnedBuffer};
16+
17+
struct BufferPoolInner {
18+
buf_ring: IoUringBufRing<Vec<u8>>,
19+
}
20+
21+
impl BufferPoolInner {
22+
fn reuse_buffer(&self, buffer_id: u16) {
23+
// SAFETY: 0 is always valid length. We just want to get the buffer once and
24+
// return it immediately.
25+
unsafe { self.buf_ring.get_buf(buffer_id, 0) };
26+
}
27+
}
1328

1429
/// Buffer pool
1530
///
1631
/// A buffer pool to allow user no need to specify a specific buffer to do the
1732
/// IO operation
33+
#[derive(Clone)]
1834
pub struct BufferPool {
19-
buf_ring: IoUringBufRing<Vec<u8>>,
35+
inner: Rc<BufferPoolInner>,
2036
}
2137

2238
impl Debug for BufferPool {
@@ -27,33 +43,67 @@ impl Debug for BufferPool {
2743

2844
impl BufferPool {
2945
pub(crate) fn new(buf_ring: IoUringBufRing<Vec<u8>>) -> Self {
30-
Self { buf_ring }
46+
Self {
47+
inner: Rc::new(BufferPoolInner { buf_ring }),
48+
}
3149
}
3250

3351
pub(crate) fn buffer_group(&self) -> u16 {
34-
self.buf_ring.buffer_group()
52+
self.inner.buf_ring.buffer_group()
53+
}
54+
55+
pub(crate) fn into_inner(self) -> Result<IoUringBufRing<Vec<u8>>, Self> {
56+
Rc::try_unwrap(self.inner)
57+
.map(|inner| inner.buf_ring)
58+
.map_err(|inner| Self { inner })
3559
}
3660

37-
pub(crate) fn into_inner(self) -> IoUringBufRing<Vec<u8>> {
38-
self.buf_ring
61+
#[doc(hidden)]
62+
pub unsafe fn get_buffer(&self, buffer_id: u16, available_len: usize) -> OwnedBuffer {
63+
OwnedBuffer {
64+
pool: self.inner.clone(),
65+
params: Some((buffer_id, available_len)),
66+
}
3967
}
4068

4169
/// ## Safety
42-
/// * `available_len` should be the returned value from the op.
43-
pub(crate) unsafe fn get_buffer(
70+
/// * `len` should be the returned value from the op.
71+
pub(crate) unsafe fn create_proxy(
4472
&self,
45-
buffer_id: u16,
46-
available_len: usize,
73+
slice: OwnedBuffer,
74+
len: usize,
4775
) -> io::Result<BorrowedBuffer<'_>> {
48-
unsafe { self.buf_ring.get_buf(buffer_id, available_len) }
76+
let Some((buffer_id, available_len)) = slice.leak() else {
77+
return Err(io::Error::other("no buffer selected"));
78+
};
79+
debug_assert_eq!(available_len, len);
80+
unsafe { self.inner.buf_ring.get_buf(buffer_id, available_len) }
4981
.map(BorrowedBuffer)
5082
.ok_or_else(|| io::Error::other(format!("cannot find buffer {buffer_id}")))
5183
}
5284

5385
pub(crate) fn reuse_buffer(&self, buffer_id: u16) {
54-
// SAFETY: 0 is always valid length. We just want to get the buffer once and
55-
// return it immediately.
56-
unsafe { self.buf_ring.get_buf(buffer_id, 0) };
86+
self.inner.reuse_buffer(buffer_id);
87+
}
88+
}
89+
90+
#[doc(hidden)]
91+
pub struct OwnedBuffer {
92+
pool: Rc<BufferPoolInner>,
93+
params: Option<(u16, usize)>,
94+
}
95+
96+
impl OwnedBuffer {
97+
pub fn leak(mut self) -> Option<(u16, usize)> {
98+
self.params.take()
99+
}
100+
}
101+
102+
impl Drop for OwnedBuffer {
103+
fn drop(&mut self) {
104+
if let Some((buffer_id, _)) = self.params {
105+
self.pool.reuse_buffer(buffer_id);
106+
}
57107
}
58108
}
59109

compio-driver/src/key.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,12 @@ impl ErasedKey {
279279
pub(crate) fn set_result(&self, res: io::Result<usize>) {
280280
let mut this = self.borrow();
281281
#[cfg(io_uring)]
282-
if let Ok(res) = res
283-
&& this.extra.is_iour()
284282
{
285-
unsafe {
286-
Pin::new_unchecked(&mut this.op).set_result(res);
283+
let this = &mut *this;
284+
if this.extra.is_iour() {
285+
unsafe {
286+
Pin::new_unchecked(&mut this.op).set_result(&res, &this.extra);
287+
}
287288
}
288289
}
289290
if let PushEntry::Pending(Some(w)) =

compio-driver/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ impl Proactor {
233233
instrument!(compio_log::Level::DEBUG, "pop", ?key);
234234
if key.has_result() {
235235
self.cancel.remove(&key);
236-
self.driver.cleanup_multishot(&key);
237236
PushEntry::Ready(key.take_result())
238237
} else {
239238
PushEntry::Pending(key)
@@ -253,7 +252,6 @@ impl Proactor {
253252
instrument!(compio_log::Level::DEBUG, "pop", ?key);
254253
if key.has_result() {
255254
self.cancel.remove(&key);
256-
self.driver.cleanup_multishot(&key);
257255
let extra = key.swap_extra(self.default_extra());
258256
let res = key.take_result();
259257
PushEntry::Ready((res, extra))

compio-driver/src/op.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -495,10 +495,10 @@ pub(crate) mod managed {
495495
use socket2::SockAddr;
496496

497497
use super::{Read, ReadAt, Recv, RecvFrom};
498-
use crate::{AsFd, BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
498+
use crate::{AsFd, BorrowedBuffer, BufferPool, FallbackOwnedBuffer, TakeBuffer};
499499

500500
fn take_buffer(
501-
slice: OwnedBuffer,
501+
slice: FallbackOwnedBuffer,
502502
buffer_pool: &BufferPool,
503503
result: io::Result<usize>,
504504
) -> io::Result<BorrowedBuffer<'_>> {
@@ -516,7 +516,7 @@ pub(crate) mod managed {
516516
/// Read a file at specified position into managed buffer.
517517
pub struct ReadManagedAt<S> {
518518
#[pin]
519-
pub(crate) op: ReadAt<OwnedBuffer, S>,
519+
pub(crate) op: ReadAt<FallbackOwnedBuffer, S>,
520520
}
521521
}
522522

@@ -549,7 +549,7 @@ pub(crate) mod managed {
549549
/// Read a file into managed buffer.
550550
pub struct ReadManaged<S> {
551551
#[pin]
552-
pub(crate) op: Read<OwnedBuffer, S>,
552+
pub(crate) op: Read<FallbackOwnedBuffer, S>,
553553
}
554554
}
555555

@@ -585,7 +585,7 @@ pub(crate) mod managed {
585585
/// use [`ReadManaged`].
586586
pub struct RecvManaged<S> {
587587
#[pin]
588-
pub(crate) op: Recv<OwnedBuffer, S>,
588+
pub(crate) op: Recv<FallbackOwnedBuffer, S>,
589589
}
590590
}
591591

@@ -618,7 +618,7 @@ pub(crate) mod managed {
618618
/// Receive data and source address into managed buffer.
619619
pub struct RecvFromManaged<S: AsFd> {
620620
#[pin]
621-
pub(crate) op: RecvFrom<OwnedBuffer, S>,
621+
pub(crate) op: RecvFrom<FallbackOwnedBuffer, S>,
622622
}
623623
}
624624

compio-driver/src/sys/fusion/mod.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,6 @@ impl Driver {
157157
FuseDriver::IoUring(driver) => driver.pop_multishot(key),
158158
}
159159
}
160-
161-
pub fn cleanup_multishot(&mut self, key: &ErasedKey) {
162-
match &mut self.fuse {
163-
FuseDriver::Poll(driver) => driver.cleanup_multishot(key),
164-
FuseDriver::IoUring(driver) => driver.cleanup_multishot(key),
165-
}
166-
}
167160
}
168161

169162
impl AsRawFd for Driver {

compio-driver/src/sys/fusion/op.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,26 @@ macro_rules! op {
9797
fn create_entry(self: std::pin::Pin<&mut Self>) -> OpEntry {
9898
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry()
9999
}
100+
101+
fn create_entry_fallback(self: std::pin::Pin<&mut Self>) -> OpEntry {
102+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry_fallback()
103+
}
104+
105+
fn call_blocking(self: std::pin::Pin<&mut Self>) -> std::io::Result<usize> {
106+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.call_blocking()
107+
}
108+
109+
unsafe fn set_result(self: std::pin::Pin<&mut Self>, result: &std::io::Result<usize>, extra: &crate::Extra) {
110+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).set_result(result, extra) }
111+
}
112+
113+
unsafe fn push_multishot(self: std::pin::Pin<&mut Self>, result: std::io::Result<usize>, extra: crate::Extra) {
114+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).push_multishot(result, extra) }
115+
}
116+
117+
fn pop_multishot(self: std::pin::Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
118+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.pop_multishot()
119+
}
100120
}
101121
};
102122
}
@@ -202,6 +222,26 @@ macro_rules! mop {
202222
fn create_entry(self: std::pin::Pin<&mut Self>) -> OpEntry {
203223
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry()
204224
}
225+
226+
fn create_entry_fallback(self: std::pin::Pin<&mut Self>) -> OpEntry {
227+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry_fallback()
228+
}
229+
230+
fn call_blocking(self: std::pin::Pin<&mut Self>) -> std::io::Result<usize> {
231+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.call_blocking()
232+
}
233+
234+
unsafe fn set_result(self: std::pin::Pin<&mut Self>, result: &std::io::Result<usize>, extra: &crate::Extra) {
235+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).set_result(result, extra) }
236+
}
237+
238+
unsafe fn push_multishot(self: std::pin::Pin<&mut Self>, result: std::io::Result<usize>, extra: crate::Extra) {
239+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).push_multishot(result, extra) }
240+
}
241+
242+
fn pop_multishot(self: std::pin::Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
243+
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.pop_multishot()
244+
}
205245
}
206246
};
207247
}

compio-driver/src/sys/iocp/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,6 @@ impl Driver {
502502
pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
503503
None
504504
}
505-
506-
pub fn cleanup_multishot(&mut self, _: &ErasedKey) {}
507505
}
508506

509507
impl AsRawFd for Driver {

0 commit comments

Comments
 (0)