Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions hyperactor_mesh/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,4 +823,120 @@ mod tests {

let _ = host_mesh.shutdown(&instance).await;
}

/// Test that undeliverable messages are properly returned to the
/// sender when communication to a proc is broken.
///
/// This is the V1 version of the test from
/// hyperactor_multiprocess/src/proc_actor.rs::test_undeliverable_message_return.
#[async_timed_test(timeout_secs = 30)]
#[cfg(fbcode_build)]
async fn test_undeliverable_message_return() {
use hyperactor::clock::RealClock;
use hyperactor::mailbox::MessageEnvelope;
use hyperactor::mailbox::Undeliverable;
use hyperactor::test_utils::pingpong::PingPongActor;
use hyperactor::test_utils::pingpong::PingPongActorParams;
use hyperactor::test_utils::pingpong::PingPongMessage;

hyperactor_telemetry::initialize_logging_for_test();

// Set message delivery timeout for faster test
let config = hyperactor::config::global::lock();
let _guard = config.override_key(
hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
std::time::Duration::from_secs(1),
);

let instance = testing::instance().await;

// Create a proc mesh with 2 replicas (simulates 2 separate processes)
let meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await;
let proc_mesh = &meshes[0];

// Set up undeliverable message port for collecting undeliverables
let (undeliverable_port, mut undeliverable_rx) =
instance.open_port::<Undeliverable<MessageEnvelope>>();

// Spawn PingPongActors across both replicas.
// Only actors on replica 0 will forward undeliverable messages.
let ping_params = PingPongActorParams::new(Some(undeliverable_port.bind()), None);
let pong_params = PingPongActorParams::new(None, None);

// Spawn actors individually on each replica by spawning separate actor meshes
// with specific proc selections.
let ping_proc_mesh = proc_mesh.range("replicas", 0..1).unwrap();
let pong_proc_mesh = proc_mesh.range("replicas", 1..2).unwrap();

let ping_mesh = ping_proc_mesh
.spawn::<PingPongActor>(instance, "ping", &ping_params)
.await
.unwrap();

let pong_mesh = pong_proc_mesh
.spawn::<PingPongActor>(instance, "pong", &pong_params)
.await
.unwrap();

// Get individual actor refs
let ping_handle = ping_mesh.values().next().unwrap();
let pong_handle = pong_mesh.values().next().unwrap();

// Verify ping-pong works initially
let (done_tx, done_rx) = instance.open_once_port();
ping_handle
.send(
instance,
PingPongMessage(2, pong_handle.clone(), done_tx.bind()),
)
.unwrap();
assert!(
done_rx.recv().await.unwrap(),
"Initial ping-pong should work"
);

// Now stop the pong actor mesh to break communication
pong_mesh.stop(instance).await.unwrap();

// Give it a moment to fully stop
RealClock.sleep(std::time::Duration::from_millis(200)).await;

// Send multiple messages that will all fail to be delivered
let n = 100usize;
for i in 1..=n {
let ttl = 66 + i as u64; // Avoid ttl = 66 (which would cause other test behavior)
let (once_tx, _once_rx) = instance.open_once_port();
ping_handle
.send(
instance,
PingPongMessage(ttl, pong_handle.clone(), once_tx.bind()),
)
.unwrap();
}

// Collect all undeliverable messages.
// The fact that we successfully collect them proves the ping actor
// is still running and handling undeliverables correctly (not crashing).
let mut count = 0;
let deadline = RealClock.now() + std::time::Duration::from_secs(5);
while count < n && RealClock.now() < deadline {
match RealClock
.timeout(std::time::Duration::from_secs(1), undeliverable_rx.recv())
.await
{
Ok(Ok(Undeliverable(envelope))) => {
let _: PingPongMessage = envelope.deserialized().unwrap();
count += 1;
}
Ok(Err(_)) => break, // Channel closed
Err(_) => break, // Timeout
}
}

assert_eq!(
count, n,
"Expected {} undeliverable messages, got {}",
n, count
);
}
}
126 changes: 0 additions & 126 deletions hyperactor_multiprocess/src/proc_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1369,132 +1369,6 @@ mod tests {
server_handle.await;
}

#[tokio::test]
async fn test_undeliverable_message_return() {
// Proc can't send a message to a remote actor because the
// system connection is lost.
use hyperactor::mailbox::Undeliverable;
use hyperactor::test_utils::pingpong::PingPongActor;
use hyperactor::test_utils::pingpong::PingPongMessage;

// Use temporary config for this test
let config = hyperactor::config::global::lock();
let _guard = config.override_key(
hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
Duration::from_secs(1),
);

// Serve a system.
let server_handle = System::serve(
ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
Duration::from_secs(120),
Duration::from_secs(120),
)
.await
.unwrap();
let mut system = System::new(server_handle.local_addr().clone());

// Build a supervisor.
let supervisor = system.attach().await.unwrap();
let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());

// Construct a system sender.
let system_sender = BoxedMailboxSender::new(MailboxClient::new(
channel::dial(server_handle.local_addr().clone()).unwrap(),
));

// Construct a proc forwarder in terms of the system sender.
let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
let proc_forwarder =
BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));

// Bootstrap proc 'world[0]', join the system.
let world_id = id!(world);
let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
let _proc_actor_0 = ProcActor::bootstrap_for_proc(
proc_0.clone(),
world_id.clone(),
listen_addr,
server_handle.local_addr().clone(),
sup_ref.clone(),
Duration::from_secs(120),
HashMap::new(),
ProcLifecycleMode::ManagedBySystem,
)
.await
.unwrap();
let proc_0_client = proc_0.attach("client").unwrap();
let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();

// Bootstrap a second proc 'world[1]', join the system.
let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
let _proc_actor_1 = ProcActor::bootstrap_for_proc(
proc_1.clone(),
world_id.clone(),
ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
server_handle.local_addr().clone(),
sup_ref.clone(),
Duration::from_secs(120),
HashMap::new(),
ProcLifecycleMode::ManagedBySystem,
)
.await
.unwrap();
let proc_1_client = proc_1.attach("client").unwrap();
let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();

let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
// Spawn two actors 'ping' and 'pong' where 'ping' runs on
// 'world[0]' and 'pong' on 'world[1]' (that is, not on the
// same proc).
let ping_handle = proc_0
.spawn::<PingPongActor>("ping", ping_params)
.await
.unwrap();
let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
let pong_handle = proc_1
.spawn::<PingPongActor>("pong", pong_params)
.await
.unwrap();

// Now kill the system server making message delivery between
// procs impossible.
server_handle.stop().await.unwrap();
server_handle.await;

let n = 100usize;
for i in 1..(n + 1) {
// Have 'ping' send 'pong' a message.
let ttl = 66 + i as u64; // Avoid ttl = 66!
let (once_handle, _) = proc_0_client.open_once_port::<bool>();
ping_handle
.send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
.unwrap();
}

// `PingPongActor`s do not exit their message loop (a
// non-default actor behavior) when they have an undelivered
// message sent back to them (the reason being this very
// test).
assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));

// We expect n undelivered messages.
let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
unreachable!()
};
let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
let mut count = 1;
while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
// We care that every undeliverable message was accounted
// for. We can't assume anything about their arrival
// order.
count += 1;
let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
}
assert!(count == n);
}

#[tracing_test::traced_test]
#[tokio::test]
#[cfg_attr(not(fbcode_build), ignore)]
Expand Down
Loading