diff --git a/compio-io/Cargo.toml b/compio-io/Cargo.toml index b781e394..c607851a 100644 --- a/compio-io/Cargo.toml +++ b/compio-io/Cargo.toml @@ -10,7 +10,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] -compio-buf = { workspace = true, features = ["arrayvec", "bytes"] } +compio-buf = { workspace = true, features = ["arrayvec"] } futures-util = { workspace = true, features = ["sink"] } paste = { workspace = true } pin-project-lite = { workspace = true, optional = true } @@ -36,10 +36,11 @@ serde = { version = "1.0.219", features = ["derive"] } futures-executor = "0.3.30" [features] -default = [] +default = ["bytes"] compat = ["futures-util/io", "dep:pin-project-lite"] sync = [] ancillary = ["dep:cfg-if", "dep:libc", "dep:windows-sys"] +bytes = ["compio-buf/bytes"] # Codecs # Serde json codec diff --git a/compio-io/src/framed/codec/mod.rs b/compio-io/src/framed/codec/mod.rs index 81637925..2429e9d0 100644 --- a/compio-io/src/framed/codec/mod.rs +++ b/compio-io/src/framed/codec/mod.rs @@ -5,6 +5,7 @@ use std::io; use compio_buf::{IoBuf, IoBufMut, Slice}; +#[cfg(feature = "bytes")] pub mod bytes; #[cfg(feature = "codec-serde-json")] diff --git a/compio-io/src/framed/frame.rs b/compio-io/src/framed/frame.rs index a4a78713..bcc56895 100644 --- a/compio-io/src/framed/frame.rs +++ b/compio-io/src/framed/frame.rs @@ -2,10 +2,7 @@ use std::io; -use compio_buf::{ - IoBuf, IoBufMut, Slice, - bytes::{Buf, BufMut}, -}; +use compio_buf::{IoBuf, IoBufMut, Slice}; /// An extracted frame #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -87,6 +84,9 @@ impl Default for LengthDelimited { } impl LengthDelimited { + /// Max allowed length of `len` field + const MAX_LFL: usize = 8; + /// Creates a new `LengthDelimited` framer. pub fn new() -> Self { Self::default() @@ -98,7 +98,16 @@ impl LengthDelimited { } /// Sets the length of the length field in bytes. + /// + /// # Panics + /// + /// This will panic if `len_field_len` is too long (8 bytes is the maximum + /// allowed for now). pub fn set_length_field_len(mut self, len_field_len: usize) -> Self { + assert!( + len_field_len <= Self::MAX_LFL, + "Length field cannot take over 8 bytes" + ); self.length_field_len = len_field_len; self } @@ -124,13 +133,16 @@ impl Framer for LengthDelimited { unsafe { buf.advance_to(len + self.length_field_len) }; let slice = buf.as_mut_slice(); + let lfl = self.length_field_len; // Write the length at the beginning - if self.length_field_is_big_endian { - (&mut slice[0..self.length_field_len]).put_uint(len as _, self.length_field_len); + let len = len as u64; + let len_bytes = if self.length_field_is_big_endian { + &len.to_be_bytes()[Self::MAX_LFL - lfl..] } else { - (&mut slice[0..self.length_field_len]).put_uint_le(len as _, self.length_field_len); - } + &len.to_le_bytes()[..lfl] + }; + slice[..lfl].copy_from_slice(len_bytes); } fn extract(&mut self, buf: &Slice) -> io::Result> { @@ -138,15 +150,19 @@ impl Framer for LengthDelimited { return Ok(None); } - let mut buf = buf.as_init(); + let buf = buf.as_init(); + let lfl = self.length_field_len; + let mut len_bytes = [0; Self::MAX_LFL]; let len = if self.length_field_is_big_endian { - buf.get_uint(self.length_field_len) + len_bytes[Self::MAX_LFL - lfl..].copy_from_slice(&buf[..lfl]); + u64::from_be_bytes(len_bytes) } else { - buf.get_uint_le(self.length_field_len) + len_bytes[..lfl].copy_from_slice(&buf[..lfl]); + u64::from_le_bytes(len_bytes) } as usize; - if buf.len() < len { + if buf.len() < self.length_field_len + len { return Ok(None); } diff --git a/compio-io/src/framed/mod.rs b/compio-io/src/framed/mod.rs index 7005204d..48b67d5e 100644 --- a/compio-io/src/framed/mod.rs +++ b/compio-io/src/framed/mod.rs @@ -5,15 +5,12 @@ use std::marker::PhantomData; -use compio_buf::{IoBufMut, bytes::Bytes}; +use compio_buf::IoBufMut; use futures_util::FutureExt; use crate::{ AsyncRead, - framed::{ - codec::{Decoder, bytes::BytesCodec}, - frame::NoopFramer, - }, + framed::{codec::Decoder, frame::NoopFramer}, util::Splittable, }; @@ -151,17 +148,43 @@ impl Framed<(), (), C, F, (), (), ()> { } } -/// A type alias for a `Framed` with bytes as the input and output type. -pub type BytesFramed = Framed; - +/// [`Framed`] that bridges [`AsyncRead`]/[`AsyncWrite`] with [`Bytes`]. +/// +/// This is useful when you want to read/write raw bytes into/from [`Bytes`] +/// without any additional framing or de/encoding. +/// +/// See also: [`ReaderStream`] and [`ReaderStream`]. +/// +/// [`Bytes`]: compio_buf::bytes::Bytes +/// [`AsyncWrite`]: crate::AsyncWrite +/// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html +/// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html +#[cfg(feature = "bytes")] +pub type BytesFramed = Framed< + R, + W, + codec::bytes::BytesCodec, + NoopFramer, + compio_buf::bytes::Bytes, + compio_buf::bytes::Bytes, +>; + +#[cfg(feature = "bytes")] impl BytesFramed<(), ()> { - /// Creates a new `Framed` with the given I/O object, codec, and framer with - /// bytes as the input and output type. + /// Creates a new [`BytesFramed`] that bridges [`AsyncRead`]/[`AsyncWrite`] + /// with [`Bytes`]. + /// + /// See also: [`ReaderStream`] and [`StreamReader`]. + /// + /// [`Bytes`]: compio_buf::bytes::Bytes + /// [`AsyncWrite`]: crate::AsyncWrite + /// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html + /// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html pub fn new_bytes() -> Self { Framed { read_state: read::State::empty(), write_state: write::State::empty(), - codec: BytesCodec::new(), + codec: codec::bytes::BytesCodec::new(), framer: NoopFramer::new(), types: PhantomData, }