Skip to content

Commit 3105c56

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Actor throuput regression test (#749)
Summary: Pull Request resolved: #749 Measures how long it takes to cast and reply to 1Kb to N hosts with 8 actors via local transport Added artificial processing time of 100ms ``` trainer_mesh .cast( all(true_()), StepMessage { step: i as usize, reply: tx.bind(), payload, }, ) .unwrap(); let mut msg_rcv = 0; while msg_rcv < actor_count { let _ = rx.recv().await.unwrap(); msg_rcv += 1; } ``` Reviewed By: moonli Differential Revision: D79406183 fbshipit-source-id: dad719a0fd2e4288341c0fd8f6bbea607cd123ac
1 parent 3768014 commit 3105c56

File tree

3 files changed

+158
-1
lines changed

3 files changed

+158
-1
lines changed

hyperactor_mesh/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap,hyperactor_mesh_test_remote_process_alloc,hyperactor_mesh_test_remote_process_allocator,process_allocator_cleanup,process_allocator_test_bin,process_allocator_test_bootstrap]
1+
# @generated by autocargo from //monarch/hyperactor_mesh:[benchmarks,hyperactor_mesh,hyperactor_mesh_test_bootstrap,hyperactor_mesh_test_remote_process_alloc,hyperactor_mesh_test_remote_process_allocator,process_allocator_cleanup,process_allocator_test_bin,process_allocator_test_bootstrap]
22

33
[package]
44
name = "hyperactor_mesh"
@@ -7,6 +7,10 @@ authors = ["Meta"]
77
edition = "2021"
88
license = "BSD-3-Clause"
99

10+
[[bin]]
11+
name = "benchmarks"
12+
path = "benches/main.rs"
13+
1014
[[bin]]
1115
name = "hyperactor_mesh_test_bootstrap"
1216
path = "test/bootstrap.rs"
@@ -39,6 +43,7 @@ bitmaps = "3.2.1"
3943
buck-resources = "1"
4044
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
4145
clap = { version = "4.5.41", features = ["derive", "env", "string", "unicode", "wrap_help"] }
46+
criterion = { version = "0.5.1", features = ["async_tokio", "csv_output"] }
4247
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
4348
enum-as-inner = "0.6.0"
4449
erased-serde = "0.3.27"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::time::Duration;
10+
11+
use anyhow::Result;
12+
use async_trait::async_trait;
13+
use hyperactor::Actor;
14+
use hyperactor::Bind;
15+
use hyperactor::Context;
16+
use hyperactor::Handler;
17+
use hyperactor::Named;
18+
use hyperactor::PortRef;
19+
use hyperactor::Unbind;
20+
use hyperactor::clock::Clock;
21+
use serde::Deserialize;
22+
use serde::Serialize;
23+
24+
#[derive(Debug, Clone, Serialize, Deserialize, Named, Bind, Unbind)]
25+
pub struct BenchMessage {
26+
pub step: usize,
27+
pub reply: PortRef<usize>,
28+
#[serde(with = "serde_bytes")]
29+
pub payload: Vec<u8>,
30+
}
31+
32+
#[derive(Debug)]
33+
#[hyperactor::export(
34+
spawn = true,
35+
handlers = [
36+
BenchMessage { cast = true },
37+
],
38+
)]
39+
pub struct BenchActor {}
40+
41+
#[async_trait]
42+
impl Actor for BenchActor {
43+
type Params = ();
44+
45+
async fn new(_: Self::Params) -> Result<Self, anyhow::Error> {
46+
Ok(Self {})
47+
}
48+
}
49+
50+
#[async_trait]
51+
impl Handler<BenchMessage> for BenchActor {
52+
async fn handle(
53+
&mut self,
54+
ctx: &Context<Self>,
55+
msg: BenchMessage,
56+
) -> Result<(), anyhow::Error> {
57+
hyperactor::clock::ClockKind::default()
58+
.sleep(Duration::from_millis(100))
59+
.await;
60+
61+
let _ = msg.reply.send(ctx, msg.step);
62+
Ok(())
63+
}
64+
}

hyperactor_mesh/benches/main.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::time::Instant;
10+
11+
use criterion::BenchmarkId;
12+
use criterion::Criterion;
13+
use criterion::criterion_group;
14+
use criterion::criterion_main;
15+
use hyperactor_mesh::ProcMesh;
16+
use hyperactor_mesh::actor_mesh::ActorMesh;
17+
use hyperactor_mesh::actor_mesh::RootActorMesh;
18+
use hyperactor_mesh::alloc::AllocSpec;
19+
use hyperactor_mesh::alloc::Allocator;
20+
use hyperactor_mesh::alloc::LocalAllocator;
21+
use hyperactor_mesh::selection::dsl::all;
22+
use hyperactor_mesh::selection::dsl::true_;
23+
use hyperactor_mesh::shape;
24+
25+
mod bench_actor;
26+
use bench_actor::BenchActor;
27+
use bench_actor::BenchMessage;
28+
use tokio::runtime::Runtime;
29+
30+
// Benchmark how long does it take to process 1KB message on 1, 10, 100, 1K hosts with 8 GPUs each
31+
fn bench_actor_scaling(c: &mut Criterion) {
32+
let mut group = c.benchmark_group("actor_scaling");
33+
let host_counts = vec![1, 10, 100, 1000];
34+
let message_size = 1024; // Fixed message size (1KB)
35+
group.sample_size(10);
36+
group.sampling_mode(criterion::SamplingMode::Flat);
37+
38+
for host_count in host_counts {
39+
group.bench_function(BenchmarkId::from_parameter(host_count), |b| {
40+
let mut b = b.to_async(Runtime::new().unwrap());
41+
b.iter_custom(|iters| async move {
42+
let shape = shape! { hosts=host_count, gpus=8 };
43+
let alloc = LocalAllocator
44+
.allocate(AllocSpec {
45+
shape: shape.clone(),
46+
constraints: Default::default(),
47+
})
48+
.await
49+
.unwrap();
50+
51+
let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
52+
let trainer_mesh: RootActorMesh<BenchActor> =
53+
proc_mesh.spawn("trainer", &()).await.unwrap();
54+
let client = proc_mesh.client();
55+
56+
let start = Instant::now();
57+
for i in 0..iters {
58+
let (tx, mut rx) = client.open_port();
59+
let payload = vec![0u8; message_size];
60+
61+
trainer_mesh
62+
.cast(
63+
all(true_()),
64+
BenchMessage {
65+
step: i as usize,
66+
reply: tx.bind(),
67+
payload,
68+
},
69+
)
70+
.unwrap();
71+
72+
let mut msg_rcv = 0;
73+
while msg_rcv < host_count {
74+
let _ = rx.recv().await.unwrap();
75+
msg_rcv += 1;
76+
}
77+
}
78+
79+
start.elapsed()
80+
});
81+
});
82+
}
83+
84+
group.finish();
85+
}
86+
87+
criterion_group!(benches, bench_actor_scaling);
88+
criterion_main!(benches);

0 commit comments

Comments
 (0)