Skip to content

Commit 2f37daf

Browse files
committed
[hyperactor] channel-level ping-pong benchmarks
This is an attempt to do an apples-to-apples comparison to P1903314366, to eliminate any non-channel related overheads. The results replicate previous findings: that our throughput is hampered by excess data copies (either outright or through growing buffers in the encoding stack), and of tokio-level network i/o overheads. Both are being addressed. These benchmarks should help to serve as validation as this work lands. Differential Revision: [D80260732](https://our.internmc.facebook.com/intern/diff/D80260732/) ghstack-source-id: 303699220 Pull Request resolved: #906
1 parent 3381a87 commit 2f37daf

File tree

4 files changed

+182
-4
lines changed

4 files changed

+182
-4
lines changed

hyperactor/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor:[channel_benchmarks,hyperactor,hyperactor-example-derive,hyperactor-example-stream]
1+
# @generated by autocargo from //monarch/hyperactor:[channel_benchmarks,hyperactor,hyperactor-example-channel,hyperactor-example-derive,hyperactor-example-stream]
22

33
[package]
44
name = "hyperactor"
@@ -13,6 +13,10 @@ license = "BSD-3-Clause"
1313
name = "channel_benchmarks"
1414
path = "benches/main.rs"
1515

16+
[[bin]]
17+
name = "hyperactor_example_channel"
18+
path = "example/channel.rs"
19+
1620
[[bin]]
1721
name = "hyperactor_example_derive"
1822
path = "example/derive.rs"
@@ -51,6 +55,7 @@ rustls-pemfile = "1.0.0"
5155
serde = { version = "1.0.219", features = ["derive", "rc"] }
5256
serde_bytes = "0.11"
5357
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
58+
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
5459
serde_with = { version = "3", features = ["hex", "json"] }
5560
serde_yaml = "0.9.25"
5661
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }

hyperactor/benches/main.rs

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
use std::time::Duration;
1212
use std::time::Instant;
1313

14+
use bytes::Bytes;
1415
use criterion::BenchmarkId;
1516
use criterion::Criterion;
1617
use criterion::Throughput;
1718
use criterion::criterion_group;
1819
use criterion::criterion_main;
1920
use futures::future::join_all;
2021
use hyperactor::Named;
22+
use hyperactor::channel;
2123
use hyperactor::channel::ChannelAddr;
2224
use hyperactor::channel::ChannelTransport;
2325
use hyperactor::channel::Rx;
@@ -26,10 +28,18 @@ use hyperactor::channel::dial;
2628
use hyperactor::channel::serve;
2729
use serde::Deserialize;
2830
use serde::Serialize;
31+
use tokio::runtime;
2932
use tokio::runtime::Runtime;
3033
use tokio::select;
3134
use tokio::sync::oneshot;
3235

