Skip to content

Commit 7667251

Browse files
authored
Fix panic in some TryStreamExt combinators (#2250)
1 parent 310fd0d commit 7667251

File tree

4 files changed

+46
-5
lines changed

4 files changed

+46
-5
lines changed

futures-util/src/stream/try_stream/try_filter_map.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
6666
Poll::Ready(loop {
6767
if let Some(p) = this.pending.as_mut().as_pin_mut() {
6868
// We have an item in progress, poll that until it's done
69-
let item = ready!(p.try_poll(cx)?);
69+
let res = ready!(p.try_poll(cx));
7070
this.pending.set(None);
71+
let item = res?;
7172
if item.is_some() {
7273
break item.map(Ok);
7374
}

futures-util/src/stream/try_stream/try_skip_while.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,10 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
7474

7575
Poll::Ready(loop {
7676
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
77-
let skipped = ready!(fut.try_poll(cx)?);
78-
let item = this.pending_item.take();
77+
let res = ready!(fut.try_poll(cx));
7978
this.pending_fut.set(None);
79+
let skipped = res?;
80+
let item = this.pending_item.take();
8081
if !skipped {
8182
*this.done_skipping = true;
8283
break item.map(Ok);

futures-util/src/stream/try_stream/try_take_while.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ where
7676

7777
Poll::Ready(loop {
7878
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
79-
let take = ready!(fut.try_poll(cx)?);
80-
let item = this.pending_item.take();
79+
let res = ready!(fut.try_poll(cx));
8180
this.pending_fut.set(None);
81+
let take = res?;
82+
let item = this.pending_item.take();
8283
if take {
8384
break item.map(Ok);
8485
} else {

futures/tests/try_stream.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use futures::{
2+
stream::{self, StreamExt, TryStreamExt},
3+
task::Poll,
4+
};
5+
use futures_test::task::noop_context;
6+
7+
#[test]
8+
fn try_filter_map_after_err() {
9+
let cx = &mut noop_context();
10+
let mut s = stream::iter(1..=3)
11+
.map(Ok)
12+
.try_filter_map(|v| async move { Err::<Option<()>, _>(v) })
13+
.filter_map(|r| async move { r.ok() })
14+
.boxed();
15+
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
16+
}
17+
18+
#[test]
19+
fn try_skip_while_after_err() {
20+
let cx = &mut noop_context();
21+
let mut s = stream::iter(1..=3)
22+
.map(Ok)
23+
.try_skip_while(|_| async move { Err::<_, ()>(()) })
24+
.filter_map(|r| async move { r.ok() })
25+
.boxed();
26+
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
27+
}
28+
29+
#[test]
30+
fn try_take_while_after_err() {
31+
let cx = &mut noop_context();
32+
let mut s = stream::iter(1..=3)
33+
.map(Ok)
34+
.try_take_while(|_| async move { Err::<_, ()>(()) })
35+
.filter_map(|r| async move { r.ok() })
36+
.boxed();
37+
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
38+
}

0 commit comments

Comments
 (0)