Skip to content

Commit f6958d9

Browse files
committed
[hyperactor] channel-level ping-pong benchmarks
Pull Request resolved: #906 This is an attempt to do an apples-to-apples comparison to P1903314366, to eliminate any non-channel related overheads. The results replicate previous findings: that our throughput is hampered by excess data copies (either outright or through growing buffers in the encoding stack), and of tokio-level network i/o overheads. Both are being addressed. These benchmarks should help to serve as validation as this work lands. ghstack-source-id: 303752471 @exported-using-ghexport Differential Revision: [D80260732](https://our.internmc.facebook.com/intern/diff/D80260732/)
1 parent 8ef0c50 commit f6958d9

File tree

4 files changed

+183
-4
lines changed

4 files changed

+183
-4
lines changed

hyperactor/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor:[channel_benchmarks,hyperactor,hyperactor-example-derive,hyperactor-example-stream]
1+
# @generated by autocargo from //monarch/hyperactor:[channel_benchmarks,hyperactor,hyperactor-example-channel,hyperactor-example-derive,hyperactor-example-stream]
22

33
[package]
44
name = "hyperactor"
@@ -13,6 +13,10 @@ license = "BSD-3-Clause"
1313
name = "channel_benchmarks"
1414
path = "benches/main.rs"
1515

16+
[[bin]]
17+
name = "hyperactor_example_channel"
18+
path = "example/channel.rs"
19+
1620
[[bin]]
1721
name = "hyperactor_example_derive"
1822
path = "example/derive.rs"
@@ -51,6 +55,7 @@ rustls-pemfile = "1.0.0"
5155
serde = { version = "1.0.219", features = ["derive", "rc"] }
5256
serde_bytes = "0.11"
5357
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
58+
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
5459
serde_with = { version = "3", features = ["hex", "json"] }
5560
serde_yaml = "0.9.25"
5661
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }

hyperactor/benches/main.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
use std::time::Duration;
1212
use std::time::Instant;
1313

14+
use bytes::Bytes;
1415
use criterion::BenchmarkId;
1516
use criterion::Criterion;
1617
use criterion::Throughput;
1718
use criterion::criterion_group;
1819
use criterion::criterion_main;
1920
use futures::future::join_all;
2021
use hyperactor::Named;
22+
use hyperactor::channel;
2123
use hyperactor::channel::ChannelAddr;
2224
use hyperactor::channel::ChannelTransport;
2325
use hyperactor::channel::Rx;
@@ -26,10 +28,18 @@ use hyperactor::channel::dial;
2628
use hyperactor::channel::serve;
2729
use serde::Deserialize;
2830
use serde::Serialize;
31+
use tokio::runtime;
2932
use tokio::runtime::Runtime;
3033
use tokio::select;
3134
use tokio::sync::oneshot;
3235

