diff --git a/hyperactor_mesh/src/v1/actor_mesh.rs b/hyperactor_mesh/src/v1/actor_mesh.rs index fb663a925..1fb9e76ff 100644 --- a/hyperactor_mesh/src/v1/actor_mesh.rs +++ b/hyperactor_mesh/src/v1/actor_mesh.rs @@ -823,4 +823,119 @@ mod tests { let _ = host_mesh.shutdown(&instance).await; } + + /// Test that undeliverable messages are properly returned to the + /// sender when communication to a proc is broken. + /// + /// This is the V1 version of the test from + /// hyperactor_multiprocess/src/proc_actor.rs::test_undeliverable_message_return. + #[async_timed_test(timeout_secs = 30)] + #[cfg(fbcode_build)] + async fn test_undeliverable_message_return() { + use hyperactor::mailbox::MessageEnvelope; + use hyperactor::mailbox::Undeliverable; + use hyperactor::test_utils::pingpong::PingPongActor; + use hyperactor::test_utils::pingpong::PingPongActorParams; + use hyperactor::test_utils::pingpong::PingPongMessage; + + hyperactor_telemetry::initialize_logging_for_test(); + + // Set message delivery timeout for faster test + let config = hyperactor::config::global::lock(); + let _guard = config.override_key( + hyperactor::config::MESSAGE_DELIVERY_TIMEOUT, + std::time::Duration::from_secs(1), + ); + + let instance = testing::instance().await; + + // Create a proc mesh with 2 replicas. + let meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await; + let proc_mesh = &meshes[1]; // Use the ProcessAllocator version + + // Set up undeliverable message port for collecting undeliverables + let (undeliverable_port, mut undeliverable_rx) = + instance.open_port::>(); + + // Spawn PingPongActors across both replicas. + // Only actors on replica 0 will forward undeliverable messages. + let ping_params = PingPongActorParams::new(Some(undeliverable_port.bind()), None); + let pong_params = PingPongActorParams::new(None, None); + + // Spawn actors individually on each replica by spawning separate actor meshes + // with specific proc selections. + let ping_proc_mesh = proc_mesh.range("replicas", 0..1).unwrap(); + let pong_proc_mesh = proc_mesh.range("replicas", 1..2).unwrap(); + + let ping_mesh = ping_proc_mesh + .spawn::(instance, "ping", &ping_params) + .await + .unwrap(); + + let pong_mesh = pong_proc_mesh + .spawn::(instance, "pong", &pong_params) + .await + .unwrap(); + + // Get individual actor refs + let ping_handle = ping_mesh.values().next().unwrap(); + let pong_handle = pong_mesh.values().next().unwrap(); + + // Verify ping-pong works initially + let (done_tx, done_rx) = instance.open_once_port(); + ping_handle + .send( + instance, + PingPongMessage(2, pong_handle.clone(), done_tx.bind()), + ) + .unwrap(); + assert!( + done_rx.recv().await.unwrap(), + "Initial ping-pong should work" + ); + + // Now stop the pong actor mesh to break communication + pong_mesh.stop(instance).await.unwrap(); + + // Give it a moment to fully stop + RealClock.sleep(std::time::Duration::from_millis(200)).await; + + // Send multiple messages that will all fail to be delivered + let n = 100usize; + for i in 1..=n { + let ttl = 66 + i as u64; // Avoid ttl = 66 (which would cause other test behavior) + let (once_tx, _once_rx) = instance.open_once_port(); + ping_handle + .send( + instance, + PingPongMessage(ttl, pong_handle.clone(), once_tx.bind()), + ) + .unwrap(); + } + + // Collect all undeliverable messages. + // The fact that we successfully collect them proves the ping actor + // is still running and handling undeliverables correctly (not crashing). + let mut count = 0; + let deadline = RealClock.now() + std::time::Duration::from_secs(5); + while count < n && RealClock.now() < deadline { + match RealClock + .timeout(std::time::Duration::from_secs(1), undeliverable_rx.recv()) + .await + { + Ok(Ok(Undeliverable(envelope))) => { + let _: PingPongMessage = envelope.deserialized().unwrap(); + count += 1; + } + Ok(Err(_)) => break, // Channel closed + Err(_) => break, // Timeout + } + } + + assert_eq!( + count, n, + "Expected {} undeliverable messages, got {}", + n, count + ); + } } diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 26b6b607a..3d3cf1edf 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -1369,132 +1369,6 @@ mod tests { server_handle.await; } - #[tokio::test] - async fn test_undeliverable_message_return() { - // Proc can't send a message to a remote actor because the - // system connection is lost. - use hyperactor::mailbox::Undeliverable; - use hyperactor::test_utils::pingpong::PingPongActor; - use hyperactor::test_utils::pingpong::PingPongMessage; - - // Use temporary config for this test - let config = hyperactor::config::global::lock(); - let _guard = config.override_key( - hyperactor::config::MESSAGE_DELIVERY_TIMEOUT, - Duration::from_secs(1), - ); - - // Serve a system. - let server_handle = System::serve( - ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)), - Duration::from_secs(120), - Duration::from_secs(120), - ) - .await - .unwrap(); - let mut system = System::new(server_handle.local_addr().clone()); - - // Build a supervisor. - let supervisor = system.attach().await.unwrap(); - let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::(); - let sup_ref = ActorRef::::attest(supervisor.self_id().clone()); - - // Construct a system sender. - let system_sender = BoxedMailboxSender::new(MailboxClient::new( - channel::dial(server_handle.local_addr().clone()).unwrap(), - )); - - // Construct a proc forwarder in terms of the system sender. - let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)); - let proc_forwarder = - BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender)); - - // Bootstrap proc 'world[0]', join the system. - let world_id = id!(world); - let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone()); - let _proc_actor_0 = ProcActor::bootstrap_for_proc( - proc_0.clone(), - world_id.clone(), - listen_addr, - server_handle.local_addr().clone(), - sup_ref.clone(), - Duration::from_secs(120), - HashMap::new(), - ProcLifecycleMode::ManagedBySystem, - ) - .await - .unwrap(); - let proc_0_client = proc_0.attach("client").unwrap(); - let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port(); - - // Bootstrap a second proc 'world[1]', join the system. - let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone()); - let _proc_actor_1 = ProcActor::bootstrap_for_proc( - proc_1.clone(), - world_id.clone(), - ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)), - server_handle.local_addr().clone(), - sup_ref.clone(), - Duration::from_secs(120), - HashMap::new(), - ProcLifecycleMode::ManagedBySystem, - ) - .await - .unwrap(); - let proc_1_client = proc_1.attach("client").unwrap(); - let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port(); - - let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None); - // Spawn two actors 'ping' and 'pong' where 'ping' runs on - // 'world[0]' and 'pong' on 'world[1]' (that is, not on the - // same proc). - let ping_handle = proc_0 - .spawn::("ping", ping_params) - .await - .unwrap(); - let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None); - let pong_handle = proc_1 - .spawn::("pong", pong_params) - .await - .unwrap(); - - // Now kill the system server making message delivery between - // procs impossible. - server_handle.stop().await.unwrap(); - server_handle.await; - - let n = 100usize; - for i in 1..(n + 1) { - // Have 'ping' send 'pong' a message. - let ttl = 66 + i as u64; // Avoid ttl = 66! - let (once_handle, _) = proc_0_client.open_once_port::(); - ping_handle - .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind())) - .unwrap(); - } - - // `PingPongActor`s do not exit their message loop (a - // non-default actor behavior) when they have an undelivered - // message sent back to them (the reason being this very - // test). - assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle)); - - // We expect n undelivered messages. - let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else { - unreachable!() - }; - let PingPongMessage(_, _, _) = envelope.deserialized().unwrap(); - let mut count = 1; - while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() { - // We care that every undeliverable message was accounted - // for. We can't assume anything about their arrival - // order. - count += 1; - let PingPongMessage(_, _, _) = envelope.deserialized().unwrap(); - } - assert!(count == n); - } - #[tracing_test::traced_test] #[tokio::test] #[cfg_attr(not(fbcode_build), ignore)]