Skip to content

Commit d960f75

Browse files
committed
add new benchmark for FutureGroup
this benchmark aims to measure latency between futures becoming ready and the stream producing them.
1 parent ee74871 commit d960f75

File tree

2 files changed

+121
-3
lines changed

2 files changed

+121
-3
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ criterion = { version = "0.3", features = [
4949
] }
5050
futures = "0.3.25"
5151
futures-time = "3.0.0"
52+
itertools = "0.12.1"
5253
lending-stream = "1.0.0"
5354
rand = "0.8.5"
5455
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }

benches/bench.rs

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,30 @@ mod stream_group {
6868
}
6969
}
7070
mod future_group {
71+
use std::fmt::{Debug, Display};
72+
use std::time::{Duration, Instant};
73+
7174
use criterion::async_executor::FuturesExecutor;
7275
use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion};
76+
use futures::channel::oneshot;
77+
use futures::never::Never;
7378
use futures::stream::FuturesUnordered;
7479
use futures_concurrency::future::FutureGroup;
80+
use futures_lite::future::yield_now;
7581
use futures_lite::prelude::*;
82+
use itertools::Itertools;
83+
use rand::{seq::SliceRandom, SeedableRng};
7684

7785
use crate::utils::{make_future_group, make_futures_unordered};
7886
criterion_group! {
7987
name = future_group_benches;
8088
// This can be any expression that returns a `Criterion` object.
8189
config = Criterion::default();
82-
targets = future_group_bench
90+
targets = future_group_throughput_bench, future_group_latency_bench
8391
}
8492

85-
fn future_group_bench(c: &mut Criterion) {
86-
let mut group = c.benchmark_group("future_group");
93+
fn future_group_throughput_bench(c: &mut Criterion) {
94+
let mut group = c.benchmark_group("future_group_poll_throughput");
8795
for i in [10, 100, 1000].iter() {
8896
group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| {
8997
let setup = || make_future_group(*i);
@@ -116,6 +124,115 @@ mod future_group {
116124
}
117125
group.finish();
118126
}
127+
128+
/// This benchmark measures the latency between when futures become ready
129+
/// and when their outputs appear on the [`FutureGroup`] stream.
130+
///
131+
/// To test this, we:
132+
/// - insert N pending futures to the [`FutureGroup`].
133+
/// - until the [`FutureGroup`] is empty, we set some fraction of the
134+
/// pending futures to ready, then record how long it takes for their
135+
/// outputs to be produced from [`FutureGroup`]'s `Stream` impl.
136+
/// - we sum the recorded durations for each of these rounds.
137+
fn future_group_latency_bench(c: &mut Criterion) {
138+
#[derive(Debug, Clone, Copy)]
139+
struct Params {
140+
init_size: usize,
141+
pct_ready_per_round: f64,
142+
}
143+
144+
impl Display for Params {
145+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146+
Debug::fmt(&self, f)
147+
}
148+
}
149+
150+
async fn routine<G>(
151+
iters: u64,
152+
Params {
153+
init_size,
154+
pct_ready_per_round,
155+
}: Params,
156+
) -> Duration
157+
where
158+
G: Default + Stream + Unpin + Extend<oneshot::Receiver<Never>>,
159+
{
160+
let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize;
161+
162+
let mut total_runtime = Duration::ZERO;
163+
164+
for _ in 0..iters {
165+
// construct a set of oneshot::Receiver futures. These become
166+
// ready once their Sender ends are dropped.
167+
let (mut senders, mut group) = (0..init_size)
168+
.map(|_| oneshot::channel())
169+
.unzip::<_, _, Vec<_>, G>();
170+
171+
// shuffle our senders so from the FutureGroup's perspective,
172+
// futures become ready in arbitrary order.
173+
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
174+
senders.shuffle(&mut rng);
175+
176+
// poll once to set up all the wakers
177+
assert!(futures::poll!(group.next()).is_pending());
178+
179+
while !senders.is_empty() {
180+
let num_completing = ready_per_round.min(senders.len());
181+
182+
// drop some Senders. The corresponding Receiver futures
183+
// will become ready in the FutureGroup
184+
assert_eq!(senders.drain(..num_completing).count(), num_completing);
185+
186+
// this isn't necessary now, but if we were using the tokio
187+
// runtime/oneshots, coming up for air regularly prevents
188+
// the budget system from inserting false pendings. this
189+
// also more closely emulates what would happen in a real
190+
// system (sender task yields to let executor poll receiver
191+
// task). though that shouldn't make a difference.
192+
yield_now().await;
193+
194+
// measure the time it takes for all newly ready futures to
195+
// be produced from the FutureGroup stream.
196+
let recv_start = Instant::now();
197+
assert_eq!(
198+
(&mut group).take(num_completing).count().await,
199+
num_completing
200+
);
201+
total_runtime += recv_start.elapsed();
202+
}
203+
}
204+
205+
total_runtime
206+
}
207+
208+
let mut group = c.benchmark_group("future_group_poll_latency");
209+
for params in [10, 100, 1000]
210+
.into_iter()
211+
.cartesian_product([0.0001, 0.2, 1.0])
212+
.map(|(init_size, pct_ready_per_round)| Params {
213+
init_size,
214+
pct_ready_per_round,
215+
})
216+
{
217+
group.bench_with_input(
218+
BenchmarkId::new("FutureGroup", params),
219+
&params,
220+
|b, &params| {
221+
b.to_async(FuturesExecutor)
222+
.iter_custom(|iters| routine::<FutureGroup<_>>(iters, params))
223+
},
224+
);
225+
group.bench_with_input(
226+
BenchmarkId::new("FuturesUnordered", params),
227+
&params,
228+
|b, &params| {
229+
b.to_async(FuturesExecutor)
230+
.iter_custom(|iters| routine::<FuturesUnordered<_>>(iters, params))
231+
},
232+
);
233+
}
234+
group.finish();
235+
}
119236
}
120237

121238
mod merge {

0 commit comments

Comments
 (0)