Skip to content

Commit 6d5a1a8

Browse files
port v0 multi-process test to v1 (#1976)
Summary: port undeliverable-message coverage from hyperactor_multiprocess into v1 mesh in preparation for retiring hyperactor_multiprocess. this moves the undeliverable-message test from `hyperactor_multiprocess::proc_actor` into the v1 mesh layer and deletes the old version. the new test uses `testing::proc_meshes` with two replicas, runs `PingPongActor` on each, confirms ping→pong works, then shuts down the pong replica and sends a batch of messages. all undeliverables are routed back via the bound port, demonstrating that the sender stays alive and correctly handles delivery failures in the mesh-based setup. Reviewed By: mariusae Differential Revision: D87680283
1 parent c98cade commit 6d5a1a8

File tree

2 files changed

+115
-126
lines changed

2 files changed

+115
-126
lines changed

hyperactor_mesh/src/v1/actor_mesh.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,4 +823,119 @@ mod tests {
823823

824824
let _ = host_mesh.shutdown(&instance).await;
825825
}
826+
827+
/// Test that undeliverable messages are properly returned to the
828+
/// sender when communication to a proc is broken.
829+
///
830+
/// This is the V1 version of the test from
831+
/// hyperactor_multiprocess/src/proc_actor.rs::test_undeliverable_message_return.
832+
#[async_timed_test(timeout_secs = 30)]
833+
#[cfg(fbcode_build)]
834+
async fn test_undeliverable_message_return() {
835+
use hyperactor::mailbox::MessageEnvelope;
836+
use hyperactor::mailbox::Undeliverable;
837+
use hyperactor::test_utils::pingpong::PingPongActor;
838+
use hyperactor::test_utils::pingpong::PingPongActorParams;
839+
use hyperactor::test_utils::pingpong::PingPongMessage;
840+
841+
hyperactor_telemetry::initialize_logging_for_test();
842+
843+
// Set message delivery timeout for faster test
844+
let config = hyperactor::config::global::lock();
845+
let _guard = config.override_key(
846+
hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
847+
std::time::Duration::from_secs(1),
848+
);
849+
850+
let instance = testing::instance().await;
851+
852+
// Create a proc mesh with 2 replicas.
853+
let meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await;
854+
let proc_mesh = &meshes[1]; // Use the ProcessAllocator version
855+
856+
// Set up undeliverable message port for collecting undeliverables
857+
let (undeliverable_port, mut undeliverable_rx) =
858+
instance.open_port::<Undeliverable<MessageEnvelope>>();
859+
860+
// Spawn PingPongActors across both replicas.
861+
// Only actors on replica 0 will forward undeliverable messages.
862+
let ping_params = PingPongActorParams::new(Some(undeliverable_port.bind()), None);
863+
let pong_params = PingPongActorParams::new(None, None);
864+
865+
// Spawn actors individually on each replica by spawning separate actor meshes
866+
// with specific proc selections.
867+
let ping_proc_mesh = proc_mesh.range("replicas", 0..1).unwrap();
868+
let pong_proc_mesh = proc_mesh.range("replicas", 1..2).unwrap();
869+
870+
let ping_mesh = ping_proc_mesh
871+
.spawn::<PingPongActor>(instance, "ping", &ping_params)
872+
.await
873+
.unwrap();
874+
875+
let pong_mesh = pong_proc_mesh
876+
.spawn::<PingPongActor>(instance, "pong", &pong_params)
877+
.await
878+
.unwrap();
879+
880+
// Get individual actor refs
881+
let ping_handle = ping_mesh.values().next().unwrap();
882+
let pong_handle = pong_mesh.values().next().unwrap();
883+
884+
// Verify ping-pong works initially
885+
let (done_tx, done_rx) = instance.open_once_port();
886+
ping_handle
887+
.send(
888+
instance,
889+
PingPongMessage(2, pong_handle.clone(), done_tx.bind()),
890+
)
891+
.unwrap();
892+
assert!(
893+
done_rx.recv().await.unwrap(),
894+
"Initial ping-pong should work"
895+
);
896+
897+
// Now stop the pong actor mesh to break communication
898+
pong_mesh.stop(instance).await.unwrap();
899+
900+
// Give it a moment to fully stop
901+
RealClock.sleep(std::time::Duration::from_millis(200)).await;
902+
903+
// Send multiple messages that will all fail to be delivered
904+
let n = 100usize;
905+
for i in 1..=n {
906+
let ttl = 66 + i as u64; // Avoid ttl = 66 (which would cause other test behavior)
907+
let (once_tx, _once_rx) = instance.open_once_port();
908+
ping_handle
909+
.send(
910+
instance,
911+
PingPongMessage(ttl, pong_handle.clone(), once_tx.bind()),
912+
)
913+
.unwrap();
914+
}
915+
916+
// Collect all undeliverable messages.
917+
// The fact that we successfully collect them proves the ping actor
918+
// is still running and handling undeliverables correctly (not crashing).
919+
let mut count = 0;
920+
let deadline = RealClock.now() + std::time::Duration::from_secs(5);
921+
while count < n && RealClock.now() < deadline {
922+
match RealClock
923+
.timeout(std::time::Duration::from_secs(1), undeliverable_rx.recv())
924+
.await
925+
{
926+
Ok(Ok(Undeliverable(envelope))) => {
927+
let _: PingPongMessage = envelope.deserialized().unwrap();
928+
count += 1;
929+
}
930+
Ok(Err(_)) => break, // Channel closed
931+
Err(_) => break, // Timeout
932+
}
933+
}
934+
935+
assert_eq!(
936+
count, n,
937+
"Expected {} undeliverable messages, got {}",
938+
n, count
939+
);
940+
}
826941
}

