Skip to content

Commit 4d5743a

Browse files
taiki-ecramertj
authored andcommitted
Add AsyncReadExt::read_to_string
1 parent b9ba5d4 commit 4d5743a

File tree

5 files changed

+141
-3
lines changed

5 files changed

+141
-3
lines changed

futures-util/src/io/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub use self::read_line::ReadLine;
6060
mod read_to_end;
6161
pub use self::read_to_end::ReadToEnd;
6262

63+
mod read_to_string;
64+
pub use self::read_to_string::ReadToString;
65+
6366
mod read_until;
6467
pub use self::read_until::ReadUntil;
6568

@@ -241,6 +244,33 @@ pub trait AsyncReadExt: AsyncRead {
241244
ReadToEnd::new(self, buf)
242245
}
243246

247+
/// Creates a future which will read all the bytes from this `AsyncRead`.
248+
///
249+
/// # Examples
250+
///
251+
/// ```
252+
/// #![feature(async_await)]
253+
/// # futures::executor::block_on(async {
254+
/// use futures::io::AsyncReadExt;
255+
/// use std::io::Cursor;
256+
///
257+
/// let mut reader = Cursor::new(&b"1234"[..]);
258+
/// let mut buffer = String::with_capacity(4);
259+
///
260+
/// reader.read_to_string(&mut buffer).await?;
261+
///
262+
/// assert_eq!(buffer, String::from("1234"));
263+
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
264+
/// ```
265+
fn read_to_string<'a>(
266+
&'a mut self,
267+
buf: &'a mut String,
268+
) -> ReadToString<'a, Self>
269+
where Self: Unpin,
270+
{
271+
ReadToString::new(self, buf)
272+
}
273+
244274
/// Helper method for splitting this read/write object into two halves.
245275
///
246276
/// The two halves returned implement the `AsyncRead` and `AsyncWrite`

futures-util/src/io/read_to_end.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl Drop for Guard<'_> {
3838
//
3939
// Because we're extending the buffer with uninitialized data for trusted
4040
// readers, we need to make sure to truncate that if any of this panics.
41-
fn read_to_end_internal<R: AsyncRead + ?Sized>(
41+
pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
4242
mut rd: Pin<&mut R>,
4343
cx: &mut Context<'_>,
4444
buf: &mut Vec<u8>,

futures-util/src/io/read_to_string.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use super::read_to_end::read_to_end_internal;
2+
use futures_core::future::Future;
3+
use futures_core::task::{Context, Poll};
4+
use futures_io::AsyncRead;
5+
use std::pin::Pin;
6+
use std::vec::Vec;
7+
use std::{io, mem, str};
8+
9+
/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method.
10+
#[derive(Debug)]
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct ReadToString<'a, R: ?Sized + Unpin> {
13+
reader: &'a mut R,
14+
buf: &'a mut String,
15+
bytes: Vec<u8>,
16+
}
17+
18+
impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}
19+
20+
impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
21+
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
22+
Self {
23+
reader,
24+
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
25+
buf,
26+
}
27+
}
28+
}
29+
30+
fn read_to_string_internal<R: AsyncRead + ?Sized>(
31+
reader: Pin<&mut R>,
32+
cx: &mut Context<'_>,
33+
buf: &mut String,
34+
bytes: &mut Vec<u8>,
35+
) -> Poll<io::Result<()>> {
36+
let ret = ready!(read_to_end_internal(reader, cx, bytes));
37+
if str::from_utf8(&bytes).is_err() {
38+
Poll::Ready(ret.and_then(|_| {
39+
Err(io::Error::new(
40+
io::ErrorKind::InvalidData,
41+
"stream did not contain valid UTF-8",
42+
))
43+
}))
44+
} else {
45+
debug_assert!(buf.is_empty());
46+
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
47+
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
48+
Poll::Ready(ret)
49+
}
50+
}
51+
52+
impl<A> Future for ReadToString<'_, A>
53+
where
54+
A: AsyncRead + ?Sized + Unpin,
55+
{
56+
type Output = io::Result<()>;
57+
58+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59+
let Self { reader, buf, bytes } = &mut *self;
60+
read_to_string_internal(Pin::new(reader), cx, buf, bytes)
61+
}
62+
}

futures/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@ pub mod io {
299299
pub use futures_util::io::{
300300
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
301301
BufReader, BufWriter, Close, CopyInto, CopyBufInto, Flush, Lines, Read,
302-
ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadUntil, ReadVectored, Seek,
303-
Window, Write, WriteAll, WriteHalf, WriteVectored,
302+
ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadToString, ReadUntil,
303+
ReadVectored, Seek, Window, Write, WriteAll, WriteHalf, WriteVectored,
304304
};
305305
}
306306

futures/tests/io_read_to_string.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use futures::executor::block_on;
2+
use futures::future::{Future, FutureExt};
3+
use futures::stream::{self, StreamExt, TryStreamExt};
4+
use futures::io::AsyncReadExt;
5+
use futures::task::Poll;
6+
use futures_test::io::AsyncReadTestExt;
7+
use futures_test::task::noop_context;
8+
use std::io::Cursor;
9+
10+
#[test]
11+
fn read_to_string() {
12+
let mut c = Cursor::new(&b""[..]);
13+
let mut v = String::new();
14+
assert!(block_on(c.read_to_string(&mut v)).is_ok());
15+
assert_eq!(v, "");
16+
17+
let mut c = Cursor::new(&b"1"[..]);
18+
let mut v = String::new();
19+
assert!(block_on(c.read_to_string(&mut v)).is_ok());
20+
assert_eq!(v, "1");
21+
22+
let mut c = Cursor::new(&b"\xff"[..]);
23+
let mut v = String::new();
24+
assert!(block_on(c.read_to_string(&mut v)).is_err());
25+
}
26+
27+
fn run<F: Future + Unpin>(mut f: F) -> F::Output {
28+
let mut cx = noop_context();
29+
loop {
30+
if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
31+
return x;
32+
}
33+
}
34+
}
35+
36+
#[test]
37+
fn interleave_pending() {
38+
let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
39+
.map(Ok)
40+
.into_async_read()
41+
.interleave_pending();
42+
43+
let mut v = String::new();
44+
assert!(run(buf.read_to_string(&mut v)).is_ok());
45+
assert_eq!(v, "12333");
46+
}

0 commit comments

Comments
 (0)