Skip to content

Commit 4e00604

Browse files
port v0 multi-process test to v1
Summary: port undeliverable-message coverage from hyperactor_multiprocess into v1 mesh in preparation for retiring hyperactor_multiprocess. this moves the undeliverable-message test from `hyperactor_multiprocess::proc_actor` into the v1 mesh layer and deletes the old version. the new test uses `testing::proc_meshes` with two replicas, runs `PingPongActor` on each, confirms ping→pong works, then shuts down the pong replica and sends a batch of messages. all undeliverables are routed back via the bound port, demonstrating that the sender stays alive and correctly handles delivery failures in the mesh-based setup. Differential Revision: D87680283
1 parent 4d2e334 commit 4e00604

File tree

2 files changed

+116
-126
lines changed

2 files changed

+116
-126
lines changed

hyperactor_mesh/src/v1/actor_mesh.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,4 +823,120 @@ mod tests {
823823

824824
let _ = host_mesh.shutdown(&instance).await;
825825
}
826+
827+
/// Test that undeliverable messages are properly returned to the
828+
/// sender when communication to a proc is broken.
829+
///
830+
/// This is the V1 version of the test from
831+
/// hyperactor_multiprocess/src/proc_actor.rs::test_undeliverable_message_return.
832+
#[async_timed_test(timeout_secs = 30)]
833+
#[cfg(fbcode_build)]
834+
async fn test_undeliverable_message_return() {
835+
use hyperactor::clock::RealClock;
836+
use hyperactor::mailbox::MessageEnvelope;
837+
use hyperactor::mailbox::Undeliverable;
838+
use hyperactor::test_utils::pingpong::PingPongActor;
839+
use hyperactor::test_utils::pingpong::PingPongActorParams;
840+
use hyperactor::test_utils::pingpong::PingPongMessage;
841+
842+
hyperactor_telemetry::initialize_logging_for_test();
843+
844+
// Set message delivery timeout for faster test
845+
let config = hyperactor::config::global::lock();
846+
let _guard = config.override_key(
847+
hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
848+
std::time::Duration::from_secs(1),
849+
);
850+
851+
let instance = testing::instance().await;
852+
853+
// Create a proc mesh with 2 replicas (simulates 2 separate processes)
854+
let meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await;
855+
let proc_mesh = &meshes[0];
856+
857+
// Set up undeliverable message port for collecting undeliverables
858+
let (undeliverable_port, mut undeliverable_rx) =
859+
instance.open_port::<Undeliverable<MessageEnvelope>>();
860+
861+
// Spawn PingPongActors across both replicas.
862+
// Only actors on replica 0 will forward undeliverable messages.
863+
let ping_params = PingPongActorParams::new(Some(undeliverable_port.bind()), None);
864+
let pong_params = PingPongActorParams::new(None, None);
865+
866+
// Spawn actors individually on each replica by spawning separate actor meshes
867+
// with specific proc selections.
868+
let ping_proc_mesh = proc_mesh.range("replicas", 0..1).unwrap();
869+
let pong_proc_mesh = proc_mesh.range("replicas", 1..2).unwrap();
870+
871+
let ping_mesh = ping_proc_mesh
872+
.spawn::<PingPongActor>(instance, "ping", &ping_params)
873+
.await
874+
.unwrap();
875+
876+
let pong_mesh = pong_proc_mesh
877+
.spawn::<PingPongActor>(instance, "pong", &pong_params)
878+
.await
879+
.unwrap();
880+
881+
// Get individual actor refs
882+
let ping_handle = ping_mesh.values().next().unwrap();
883+
let pong_handle = pong_mesh.values().next().unwrap();
884+
885+
// Verify ping-pong works initially
886+
let (done_tx, done_rx) = instance.open_once_port();
887+
ping_handle
888+
.send(
889+
instance,
890+
PingPongMessage(2, pong_handle.clone(), done_tx.bind()),
891+
)
892+
.unwrap();
893+
assert!(
894+
done_rx.recv().await.unwrap(),
895+
"Initial ping-pong should work"
896+
);
897+
898+
// Now stop the pong actor mesh to break communication
899+
pong_mesh.stop(instance).await.unwrap();
900+
901+
// Give it a moment to fully stop
902+
RealClock.sleep(std::time::Duration::from_millis(200)).await;
903+
904+
// Send multiple messages that will all fail to be delivered
905+
let n = 100usize;
906+
for i in 1..=n {
907+
let ttl = 66 + i as u64; // Avoid ttl = 66 (which would cause other test behavior)
908+
let (once_tx, _once_rx) = instance.open_once_port();
909+
ping_handle
910+
.send(
911+
instance,
912+
PingPongMessage(ttl, pong_handle.clone(), once_tx.bind()),
913+
)
914+
.unwrap();
915+
}
916+
917+
// Collect all undeliverable messages.
918+
// The fact that we successfully collect them proves the ping actor
919+
// is still running and handling undeliverables correctly (not crashing).
920+
let mut count = 0;
921+
let deadline = RealClock.now() + std::time::Duration::from_secs(5);
922+
while count < n && RealClock.now() < deadline {
923+
match RealClock
924+
.timeout(std::time::Duration::from_secs(1), undeliverable_rx.recv())
925+
.await
926+
{
927+
Ok(Ok(Undeliverable(envelope))) => {
928+
let _: PingPongMessage = envelope.deserialized().unwrap();
929+
count += 1;
930+
}
931+
Ok(Err(_)) => break, // Channel closed
932+
Err(_) => break, // Timeout
933+
}
934+
}
935+
936+
assert_eq!(
937+
count, n,
938+
"Expected {} undeliverable messages, got {}",
939+
n, count
940+
);
941+
}
826942
}

