Skip to content

Commit 701a3a3

Browse files
committed
Add assertion that we do not read past EOF
1 parent eb1d0cc commit 701a3a3

File tree

3 files changed

+196
-9
lines changed

3 files changed

+196
-9
lines changed

tests/utils/impls.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ pub mod futures {
1313
pub mod bufread {
1414
pub use futures::io::AsyncBufRead;
1515

16-
use crate::utils::InputStream;
16+
use crate::utils::{InputStream, TrackEof};
1717
use futures::stream::{StreamExt as _, TryStreamExt as _};
1818

1919
pub fn from(input: &InputStream) -> impl AsyncBufRead {
2020
// By using the stream here we ensure that each chunk will require a separate
2121
// read/poll_fill_buf call to process to help test reading multiple chunks.
22-
input.stream().map(Ok).into_async_read()
22+
TrackEof::new(input.stream().map(Ok).into_async_read())
2323
}
2424
}
2525

@@ -100,13 +100,13 @@ pub mod tokio_02 {
100100
pub mod bufread {
101101
pub use tokio_02::io::AsyncBufRead;
102102

103-
use crate::utils::InputStream;
103+
use crate::utils::{InputStream, TrackEof};
104104
use tokio_02::io::stream_reader;
105105

106106
pub fn from(input: &InputStream) -> impl AsyncBufRead {
107107
// By using the stream here we ensure that each chunk will require a separate
108108
// read/poll_fill_buf call to process to help test reading multiple chunks.
109-
stream_reader(input.bytes_05_stream())
109+
TrackEof::new(stream_reader(input.bytes_05_stream()))
110110
}
111111
}
112112

@@ -169,14 +169,14 @@ pub mod tokio_02 {
169169
#[cfg(feature = "tokio-03")]
170170
pub mod tokio_03 {
171171
pub mod bufread {
172-
use crate::utils::InputStream;
172+
use crate::utils::{InputStream, TrackEof};
173173
pub use tokio_03::io::AsyncBufRead;
174174
use tokio_util_04::io::StreamReader;
175175

176176
pub fn from(input: &InputStream) -> impl AsyncBufRead {
177177
// By using the stream here we ensure that each chunk will require a separate
178178
// read/poll_fill_buf call to process to help test reading multiple chunks.
179-
StreamReader::new(input.bytes_05_stream())
179+
TrackEof::new(StreamReader::new(input.bytes_05_stream()))
180180
}
181181
}
182182

@@ -239,7 +239,7 @@ pub mod tokio_03 {
239239
#[cfg(feature = "tokio")]
240240
pub mod tokio {
241241
pub mod bufread {
242-
use crate::utils::InputStream;
242+
use crate::utils::{InputStream, TrackEof};
243243
use bytes::Bytes;
244244
use futures::stream::StreamExt;
245245
pub use tokio::io::AsyncBufRead;
@@ -248,7 +248,9 @@ pub mod tokio {
248248
pub fn from(input: &InputStream) -> impl AsyncBufRead {
249249
// By using the stream here we ensure that each chunk will require a separate
250250
// read/poll_fill_buf call to process to help test reading multiple chunks.
251-
StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok))
251+
TrackEof::new(StreamReader::new(
252+
input.stream().map(Bytes::from).map(std::io::Result::Ok),
253+
))
252254
}
253255
}
254256

tests/utils/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ mod tokio_03_ext;
88
#[cfg(feature = "tokio")]
99
mod tokio_ext;
1010
mod track_closed;
11+
mod track_eof;
1112
#[macro_use]
1213
mod test_cases;
1314

1415
pub mod algos;
1516
pub mod impls;
1617

17-
pub use self::{input_stream::InputStream, track_closed::TrackClosed};
18+
pub use self::{input_stream::InputStream, track_closed::TrackClosed, track_eof::TrackEof};
1819
pub use async_compression::Level;
1920
pub use futures::{executor::block_on, pin_mut, stream::Stream};
2021
pub use std::{future::Future, io::Result, iter::FromIterator, pin::Pin};

tests/utils/track_eof.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
#[cfg_attr(not(feature = "all-implementations"), allow(unused))]
2+
use std::{
3+
io::Result,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
};
7+
8+
pub struct TrackEof<R> {
9+
inner: R,
10+
eof: bool,
11+
}
12+
13+
impl<R: Unpin> TrackEof<R> {
14+
pub fn new(inner: R) -> Self {
15+
Self { inner, eof: false }
16+
}
17+
18+
pub fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, &mut bool) {
19+
let Self { inner, eof } = Pin::into_inner(self);
20+
(Pin::new(inner), eof)
21+
}
22+
}
23+
24+
#[cfg(feature = "futures-io")]
25+
impl<R: futures::io::AsyncRead + Unpin> futures::io::AsyncRead for TrackEof<R> {
26+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
27+
let (inner, eof) = self.project();
28+
assert!(!*eof);
29+
match inner.poll_read(cx, buf) {
30+
Poll::Ready(Ok(0)) => {
31+
if !buf.is_empty() {
32+
*eof = true;
33+
}
34+
Poll::Ready(Ok(0))
35+
}
36+
other => other,
37+
}
38+
}
39+
}
40+
41+
#[cfg(feature = "futures-io")]
42+
impl<R: futures::io::AsyncBufRead + Unpin> futures::io::AsyncBufRead for TrackEof<R> {
43+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
44+
let (inner, eof) = self.project();
45+
assert!(!*eof);
46+
match inner.poll_fill_buf(cx) {
47+
Poll::Ready(Ok(buf)) => {
48+
if buf.is_empty() {
49+
*eof = true;
50+
}
51+
Poll::Ready(Ok(buf))
52+
}
53+
other => other,
54+
}
55+
}
56+
57+
fn consume(self: Pin<&mut Self>, amt: usize) {
58+
self.project().0.consume(amt)
59+
}
60+
}
61+
62+
#[cfg(feature = "tokio-02")]
63+
impl<R: tokio_02::io::AsyncRead + Unpin> tokio_02::io::AsyncRead for TrackEof<R> {
64+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
65+
let (inner, eof) = self.project();
66+
assert!(!*eof);
67+
match inner.poll_read(cx, buf) {
68+
Poll::Ready(Ok(0)) => {
69+
if !buf.is_empty() {
70+
*eof = true;
71+
}
72+
Poll::Ready(Ok(0))
73+
}
74+
other => other,
75+
}
76+
}
77+
}
78+
79+
#[cfg(feature = "tokio-02")]
80+
impl<R: tokio_02::io::AsyncBufRead + Unpin> tokio_02::io::AsyncBufRead for TrackEof<R> {
81+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
82+
let (inner, eof) = self.project();
83+
assert!(!*eof);
84+
match inner.poll_fill_buf(cx) {
85+
Poll::Ready(Ok(buf)) => {
86+
if buf.is_empty() {
87+
*eof = true;
88+
}
89+
Poll::Ready(Ok(buf))
90+
}
91+
other => other,
92+
}
93+
}
94+
95+
fn consume(self: Pin<&mut Self>, amt: usize) {
96+
self.project().0.consume(amt)
97+
}
98+
}
99+
100+
#[cfg(feature = "tokio-03")]
101+
impl<R: tokio_03::io::AsyncRead + Unpin> tokio_03::io::AsyncRead for TrackEof<R> {
102+
fn poll_read(
103+
self: Pin<&mut Self>,
104+
cx: &mut Context,
105+
buf: &mut tokio_03::io::ReadBuf,
106+
) -> Poll<Result<()>> {
107+
let (inner, eof) = self.project();
108+
assert!(!*eof);
109+
let len = buf.filled().len();
110+
match inner.poll_read(cx, buf) {
111+
Poll::Ready(Ok(())) => {
112+
if buf.filled().len() == len && buf.remaining() > 0 {
113+
*eof = true;
114+
}
115+
Poll::Ready(Ok(()))
116+
}
117+
other => other,
118+
}
119+
}
120+
}
121+
122+
#[cfg(feature = "tokio-03")]
123+
impl<R: tokio_03::io::AsyncBufRead + Unpin> tokio_03::io::AsyncBufRead for TrackEof<R> {
124+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
125+
let (inner, eof) = self.project();
126+
assert!(!*eof);
127+
match inner.poll_fill_buf(cx) {
128+
Poll::Ready(Ok(buf)) => {
129+
if buf.is_empty() {
130+
*eof = true;
131+
}
132+
Poll::Ready(Ok(buf))
133+
}
134+
other => other,
135+
}
136+
}
137+
138+
fn consume(self: Pin<&mut Self>, amt: usize) {
139+
self.project().0.consume(amt)
140+
}
141+
}
142+
143+
#[cfg(feature = "tokio")]
144+
impl<R: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for TrackEof<R> {
145+
fn poll_read(
146+
self: Pin<&mut Self>,
147+
cx: &mut Context,
148+
buf: &mut tokio::io::ReadBuf,
149+
) -> Poll<Result<()>> {
150+
let (inner, eof) = self.project();
151+
assert!(!*eof);
152+
let len = buf.filled().len();
153+
match inner.poll_read(cx, buf) {
154+
Poll::Ready(Ok(())) => {
155+
if buf.filled().len() == len && buf.remaining() > 0 {
156+
*eof = true;
157+
}
158+
Poll::Ready(Ok(()))
159+
}
160+
other => other,
161+
}
162+
}
163+
}
164+
165+
#[cfg(feature = "tokio")]
166+
impl<R: tokio::io::AsyncBufRead + Unpin> tokio::io::AsyncBufRead for TrackEof<R> {
167+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
168+
let (inner, eof) = self.project();
169+
assert!(!*eof);
170+
match inner.poll_fill_buf(cx) {
171+
Poll::Ready(Ok(buf)) => {
172+
if buf.is_empty() {
173+
*eof = true;
174+
}
175+
Poll::Ready(Ok(buf))
176+
}
177+
other => other,
178+
}
179+
}
180+
181+
fn consume(self: Pin<&mut Self>, amt: usize) {
182+
self.project().0.consume(amt)
183+
}
184+
}

0 commit comments

Comments
 (0)