Skip to content

Commit d8965f3

Browse files
committed
add the ability for futures to know about global and group slot numbers
With this change, called futures can now be assigned an integer that is unique for the lifetime of the future. The integer is always the smallest possible that could be assigned. Also expand our property-based testing to ensure uniqueness and compactness of slots.
1 parent fd4510a commit d8965f3

File tree

8 files changed

+586
-81
lines changed

8 files changed

+586
-81
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ categories = ["asynchronous"]
1313
keywords = ["stream", "futures", "async", "buffer_unordered"]
1414

1515
[dependencies]
16+
debug-ignore = "1.0.5"
1617
fnv = "1.0.7"
1718
futures-util = { version = "0.3.31", default-features = false, features = ["std"] }
1819
pin-project-lite = "0.2.15"

README.md

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@ This crate provides two adaptors on streams.
5555
The [`future_queue`](StreamExt::future_queue) adaptor can run several futures simultaneously,
5656
limiting the concurrency to a maximum *weight*.
5757

58-
Rather than taking a stream of futures, this adaptor takes a stream of `(usize, future)` pairs,
59-
where the `usize` indicates the weight of each future. This adaptor will schedule and buffer
60-
futures to be run until queueing the next future will exceed the maximum weight.
58+
Rather than taking a stream of futures, this adaptor takes a stream of
59+
`(usize, F)` pairs, where the `usize` indicates the weight of each future,
60+
and `F` is `FnOnce(FutureQueueContext) -> impl Future`. This adaptor will
61+
schedule and buffer futures to be run until queueing the next future will
62+
exceed the maximum weight.
6163

6264
* The maximum weight is never exceeded while futures are being run.
6365
* If the weight of an individual future is greater than the maximum weight, its weight will be
@@ -80,7 +82,11 @@ use future_queue::{StreamExt as _};
8082
let (send_one, recv_one) = oneshot::channel();
8183
let (send_two, recv_two) = oneshot::channel();
8284

83-
let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
85+
let stream_of_futures = stream::iter(
86+
vec![(1, recv_one), (2, recv_two)],
87+
).map(|(weight, future)| {
88+
(weight, move |_cx| future)
89+
});
8490
let mut queue = stream_of_futures.future_queue(10);
8591

8692
send_two.send("hello")?;
@@ -116,7 +122,7 @@ Like with [`future_queue`](StreamExt::future_queue):
116122

