Skip to content

Commit 97250d4

Browse files
return undeliverable messages received by servers (#781)
Summary: Pull Request resolved: #781 while working on D79478197, mariusae noticed that there are circumstances where undeliverable messages get returned to mailbox servers like routers for example. handling returns of such messages to their original senders has been a long standing gap (the stock handling until now has been to log and continue) which this diff goes towards addressing. this diff improves undeliverable message handling in `hyperactor_mesh`. the main change is replacing the router-specific return handler with a generic `server_return_handle` that works with any `MailboxServer`. when messages can't be delivered, the system now attempts to return them to the sender and tries also to deal with nested undeliverable messages should, for whatever reason, that come up. i use an environment variable `HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK` (eschewing the attempt at config in D79725191) for router failure injection in the newly added `test_router_undeliverable_return`. this test verifies supervision events are generated when routers fail. Reviewed By: mariusae Differential Revision: D79739939 fbshipit-source-id: 8f9d48954dd26b47fd54837220235008184000c3
1 parent d8213ae commit 97250d4

File tree

5 files changed

+151
-21
lines changed

5 files changed

+151
-21
lines changed

hyperactor/src/mailbox.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2399,6 +2399,45 @@ impl MailboxSender for DialMailboxRouter {
23992399
}
24002400
}
24012401

2402+
/// A `MailboxServer` (such as a router) can can receive a message
2403+
/// that couldn't reach its destination. We can use the fact that
2404+
/// servers are `MailboxSender`s to attempt to forward them back to
2405+
/// their senders.
2406+
pub fn server_return_handle<T: MailboxServer>(
2407+
server: T,
2408+
) -> PortHandle<Undeliverable<MessageEnvelope>> {
2409+
let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
2410+
2411+
tokio::task::spawn(async move {
2412+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
2413+
if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
2414+
{
2415+
// A non-returnable undeliverable.
2416+
UndeliverableMailboxSender.post(e, monitored_return_handle());
2417+
continue;
2418+
}
2419+
envelope.try_set_error(DeliveryError::BrokenLink(
2420+
"message was undeliverable".to_owned(),
2421+
));
2422+
server.post(
2423+
MessageEnvelope::new(
2424+
envelope.sender().clone(),
2425+
PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
2426+
envelope.sender(),
2427+
)
2428+
.port_id()
2429+
.clone(),
2430+
Serialized::serialize(&Undeliverable(envelope)).unwrap(),
2431+
Attrs::new(),
2432+
),
2433+
monitored_return_handle(),
2434+
);
2435+
}
2436+
});
2437+
2438+
return_handle
2439+
}
2440+
24022441
/// A MailboxSender that reports any envelope as undeliverable due to
24032442
/// routing failure.
24042443
#[derive(Debug)]

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -623,16 +623,39 @@ pub(crate) mod test_util {
623623
#[async_trait]
624624
impl Handler<Echo> for ProxyActor {
625625
async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
626-
let actor = self.actor_mesh.get(0).unwrap();
627-
628-
// For now, we reply directly to the client.
629-
// We will support directly wiring up the meshes later.
630-
let (tx, mut rx) = cx.open_port();
631-
632-
actor.send(cx, Echo(message.0, tx.bind()))?;
633-
message.1.send(cx, rx.recv().await.unwrap())?;
626+
if std::env::var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK").is_err() {
627+
// test_proxy_mesh
628+
629+
let actor = self.actor_mesh.get(0).unwrap();
630+
631+
// For now, we reply directly to the client.
632+
// We will support directly wiring up the meshes later.
633+
let (tx, mut rx) = cx.open_port();
634+
635+
actor.send(cx, Echo(message.0, tx.bind()))?;
636+
message.1.send(cx, rx.recv().await.unwrap())?;
637+
638+
Ok(())
639+
} else {
640+
// test_router_undeliverable_return
641+
642+
let actor: ActorRef<_> = self.actor_mesh.get(0).unwrap();
643+
let (tx, mut rx) = cx.open_port::<String>();
644+
actor.send(cx, Echo(message.0, tx.bind()))?;
645+
646+
use tokio::time::Duration;
647+
use tokio::time::timeout;
648+
#[allow(clippy::disallowed_methods)]
649+
match timeout(Duration::from_secs(1), rx.recv()).await {
650+
Ok(_) => message
651+
.1
652+
.send(cx, "the impossible happened".to_owned())
653+
.unwrap(),
654+
_ => (),
655+
}
634656

635-
Ok(())
657+
Ok(())
658+
}
636659
}
637660
}
638661
}
@@ -677,8 +700,6 @@ mod tests {
677700
use $crate::alloc::AllocSpec;
678701
use $crate::alloc::Allocator;
679702

680-
hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
681-
682703
use ndslice::extent;
683704

684705
let alloc = $allocator
@@ -693,7 +714,12 @@ mod tests {
693714
let proxy_actor = actor_mesh.get(0).unwrap();
694715
let (tx, mut rx) = actor_mesh.open_port::<String>();
695716
proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind())).unwrap();
696-
assert_eq!(rx.recv().await.unwrap(), "hello!");
717+
718+
#[allow(clippy::disallowed_methods)]
719+
match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
720+
Ok(msg) => assert_eq!(&msg.unwrap(), "hello!"),
721+
Err(_) => assert!(false),
722+
}
697723
}
698724