36+
fn new_runtime() -> Runtime {
37+
runtime::Builder::new_current_thread()
38+
.enable_all()
39+
.build()
40+
.unwrap()
41+
}
42+
3343
#[derive(Debug, Clone, Serialize, Deserialize, Named, PartialEq)]
3444
struct Message {
3545
id: u64,
@@ -62,7 +72,7 @@ fn bench_message_sizes(c: &mut Criterion) {
6272
group.sampling_mode(criterion::SamplingMode::Flat);
6373
group.sample_size(10);
6474
group.bench_function(BenchmarkId::from_parameter(size), move |b| {
65-
let mut b = b.to_async(Runtime::new().unwrap());
75+
let mut b = b.to_async(new_runtime());
6676
let tt = &transport;
6777
b.iter_custom(|iters| async move {
6878
let addr = ChannelAddr::any(tt.clone());
@@ -106,7 +116,7 @@ fn bench_message_rates(c: &mut Criterion) {
106116
let rate = *rate;
107117

108118
group.bench_function(format!("rate_{}_{}mps", transport_name, rate), move |b| {
109-
let mut b = b.to_async(Runtime::new().unwrap());
119+
let mut b = b.to_async(new_runtime());
110120
b.iter_custom(|iters| async move {
111121
let total_msgs = iters * rate;
112122
let addr = ChannelAddr::any(transport.clone());
@@ -169,6 +179,66 @@ fn bench_message_rates(c: &mut Criterion) {
169179
group.finish();
170180
}
171181

172-
criterion_group!(benches, bench_message_sizes, bench_message_rates);
182+
// Try to replicate https://www.internalfb.com/phabricator/paste/view/P1903314366
183+
fn bench_channel_ping_pong(c: &mut Criterion) {
184+
let transport = ChannelTransport::Unix;
185+
186+
for size in [1usize, 1_000_000usize] {
187+
let mut group = c.benchmark_group("channel_ping_pong".to_string());
188+
let transport = transport.clone();
189+
group.throughput(Throughput::Bytes((size * 2) as u64)); // send and receive
190+
group.sampling_mode(criterion::SamplingMode::Flat);
191+
group.sample_size(100);
192+
group.bench_function(BenchmarkId::from_parameter(size), move |b| {
193+
let mut b = b.to_async(new_runtime());
194+
b.iter_custom(|iters| channel_ping_pong(transport.clone(), size, iters as usize));
195+
});
196+
group.finish();
197+
}
198+
}
199+
200+
async fn channel_ping_pong(
201+
transport: ChannelTransport,
202+
message_size: usize,
203+
num_iter: usize,
204+
) -> Duration {
205+
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
206+
.await
207+
.unwrap();
208+
let (server_addr, mut server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
209+
.await
210+
.unwrap();
211+
212+
let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
213+
tokio::spawn(async move {
214+
let client_tx = channel::dial(client_addr)?;
215+
loop {
216+
let message = server_rx.recv().await?;
217+
client_tx.post(message);
218+
}
219+
});
220+
221+
let client_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
222+
tokio::spawn(async move {
223+
let server_tx = channel::dial(server_addr)?;
224+
let message = Bytes::from(vec![0u8; message_size]);
225+
for _ in 0..num_iter {
226+
server_tx.post(message.clone() /*cheap */);
227+
client_rx.recv().await?;
228+
}
229+
Ok(())
230+
});
231+
232+
let start = Instant::now();
233+
client_handle.await.unwrap();
234+
start.elapsed()
235+
}
236+
237+
criterion_group!(
238+
benches,
239+
bench_message_sizes,
240+
bench_message_rates,
241+
bench_channel_ping_pong
242+
);
173243

174244
criterion_main!(benches);

hyperactor/example/channel.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use bytes::Bytes;
10+
use hyperactor::channel;
11+
use hyperactor::channel::ChannelAddr;
12+
use hyperactor::channel::ChannelRx;
13+
use hyperactor::channel::ChannelTransport;
14+
use hyperactor::channel::Rx;
15+
use hyperactor::channel::Tx;
16+
use tokio::time::Duration;
17+
use tokio::time::Instant;
18+
19+
async fn server(
20+
mut server_rx: ChannelRx<Bytes>,
21+
client_addr: ChannelAddr,
22+
) -> Result<(), anyhow::Error> {
23+
let client_tx = channel::dial(client_addr)?;
24+
loop {
25+
let message = server_rx.recv().await?;
26+
client_tx.post(message);
27+
}
28+
}
29+
30+
// Analog of https://www.internalfb.com/phabricator/paste/view/P1903314366, using Channel APIs.
31+
// Possibly we should create separate threads for the client and server to also make the OS-level
32+
// setup equivalent.
33+
#[tokio::main(flavor = "current_thread")]
34+
async fn main() -> Result<(), anyhow::Error> {
35+
let transport = ChannelTransport::Tcp;
36+
// let transport = ChannelTransport::Local;
37+
let message_size = 1_000_000;
38+
let num_iter = 100;
39+
40+
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
41+
.await
42+
.unwrap();
43+
let (server_addr, server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
44+
.await
45+
.unwrap();
46+
47+
let _server_handle = tokio::spawn(server(server_rx, client_addr));
48+
49+
let server_tx = channel::dial(server_addr)?;
50+
let message = Bytes::from(vec![0u8; message_size]);
51+
52+
for _ in 0..10 {
53+
// Warmup
54+
let t = Instant::now();
55+
server_tx.post(message.clone() /*cheap */);
56+
client_rx.recv().await?;
57+
}
58+
59+
let mut latencies = vec![];
60+
let mut total_bytes_sent = 0usize;
61+
let mut total_bytes_received = 0usize;
62+
63+
let start = Instant::now();
64+
for _ in 0..num_iter {
65+
total_bytes_sent += message.len();
66+
let start = Instant::now();
67+
server_tx.post(message.clone() /*cheap */);
68+
total_bytes_received += client_rx.recv().await?.len();
69+
latencies.push(start.elapsed());
70+
}
71+
let elapsed = start.elapsed();
72+
73+
let avg_latency = ((latencies.iter().sum::<Duration>().as_micros() as f64) / 1000f64)
74+
/ (latencies.len() as f64);
75+
let min_latency = (latencies.iter().min().unwrap().as_micros() as f64) / 1000f64;
76+
let max_latency = (latencies.iter().max().unwrap().as_micros() as f64) / 1000f64;
77+
78+
let total_bytes_transferred = total_bytes_sent + total_bytes_received;
79+
let bandwidth_bytes_per_sec =
80+
(total_bytes_transferred as f64) / ((elapsed.as_millis() as f64) / 1000f64);
81+
let bandwidth_mbps = (bandwidth_bytes_per_sec * 8f64) / (1024f64 * 1024f64);
82+
83+
println!("Results:");
84+
println!("Average latency: {} ms", avg_latency);
85+
println!("Min latency: {} ms", min_latency);
86+
println!("Max latency: {} ms", max_latency);
87+
println!("Total iterations: {}", latencies.len());
88+
println!("Total time: {} seconds", elapsed.as_secs());
89+
println!("Bytes sent: {} bytes", total_bytes_sent);
90+
println!("Bytes received: {} bytes", total_bytes_received);
91+
println!("Total bytes transferred: {} bytes", total_bytes_transferred);
92+
println!(
93+
"Bandwidth: {} bytes/sec ({} Mbps)",
94+
bandwidth_bytes_per_sec, bandwidth_mbps
95+
);
96+
97+
Ok(())
98+
}

hyperactor/src/data.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ impl Named for std::time::Duration {
105105
}
106106
}
107107

108+
impl Named for bytes::Bytes {
109+
fn typename() -> &'static str {
110+
"bytes::Bytes"
111+
}
112+
}
113+
108114
// A macro that implements type-keyed interning of typenames. This is useful
109115
// for implementing [`Named`] for generic types.
110116
#[doc(hidden)] // not part of the public API

0 commit comments

Comments
 (0)