36+
fn new_runtime() -> Runtime {
37+
runtime::Builder::new_current_thread()
38+
.enable_all()
39+
.build()
40+
.unwrap()
41+
}
42+
3343
#[derive(Debug, Clone, Serialize, Deserialize, Named, PartialEq)]
3444
struct Message {
3545
id: u64,
@@ -62,7 +72,7 @@ fn bench_message_sizes(c: &mut Criterion) {
6272
group.sampling_mode(criterion::SamplingMode::Flat);
6373
group.sample_size(10);
6474
group.bench_function(BenchmarkId::from_parameter(size), move |b| {
65-
let mut b = b.to_async(Runtime::new().unwrap());
75+
let mut b = b.to_async(new_runtime());
6676
let tt = &transport;
6777
b.iter_custom(|iters| async move {
6878
let addr = ChannelAddr::any(tt.clone());
@@ -106,7 +116,7 @@ fn bench_message_rates(c: &mut Criterion) {
106116
let rate = *rate;
107117

108118
group.bench_function(format!("rate_{}_{}mps", transport_name, rate), move |b| {
109-
let mut b = b.to_async(Runtime::new().unwrap());
119+
let mut b = b.to_async(new_runtime());
110120
b.iter_custom(|iters| async move {
111121
let total_msgs = iters * rate;
112122
let addr = ChannelAddr::any(transport.clone());
@@ -169,6 +179,73 @@ fn bench_message_rates(c: &mut Criterion) {
169179
group.finish();
170180
}
171181

172-
criterion_group!(benches, bench_message_sizes, bench_message_rates);
182+
// Try to replicate https://www.internalfb.com/phabricator/paste/view/P1903314366
183+
fn bench_channel_ping_pong(c: &mut Criterion) {
184+
let transport = ChannelTransport::Unix;
185+
186+
for size in [1usize, 1_000_000usize] {
187+
let mut group = c.benchmark_group("channel_ping_pong".to_string());
188+
let transport = transport.clone();
189+
group.throughput(Throughput::Bytes((size * 2) as u64)); // send and receive
190+
group.sampling_mode(criterion::SamplingMode::Flat);
191+
group.sample_size(100);
192+
group.bench_function(BenchmarkId::from_parameter(size), move |b| {
193+
let mut b = b.to_async(new_runtime());
194+
b.iter_custom(|iters| channel_ping_pong(transport.clone(), size, iters as usize));
195+
});
196+
group.finish();
197+
}
198+
}
199+
200+
async fn channel_ping_pong(
201+
transport: ChannelTransport,
202+
message_size: usize,
203+
num_iter: usize,
204+
) -> Duration {
205+
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
206+
.await
207+
.unwrap();
208+
let (server_addr, mut server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
209+
.await
210+
.unwrap();
211+
212+
let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
213+
tokio::spawn(async move {
214+
let client_tx = channel::dial(client_addr)?;
215+
loop {
216+
let message = server_rx.recv().await?;
217+
client_tx.post(message);
218+
}
219+
});
220+
221+
let client_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
222+
tokio::spawn(async move {
223+
let server_tx = channel::dial(server_addr)?;
224+
let message = Bytes::from(vec![0u8; message_size]);
225+
if false && num_iter > 1 {
226+
loop {
227+
server_tx.post(message.clone() /*cheap */);
228+
client_rx.recv().await?;
229+
}
230+
} else {
231+
for _ in 0..num_iter {
232+
server_tx.post(message.clone() /*cheap */);
233+
client_rx.recv().await?;
234+
}
235+
}
236+
Ok(())
237+
});
238+
239+
let start = Instant::now();
240+
client_handle.await.unwrap();
241+
start.elapsed()
242+
}
243+
244+
criterion_group!(
245+
benches,
246+
bench_message_sizes,
247+
bench_message_rates,
248+
bench_channel_ping_pong
249+
);
173250

174251
criterion_main!(benches);

hyperactor/example/channel.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use bytes::Bytes;
2+
use hyperactor::channel;
3+
use hyperactor::channel::ChannelAddr;
4+
use hyperactor::channel::ChannelRx;
5+
use hyperactor::channel::ChannelTransport;
6+
use hyperactor::channel::Rx;
7+
use hyperactor::channel::Tx;
8+
use tokio::time::Duration;
9+
use tokio::time::Instant;
10+
11+
async fn server(
12+
mut server_rx: ChannelRx<Bytes>,
13+
client_addr: ChannelAddr,
14+
) -> Result<(), anyhow::Error> {
15+
let client_tx = channel::dial(client_addr)?;
16+
loop {
17+
let message = server_rx.recv().await?;
18+
client_tx.post(message);
19+
}
20+
}
21+
22+
// Analog of https://www.internalfb.com/phabricator/paste/view/P1903314366, using Channel APIs.
23+
// Possibly we should create separate threads for the client and server to also make the OS-level
24+
// setup equivalent.
25+
#[tokio::main(flavor = "current_thread")]
26+
async fn main() -> Result<(), anyhow::Error> {
27+
let transport = ChannelTransport::Tcp;
28+
// let transport = ChannelTransport::Local;
29+
let message_size = 1_000_000;
30+
let num_iter = 100;
31+
32+
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
33+
.await
34+
.unwrap();
35+
let (server_addr, server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
36+
.await
37+
.unwrap();
38+
39+
let _server_handle = tokio::spawn(server(server_rx, client_addr));
40+
41+
let server_tx = channel::dial(server_addr)?;
42+
let message = Bytes::from(vec![0u8; message_size]);
43+
44+
for _ in 0..10 {
45+
// Warmup
46+
let t = Instant::now();
47+
server_tx.post(message.clone() /*cheap */);
48+
client_rx.recv().await?;
49+
}
50+
51+
let mut latencies = vec![];
52+
let mut total_bytes_sent = 0usize;
53+
let mut total_bytes_received = 0usize;
54+
55+
let start = Instant::now();
56+
for _ in 0..num_iter {
57+
total_bytes_sent += message.len();
58+
let start = Instant::now();
59+
server_tx.post(message.clone() /*cheap */);
60+
total_bytes_received += client_rx.recv().await?.len();
61+
latencies.push(start.elapsed());
62+
}
63+
let elapsed = start.elapsed();
64+
65+
let avg_latency = ((latencies.iter().sum::<Duration>().as_micros() as f64) / 1000f64)
66+
/ (latencies.len() as f64);
67+
let min_latency = (latencies.iter().min().unwrap().as_micros() as f64) / 1000f64;
68+
let max_latency = (latencies.iter().max().unwrap().as_micros() as f64) / 1000f64;
69+
70+
let total_bytes_transferred = total_bytes_sent + total_bytes_received;
71+
let bandwidth_bytes_per_sec =
72+
(total_bytes_transferred as f64) / ((elapsed.as_millis() as f64) / 1000f64);
73+
let bandwidth_mbps = (bandwidth_bytes_per_sec * 8f64) / (1024f64 * 1024f64);
74+
75+
println!("Results:");
76+
println!("Average latency: {} ms", avg_latency);
77+
println!("Min latency: {} ms", min_latency);
78+
println!("Max latency: {} ms", max_latency);
79+
println!("Total iterations: {}", latencies.len());
80+
println!("Total time: {} seconds", elapsed.as_secs());
81+
println!("Bytes sent: {} bytes", total_bytes_sent);
82+
println!("Bytes received: {} bytes", total_bytes_received);
83+
println!("Total bytes transferred: {} bytes", total_bytes_transferred);
84+
println!(
85+
"Bandwidth: {} bytes/sec ({} Mbps)",
86+
bandwidth_bytes_per_sec, bandwidth_mbps
87+
);
88+
89+
Ok(())
90+
}

hyperactor/src/data.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ impl Named for std::time::Duration {
105105
}
106106
}
107107

108+
impl Named for bytes::Bytes {
109+
fn typename() -> &'static str {
110+
"bytes::Bytes"
111+
}
112+
}
113+
108114
// A macro that implements type-keyed interning of typenames. This is useful
109115
// for implementing [`Named`] for generic types.
110116
#[doc(hidden)] // not part of the public API

0 commit comments

Comments
 (0)