Skip to content

Commit e6b4d8c

Browse files
committed
switch to not exceeding weights
I think this is easier to explain to people, and it opens up more use cases for weights (e.g. nextest-rs/nextest#831).
1 parent ded8751 commit e6b4d8c

File tree

8 files changed

+238
-166
lines changed

8 files changed

+238
-166
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
### Changed
6+
7+
- Changed definitions of `future_queue` and `future_queue_grouped` such that weights can no longer be
8+
exceeded. This enables certain additional use cases.
9+
310
## [0.2.2] - 2022-12-27
411

5-
## Added
12+
### Added
613

714
- Added documentation link to Cargo.toml metadata.
815

README.md

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ limiting the concurrency to a maximum *weight*.
5757

5858
Rather than taking a stream of futures, this adaptor takes a stream of `(usize, future)` pairs,
5959
where the `usize` indicates the weight of each future. This adaptor will schedule and buffer
60-
futures to be run until the maximum weight is exceeded. Once that happens, this adaptor will
61-
wait until some of the currently executing futures complete, and the current weight of running
62-
futures drops below the maximum weight, before scheduling new futures.
60+
futures to be run until queueing the next future will exceed the maximum weight.
6361

64-
Note that in some cases, the current weight may exceed the maximum weight. For example:
62+
* The maximum weight is never exceeded while futures are being run.
63+
* If the weight of an individual future is greater than the maximum weight, its weight will be
64+
set to the maximum weight.
6565

66-
* Let's say the maximum weight is **24**, and the current weight is **20**.
67-
* If the next future has weight **6**, then it will be scheduled and the current weight will become **26**.
68-
* No new futures will be scheduled until the current weight falls to **23** or below.
66+
Once all possible futures are scheduled, this adaptor will wait until some of the currently
67+
executing futures complete, and the current weight of running futures drops below the maximum
68+
weight, before scheduling new futures.
6969

7070
The weight of a future can be zero, in which case it doesn't count towards the maximum weight.
7171

@@ -104,7 +104,13 @@ the order they're returned by the stream, without doing any reordering based on
104104
a future from a group completes, queued up futures in this group will be preferentially
105105
scheduled before any other futures from the provided stream.
106106

107-
The current weight for groups may exceed the maximum weight, similar to `future_queue`.
107+
Like with [`future_queue`](StreamExt::future_queue):
108+
109+
* The maximum global and group weights is never exceeded while futures are being run.
110+
* While accounting against global weights, if the weight of an individual future is greater than
111+
the maximum weight, its weight will be set to the maximum weight.
112+
* *If a future belongs to a group:* While accounting against the group weight, if its weight is
113+
greater than the maximum group weight, its weight will be set to the maximum group weight.
108114

109115
#### Examples
110116

src/future_queue.rs

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) The future-queue Contributors
22
// SPDX-License-Identifier: MIT OR Apache-2.0
33

4+
use crate::{global_weight::GlobalWeight, peekable_fused::PeekableFused};
45
use futures_util::{
56
stream::{Fuse, FuturesUnordered},
67
Future, Stream, StreamExt as _,
@@ -21,10 +22,9 @@ pin_project! {
2122
St::Item: WeightedFuture,
2223
{
2324
#[pin]
24-
stream: Fuse<St>,
25+
stream: PeekableFused<Fuse<St>>,
2526
in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
26-
max_weight: usize,
27-
current_weight: usize,
27+
global_weight: GlobalWeight,
2828
}
2929
}
3030

@@ -37,8 +37,7 @@ where
3737
f.debug_struct("FutureQueue")
3838
.field("stream", &self.stream)
3939
.field("in_progress_queue", &self.in_progress_queue)
40-
.field("max_weight", &self.max_weight)
41-
.field("current_weight", &self.current_weight)
40+
.field("global_weight", &self.global_weight)
4241
.finish()
4342
}
4443
}
@@ -50,27 +49,26 @@ where
5049
{
5150
pub(crate) fn new(stream: St, max_weight: usize) -> Self {
5251
Self {
53-
stream: stream.fuse(),
52+
stream: PeekableFused::new(stream.fuse()),
5453
in_progress_queue: FuturesUnordered::new(),
55-
max_weight,
56-
current_weight: 0,
54+
global_weight: GlobalWeight::new(max_weight),
5755
}
5856
}
5957

6058
/// Returns the maximum weight of futures allowed to be run by this adaptor.
6159
pub fn max_weight(&self) -> usize {
62-
self.max_weight
60+
self.global_weight.max()
6361
}
6462

6563
/// Returns the currently running weight of futures.
6664
pub fn current_weight(&self) -> usize {
67-
self.current_weight
65+
self.global_weight.current()
6866
}
6967

7068
/// Acquires a reference to the underlying sink or stream that this combinator is
7169
/// pulling from.
7270
pub fn get_ref(&self) -> &St {
73-
self.stream.get_ref()
71+
self.stream.get_ref().get_ref()
7472
}
7573

7674
/// Acquires a mutable reference to the underlying sink or stream that this
@@ -79,7 +77,7 @@ where
7977
/// Note that care must be taken to avoid tampering with the state of the
8078
/// sink or stream which may otherwise confuse this combinator.
8179
pub fn get_mut(&mut self) -> &mut St {
82-
self.stream.get_mut()
80+
self.stream.get_mut().get_mut()
8381
}
8482

8583
/// Acquires a pinned mutable reference to the underlying sink or stream that this
@@ -88,15 +86,15 @@ where
8886
/// Note that care must be taken to avoid tampering with the state of the
8987
/// sink or stream which may otherwise confuse this combinator.
9088
pub fn get_pin_mut(self: Pin<&mut Self>) -> core::pin::Pin<&mut St> {
91-
self.project().stream.get_pin_mut()
89+
self.project().stream.get_pin_mut().get_pin_mut()
9290
}
9391

9492
/// Consumes this combinator, returning the underlying sink or stream.
9593
///
9694
/// Note that this may discard intermediate state of this combinator, so
9795
/// care should be taken to avoid losing resources when this is called.
9896
pub fn into_inner(self) -> St {
99-
self.stream.into_inner()
97+
self.stream.into_inner().into_inner()
10098
}
10199
}
102100

