Skip to content

Commit 1055522

Browse files
committed
add test to ensure FutureQueue can handle streams that can be pending
Previously, the test stream always returned items immediately. Change it to possibly return items after a delay. Also add a couple of example-based tests.
1 parent 5cfa204 commit 1055522

File tree

2 files changed

+53
-19
lines changed

2 files changed

+53
-19
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pin-project-lite = "0.2.9"
1717

1818
[dev-dependencies]
1919
futures = "0.3.25"
20-
proptest = "1.0.0"
20+
proptest = { version = "1.0.0", features = ["timeout"] }
2121
proptest-derive = "0.3.0"
2222
tokio = { version = "1.21.2", features = ["macros", "sync", "test-util", "time"] }
23+
tokio-stream = { version = "0.1.11" }

tests/test_properties.rs

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use futures::{stream, StreamExt as _};
66
use proptest::prelude::*;
77
use proptest_derive::Arbitrary;
88
use std::time::Duration;
9+
use tokio_stream::wrappers::UnboundedReceiverStream;
910

1011
#[derive(Clone, Debug, Arbitrary)]
1112
struct TestState {
@@ -17,6 +18,8 @@ struct TestState {
1718

1819
#[derive(Copy, Clone, Debug, Arbitrary)]
1920
struct TestFutureDesc {
21+
#[proptest(strategy = "duration_strategy()")]
22+
start_delay: Duration,
2023
#[proptest(strategy = "duration_strategy()")]
2124
delay: Duration,
2225
#[proptest(strategy = "0usize..8")]
@@ -28,10 +31,23 @@ fn duration_strategy() -> BoxedStrategy<Duration> {
2831
(0u64..1000).prop_map(Duration::from_millis).boxed()
2932
}
3033

34+
#[test]
35+
fn test_examples() {
36+
let state = TestState {
37+
max_weight: 1,
38+
future_descriptions: vec![TestFutureDesc {
39+
start_delay: Duration::ZERO,
40+
delay: Duration::ZERO,
41+
weight: 0,
42+
}],
43+
};
44+
test_future_queue_impl(state);
45+
}
46+
3147
proptest! {
3248
#[test]
3349
fn proptest_future_queue(state: TestState) {
34-
proptest_future_queue_impl(state)
50+
test_future_queue_impl(state)
3551
}
3652
}
3753

@@ -41,33 +57,48 @@ enum FutureEvent {
4157
Finished(usize, TestFutureDesc),
4258
}
4359

44-
fn proptest_future_queue_impl(state: TestState) {
60+
fn test_future_queue_impl(state: TestState) {
4561
let runtime = tokio::runtime::Builder::new_current_thread()
4662
.enable_time()
4763
.start_paused(true)
4864
.build()
4965
.expect("tokio builder succeeded");
5066
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
67+
let (future_sender, future_receiver) = tokio::sync::mpsc::unbounded_channel();
5168
let futures = state
5269
.future_descriptions
5370
.iter()
5471
.enumerate()
55-
.map(|(id, desc)| {
72+
.map(move |(id, desc)| {
73+
let desc = *desc;
5674
let sender = sender.clone();
57-
// For each description, create a future.
58-
let delay_fut = async move {
59-
// Send the fact that this future started to the mpsc queue.
60-
sender
61-
.send(FutureEvent::Started(id, *desc))
62-
.expect("receiver held open by loop");
63-
tokio::time::sleep(desc.delay).await;
64-
sender
65-
.send(FutureEvent::Finished(id, *desc))
66-
.expect("receiver held open by loop");
67-
};
68-
(desc.weight, delay_fut)
69-
});
70-
let stream = stream::iter(futures);
75+
let future_sender = future_sender.clone();
76+
async move {
77+
// First, sleep for this long.
78+
tokio::time::sleep(desc.start_delay).await;
79+
// For each description, create a future.
80+
let delay_fut = async move {
81+
// Send the fact that this future started to the mpsc queue.
82+
sender
83+
.send(FutureEvent::Started(id, desc))
84+
.expect("receiver held open by loop");
85+
tokio::time::sleep(desc.delay).await;
86+
sender
87+
.send(FutureEvent::Finished(id, desc))
88+
.expect("receiver held open by loop");
89+
};
90+
// Errors should never occur here.
91+
if let Err(err) = future_sender.send((desc.weight, delay_fut)) {
92+
panic!("future_receiver held open by loop: {}", err);
93+
}
94+
}
95+
})
96+
.collect::<Vec<_>>();
97+
let combined_future = stream::iter(futures).buffer_unordered(1).collect::<()>();
98+
runtime.spawn(combined_future);
99+
100+
// We're going to use future_receiver as a stream.
101+
let stream = UnboundedReceiverStream::new(future_receiver);
71102

72103
let mut completed_map = vec![false; state.future_descriptions.len()];
73104
let mut last_started_id: Option<usize> = None;
@@ -76,13 +107,14 @@ fn proptest_future_queue_impl(state: TestState) {
76107
runtime.block_on(async move {
77108
// Record values that have been completed in this map.
78109
let mut stream = stream.future_queue(state.max_weight);
110+
let mut receiver_done = false;
79111
loop {
80112
tokio::select! {
81113
// biased ensures that the receiver is drained before the stream is polled. Without
82114
// it, it's possible that we fail to record the completion of some futures in status_map.
83115
biased;
84116

85-
recv = receiver.recv() => {
117+
recv = receiver.recv(), if !receiver_done => {
86118
match recv {
87119
Some(FutureEvent::Started(id, desc)) => {
88120
// last_started_id must be 1 less than id.
@@ -106,6 +138,7 @@ fn proptest_future_queue_impl(state: TestState) {
106138
}
107139
None => {
108140
// All futures finished -- going to check for completion in stream.next() below.
141+
receiver_done = true;
109142
}
110143
}
111144
}

0 commit comments

Comments
 (0)