Skip to content

Commit 97062ce

Browse files
committed
move BufferUnorderedWeighted into its own file
Going to introduce a variant of this to the file. Move it to `future_queue.rs` because we're going to rename this crate `future-queue`.
1 parent 9cf3bb8 commit 97062ce

File tree

2 files changed

+231
-221
lines changed

2 files changed

+231
-221
lines changed

src/future_queue.rs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Copyright (c) The buffer-unordered-weighted Contributors
2+
// SPDX-License-Identifier: MIT OR Apache-2.0
3+
4+
use futures_util::{
5+
stream::{Fuse, FuturesUnordered},
6+
Future, Stream, StreamExt as _,
7+
};
8+
use pin_project_lite::pin_project;
9+
use private::WeightedFuture;
10+
use std::{
11+
fmt,
12+
pin::Pin,
13+
task::{Context, Poll},
14+
};
15+
16+
pin_project! {
17+
/// Stream for the [`buffer_unordered_weighted`](StreamExt::buffer_unordered_weighted) method.
18+
#[must_use = "streams do nothing unless polled"]
19+
pub struct BufferUnorderedWeighted<St>
20+
where
21+
St: Stream,
22+
St::Item: WeightedFuture,
23+
{
24+
#[pin]
25+
stream: Fuse<St>,
26+
in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
27+
max_weight: usize,
28+
current_weight: usize,
29+
}
30+
}
31+
32+
impl<St> fmt::Debug for BufferUnorderedWeighted<St>
33+
where
34+
St: Stream + fmt::Debug,
35+
St::Item: WeightedFuture,
36+
{
37+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38+
f.debug_struct("BufferUnorderedWeighted")
39+
.field("stream", &self.stream)
40+
.field("in_progress_queue", &self.in_progress_queue)
41+
.field("max_weight", &self.max_weight)
42+
.field("current_weight", &self.current_weight)
43+
.finish()
44+
}
45+
}
46+
47+
impl<St> BufferUnorderedWeighted<St>
48+
where
49+
St: Stream,
50+
St::Item: WeightedFuture,
51+
{
52+
pub(crate) fn new(stream: St, max_weight: usize) -> Self {
53+
Self {
54+
stream: stream.fuse(),
55+
in_progress_queue: FuturesUnordered::new(),
56+
max_weight,
57+
current_weight: 0,
58+
}
59+
}
60+
61+
/// Returns the maximum weight of futures allowed to be run by this adaptor.
62+
pub fn max_weight(&self) -> usize {
63+
self.max_weight
64+
}
65+
66+
/// Returns the currently running weight of futures.
67+
pub fn current_weight(&self) -> usize {
68+
self.current_weight
69+
}
70+
71+
/// Acquires a reference to the underlying sink or stream that this combinator is
72+
/// pulling from.
73+
pub fn get_ref(&self) -> &St {
74+
self.stream.get_ref()
75+
}
76+
77+
/// Acquires a mutable reference to the underlying sink or stream that this
78+
/// combinator is pulling from.
79+
///
80+
/// Note that care must be taken to avoid tampering with the state of the
81+
/// sink or stream which may otherwise confuse this combinator.
82+
pub fn get_mut(&mut self) -> &mut St {
83+
self.stream.get_mut()
84+
}
85+
86+
/// Acquires a pinned mutable reference to the underlying sink or stream that this
87+
/// combinator is pulling from.
88+
///
89+
/// Note that care must be taken to avoid tampering with the state of the
90+
/// sink or stream which may otherwise confuse this combinator.
91+
pub fn get_pin_mut(self: Pin<&mut Self>) -> core::pin::Pin<&mut St> {
92+
self.project().stream.get_pin_mut()
93+
}
94+
95+
/// Consumes this combinator, returning the underlying sink or stream.
96+
///
97+
/// Note that this may discard intermediate state of this combinator, so
98+
/// care should be taken to avoid losing resources when this is called.
99+
pub fn into_inner(self) -> St {
100+
self.stream.into_inner()
101+
}
102+
}
103+
104+
impl<St> Stream for BufferUnorderedWeighted<St>
105+
where
106+
St: Stream,
107+
St::Item: WeightedFuture,
108+
{
109+
type Item = <<St::Item as WeightedFuture>::Future as Future>::Output;
110+
111+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112+
let mut this = self.project();
113+
114+
// First up, try to spawn off as many futures as possible by filling up
115+
// our queue of futures.
116+
while *this.current_weight < *this.max_weight {
117+
match this.stream.as_mut().poll_next(cx) {
118+
Poll::Ready(Some(weighted_future)) => {
119+
let (weight, future) = weighted_future.into_components();
120+
*this.current_weight =
121+
this.current_weight.checked_add(weight).unwrap_or_else(|| {
122+
panic!(
123+
"buffer_unordered_weighted: added weight {} to current {}, overflowed",
124+
weight,
125+
this.current_weight,
126+
)
127+
});
128+
this.in_progress_queue
129+
.push(FutureWithWeight::new(weight, future));
130+
}
131+
Poll::Ready(None) | Poll::Pending => break,
132+
}
133+
}
134+
135+
// Attempt to pull the next value from the in_progress_queue
136+
match this.in_progress_queue.poll_next_unpin(cx) {
137+
Poll::Pending => return Poll::Pending,
138+
Poll::Ready(Some((weight, output))) => {
139+
*this.current_weight = this.current_weight.checked_sub(weight).unwrap_or_else(|| {
140+
panic!(
141+
"buffer_unordered_weighted: subtracted weight {} from current {}, overflowed",
142+
weight,
143+
this.current_weight,
144+
)
145+
});
146+
return Poll::Ready(Some(output));
147+
}
148+
Poll::Ready(None) => {}
149+
}
150+
151+
// If more values are still coming from the stream, we're not done yet
152+
if this.stream.is_done() {
153+
Poll::Ready(None)
154+
} else {
155+
Poll::Pending
156+
}
157+
}
158+
159+
fn size_hint(&self) -> (usize, Option<usize>) {
160+
let queue_len = self.in_progress_queue.len();
161+
let (lower, upper) = self.stream.size_hint();
162+
let lower = lower.saturating_add(queue_len);
163+
let upper = match upper {
164+
Some(x) => x.checked_add(queue_len),
165+
None => None,
166+
};
167+
(lower, upper)
168+
}
169+
}
170+
171+
mod private {
172+
use futures_util::Future;
173+
174+
pub trait WeightedFuture {
175+
type Future: Future;
176+
177+
fn into_components(self) -> (usize, Self::Future);
178+
}
179+
180+
impl<Fut> WeightedFuture for (usize, Fut)
181+
where
182+
Fut: Future,
183+
{
184+
type Future = Fut;
185+
186+
#[inline]
187+
fn into_components(self) -> (usize, Self::Future) {
188+
self
189+
}
190+
}
191+
}
192+
193+
pin_project! {
194+
#[must_use = "futures do nothing unless polled"]
195+
struct FutureWithWeight<Fut> {
196+
#[pin]
197+
future: Fut,
198+
weight: usize,
199+
}
200+
}
201+
202+
impl<Fut> FutureWithWeight<Fut> {
203+
pub fn new(weight: usize, future: Fut) -> Self {
204+
Self { future, weight }
205+
}
206+
}
207+
208+
impl<Fut> Future for FutureWithWeight<Fut>
209+
where
210+
Fut: Future,
211+
{
212+
type Output = (usize, Fut::Output);
213+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214+
let this = self.project();
215+
216+
match this.future.poll(cx) {
217+
Poll::Pending => Poll::Pending,
218+
Poll::Ready(output) => Poll::Ready((*this.weight, output)),
219+
}
220+
}
221+
}

0 commit comments

Comments
 (0)