Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ jobs:
toolchain: ["nightly", "beta", "stable"]
setup:
- os: "ubuntu-20.04"
- os: "ubuntu-20.04"
features: "polling" # fusion
- os: "ubuntu-22.04"
features: "io-uring-sqe128,io-uring-cqe32,io-uring-socket"
- os: "ubuntu-22.04"
features: "io-uring-buf-ring"
- os: "ubuntu-22.04"
features: "polling,io-uring-buf-ring" # fusion & buf-ring
- os: "ubuntu-20.04"
features: "polling,ring"
no_default_features: true
Expand Down
4 changes: 4 additions & 0 deletions compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ windows-sys = { workspace = true, features = [
# Linux specific dependencies
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = { version = "0.7.0", optional = true }
io_uring_buf_ring = { version = "0.2.0", optional = true }
polling = { version = "3.3.0", optional = true }
paste = { workspace = true }
slab = { workspace = true, optional = true }

# Other platform dependencies
[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies]
Expand All @@ -75,11 +77,13 @@ cfg_aliases = { workspace = true }

[features]
default = ["io-uring"]
io-uring = ["dep:io-uring", "dep:io_uring_buf_ring", "dep:slab"]
polling = ["dep:polling"]

io-uring-sqe128 = []
io-uring-cqe32 = []
io-uring-socket = []
io-uring-buf-ring = []

iocp-global = []
iocp-wait-packet = []
Expand Down
2 changes: 2 additions & 0 deletions compio-driver/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ fn main() {
freebsd: { target_os = "freebsd" },
solarish: { any(target_os = "illumos", target_os = "solaris") },
aio: { any(freebsd, solarish) },
buf_ring: { all(target_os = "linux", feature = "io-uring", feature = "io-uring-buf-ring") },
fusion: { all(target_os = "linux", feature = "io-uring", feature = "polling") }
}
}
221 changes: 221 additions & 0 deletions compio-driver/src/buffer_pool/fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
//! The fallback buffer pool. It is backed by a [`VecDeque`] of [`Vec<u8>`].
//! An [`OwnedBuffer`] is selected when the op is created. It keeps a strong
//! reference to the buffer pool. The [`BorrowedBuffer`] is created after the op
//! returns successfully.

use std::{
borrow::{Borrow, BorrowMut},
cell::RefCell,
collections::VecDeque,
fmt::{Debug, Formatter},
io,
mem::ManuallyDrop,
ops::{Deref, DerefMut},
rc::Rc,
};

use compio_buf::{IntoInner, IoBuf, IoBufMut, SetBufInit, Slice};

struct BufferPoolInner {
buffers: RefCell<VecDeque<Vec<u8>>>,
}

impl BufferPoolInner {
pub(crate) fn add_buffer(&self, mut buffer: Vec<u8>) {
buffer.clear();
self.buffers.borrow_mut().push_back(buffer)
}
}

/// Buffer pool
///
/// A buffer pool to allow user no need to specify a specific buffer to do the
/// IO operation
pub struct BufferPool {
inner: Rc<BufferPoolInner>,
}

impl Debug for BufferPool {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufferPool").finish_non_exhaustive()
}
}

impl BufferPool {
pub(crate) fn new(buffer_len: u16, buffer_size: usize) -> Self {
// To match the behavior of io-uring, extend the number of buffers.
let buffers = (0..buffer_len.next_power_of_two())
.map(|_| Vec::with_capacity(buffer_size))
.collect();

Self {
inner: Rc::new(BufferPoolInner {
buffers: RefCell::new(buffers),
}),
}
}

/// Select an [`OwnedBuffer`] when the op creates.
pub(crate) fn get_buffer(&self, len: usize) -> io::Result<OwnedBuffer> {
let buffer = self
.inner
.buffers
.borrow_mut()
.pop_front()
.ok_or_else(|| io::Error::other("buffer ring has no available buffer"))?;
let len = if len == 0 {
buffer.capacity()
} else {
buffer.capacity().min(len)
};
Ok(OwnedBuffer::new(buffer.slice(..len), self.inner.clone()))
}

/// Return the buffer to the pool.
pub(crate) fn add_buffer(&self, buffer: Vec<u8>) {
self.inner.add_buffer(buffer);
}

/// ## Safety
/// * `len` should be valid.
pub(crate) unsafe fn create_proxy(&self, mut slice: OwnedBuffer, len: usize) -> BorrowedBuffer {
unsafe {
slice.set_buf_init(len);
}
BorrowedBuffer::new(slice.into_inner(), self)
}
}

