Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions compio-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
compio-buf = { workspace = true, features = ["arrayvec", "bytes"] }
compio-buf = { workspace = true, features = ["arrayvec"] }
futures-util = { workspace = true, features = ["sink"] }
paste = { workspace = true }
pin-project-lite = { workspace = true, optional = true }
Expand All @@ -36,10 +36,11 @@ serde = { version = "1.0.219", features = ["derive"] }
futures-executor = "0.3.30"

[features]
default = []
default = ["bytes"]
compat = ["futures-util/io", "dep:pin-project-lite"]
sync = []
ancillary = ["dep:cfg-if", "dep:libc", "dep:windows-sys"]
bytes = ["compio-buf/bytes"]

# Codecs
# Serde json codec
Expand Down
1 change: 1 addition & 0 deletions compio-io/src/framed/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::io;

use compio_buf::{IoBuf, IoBufMut, Slice};

#[cfg(feature = "bytes")]
pub mod bytes;

#[cfg(feature = "codec-serde-json")]
Expand Down
39 changes: 27 additions & 12 deletions compio-io/src/framed/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

use std::io;

use compio_buf::{
IoBuf, IoBufMut, Slice,
bytes::{Buf, BufMut},
};
use compio_buf::{IoBuf, IoBufMut, Slice};

/// An extracted frame
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -87,6 +84,9 @@ impl Default for LengthDelimited {
}

impl LengthDelimited {
/// Max allowed length of `len` field
const MAX_LFL: usize = 8;

/// Creates a new `LengthDelimited` framer.
pub fn new() -> Self {
Self::default()
Expand All @@ -98,7 +98,16 @@ impl LengthDelimited {
}

/// Sets the length of the length field in bytes.
///
/// # Panics
///
/// This will panic if `len_field_len` is too long (8 bytes is the maximum
/// allowed for now).
pub fn set_length_field_len(mut self, len_field_len: usize) -> Self {
assert!(
len_field_len <= Self::MAX_LFL,
"Length field cannot take over 8 bytes"
);
self.length_field_len = len_field_len;
self
}
Expand All @@ -124,29 +133,35 @@ impl<B: IoBufMut> Framer<B> for LengthDelimited {
unsafe { buf.advance_to(len + self.length_field_len) };

let slice = buf.as_mut_slice();
let lfl = self.length_field_len;

// Write the length at the beginning
if self.length_field_is_big_endian {
(&mut slice[0..self.length_field_len]).put_uint(len as _, self.length_field_len);
let len_bytes = if self.length_field_is_big_endian {
&len.to_be_bytes()[Self::MAX_LFL - lfl..]
} else {
(&mut slice[0..self.length_field_len]).put_uint_le(len as _, self.length_field_len);
}
&len.to_le_bytes()[..lfl]
};
slice[..lfl].copy_from_slice(len_bytes);
}

fn extract(&mut self, buf: &Slice<B>) -> io::Result<Option<Frame>> {
if buf.len() < self.length_field_len {
return Ok(None);
}

let mut buf = buf.as_init();
let buf = buf.as_init();
let lfl = self.length_field_len;
let mut len_bytes = [0; Self::MAX_LFL];

let len = if self.length_field_is_big_endian {
buf.get_uint(self.length_field_len)
len_bytes[Self::MAX_LFL - lfl..].copy_from_slice(&buf[..lfl]);
u64::from_be_bytes(len_bytes)
} else {
buf.get_uint_le(self.length_field_len)
len_bytes[..lfl].copy_from_slice(&buf[..lfl]);
u64::from_le_bytes(len_bytes)
} as usize;

if buf.len() < len {
if buf.len() < self.length_field_len + len {
return Ok(None);
}

Expand Down
45 changes: 34 additions & 11 deletions compio-io/src/framed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

use std::marker::PhantomData;

use compio_buf::{IoBufMut, bytes::Bytes};
use compio_buf::IoBufMut;
use futures_util::FutureExt;

use crate::{
AsyncRead,
framed::{
codec::{Decoder, bytes::BytesCodec},
frame::NoopFramer,
},
framed::{codec::Decoder, frame::NoopFramer},
util::Splittable,
};

Expand Down Expand Up @@ -151,17 +148,43 @@ impl<C, F> Framed<(), (), C, F, (), (), ()> {
}
}

/// A type alias for a `Framed` with bytes as the input and output type.
pub type BytesFramed<R, W> = Framed<R, W, BytesCodec, NoopFramer, Bytes, Bytes>;

/// [`Framed`] that bridges [`AsyncRead`]/[`AsyncWrite`] with [`Bytes`].
///
/// This is useful when you want to read/write raw bytes into/from [`Bytes`]
/// without any additional framing or de/encoding.
///
/// See also: [`ReaderStream`] and [`ReaderStream`].
///
/// [`Bytes`]: compio_buf::bytes::Bytes
/// [`AsyncWrite`]: crate::AsyncWrite
/// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html
/// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html
#[cfg(feature = "bytes")]
pub type BytesFramed<R, W> = Framed<
R,
W,
codec::bytes::BytesCodec,
NoopFramer,
compio_buf::bytes::Bytes,
compio_buf::bytes::Bytes,
>;

#[cfg(feature = "bytes")]
impl BytesFramed<(), ()> {
/// Creates a new `Framed` with the given I/O object, codec, and framer with
/// bytes as the input and output type.
/// Creates a new [`BytesFramed`] that bridges [`AsyncRead`]/[`AsyncWrite`]
/// with [`Bytes`].
///
/// See also: [`ReaderStream`] and [`StreamReader`].
///
/// [`Bytes`]: compio_buf::bytes::Bytes
/// [`AsyncWrite`]: crate::AsyncWrite
/// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html
/// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html
pub fn new_bytes() -> Self {
Framed {
read_state: read::State::empty(),
write_state: write::State::empty(),
codec: BytesCodec::new(),
codec: codec::bytes::BytesCodec::new(),
framer: NoopFramer::new(),
types: PhantomData,
}
Expand Down
Loading