Skip to content

Commit 0446642

Browse files
ararogGeorge-Miao
andauthored
feat(io): add BytesFramed support (#749)
* feat: add BytesCodec, CapacityDelimiter, BytesFramed and BytesFramedExt trait * docs: added BytesFramed documentation, renamed CapacityDelimiter to NoopFramer. * refactor(io): removed BytesFramed, added new_bytes as a Frame implementation. * docs(io): fix NoopFramer api doc description * docs(io): fix BytesCoded api doc text Co-authored-by: Pop <gm@miao.dev> * refactor(io): added BytesFramed type alias back for better readability Co-authored-by: Pop <gm@miao.dev> * refactor(io): return Self and make it more readable Co-authored-by: Pop <gm@miao.dev> * refactor(io): remove unnecessary reserve Co-authored-by: Pop <gm@miao.dev> * style(io): some code formatting * style(io): fixed code formatting on api doc example * style: fixed lint issue * style: ommit this function arg * fix: clippy * fix: compio-io/tests/framed.rs --------- Co-authored-by: Pop <gm@miao.dev>
1 parent 3f774e8 commit 0446642

File tree

5 files changed

+171
-3
lines changed

5 files changed

+171
-3
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! [`Encoder`]/[`Decoder`] implementation with Bytes
2+
//!
3+
//! This module provides a codec implementation for bytes serialization and
4+
//! deserialization (noop per se).
5+
//!
6+
//! # Examples
7+
//!
8+
//! ```
9+
//! use compio_buf::{IoBuf, bytes::Bytes};
10+
//! use compio_io::framed::codec::{Decoder, Encoder, bytes::BytesCodec};
11+
//!
12+
//! let mut codec = BytesCodec::new();
13+
//! let data = Bytes::from("Hello, world!");
14+
//!
15+
//! // Encoding
16+
//! let mut buffer = Vec::new();
17+
//! codec.encode(data.clone(), &mut buffer).unwrap();
18+
//!
19+
//! // Decoding
20+
//! let decoded = codec.decode(&buffer.as_slice()).unwrap();
21+
//! assert_eq!(decoded, data);
22+
//! ```
23+
use std::io::{self, Write};
24+
25+
use compio_buf::{IoBuf, IoBufMut, Slice, bytes::Bytes};
26+
27+
use crate::framed::codec::{Decoder, Encoder};
28+
29+
/// A codec for bytes serialization and deserialization.
30+
///
31+
/// This codec can be used to write into and read from [`Bytes`].
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33+
pub struct BytesCodec;
34+
35+
impl BytesCodec {
36+
/// Creates a new `BytesCodec`.
37+
pub fn new() -> Self {
38+
Self {}
39+
}
40+
}
41+
42+
impl Default for BytesCodec {
43+
fn default() -> Self {
44+
Self::new()
45+
}
46+
}
47+
48+
impl<B: IoBufMut> Encoder<Bytes, B> for BytesCodec {
49+
type Error = io::Error;
50+
51+
fn encode(&mut self, item: Bytes, buf: &mut B) -> Result<(), Self::Error> {
52+
let mut writer = buf.as_writer();
53+
writer.write_all(&item)?;
54+
Ok(())
55+
}
56+
}
57+
58+
impl<B: IoBuf> Decoder<Bytes, B> for BytesCodec {
59+
type Error = io::Error;
60+
61+
fn decode(&mut self, buf: &Slice<B>) -> Result<Bytes, Self::Error> {
62+
let inner = buf.as_ref().to_vec();
63+
Ok(Bytes::from(inner))
64+
}
65+
}

compio-io/src/framed/codec/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::io;
55

66
use compio_buf::{IoBuf, IoBufMut, Slice};
77

8+
pub mod bytes;
9+
810
#[cfg(feature = "codec-serde-json")]
911
pub mod serde_json;
1012

compio-io/src/framed/frame.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,50 @@ impl<B: IoBufMut> Framer<B> for AnyDelimited<'_> {
226226
/// Delimiter that uses newline characters (`\n`) as delimiters.
227227
pub type LineDelimited = CharDelimited<'\n'>;
228228

229+
/// A framer that does nothing.
230+
///
231+
/// It simply reserves space in the buffer without adding any framing.
232+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
233+
pub struct NoopFramer {
234+
max_size: usize,
235+
}
236+
237+
impl Default for NoopFramer {
238+
fn default() -> Self {
239+
Self { max_size: 4096 }
240+
}
241+
}
242+
243+
impl NoopFramer {
244+
/// Creates a new `NoopFramer` framer.
245+
pub fn new() -> Self {
246+
Self::default()
247+
}
248+
249+
/// Returns the size of the capacity.
250+
pub fn max_size(&self) -> usize {
251+
self.max_size
252+
}
253+
}
254+
255+
impl<B: IoBufMut> Framer<B> for NoopFramer {
256+
fn enclose(&mut self, _: &mut B) {}
257+
258+
fn extract(&mut self, buf: &Slice<B>) -> io::Result<Option<Frame>> {
259+
if buf.is_empty() {
260+
return Ok(None);
261+
}
262+
263+
let len = if buf.len() < self.max_size {
264+
buf.len()
265+
} else {
266+
self.max_size
267+
};
268+
269+
Ok(Some(Frame::new(0, len, 0)))
270+
}
271+
}
272+
229273
#[cfg(test)]
230274
mod tests {
231275
use compio_buf::{IntoInner, IoBufMut};
@@ -248,6 +292,22 @@ mod tests {
248292
assert_eq!(payload.as_init(), b"hello");
249293
}
250294

295+
#[test]
296+
fn test_noop_framer() {
297+
let mut framer = NoopFramer::new();
298+
299+
let mut buf = Vec::from(b"hello");
300+
framer.enclose(&mut buf);
301+
assert_eq!(&buf.as_slice()[..5], b"hello");
302+
303+
let buf = buf.slice(..);
304+
let frame = framer.extract(&buf).unwrap().unwrap();
305+
let buf = buf.into_inner();
306+
assert_eq!(frame, Frame::new(0, 5, 0));
307+
let payload = frame.slice(buf);
308+
assert_eq!(payload.as_init(), b"hello");
309+
}
310+
251311
#[test]
252312
fn test_char_delimited() {
253313
let mut framer = CharDelimited::<'ℝ'>::new();

compio-io/src/framed/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,17 @@
55
66
use std::marker::PhantomData;
77

8-
use compio_buf::IoBufMut;
8+
use compio_buf::{IoBufMut, bytes::Bytes};
99
use futures_util::FutureExt;
1010

11-
use crate::{AsyncRead, framed::codec::Decoder, util::Splittable};
11+
use crate::{
12+
AsyncRead,
13+
framed::{
14+
codec::{Decoder, bytes::BytesCodec},
15+
frame::NoopFramer,
16+
},
17+
util::Splittable,
18+
};
1219

1320
pub mod codec;
1421
pub mod frame;
@@ -143,3 +150,19 @@ impl<C, F> Framed<(), (), C, F, (), (), ()> {
143150
}
144151
}
145152
}
153+
154+
type BytesFramed<R, W> = Framed<R, W, BytesCodec, NoopFramer, Bytes, Bytes>;
155+
156+
impl BytesFramed<(), ()> {
157+
/// Creates a new `Framed` with the given I/O object, codec, and framer with
158+
/// bytes as the input and output type.
159+
pub fn new_bytes() -> Self {
160+
Framed {
161+
read_state: read::State::empty(),
162+
write_state: write::State::empty(),
163+
codec: BytesCodec::new(),
164+
framer: NoopFramer::new(),
165+
types: PhantomData,
166+
}
167+
}
168+
}

compio-io/tests/framed.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55

66
use compio_buf::{BufResult, IoBuf, IoBufMut};
77
use compio_io::{
8-
AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt,
8+
AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt, BufReader,
99
framed::{Framed, codec::serde_json::SerdeJsonCodec, frame::LengthDelimited},
1010
};
1111
use futures_executor::block_on;
@@ -104,3 +104,21 @@ fn test_framed() {
104104
assert!(res.is_none());
105105
})
106106
}
107+
108+
#[test]
109+
fn test_bytes_framed() {
110+
block_on(async {
111+
let buf = b"Hello, world!".to_vec();
112+
let r = BufReader::with_capacity(5, Cursor::new(buf));
113+
114+
let mut framed = Framed::new_bytes().with_reader(r);
115+
let mut s = String::new();
116+
while let Some(result) = framed.next().await {
117+
let bytes = result.unwrap();
118+
s.push_str(str::from_utf8(&bytes).unwrap());
119+
}
120+
assert_eq!(s, "Hello, world!");
121+
let res = framed.next().await;
122+
assert!(res.is_none());
123+
})
124+
}

0 commit comments

Comments
 (0)