@@ -112,35 +110,27 @@ where
112110

113111
// First up, try to spawn off as many futures as possible by filling up
114112
// our queue of futures.
115-
while *this.current_weight < *this.max_weight {
116-
match this.stream.as_mut().poll_next(cx) {
117-
Poll::Ready(Some(weighted_future)) => {
118-
let (weight, future) = weighted_future.into_components();
119-
*this.current_weight =
120-
this.current_weight.checked_add(weight).unwrap_or_else(|| {
121-
panic!(
122-
"future_queue: added weight {} to current {}, overflowed",
123-
weight, this.current_weight,
124-
)
125-
});
126-
this.in_progress_queue
127-
.push(FutureWithWeight::new(weight, future));
128-
}
129-
Poll::Ready(None) | Poll::Pending => break,
113+
while let Poll::Ready(Some(weighted_future)) = this.stream.as_mut().poll_peek(cx) {
114+
if !this.global_weight.has_space_for(weighted_future.weight()) {
115+
// Global limits would be exceeded, break out of the loop. Consider this
116+
// item next time.
117+
break;
130118
}
119+
120+
let (weight, future) = match this.stream.as_mut().poll_next(cx) {
121+
Poll::Ready(Some(weighted_future)) => weighted_future.into_components(),
122+
_ => unreachable!("we just peeked at this item"),
123+
};
124+
this.global_weight.add_weight(weight);
125+
this.in_progress_queue
126+
.push(FutureWithWeight::new(weight, future));
131127
}
132128

133-
// Attempt to pull the next value from the in_progress_queue
129+
// Attempt to pull the next value from the in_progress_queue.
134130
match this.in_progress_queue.poll_next_unpin(cx) {
135131
Poll::Pending => return Poll::Pending,
136132
Poll::Ready(Some((weight, output))) => {
137-
*this.current_weight =
138-
this.current_weight.checked_sub(weight).unwrap_or_else(|| {
139-
panic!(
140-
"future_queue: subtracted weight {} from current {}, overflowed",
141-
weight, this.current_weight,
142-
)
143-
});
133+
this.global_weight.sub_weight(weight);
144134
return Poll::Ready(Some(output));
145135
}
146136
Poll::Ready(None) => {}
@@ -173,6 +163,9 @@ pub trait WeightedFuture: private::Sealed {
173163
/// The associated `Future` type.
174164
type Future: Future;
175165

166+
/// The weight of the future.
167+
fn weight(&self) -> usize;
168+
176169
/// Turns self into its components.
177170
fn into_components(self) -> (usize, Self::Future);
178171
}
@@ -189,6 +182,11 @@ where
189182
{
190183
type Future = Fut;
191184

185+
#[inline]
186+
fn weight(&self) -> usize {
187+
self.0
188+
}
189+
192190
#[inline]
193191
fn into_components(self) -> (usize, Self::Future) {
194192
self

0 commit comments

Comments
 (0)