hyperactor_multiprocess/src/proc_actor.rs

Lines changed: 0 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,132 +1369,6 @@ mod tests {
13691369
server_handle.await;
13701370
}
13711371

1372-
#[tokio::test]
1373-
async fn test_undeliverable_message_return() {
1374-
// Proc can't send a message to a remote actor because the
1375-
// system connection is lost.
1376-
use hyperactor::mailbox::Undeliverable;
1377-
use hyperactor::test_utils::pingpong::PingPongActor;
1378-
use hyperactor::test_utils::pingpong::PingPongMessage;
1379-
1380-
// Use temporary config for this test
1381-
let config = hyperactor::config::global::lock();
1382-
let _guard = config.override_key(
1383-
hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1384-
Duration::from_secs(1),
1385-
);
1386-
1387-
// Serve a system.
1388-
let server_handle = System::serve(
1389-
ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1390-
Duration::from_secs(120),
1391-
Duration::from_secs(120),
1392-
)
1393-
.await
1394-
.unwrap();
1395-
let mut system = System::new(server_handle.local_addr().clone());
1396-
1397-
// Build a supervisor.
1398-
let supervisor = system.attach().await.unwrap();
1399-
let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
1400-
let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1401-
1402-
// Construct a system sender.
1403-
let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1404-
channel::dial(server_handle.local_addr().clone()).unwrap(),
1405-
));
1406-
1407-
// Construct a proc forwarder in terms of the system sender.
1408-
let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1409-
let proc_forwarder =
1410-
BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1411-
1412-
// Bootstrap proc 'world[0]', join the system.
1413-
let world_id = id!(world);
1414-
let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1415-
let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1416-
proc_0.clone(),
1417-
world_id.clone(),
1418-
listen_addr,
1419-
server_handle.local_addr().clone(),
1420-
sup_ref.clone(),
1421-
Duration::from_secs(120),
1422-
HashMap::new(),
1423-
ProcLifecycleMode::ManagedBySystem,
1424-
)
1425-
.await
1426-
.unwrap();
1427-
let proc_0_client = proc_0.attach("client").unwrap();
1428-
let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1429-
1430-
// Bootstrap a second proc 'world[1]', join the system.
1431-
let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1432-
let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1433-
proc_1.clone(),
1434-
world_id.clone(),
1435-
ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1436-
server_handle.local_addr().clone(),
1437-
sup_ref.clone(),
1438-
Duration::from_secs(120),
1439-
HashMap::new(),
1440-
ProcLifecycleMode::ManagedBySystem,
1441-
)
1442-
.await
1443-
.unwrap();
1444-
let proc_1_client = proc_1.attach("client").unwrap();
1445-
let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1446-
1447-
let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1448-
// Spawn two actors 'ping' and 'pong' where 'ping' runs on
1449-
// 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1450-
// same proc).
1451-
let ping_handle = proc_0
1452-
.spawn::<PingPongActor>("ping", ping_params)
1453-
.await
1454-
.unwrap();
1455-
let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1456-
let pong_handle = proc_1
1457-
.spawn::<PingPongActor>("pong", pong_params)
1458-
.await
1459-
.unwrap();
1460-
1461-
// Now kill the system server making message delivery between
1462-
// procs impossible.
1463-
server_handle.stop().await.unwrap();
1464-
server_handle.await;
1465-
1466-
let n = 100usize;
1467-
for i in 1..(n + 1) {
1468-
// Have 'ping' send 'pong' a message.
1469-
let ttl = 66 + i as u64; // Avoid ttl = 66!
1470-
let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1471-
ping_handle
1472-
.send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1473-
.unwrap();
1474-
}
1475-
1476-
// `PingPongActor`s do not exit their message loop (a
1477-
// non-default actor behavior) when they have an undelivered
1478-
// message sent back to them (the reason being this very
1479-
// test).
1480-
assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1481-
1482-
// We expect n undelivered messages.
1483-
let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1484-
unreachable!()
1485-
};
1486-
let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1487-
let mut count = 1;
1488-
while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1489-
// We care that every undeliverable message was accounted
1490-
// for. We can't assume anything about their arrival
1491-
// order.
1492-
count += 1;
1493-
let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1494-
}
1495-
assert!(count == n);
1496-
}
1497-
14981372
#[tracing_test::traced_test]
14991373
#[tokio::test]
15001374
#[cfg_attr(not(fbcode_build), ignore)]

0 commit comments

Comments
 (0)