Skip to content

Commit 852d387

Browse files
authored
Don't ignore empty state polling (#2728)
* Don't ignore empty state polling * Test case * Start polling in a loop to ensure we don't wait for an outdated waker
1 parent cc0e342 commit 852d387

File tree

2 files changed

+69
-39
lines changed

2 files changed

+69
-39
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ impl SharedPollState {
6161
}
6262

6363
/// Attempts to start polling, returning stored state in case of success.
64-
/// Returns `None` if either waker is waking at the moment or state is empty.
64+
/// Returns `None` if either waker is waking at the moment.
6565
fn start_polling(
6666
&self,
6767
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
6868
let value = self
6969
.state
7070
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
71-
if value & WAKING == NONE && value & NEED_TO_POLL_ALL != NONE {
71+
if value & WAKING == NONE {
7272
Some(POLLING)
7373
} else {
7474
None
@@ -405,11 +405,10 @@ where
405405

406406
let mut this = self.as_mut().project();
407407

408-
let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() {
409-
Some(value) => value,
410-
_ => {
411-
// Waker was called, just wait for the next poll
412-
return Poll::Pending;
408+
// Attempt to start polling, in case some waker is holding the lock, wait in loop
409+
let (mut poll_state_value, state_bomb) = loop {
410+
if let Some(value) = this.poll_state.start_polling() {
411+
break value;
413412
}
414413
};
415414

futures/tests/stream.rs

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -342,46 +342,77 @@ fn flatten_unordered() {
342342
});
343343
}
344344

345+
fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
346+
let ready = Arc::new(AtomicBool::new(false));
347+
let mut spawned = false;
348+
349+
future::poll_fn(move |cx| {
350+
if !spawned {
351+
let waker = cx.waker().clone();
352+
let ready = ready.clone();
353+
354+
std::thread::spawn(move || {
355+
std::thread::sleep(time);
356+
ready.store(true, Ordering::Release);
357+
358+
waker.wake_by_ref()
359+
});
360+
spawned = true;
361+
}
362+
363+
if ready.load(Ordering::Acquire) {
364+
Poll::Ready(value.clone())
365+
} else {
366+
Poll::Pending
367+
}
368+
})
369+
}
370+
371+
fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
372+
where
373+
S::Item: Clone,
374+
{
375+
let inner = st
376+
.then(|item| timeout(Duration::from_millis(50), item))
377+
.enumerate()
378+
.map(|(idx, value)| {
379+
stream::once(if idx % 2 == 0 {
380+
future::ready(value).left_future()
381+
} else {
382+
timeout(Duration::from_millis(100), value).right_future()
383+
})
384+
})
385+
.flatten_unordered(None);
386+
387+
stream::once(future::ready(inner)).flatten_unordered(None)
388+
}
389+
345390
// nested `flatten_unordered`
346391
let te = ThreadPool::new().unwrap();
347-
let handle = te
392+
let base_handle = te
348393
.spawn_with_handle(async move {
349-
let inner = stream::iter(0..10)
350-
.then(|_| {
351-
let task = Arc::new(AtomicBool::new(false));
352-
let mut spawned = false;
353-
354-
future::poll_fn(move |cx| {
355-
if !spawned {
356-
let waker = cx.waker().clone();
357-
let task = task.clone();
358-
359-
std::thread::spawn(move || {
360-
std::thread::sleep(Duration::from_millis(500));
361-
task.store(true, Ordering::Release);
362-
363-
waker.wake_by_ref()
364-
});
365-
spawned = true;
366-
}
367-
368-
if task.load(Ordering::Acquire) {
369-
Poll::Ready(Some(()))
370-
} else {
371-
Poll::Pending
372-
}
373-
})
374-
})
375-
.map(|_| stream::once(future::ready(())))
376-
.flatten_unordered(None);
394+
let fu = build_nested_fu(stream::iter(1..=10));
377395

378-
let stream = stream::once(future::ready(inner)).flatten_unordered(None);
396+
assert_eq!(fu.count().await, 10);
397+
})
398+
.unwrap();
399+
400+
block_on(base_handle);
401+
402+
let empty_state_move_handle = te
403+
.spawn_with_handle(async move {
404+
let mut fu = build_nested_fu(stream::iter(1..10));
405+
{
406+
let mut cx = noop_context();
407+
let _ = fu.poll_next_unpin(&mut cx);
408+
let _ = fu.poll_next_unpin(&mut cx);
409+
}
379410

380-
assert_eq!(stream.count().await, 10);
411+
assert_eq!(fu.count().await, 9);
381412
})
382413
.unwrap();
383414

384-
block_on(handle);
415+
block_on(empty_state_move_handle);
385416
}
386417

387418
#[test]

0 commit comments

Comments
 (0)