699725
#[tokio::test]
@@ -1287,6 +1313,62 @@ mod tests {
12871313
assert_eq!(event.actor_id.name(), &actor_mesh.name);
12881314
}
12891315
}
1316+
1317+
// Set this test only for `mod process` because it relies on a
1318+
// trick to emulate router failure that only works when using
1319+
// non-local allocators.
1320+
#[cfg(fbcode_build)]
1321+
#[tokio::test]
1322+
async fn test_router_undeliverable_return() {
1323+
// Test that an undeliverable message received by a
1324+
// router results in actor mesh supervision events.
1325+
use ndslice::extent;
1326+
1327+
use super::test_util::*;
1328+
use crate::alloc::AllocSpec;
1329+
use crate::alloc::Allocator;
1330+
1331+
let alloc = process_allocator()
1332+
.allocate(AllocSpec {
1333+
extent: extent! { replica = 1 },
1334+
constraints: Default::default(),
1335+
})
1336+
.await
1337+
.unwrap();
1338+
1339+
// SAFETY: Not multithread safe.
1340+
unsafe { std::env::set_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK", "1") };
1341+
1342+
let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1343+
let mut proc_events = proc_mesh.events().unwrap();
1344+
let mut actor_mesh: RootActorMesh<'_, ProxyActor> =
1345+
{ proc_mesh.spawn("proxy", &()).await.unwrap() };
1346+
let mut actor_events = actor_mesh.events().unwrap();
1347+
1348+
let proxy_actor = actor_mesh.get(0).unwrap();
1349+
let (tx, mut rx) = actor_mesh.open_port::<String>();
1350+
proxy_actor
1351+
.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))
1352+
.unwrap();
1353+
1354+
#[allow(clippy::disallowed_methods)]
1355+
match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
1356+
Ok(_) => panic!("the impossible happened"),
1357+
Err(_) => {
1358+
assert_matches!(
1359+
proc_events.next().await.unwrap(),
1360+
ProcEvent::Crashed(0, reason) if reason.contains("undeliverable")
1361+
);
1362+
assert_eq!(
1363+
actor_events.next().await.unwrap().actor_id.name(),
1364+
&actor_mesh.name
1365+
);
1366+
}
1367+
}
1368+
1369+
// SAFETY: Not multithread safe.
1370+
unsafe { std::env::remove_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK") };
1371+
}
12901372
}
12911373

12921374
mod sim {

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use hyperactor::clock::RealClock;
3535
use hyperactor::config;
3636
use hyperactor::mailbox::DialMailboxRouter;
3737
use hyperactor::mailbox::MailboxServer;
38-
use hyperactor::mailbox::monitored_return_handle;
38+
use hyperactor::mailbox::server_return_handle;
3939
use hyperactor::reference::Reference;
4040
use hyperactor::serde_json;
4141
use mockall::automock;
@@ -286,7 +286,7 @@ impl RemoteProcessAllocator {
286286
let router = DialMailboxRouter::new();
287287
let mailbox_handle = router
288288
.clone()
289-
.serve(forwarder_rx, monitored_return_handle());
289+
.serve(forwarder_rx, server_return_handle(router.clone()));
290290
tracing::info!("started forwarder on: {}", forwarder_addr);
291291

292292
// Check if we need to write TORCH_ELASTIC_CUSTOM_HOSTNAMES_LIST_FILE

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use hyperactor::mailbox::MailboxServer;
3737
use hyperactor::mailbox::MessageEnvelope;
3838
use hyperactor::mailbox::PortReceiver;
3939
use hyperactor::mailbox::Undeliverable;
40+
use hyperactor::mailbox::server_return_handle;
4041
use hyperactor::metrics;
4142
use hyperactor::proc::Proc;
4243
use hyperactor::reference::ProcId;
@@ -211,7 +212,7 @@ impl ProcMesh {
211212
}
212213
router
213214
.clone()
214-
.serve(router_rx, mailbox::custom_monitored_return_handle("router"));
215+
.serve(router_rx, server_return_handle(router.clone()));
215216

216217
// Set up a client proc for the mesh itself, so that we can attach ourselves
217218
// to it, and communicate with the agents. We wire it into the same router as
@@ -225,10 +226,9 @@ impl ProcMesh {
225226
client_proc_id.clone(),
226227
BoxedMailboxSender::new(router.clone()),
227228
);
228-
client_proc.clone().serve(
229-
client_rx,
230-
mailbox::custom_monitored_return_handle("client proc"),
231-
);
229+
client_proc
230+
.clone()
231+
.serve(client_rx, server_return_handle(client_proc.clone()));
232232
router.bind(client_proc_id.clone().into(), client_proc_addr.clone());
233233

234234
// Bind this router to the global router, to enable cross-mesh routing.

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,17 @@ impl MeshAgentMessageHandler for MeshAgent {
208208
// Wire up the local proc to the global (process) router. This ensures that child
209209
// meshes are reachable from any actor created by this mesh.
210210
let client = MailboxClient::new(channel::dial(forwarder)?);
211-
let default = super::global_router().fallback(client.into_boxed());
212-
let router = DialMailboxRouter::new_with_default(default.into_boxed());
211+
212+
// `HYPERACTOR_MESH_ROUTER_CONFIG_NO_GLOBAL_FALLBACK` may be
213+
// set as a means of failure injection in the testing of
214+
// supervision codepaths.
215+
let router = if std::env::var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK").is_err() {
216+
let default = super::global_router().fallback(client.into_boxed());
217+
DialMailboxRouter::new_with_default(default.into_boxed())
218+
} else {
219+
DialMailboxRouter::new_with_default(client.into_boxed())
220+
};
221+
213222
for (proc_id, addr) in address_book {
214223
router.bind(proc_id.into(), addr);
215224
}

0 commit comments

Comments
 (0)