Skip to content

Commit 8e61c3d

Browse files
ararogGeorge-Miao
andauthored
feat(io): allow create a stream out of AsyncRead for BytesFramed (#767)
* fix(io): make also bytes method on AsyncReadExt/AsyncWriteExt feature gated * feat(io): allow create a stream out of AsyncRead for BytesFramed * test(io): removed file stream test * refactor(io): renamed methods to read_only and write_only * refactor(fs): removed unused imports * test(fs): added unit test for file streaming * feat: more generic {read,write}_only * fix: test --------- Co-authored-by: George Miao <gm@miao.dev>
1 parent b5ec94a commit 8e61c3d

File tree

3 files changed

+128
-3
lines changed

3 files changed

+128
-3
lines changed

compio-fs/tests/file.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::io::prelude::*;
22

33
use compio_fs::{File, OpenOptions};
4-
use compio_io::{AsyncReadAtExt, AsyncWriteAt, AsyncWriteAtExt};
4+
use compio_io::{AsyncReadAtExt, AsyncReadExt, AsyncWriteAt, AsyncWriteAtExt};
5+
use futures_util::StreamExt;
56
use tempfile::NamedTempFile;
67

78
async fn setlen_run(file: &File, size: u64) {
@@ -43,6 +44,22 @@ async fn metadata() {
4344
assert_eq!(size, std_meta.len());
4445
}
4546

47+
#[compio_macros::test]
48+
async fn file_stream() {
49+
let mut tempfile = tempfile();
50+
tempfile.write_all(HELLO).unwrap();
51+
52+
let file = File::open(tempfile.path()).await.unwrap();
53+
let cursor = std::io::Cursor::new(file);
54+
let mut file = cursor.read_only().bytes();
55+
let mut s = String::new();
56+
while let Some(result) = file.next().await {
57+
let chunk = result.unwrap();
58+
s.push_str(str::from_utf8(&chunk).unwrap());
59+
}
60+
assert_eq!(s, String::from_utf8_lossy(HELLO));
61+
}
62+
4663
const HELLO: &[u8] = b"hello world...";
4764

4865
async fn read_hello(file: &File) {

compio-io/src/read/ext.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,16 +201,43 @@ pub trait AsyncReadExt: AsyncRead {
201201
Framed::new(codec, framer).with_duplex(self)
202202
}
203203

204-
#[cfg(feature = "bytes")]
205204
/// Convenience method to create a [`BytesFramed`] reader/writter
206205
/// out of a splittable.
206+
#[cfg(feature = "bytes")]
207207
fn bytes(self) -> BytesFramed<Self::ReadHalf, Self::WriteHalf>
208208
where
209209
Self: Splittable + Sized,
210210
{
211211
BytesFramed::new_bytes().with_duplex(self)
212212
}
213213

214+
/// Create a [`Splittable`] that uses `Self` as [`ReadHalf`] and `()` as
215+
/// [`WriteHalf`].
216+
///
217+
/// This is useful for creating framed sink with only a reader,
218+
/// using the [`AsyncReadExt::framed`] or [`AsyncReadExt::bytes`]
219+
/// method, which require a [`Splittable`] to work.
220+
///
221+
/// # Examples
222+
///
223+
/// ```rust,ignore
224+
/// use compio_io::{AsyncReadExt, framed::BytesFramed};
225+
///
226+
/// let mut file_bytes = file.read_only().bytes();
227+
/// while let Some(Ok(bytes)) = file_bytes.next().await {
228+
/// // process bytes
229+
/// }
230+
/// ```
231+
///
232+
/// [`ReadHalf`]: Splittable::ReadHalf
233+
/// [`WriteHalf`]: Splittable::WriteHalf
234+
fn read_only(self) -> ReadOnly<Self>
235+
where
236+
Self: Sized,
237+
{
238+
ReadOnly(self)
239+
}
240+
214241
/// Creates an adaptor which reads at most `limit` bytes from it.
215242
///
216243
/// This function returns a new instance of `AsyncRead` which will read
@@ -313,3 +340,29 @@ pub trait AsyncReadAtExt: AsyncReadAt {
313340
}
314341

315342
impl<A: AsyncReadAt + ?Sized> AsyncReadAtExt for A {}
343+
344+
/// An adaptor which implements [`Splittable`] for any [`AsyncRead`], with the
345+
/// write half being `()`.
346+
///
347+
/// This can be used to create a framed stream with only a reader, using
348+
/// the [`AsyncReadExt::framed`] or [`AsyncReadExt::bytes`] method.
349+
pub struct ReadOnly<R>(pub R);
350+
351+
impl<R: AsyncRead> AsyncRead for ReadOnly<R> {
352+
async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
353+
self.0.read(buf).await
354+
}
355+
356+
async fn read_vectored<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
357+
self.0.read_vectored(buf).await
358+
}
359+
}
360+
361+
impl<R> Splittable for ReadOnly<R> {
362+
type ReadHalf = R;
363+
type WriteHalf = ();
364+
365+
fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
366+
(self.0, ())
367+
}
368+
}

