Skip to content

Commit 2320f51

Browse files
mariusaefacebook-github-bot
authored andcommitted
multipart channels (#948)
Summary: Pull Request resolved: #948 This change introduces multipart support to net channels in Hyperactor. It is currently configurable (disabled by default) until we fully roll it out. In addition to adding configurability, through a new key, CHANNEL_MULTIPART, this change makes two changes: First, the framer now write Buf-typed frames (requiring the framer to become generic in the type of buffer). By abstracting over the buffer type, the underlying buffer is allowed to be noncontiguous; the framer performs vectored writes. Accordingly, the multi-part message frame implements Buf, presenting each part as a chunk, which will be included in the vectored write from the net framer. Second, in order to reduce the complexity/divergence of this frame, we implement the standard bincode encoding by adding an "illegal" message. These are not valid multipart messages, but rather are a way for us to smuggle a (single) encoded blob into serde_multipart::Message (and serde_multipart::Frame), which allows us to support both encodings using the same data structures, simplifying the net channel implementation. Once we are confident in the multipart implementation, we will remove this illegal encoding, and also the configuration option to enable multipart encoding. This change will have follow-ups to: 1) Clean up naming: we now have several things called "frames" and "messages", confusingly overlapping concepts; and 2) Homogenize client-to-server and server-to-client messages: currently, client-to-server messaging uses a framed multipart encoding, while server-to-client messages (today these are used only for acks) have a different encoding. We should keep everything consistent except for the message types. ## Benchmarking Multipart encoding substantially improves the performance of the ping-pong benchmark, improving throughput by about **2.7x**. ``` # Baseline: $ buck run mode/opt :hyperactor-example-channel Average latency: 2.38006 ms Min latency: 1.854 ms Max latency: 2.789 ms Total iterations: 100 Total time: 0 seconds Bytes sent: 100000000 bytes Bytes received: 100000000 bytes Total bytes transferred: 200000000 bytes Bandwidth: 840336134.4537816 bytes/sec (6411.255908613446 Mbps) # Multipart: $ HYPERACTOR_DEFAULT_ENCODING=multipart HYPERACTOR_CHANNEL_MULTIPART=true buck run mode/opt :hyperactor-example-channel Average latency: 0.86634 ms Min latency: 0.826 ms Max latency: 0.935 ms Total iterations: 100 Total time: 0 seconds Bytes sent: 100000000 bytes Bytes received: 100000000 bytes Total bytes transferred: 200000000 bytes Bandwidth: 2325581395.3488374 bytes/sec (17742.777979651164 Mbps) ``` ghstack-source-id: 305028344 Reviewed By: shayne-fletcher, vidhyav Differential Revision: D80704703 fbshipit-source-id: 0722d5836ef9cbc08802bd1f3da5d2f8b4e14915
1 parent 47ce35f commit 2320f51

File tree

6 files changed

+277
-102
lines changed

6 files changed

+277
-102
lines changed

hyperactor/benches/main.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use hyperactor::reference::ProcId;
3434
use hyperactor::reference::WorldId;
3535
use serde::Deserialize;
3636
use serde::Serialize;
37+
use serde_multipart::Part;
3738
use tokio::runtime;
3839
use tokio::runtime::Runtime;
3940
use tokio::select;
@@ -208,12 +209,17 @@ async fn channel_ping_pong(
208209
message_size: usize,
209210
num_iter: usize,
210211
) -> Duration {
211-
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
212-
.await
213-
.unwrap();
214-
let (server_addr, mut server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
215-
.await
216-
.unwrap();
212+
#[derive(Clone, Debug, Named, Serialize, Deserialize)]
213+
struct Message(Part);
214+
215+
let (client_addr, mut client_rx) =
216+
channel::serve::<Message>(ChannelAddr::any(transport.clone()))
217+
.await
218+
.unwrap();
219+
let (server_addr, mut server_rx) =
220+
channel::serve::<Message>(ChannelAddr::any(transport.clone()))
221+
.await
222+
.unwrap();
217223

218224
let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
219225
tokio::spawn(async move {
@@ -227,7 +233,7 @@ async fn channel_ping_pong(
227233
let client_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
228234
tokio::spawn(async move {
229235
let server_tx = channel::dial(server_addr)?;
230-
let message = Bytes::from(vec![0u8; message_size]);
236+
let message = Message(Part::from(vec![0u8; message_size]));
231237
for _ in 0..num_iter {
232238
server_tx.post(message.clone() /*cheap */);
233239
client_rx.recv().await?;

hyperactor/example/channel.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,23 @@
77
*/
88

99
use bytes::Bytes;
10+
use hyperactor::Named;
1011
use hyperactor::channel;
1112
use hyperactor::channel::ChannelAddr;
1213
use hyperactor::channel::ChannelRx;
1314
use hyperactor::channel::ChannelTransport;
1415
use hyperactor::channel::Rx;
1516
use hyperactor::channel::Tx;
17+
use serde::Deserialize;
18+
use serde::Serialize;
1619
use tokio::time::Duration;
1720
use tokio::time::Instant;
1821

22+
#[derive(Clone, Debug, Named, Serialize, Deserialize)]
23+
struct Message(serde_multipart::Part);
24+
1925
async fn server(
20-
mut server_rx: ChannelRx<Bytes>,
26+
mut server_rx: ChannelRx<Message>,
2127
client_addr: ChannelAddr,
2228
) -> Result<(), anyhow::Error> {
2329
let client_tx = channel::dial(client_addr)?;
@@ -35,19 +41,21 @@ async fn main() -> Result<(), anyhow::Error> {
3541
let transport = ChannelTransport::Tcp;
3642
// let transport = ChannelTransport::Local;
3743
let message_size = 1_000_000;
38-
let num_iter = 100;
44+
let num_iter = 1000;
3945

40-
let (client_addr, mut client_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
41-
.await
42-
.unwrap();
43-
let (server_addr, server_rx) = channel::serve::<Bytes>(ChannelAddr::any(transport.clone()))
46+
let (client_addr, mut client_rx) =
47+
channel::serve::<Message>(ChannelAddr::any(transport.clone()))
48+
.await
49+
.unwrap();
50+
let (server_addr, server_rx) = channel::serve::<Message>(ChannelAddr::any(transport.clone()))
4451
.await
4552
.unwrap();
4653

4754
let _server_handle = tokio::spawn(server(server_rx, client_addr));
4855

4956
let server_tx = channel::dial(server_addr)?;
50-
let message = Bytes::from(vec![0u8; message_size]);
57+
58+
let message = Message(serde_multipart::Part::from(vec![0u8; message_size]));
5159

5260
for _ in 0..10 {
5361
// Warmup
@@ -62,10 +70,10 @@ async fn main() -> Result<(), anyhow::Error> {
6270

6371
let start = Instant::now();
6472
for _ in 0..num_iter {
65-
total_bytes_sent += message.len();
73+
total_bytes_sent += message.0.len();
6674
let start = Instant::now();
6775
server_tx.post(message.clone() /*cheap */);
68-
total_bytes_received += client_rx.recv().await?.len();
76+
total_bytes_received += client_rx.recv().await?.0.len();
6977
latencies.push(start.elapsed());
7078
}
7179
let elapsed = start.elapsed();

0 commit comments

Comments
 (0)