hyperactor_multiprocess/src/proc_actor.rs

Lines changed: 0 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,132 +1369,6 @@ mod tests {
13691369
server_handle.await;
13701370
}
13711371

1372-
#[tokio::test]
1373-
async fn test_undeliverable_message_return() {
1374-
// Proc can't send a message to a remote actor because the
1375-
// system connection is lost.
1376-
use hyperactor::mailbox::Undeliverable;
1377-
use hyperactor::test_utils::pingpong::PingPongActor;
1378-
use hyperactor::test_utils::pingpong::PingPongMessage;
1379-
1380-
// Use temporary config for this test
1381-
let config = hyperactor::config::global::lock();
1382-
let _guard = config.override_key(
1383-
hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1384-
Duration::from_secs(1),
1385-
);
1386-
1387-
// Serve a system.
1388-
let server_handle = System::serve(
1389-
ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1390-
Duration::from_secs(120),
1391-
Duration::from_secs(120),
1392-
)
1393-
.await
1394-
.unwrap();
1395-
let mut system = System::new(server_handle.local_addr().clone());
1396-
1397-
// Build a supervisor.
1398-
let supervisor = system.attach().await.unwrap();
1399-
let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
1400-
let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1401-
1402-
// Construct a system sender.
1403-
let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1404-
channel::dial(server_handle.local_addr().clone()).unwrap(),
1405-
));
1406-
1407-
// Construct a proc forwarder in terms of the system sender.
1408-
let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1409-
let proc_forwarder =
1410-
BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1411-
1412-
// Bootstrap proc 'world[0]', join the system.
1413-
let world_id = id!(world);
1414-
let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1415-
let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1416-
proc_0.clone(),
1417-
world_id.clone(),
1418-
listen_addr,
1419-
server_handle.local_addr().clone(),
1420-
sup_ref.clone(),
1421-
Duration::from_secs(120),
1422-
HashMap::new(),
1423-
ProcLifecycleMode::ManagedBySystem,
1424-
)
1425-
.await
1426-
.unwrap();
1427-
let proc_0_client = proc_0.attach("client").unwrap();
1428-
let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1429-
1430-
// Bootstrap a second proc 'world[1]', join the system.
1431-
let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1432-
let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1433-
proc_1.clone(),
1434-
world_id.clone(),
1435-
ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1436-
server_handle.local_addr().clone(),
1437-
sup_ref.clone(),
1438-
Duration::from_secs(120),
1439-
HashMap::new(),
1440-
ProcLifecycleMode::ManagedBySystem,
1441-
)
1442-
.await
1443-
.unwrap();
1444-
let proc_1_client = proc_1.attach("client").unwrap();
1445-
let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1446-
1447-
let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1448-
// Spawn two actors 'ping' and 'pong' where 'ping' runs on
1449-
// 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1450-
// same proc).
1451-
let ping_handle = proc_0
1452-
.spawn::<PingPongActor>("ping", ping_params)
1453-
.await
1454-
.unwrap();
1455-
let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1456-
let pong_handle = proc_1
1457-
.spawn::<PingPongActor>("pong", pong_params)
1458-
.await
1459-
.unwrap();
1460-
1461-
// Now kill the system server making message delivery between
1462-
// procs impossible.
1463-
server_handle.stop().await.unwrap();
1464-
server_handle.await;
1465-
1466-
let n = 100usize;
1467-
for i in 1..(n + 1) {
1468-
// Have 'ping' send 'pong' a message.
1469-
let ttl = 66 + i as u64; // Avoid ttl = 66!
1470-
let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1471-
ping_handle
1472-
.send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1473-
.unwrap();
1474-
}
1475-
1476-
// `PingPongActor`s do not exit their message loop (a
1477-
// non-default actor behavior) when they have an undelivered
1478-
// message sent back to them (the reason being this very
1479-
// test).
1480-
assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1481-
1482-
// We expect n undelivered messages.
1483-
let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1484-
unreachable!()
1485-
};
1486-
let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1487-
let mut count = 1;
1488-
while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1489-
// We care that every undeliverable message was accounted
1490-
// for. We can't assume anything about their arrival
1491-
// order.
1492-
count += 1;
1493-
let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1494-
}
1495-
assert!(count == n);
1496-
}
1497-
14981372
#[tracing_test::traced_test]
14991373
#[tokio::test]
15001374
#[cfg_attr(not(fbcode_build), ignore)]

0 commit comments

Comments
 (0)