compio-io/src/write/ext.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,41 @@ pub trait AsyncWriteExt: AsyncWrite {
132132
Framed::new(codec, framer).with_duplex(self)
133133
}
134134

135-
#[cfg(feature = "bytes")]
136135
/// Convenience method to create a [`BytesFramed`] reader/writer
137136
/// out of a splittable.
137+
#[cfg(feature = "bytes")]
138138
fn bytes(self) -> BytesFramed<Self::ReadHalf, Self::WriteHalf>
139139
where
140140
Self: Splittable + Sized,
141141
{
142142
BytesFramed::new_bytes().with_duplex(self)
143143
}
144144

145+
/// Create a [`Splittable`] that uses `Self` as [`WriteHalf`] and `()` as
146+
/// [`ReadHalf`].
147+
///
148+
/// This is useful for creating framed sink with only a writer,
149+
/// using the [`AsyncWriteExt::framed`] or [`AsyncWriteExt::bytes`]
150+
/// method, which require a [`Splittable`] to work.
151+
///
152+
/// # Examples
153+
///
154+
/// ```rust,ignore
155+
/// use compio_io::{AsyncWriteExt, framed::BytesFramed};
156+
///
157+
/// let mut file_bytes = file.write_only().bytes();
158+
/// file_bytes.send(Bytes::from("hello world")).await?;
159+
/// ```
160+
///
161+
/// [`ReadHalf`]: Splittable::ReadHalf
162+
/// [`WriteHalf`]: Splittable::WriteHalf
163+
fn write_only(self) -> WriteOnly<Self>
164+
where
165+
Self: Sized,
166+
{
167+
WriteOnly(self)
168+
}
169+
145170
write_scalar!(u8, to_be_bytes, to_le_bytes);
146171
write_scalar!(u16, to_be_bytes, to_le_bytes);
147172
write_scalar!(u32, to_be_bytes, to_le_bytes);
@@ -186,3 +211,33 @@ pub trait AsyncWriteAtExt: AsyncWriteAt {
186211
}
187212

188213
impl<A: AsyncWriteAt + ?Sized> AsyncWriteAtExt for A {}
214+
215+
/// An adaptor which implements [`Splittable`] for any [`AsyncWrite`], with the
216+
/// read half being `()`.
217+
///
218+
/// This can be used to create a framed sink with only a writer, using
219+
/// the [`AsyncWriteExt::framed`] or [`AsyncWriteExt::bytes`] method.
220+
pub struct WriteOnly<W>(pub W);
221+
222+
impl<W: AsyncWrite> AsyncWrite for WriteOnly<W> {
223+
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
224+
self.0.write(buf).await
225+
}
226+
227+
async fn flush(&mut self) -> IoResult<()> {
228+
self.0.flush().await
229+
}
230+
231+
async fn shutdown(&mut self) -> IoResult<()> {
232+
self.0.shutdown().await
233+
}
234+
}
235+
236+
impl<W> Splittable for WriteOnly<W> {
237+
type ReadHalf = ();
238+
type WriteHalf = W;
239+
240+
fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
241+
((), self.0)
242+
}
243+
}

0 commit comments

Comments
 (0)