Skip to content

Commit 286ff05

Browse files
bors[bot]eeeeetaNemo157
authored
Merge #148
148: bufread::generic::Decoder: don't reinitialize on reader EOF r=Nemo157 a=eeeeeta If `multiple_members` is enabled, the `bufread::generic::Decoder` will attempt to reinitialise the decoder inside `State::Flushing`, even if the reason it entered that state was due to the reader returning an EOF. This will result in an attempt to read past EOF, which is highly undesirable to say the least. To fix this, force `multiple_members` to `false` when we get an EOF condition from the reader. Co-authored-by: eta <[email protected]> Co-authored-by: Wim Looman <[email protected]>
2 parents 6104f83 + e724673 commit 286ff05

File tree

7 files changed

+208
-9
lines changed

7 files changed

+208
-9
lines changed

src/futures/bufread/generic/decoder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
7070
State::Decoding => {
7171
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
7272
if input.is_empty() {
73+
// Avoid attempting to reinitialise the decoder if the reader
74+
// has returned EOF.
75+
*this.multiple_members = false;
7376
State::Flushing
7477
} else {
7578
let mut input = PartialBuffer::new(input);

src/tokio/bufread/generic/decoder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
7070
State::Decoding => {
7171
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
7272
if input.is_empty() {
73+
// Avoid attempting to reinitialise the decoder if the reader
74+
// has returned EOF.
75+
*this.multiple_members = false;
7376
State::Flushing
7477
} else {
7578
let mut input = PartialBuffer::new(input);

src/tokio_02/bufread/generic/decoder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
7070
State::Decoding => {
7171
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
7272
if input.is_empty() {
73+
// Avoid attempting to reinitialise the decoder if the reader
74+
// has returned EOF.
75+
*this.multiple_members = false;
7376
State::Flushing
7477
} else {
7578
let mut input = PartialBuffer::new(input);

src/tokio_03/bufread/generic/decoder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
7070
State::Decoding => {
7171
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
7272
if input.is_empty() {
73+
// Avoid attempting to reinitialise the decoder if the reader
74+
// has returned EOF.
75+
*this.multiple_members = false;
7376
State::Flushing
7477
} else {
7578
let mut input = PartialBuffer::new(input);

tests/utils/impls.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ pub mod futures {
1313
pub mod bufread {
1414
pub use futures::io::AsyncBufRead;
1515

16-
use crate::utils::InputStream;
16+
use crate::utils::{InputStream, TrackEof};
1717
use futures::stream::{StreamExt as _, TryStreamExt as _};
1818

1919
pub fn from(input: &InputStream) -> impl AsyncBufRead {
2020
// By using the stream here we ensure that each chunk will require a separate
2121
// read/poll_fill_buf call to process to help test reading multiple chunks.
22-
input.stream().map(Ok).into_async_read()
22+
TrackEof::new(input.stream().map(Ok).into_async_read())
2323
}
2424
}
2525

@@ -100,13 +100,13 @@ pub mod tokio_02 {
100100
pub mod bufread {
101101
pub use tokio_02::io::AsyncBufRead;
102102

103-
use crate::utils::InputStream;
103+
use crate::utils::{InputStream, TrackEof};
104104
use tokio_02::io::stream_reader;
105105

106106
pub fn from(input: &InputStream) -> impl AsyncBufRead {
107107
// By using the stream here we ensure that each chunk will require a separate
108108
// read/poll_fill_buf call to process to help test reading multiple chunks.
109-
stream_reader(input.bytes_05_stream())
109+
TrackEof::new(stream_reader(input.bytes_05_stream()))
110110
}
111111
}
112112

@@ -169,14 +169,14 @@ pub mod tokio_02 {
169169
#[cfg(feature = "tokio-03")]
170170
pub mod tokio_03 {
171171
pub mod bufread {
172-
use crate::utils::InputStream;
172+
use crate::utils::{InputStream, TrackEof};
173173
pub use tokio_03::io::AsyncBufRead;
174174
use tokio_util_04::io::StreamReader;
175175

176176
pub fn from(input: &InputStream) -> impl AsyncBufRead {
177177
// By using the stream here we ensure that each chunk will require a separate
178178
// read/poll_fill_buf call to process to help test reading multiple chunks.
179-
StreamReader::new(input.bytes_05_stream())
179+
TrackEof::new(StreamReader::new(input.bytes_05_stream()))
180180
}
181181
}
182182

@@ -239,7 +239,7 @@ pub mod tokio_03 {
239239
#[cfg(feature = "tokio")]
240240
pub mod tokio {
241241
pub mod bufread {
242-
use crate::utils::InputStream;
242+
use crate::utils::{InputStream, TrackEof};
243243
use bytes::Bytes;
244244
use futures::stream::StreamExt;
245245
pub use tokio::io::AsyncBufRead;
@@ -248,7 +248,9 @@ pub mod tokio {
248248
pub fn from(input: &InputStream) -> impl AsyncBufRead {
249249
// By using the stream here we ensure that each chunk will require a separate
250250
// read/poll_fill_buf call to process to help test reading multiple chunks.
251-
StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok))
251+
TrackEof::new(StreamReader::new(
252+
input.stream().map(Bytes::from).map(std::io::Result::Ok),
253+
))
252254
}
253255
}
254256

tests/utils/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ mod tokio_03_ext;
88
#[cfg(feature = "tokio")]
99
mod tokio_ext;
1010
mod track_closed;
11+
mod track_eof;
1112
#[macro_use]
1213
mod test_cases;
1314

1415
pub mod algos;
1516
pub mod impls;
1617

17-
pub use self::{input_stream::InputStream, track_closed::TrackClosed};
18+
pub use self::{input_stream::InputStream, track_closed::TrackClosed, track_eof::TrackEof};
1819
pub use async_compression::Level;
1920
pub use futures::{executor::block_on, pin_mut, stream::Stream};
2021
pub use std::{future::Future, io::Result, iter::FromIterator, pin::Pin};

tests/utils/track_eof.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
#[cfg_attr(not(feature = "all-implementations"), allow(unused))]
2+
use std::{
3+
io::Result,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
};
7+
8+
pub struct TrackEof<R> {
9+
inner: R,
10+
eof: bool,
11+
}
12+
13+
impl<R: Unpin> TrackEof<R> {
14+
pub fn new(inner: R) -> Self {
15+
Self { inner, eof: false }
16+
}
17+
18+
pub fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, &mut bool) {
19+
let Self { inner, eof } = Pin::into_inner(self);
20+
(Pin::new(inner), eof)
21+
}
22+
}
23+
24+
#[cfg(feature = "futures-io")]
25+
impl<R: futures::io::AsyncRead + Unpin> futures::io::AsyncRead for TrackEof<R> {
26+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
27+
let (inner, eof) = self.project();
28+
assert!(!*eof);
29+
match inner.poll_read(cx, buf) {
30+
Poll::Ready(Ok(0)) => {
31+
if !buf.is_empty() {
32+
*eof = true;
33+
}
34+
Poll::Ready(Ok(0))
35+
}
36+
other => other,
37+
}
38+
}
39+
}
40+
41+
#[cfg(feature = "futures-io")]
42+
impl<R: futures::io::AsyncBufRead + Unpin> futures::io::AsyncBufRead for TrackEof<R> {
43+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
44+
let (inner, eof) = self.project();
45+
assert!(!*eof);
46+
match inner.poll_fill_buf(cx) {
47+
Poll::Ready(Ok(buf)) => {
48+
if buf.is_empty() {
49+
*eof = true;
50+
}
51+
Poll::Ready(Ok(buf))
52+
}
53+
other => other,
54+
}
55+
}
56+
57+
fn consume(self: Pin<&mut Self>, amt: usize) {
58+
self.project().0.consume(amt)
59+
}
60+
}
61+
62+
#[cfg(feature = "tokio-02")]
63+
impl<R: tokio_02::io::AsyncRead + Unpin> tokio_02::io::AsyncRead for TrackEof<R> {
64+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
65+
let (inner, eof) = self.project();
66+
assert!(!*eof);
67+
match inner.poll_read(cx, buf) {
68+
Poll::Ready(Ok(0)) => {
69+
if !buf.is_empty() {
70+
*eof = true;
71+
}
72+
Poll::Ready(Ok(0))
73+
}
74+
other => other,
75+
}
76+
}
77+
}
78+
79+
#[cfg(feature = "tokio-02")]
80+
impl<R: tokio_02::io::AsyncBufRead + Unpin> tokio_02::io::AsyncBufRead for TrackEof<R> {
81+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
82+
let (inner, eof) = self.project();
83+
assert!(!*eof);
84+
match inner.poll_fill_buf(cx) {
85+
Poll::Ready(Ok(buf)) => {
86+
if buf.is_empty() {
87+
*eof = true;
88+
}
89+
Poll::Ready(Ok(buf))
90+
}
91+
other => other,
92+
}
93+
}
94+
95+
fn consume(self: Pin<&mut Self>, amt: usize) {
96+
self.project().0.consume(amt)
97+
}
98+
}
99+
100+
#[cfg(feature = "tokio-03")]
101+
impl<R: tokio_03::io::AsyncRead + Unpin> tokio_03::io::AsyncRead for TrackEof<R> {
102+
fn poll_read(
103+
self: Pin<&mut Self>,
104+
cx: &mut Context,
105+
buf: &mut tokio_03::io::ReadBuf,
106+
) -> Poll<Result<()>> {
107+
let (inner, eof) = self.project();
108+
assert!(!*eof);
109+
let len = buf.filled().len();
110+
match inner.poll_read(cx, buf) {
111+
Poll::Ready(Ok(())) => {
112+
if buf.filled().len() == len && buf.remaining() > 0 {
113+
*eof = true;
114+
}
115+
Poll::Ready(Ok(()))
116+
}
117+
other => other,
118+
}
119+
}
120+
}
121+
122+
#[cfg(feature = "tokio-03")]
123+
impl<R: tokio_03::io::AsyncBufRead + Unpin> tokio_03::io::AsyncBufRead for TrackEof<R> {
124+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
125+
let (inner, eof) = self.project();
126+
assert!(!*eof);
127+
match inner.poll_fill_buf(cx) {
128+
Poll::Ready(Ok(buf)) => {
129+
if buf.is_empty() {
130+
*eof = true;
131+
}
132+
Poll::Ready(Ok(buf))
133+
}
134+
other => other,
135+
}
136+
}
137+
138+
fn consume(self: Pin<&mut Self>, amt: usize) {
139+
self.project().0.consume(amt)
140+
}
141+
}
142+
143+
#[cfg(feature = "tokio")]
144+
impl<R: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for TrackEof<R> {
145+
fn poll_read(
146+
self: Pin<&mut Self>,
147+
cx: &mut Context,
148+
buf: &mut tokio::io::ReadBuf,
149+
) -> Poll<Result<()>> {
150+
let (inner, eof) = self.project();
151+
assert!(!*eof);
152+
let len = buf.filled().len();
153+
match inner.poll_read(cx, buf) {
154+
Poll::Ready(Ok(())) => {
155+
if buf.filled().len() == len && buf.remaining() > 0 {
156+
*eof = true;
157+
}
158+
Poll::Ready(Ok(()))
159+
}
160+
other => other,
161+
}
162+
}
163+
}
164+
165+
#[cfg(feature = "tokio")]
166+
impl<R: tokio::io::AsyncBufRead + Unpin> tokio::io::AsyncBufRead for TrackEof<R> {
167+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
168+
let (inner, eof) = self.project();
169+
assert!(!*eof);
170+
match inner.poll_fill_buf(cx) {
171+
Poll::Ready(Ok(buf)) => {
172+
if buf.is_empty() {
173+
*eof = true;
174+
}
175+
Poll::Ready(Ok(buf))
176+
}
177+
other => other,
178+
}
179+
}
180+
181+
fn consume(self: Pin<&mut Self>, amt: usize) {
182+
self.project().0.consume(amt)
183+
}
184+
}

0 commit comments

Comments
 (0)