Skip to content

Commit 46a2152

Browse files
committed
add a future_queue_grouped method
This method adds a way to specify an optional "group" for each future. Running futures are limited to both the maximum global weight and the maximum group weight. The main use case is ensuring mutual exclusion between tests.
1 parent b40b45b commit 46a2152

File tree

8 files changed

+949
-44
lines changed

8 files changed

+949
-44
lines changed

.config/nextest.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[[profile.default.overrides]]
2+
filter = 'test(test_empty)'
3+
slow-timeout = { period = "250ms", terminate-after = 1 }

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pin-project-lite = "0.2.9"
1717

1818
[dev-dependencies]
1919
futures = "0.3.25"
20+
maplit = "1.0.2"
2021
proptest = { version = "1.0.0", features = ["timeout"] }
2122
proptest-derive = "0.3.0"
2223
tokio = { version = "1.21.2", features = ["macros", "sync", "test-util", "time"] }

README.md

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
[![License](https://img.shields.io/badge/license-Apache-green.svg)](LICENSE-APACHE)
88
[![License](https://img.shields.io/badge/license-MIT-green.svg)](LICENSE-MIT)
99

10-
`future_queue` is a variant of
11-
[`buffer_unordered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffer_unordered),
12-
where each future can be assigned a different weight.
10+
`future_queue` provides ways to run several futures:
11+
* concurrently
12+
* in the order they're spawned
13+
* with global limits
14+
* and with an optional group specified for each future, with its own limits.
1315

1416
This crate is part of the [nextest organization](https://github.com/nextest-rs) on GitHub, and is
1517
designed to serve the needs of [cargo-nextest](https://nexte.st).
@@ -33,17 +35,24 @@ Common use cases for `buffer_unordered` include:
3335

3436
`buffer_unordered` works well for many use cases. However, one issue with it is that it treats
3537
all futures as equally taxing: there's no way to say that some futures consume more resources
36-
than others. For nextest in particular, some tests can be much heavier than others, and fewer of
37-
those tests should be run simultaneously.
38+
than others, or that some subsets of futures should be mutually excluded from others.
39+
40+
For nextest in particular, some tests can be much heavier than others, and fewer of those tests
41+
should be run simultaneously. Also, some tests need to be mutually excluded from others, or
42+
other concurrency limits placed on them.
3843

3944
[^1]: This adaptor takes a stream of futures for maximum generality. In practice this is often
4045
an *iterator* of futures, converted over using
4146
[`stream::iter`](https://docs.rs/futures/latest/futures/stream/fn.iter.html).
4247

4348
## About this crate
4449

45-
This crate provides an adaptor on streams called `future_queue`, which can run
46-
several futures simultaneously, limiting the concurrency to a maximum *weight*.
50+
This crate provides two adaptors on streams.
51+
52+
### 1. The `future_queue` adaptor
53+
54+
The [`future_queue`](StreamExt::future_queue) adaptor can run several futures simultaneously,
55+
limiting the concurrency to a maximum *weight*.
4756

4857
Rather than taking a stream of futures, this adaptor takes a stream of `(usize, future)` pairs,
4958
where the `usize` indicates the weight of each future. This adaptor will schedule and buffer
@@ -57,16 +66,11 @@ Note that in some cases, the current weight may exceed the maximum weight. For e
5766
* If the next future has weight **6**, then it will be scheduled and the current weight will become **26**.
5867
* No new futures will be scheduled until the current weight falls to **23** or below.
5968

60-
It is possible to have a variant of this adaptor which always stays below the limit and holds
61-
the next future in abeyance; however, the implementation for that variant is a bit more
62-
complicated, and is also not the behavior desired by nextest. This variant may be provided in
63-
the future.
64-
6569
The weight of a future can be zero, in which case it doesn't count towards the maximum weight.
6670

6771
If all weights are 1, then `future_queue` is exactly the same as `buffer_unordered`.
6872

69-
## Examples
73+
#### Examples
7074

7175
```rust
7276
use futures::{channel::oneshot, stream, StreamExt as _};
@@ -87,6 +91,41 @@ assert_eq!(queue.next().await, Some(Ok("world")));
8791
assert_eq!(queue.next().await, None);
8892
```
8993

94+
### 2. The `future_queue_grouped` adaptor
95+
96+
The [`future_queue_grouped`](StreamExt::future_queue_grouped) adaptor is like `future_queue`,
97+
except it is possible to specify an optional *group* for each future. Each group has a maximum
98+
weight, and a future will only be scheduled if both the maximum weight and the group weight
99+
aren't exceeded.
100+
101+
The current weight for groups may exceed the maximum weight, similar to `future_queue`.
102+
103+
#### Examples
104+
105+
```rust
106+
use futures::{channel::oneshot, stream, StreamExt as _};
107+
use future_queue::{StreamExt as _};
108+
109+
let (send_one, recv_one) = oneshot::channel();
110+
let (send_two, recv_two) = oneshot::channel();
111+
112+
let stream_of_futures = stream::iter(
113+
vec![
114+
(1, Some("group1"), recv_one),
115+
(2, None, recv_two),
116+
],
117+
);
118+
let mut queue = stream_of_futures.future_queue_grouped(10, [("group1", 5)]);
119+
120+
send_two.send("hello")?;
121+
assert_eq!(queue.next().await, Some(Ok("hello")));
122+
123+
send_one.send("world")?;
124+
assert_eq!(queue.next().await, Some(Ok("world")));
125+
126+
assert_eq!(queue.next().await, None);
127+
```
128+
90129
## Minimum supported Rust version (MSRV)
91130

92131
The minimum supported Rust version is **Rust 1.56.**

src/future_queue.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,10 @@ where
170170
///
171171
/// Provided in case it's necessary. This trait is only implemented for `(usize, impl Future)`.
172172
pub trait WeightedFuture: private::Sealed {
173+
/// The associated `Future` type.
173174
type Future: Future;
174175

175-
/// Turns this trait into its components.
176+
/// Turns self into its components.
176177
fn into_components(self) -> (usize, Self::Future);
177178
}
178179

0 commit comments

Comments
 (0)