diff --git a/hyperactor_mesh/benches/main.rs b/hyperactor_mesh/benches/main.rs index d7ea66e01..ffd19986a 100644 --- a/hyperactor_mesh/benches/main.rs +++ b/hyperactor_mesh/benches/main.rs @@ -74,8 +74,13 @@ fn bench_actor_scaling(c: &mut Criterion) { .unwrap(); let mut msg_rcv = 0; - while msg_rcv < host_count { - let _ = rx.recv().await.unwrap(); + while msg_rcv < host_count * 8 { + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(Duration::from_secs(10)) => { + panic!("Timed out. Expected {} messages but got {}", host_count, msg_rcv); + } + } msg_rcv += 1; } } @@ -89,7 +94,7 @@ fn bench_actor_scaling(c: &mut Criterion) { .await .expect("Failed to stop allocator"); elapsed - }); + }) }); } @@ -154,6 +159,7 @@ fn bench_actor_mesh_message_sizes(c: &mut Criterion) { let payload = vec![0u8; message_size]; actor_mesh + .cast( client, all(true_()), @@ -167,7 +173,12 @@ fn bench_actor_mesh_message_sizes(c: &mut Criterion) { let mut msg_rcv = 0; while msg_rcv < actor_count { - let _ = rx.recv().await.unwrap(); + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(Duration::from_secs(10)) => { + panic!("Timed out. Expected {} messages but got {}", actor_count, msg_rcv); + } + } msg_rcv += 1; } }