Skip to content

Commit 9e4446e

Browse files
authored
Fix incorrect termination of select_with_strategy streams (#2635)
1 parent 475a711 commit 9e4446e

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

futures-util/src/stream/select_with_strategy.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,18 +229,23 @@ where
229229
St1: Stream,
230230
St2: Stream<Item = St1::Item>,
231231
{
232-
match poll_side(select, side, cx) {
232+
let first_done = match poll_side(select, side, cx) {
233233
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
234234
Poll::Ready(None) => {
235235
select.internal_state.finish(side);
236+
true
236237
}
237-
Poll::Pending => (),
238+
Poll::Pending => false,
238239
};
239240
let other = side.other();
240241
match poll_side(select, other, cx) {
241242
Poll::Ready(None) => {
242243
select.internal_state.finish(other);
243-
Poll::Ready(None)
244+
if first_done {
245+
Poll::Ready(None)
246+
} else {
247+
Poll::Pending
248+
}
244249
}
245250
a => a,
246251
}

futures/tests/stream.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use std::cell::Cell;
12
use std::iter;
3+
use std::pin::Pin;
4+
use std::rc::Rc;
25
use std::sync::Arc;
6+
use std::task::Context;
37

48
use futures::channel::mpsc;
59
use futures::executor::block_on;
@@ -9,6 +13,7 @@ use futures::sink::SinkExt;
913
use futures::stream::{self, StreamExt};
1014
use futures::task::Poll;
1115
use futures::{ready, FutureExt};
16+
use futures_core::Stream;
1217
use futures_test::task::noop_context;
1318

1419
#[test]
@@ -436,3 +441,40 @@ fn ready_chunks() {
436441
assert_eq!(s.next().await.unwrap(), vec![4]);
437442
});
438443
}
444+
445+
struct SlowStream {
446+
times_should_poll: usize,
447+
times_polled: Rc<Cell<usize>>,
448+
}
449+
impl Stream for SlowStream {
450+
type Item = usize;
451+
452+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453+
self.times_polled.set(self.times_polled.get() + 1);
454+
if self.times_polled.get() % 2 == 0 {
455+
cx.waker().wake_by_ref();
456+
return Poll::Pending;
457+
}
458+
if self.times_polled.get() >= self.times_should_poll {
459+
return Poll::Ready(None);
460+
}
461+
Poll::Ready(Some(self.times_polled.get()))
462+
}
463+
}
464+
465+
#[test]
466+
fn select_with_strategy_doesnt_terminate_early() {
467+
for side in [stream::PollNext::Left, stream::PollNext::Right] {
468+
let times_should_poll = 10;
469+
let count = Rc::new(Cell::new(0));
470+
let b = stream::iter([10, 20]);
471+
472+
let mut selected = stream::select_with_strategy(
473+
SlowStream { times_should_poll, times_polled: count.clone() },
474+
b,
475+
|_: &mut ()| side,
476+
);
477+
block_on(async move { while selected.next().await.is_some() {} });
478+
assert_eq!(count.get(), times_should_poll + 1);
479+
}
480+
}

0 commit comments

Comments
 (0)