117123
```rust
118124
use futures::{channel::oneshot, stream, StreamExt as _};
119-
use future_queue::{StreamExt as _};
125+
use future_queue::{FutureQueueContext, StreamExt as _};
120126

121127
let (send_one, recv_one) = oneshot::channel();
122128
let (send_two, recv_two) = oneshot::channel();
@@ -126,7 +132,9 @@ let stream_of_futures = stream::iter(
126132
(1, Some("group1"), recv_one),
127133
(2, None, recv_two),
128134
],
129-
);
135+
).map(|(weight, group, future)| {
136+
(weight, group, move |_cx| future)
137+
});
130138
let mut queue = stream_of_futures.future_queue_grouped(10, [("group1", 5)]);
131139

132140
send_two.send("hello")?;

src/future_queue.rs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// Copyright (c) The future-queue Contributors
22
// SPDX-License-Identifier: MIT OR Apache-2.0
33

4-
use crate::{global_weight::GlobalWeight, peekable_fused::PeekableFused};
4+
use crate::{
5+
global_weight::GlobalWeight, peekable_fused::PeekableFused, slots::SlotReservations,
6+
FutureQueueContext,
7+
};
58
use futures_util::{
69
stream::{Fuse, FuturesUnordered},
710
Future, Stream, StreamExt as _,
@@ -24,6 +27,7 @@ pin_project! {
2427
#[pin]
2528
stream: PeekableFused<Fuse<St>>,
2629
in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
30+
slots: SlotReservations,
2731
global_weight: GlobalWeight,
2832
}
2933
}
@@ -37,6 +41,7 @@ where
3741
f.debug_struct("FutureQueue")
3842
.field("stream", &self.stream)
3943
.field("in_progress_queue", &self.in_progress_queue)
44+
.field("slots", &self.slots)
4045
.field("global_weight", &self.global_weight)
4146
.finish()
4247
}
@@ -51,10 +56,17 @@ where
5156
Self {
5257
stream: PeekableFused::new(stream.fuse()),
5358
in_progress_queue: FuturesUnordered::new(),
59+
slots: SlotReservations::with_capacity(max_weight),
5460
global_weight: GlobalWeight::new(max_weight),
5561
}
5662
}
5763

64+
/// Sets a mode where extra internal verifications are performed.
65+
#[doc(hidden)]
66+
pub fn set_extra_verify(&mut self, verify: bool) {
67+
self.slots.set_check_reserved(verify);
68+
}
69+
5870
/// Returns the maximum weight of futures allowed to be run by this adaptor.
5971
pub fn max_weight(&self) -> usize {
6072
self.global_weight.max()
@@ -117,20 +129,29 @@ where
117129
break;
118130
}
119131

120-
let (weight, future) = match this.stream.as_mut().poll_next(cx) {
132+
let (weight, future_fn) = match this.stream.as_mut().poll_next(cx) {
121133
Poll::Ready(Some(weighted_future)) => weighted_future.into_components(),
122134
_ => unreachable!("we just peeked at this item"),
123135
};
124136
this.global_weight.add_weight(weight);
137+
let global_slot = this.slots.reserve();
138+
139+
let cx = FutureQueueContext {
140+
global_slot,
141+
group_slot: None,
142+
};
143+
let future = future_fn(cx);
144+
125145
this.in_progress_queue
126-
.push(FutureWithWeight::new(weight, future));
146+
.push(FutureWithWeight::new(weight, global_slot, future));
127147
}
128148

129149
// Attempt to pull the next value from the in_progress_queue.
130150
match this.in_progress_queue.poll_next_unpin(cx) {
131151
Poll::Pending => return Poll::Pending,
132-
Poll::Ready(Some((weight, output))) => {
152+
Poll::Ready(Some((weight, slot, output))) => {
133153
this.global_weight.sub_weight(weight);
154+
this.slots.release(slot);
134155
return Poll::Ready(Some(output));
135156
}
136157
Poll::Ready(None) => {}
@@ -160,26 +181,36 @@ where
160181
///
161182
/// Provided in case it's necessary. This trait is only implemented for `(usize, impl Future)`.
162183
pub trait WeightedFuture: private::Sealed {
184+
/// The function to obtain the future from
185+
type F: FnOnce(FutureQueueContext) -> Self::Future;
186+
163187
/// The associated `Future` type.
164188
type Future: Future;
165189

166190
/// The weight of the future.
167191
fn weight(&self) -> usize;
168192

169193
/// Turns self into its components.
170-
fn into_components(self) -> (usize, Self::Future);
194+
fn into_components(self) -> (usize, Self::F);
171195
}
172196

173197
mod private {
174198
pub trait Sealed {}
175199
}
176200

177-
impl<Fut> private::Sealed for (usize, Fut) where Fut: Future {}
201+
impl<F, Fut> private::Sealed for (usize, F)
202+
where
203+
F: FnOnce(FutureQueueContext) -> Fut,
204+
Fut: Future,
205+
{
206+
}
178207

179-
impl<Fut> WeightedFuture for (usize, Fut)
208+
impl<F, Fut> WeightedFuture for (usize, F)
180209
where
210+
F: FnOnce(FutureQueueContext) -> Fut,
181211
Fut: Future,
182212
{
213+
type F = F;
183214
type Future = Fut;
184215

185216
#[inline]
@@ -188,7 +219,7 @@ where
188219
}
189220

190221
#[inline]
191-
fn into_components(self) -> (usize, Self::Future) {
222+
fn into_components(self) -> (usize, Self::F) {
192223
self
193224
}
194225
}
@@ -199,26 +230,31 @@ pin_project! {
199230
#[pin]
200231
future: Fut,
201232
weight: usize,
233+
slot: u64,
202234
}
203235
}
204236

205237
impl<Fut> FutureWithWeight<Fut> {
206-
pub fn new(weight: usize, future: Fut) -> Self {
207-
Self { future, weight }
238+
pub fn new(weight: usize, slot: u64, future: Fut) -> Self {
239+
Self {
240+
future,
241+
weight,
242+
slot,
243+
}
208244
}
209245
}
210246

211247
impl<Fut> Future for FutureWithWeight<Fut>
212248
where
213249
Fut: Future,
214250
{
215-
type Output = (usize, Fut::Output);
251+
type Output = (usize, u64, Fut::Output);
216252
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
217253
let this = self.project();
218254

219255
match this.future.poll(cx) {
220256
Poll::Pending => Poll::Pending,
221-
Poll::Ready(output) => Poll::Ready((*this.weight, output)),
257+
Poll::Ready(output) => Poll::Ready((*this.weight, *this.slot, output)),
222258
}
223259
}
224260
}

0 commit comments

Comments
 (0)