pub(crate) struct OwnedBuffer {
buffer: ManuallyDrop<Slice<Vec<u8>>>,
pool: ManuallyDrop<Rc<BufferPoolInner>>,
}

impl OwnedBuffer {
fn new(buffer: Slice<Vec<u8>>, pool: Rc<BufferPoolInner>) -> Self {
Self {
buffer: ManuallyDrop::new(buffer),
pool: ManuallyDrop::new(pool),
}
}
}

unsafe impl IoBuf for OwnedBuffer {
fn as_buf_ptr(&self) -> *const u8 {
self.buffer.as_buf_ptr()
}

fn buf_len(&self) -> usize {
self.buffer.buf_len()
}

fn buf_capacity(&self) -> usize {
self.buffer.buf_capacity()
}
}

unsafe impl IoBufMut for OwnedBuffer {
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
self.buffer.as_buf_mut_ptr()
}
}

impl SetBufInit for OwnedBuffer {
unsafe fn set_buf_init(&mut self, len: usize) {
self.buffer.set_buf_init(len);
}
}

impl Drop for OwnedBuffer {
fn drop(&mut self) {
// Safety: `take` is called only once here.
self.pool
.add_buffer(unsafe { ManuallyDrop::take(&mut self.buffer) }.into_inner());
// Safety: `drop` is called only once here.
unsafe { ManuallyDrop::drop(&mut self.pool) };
}
}

impl IntoInner for OwnedBuffer {
type Inner = Slice<Vec<u8>>;

fn into_inner(mut self) -> Self::Inner {
// Safety: `self` is forgotten in this method.
let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
// The buffer is taken, we only need to drop the Rc.
// Safety: `self` is forgotten in this method.
unsafe { ManuallyDrop::drop(&mut self.pool) };
std::mem::forget(self);
buffer
}
}

/// Buffer borrowed from buffer pool
///
/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the
/// filled data
pub struct BorrowedBuffer<'a> {
buffer: ManuallyDrop<Slice<Vec<u8>>>,
pool: &'a BufferPool,
}

impl<'a> BorrowedBuffer<'a> {
pub(crate) fn new(buffer: Slice<Vec<u8>>, pool: &'a BufferPool) -> Self {
Self {
buffer: ManuallyDrop::new(buffer),
pool,
}
}
}

impl Debug for BorrowedBuffer<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BorrowedBuffer").finish_non_exhaustive()
}
}

impl Drop for BorrowedBuffer<'_> {
fn drop(&mut self) {
// Safety: `take` is called only once here.
let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
self.pool.add_buffer(buffer.into_inner());
}
}

impl Deref for BorrowedBuffer<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.buffer.deref()
}
}

impl DerefMut for BorrowedBuffer<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer.deref_mut()
}
}

impl AsRef<[u8]> for BorrowedBuffer<'_> {
fn as_ref(&self) -> &[u8] {
self.deref()
}
}

impl AsMut<[u8]> for BorrowedBuffer<'_> {
fn as_mut(&mut self) -> &mut [u8] {
self.deref_mut()
}
}

impl Borrow<[u8]> for BorrowedBuffer<'_> {
fn borrow(&self) -> &[u8] {
self.deref()
}
}

impl BorrowMut<[u8]> for BorrowedBuffer<'_> {
fn borrow_mut(&mut self) -> &mut [u8] {
self.deref_mut()
}
}
Loading
Loading