Skip to content

Commit 906620a

Browse files
mailbox: MailboxServers now handle undeliverables intrinsically (#791)
Summary: Pull Request resolved: #791 D79739939 established that mailbox servers can handle undeliverable messages by forwarding them back to the sender. that diff improved return-handling, but the `return_handle` was still passed explicitly to `.serve()`. however, inspecting how these return handles are constructed reveals that their creation can-and should-be handled internally by `.serve()` itself. this eliminates the need for `return_handle: PortHandle<Undeliverable<MessageEnvelope>>` in the `MailboxServer` interface altogether. this change simplifies the interface and unlocks widespread cleanup and better default handling of undeliverable messages throughout the system. Reviewed By: mariusae Differential Revision: D79812085 fbshipit-source-id: c6c6a24cd48db1e662cc572f0f7069f8ed59b290
1 parent ddfb022 commit 906620a

File tree

12 files changed

+88
-114
lines changed

12 files changed

+88
-114
lines changed

controller/src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,6 @@ mod tests {
638638
use hyperactor::mailbox::MailboxServer;
639639
use hyperactor::mailbox::PortHandle;
640640
use hyperactor::mailbox::PortReceiver;
641-
use hyperactor::mailbox::monitored_return_handle;
642641
use hyperactor::message::IndexedErasedUnbound;
643642
use hyperactor::proc::Proc;
644643
use hyperactor::reference::GangId;
@@ -1545,9 +1544,7 @@ mod tests {
15451544
let (local_proc_message_port, local_proc_message_receiver) = local_proc_mbox.open_port();
15461545
local_proc_message_port.bind();
15471546

1548-
let _local_proc_serve_handle = local_proc_mbox
1549-
.clone()
1550-
.serve(local_proc_rx, monitored_return_handle());
1547+
let _local_proc_serve_handle = local_proc_mbox.clone().serve(local_proc_rx);
15511548
(
15521549
world_id,
15531550
local_proc_id,

hyperactor/src/mailbox.rs

Lines changed: 74 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,43 @@ impl Future for MailboxServerHandle {
838838
}
839839
}
840840

841+
// A `MailboxServer` (such as a router) can can receive a message
842+
// that couldn't reach its destination. We can use the fact that
843+
// servers are `MailboxSender`s to attempt to forward them back to
844+
// their senders.
845+
fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
846+
let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
847+
848+
tokio::task::spawn(async move {
849+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
850+
if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
851+
{
852+
// A non-returnable undeliverable.
853+
UndeliverableMailboxSender.post(e, monitored_return_handle());
854+
continue;
855+
}
856+
envelope.try_set_error(DeliveryError::BrokenLink(
857+
"message was undeliverable".to_owned(),
858+
));
859+
server.post(
860+
MessageEnvelope::new(
861+
envelope.sender().clone(),
862+
PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
863+
envelope.sender(),
864+
)
865+
.port_id()
866+
.clone(),
867+
Serialized::serialize(&Undeliverable(envelope)).unwrap(),
868+
Attrs::new(),
869+
),
870+
monitored_return_handle(),
871+
);
872+
}
873+
});
874+
875+
return_handle
876+
}
877+
841878
/// Serve a port on the provided [`channel::Rx`]. This dispatches all
842879
/// channel messages directly to the port.
843880
pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
@@ -847,8 +884,41 @@ pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
847884
fn serve(
848885
self,
849886
mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
850-
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
851887
) -> MailboxServerHandle {
888+
// A `MailboxServer` can can receive a message that couldn't
889+
// reach its destination. We can use the fact that servers are
890+
// `MailboxSender`s to attempt to forward them back to their
891+
// senders.
892+
let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
893+
let server = self.clone();
894+
tokio::task::spawn(async move {
895+
while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
896+
if let Ok(Undeliverable(e)) =
897+
envelope.deserialized::<Undeliverable<MessageEnvelope>>()
898+
{
899+
// A non-returnable undeliverable.
900+
UndeliverableMailboxSender.post(e, monitored_return_handle());
901+
continue;
902+
}
903+
envelope.try_set_error(DeliveryError::BrokenLink(
904+
"message was undeliverable".to_owned(),
905+
));
906+
server.post(
907+
MessageEnvelope::new(
908+
envelope.sender().clone(),
909+
PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
910+
envelope.sender(),
911+
)
912+
.port_id()
913+
.clone(),
914+
Serialized::serialize(&Undeliverable(envelope)).unwrap(),
915+
Attrs::new(),
916+
),
917+
monitored_return_handle(),
918+
);
919+
}
920+
});
921+
852922
let (stopped_tx, mut stopped_rx) = watch::channel(false);
853923
let join_handle = tokio::spawn(async move {
854924
let mut detached = false;
@@ -2399,45 +2469,6 @@ impl MailboxSender for DialMailboxRouter {
23992469
}
24002470
}
24012471

2402-
/// A `MailboxServer` (such as a router) can can receive a message
2403-
/// that couldn't reach its destination. We can use the fact that
2404-
/// servers are `MailboxSender`s to attempt to forward them back to
2405-
/// their senders.
2406-
pub fn server_return_handle<T: MailboxServer>(
2407-
server: T,
2408-
) -> PortHandle<Undeliverable<MessageEnvelope>> {
2409-
let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
2410-
2411-
tokio::task::spawn(async move {
2412-
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
2413-
if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
2414-
{
2415-
// A non-returnable undeliverable.
2416-
UndeliverableMailboxSender.post(e, monitored_return_handle());
2417-
continue;
2418-
}
2419-
envelope.try_set_error(DeliveryError::BrokenLink(
2420-
"message was undeliverable".to_owned(),
2421-
));
2422-
server.post(
2423-
MessageEnvelope::new(
2424-
envelope.sender().clone(),
2425-
PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
2426-
envelope.sender(),
2427-
)
2428-
.port_id()
2429-
.clone(),
2430-
Serialized::serialize(&Undeliverable(envelope)).unwrap(),
2431-
Attrs::new(),
2432-
),
2433-
monitored_return_handle(),
2434-
);
2435-
}
2436-
});
2437-
2438-
return_handle
2439-
}
2440-
24412472
/// A MailboxSender that reports any envelope as undeliverable due to
24422473
/// routing failure.
24432474
#[derive(Debug)]
@@ -2657,7 +2688,7 @@ mod tests {
26572688
async fn test_local_client_server() {
26582689
let mbox = Mailbox::new_detached(id!(test[0].actor0));
26592690
let (tx, rx) = channel::local::new();
2660-
let serve_handle = mbox.clone().serve(rx, monitored_return_handle());
2691+
let serve_handle = mbox.clone().serve(rx);
26612692
let client = MailboxClient::new(tx);
26622693

26632694
let (port, receiver) = mbox.open_once_port::<u64>();
@@ -2688,7 +2719,7 @@ mod tests {
26882719
.unwrap();
26892720
let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
26902721
let mbox = Mailbox::new_detached(id!(test[0].actor0));
2691-
let serve_handle = mbox.clone().serve(rx, monitored_return_handle());
2722+
let serve_handle = mbox.clone().serve(rx);
26922723
let client = MailboxClient::new(tx);
26932724
let (port, receiver) = mbox.open_once_port::<u64>();
26942725
let port = port.bind();
@@ -2798,7 +2829,7 @@ mod tests {
27982829
let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local))
27992830
.await
28002831
.unwrap();
2801-
let handle = (*mbox).clone().serve(rx, monitored_return_handle());
2832+
let handle = (*mbox).clone().serve(rx);
28022833
handles.push(handle);
28032834

28042835
eprintln!("{}: {}", mbox.actor_id(), addr);

hyperactor_mesh/src/alloc.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,7 @@ pub(crate) mod testing {
459459
.unwrap();
460460
let router =
461461
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
462-
router
463-
.clone()
464-
.serve(router_rx, mailbox::monitored_return_handle());
462+
router.clone().serve(router_rx);
465463

466464
let client_proc_id = ProcId(WorldId("test_stuck".to_string()), 0);
467465
let (client_proc_addr, client_rx) =
@@ -470,9 +468,7 @@ pub(crate) mod testing {
470468
client_proc_id.clone(),
471469
BoxedMailboxSender::new(router.clone()),
472470
);
473-
client_proc
474-
.clone()
475-
.serve(client_rx, mailbox::monitored_return_handle());
471+
client_proc.clone().serve(client_rx);
476472
router.bind(client_proc_id.clone().into(), client_proc_addr);
477473
(
478474
router,

hyperactor_mesh/src/alloc/local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl Alloc for LocalAlloc {
187187
};
188188

189189
// Undeliverable messages get forwarded to the mesh agent.
190-
let handle = proc.clone().serve(proc_rx, mesh_agent.port());
190+
let handle = proc.clone().serve(proc_rx);
191191

192192
self.procs.insert(
193193
rank,

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use hyperactor::clock::RealClock;
3535
use hyperactor::config;
3636
use hyperactor::mailbox::DialMailboxRouter;
3737
use hyperactor::mailbox::MailboxServer;
38-
use hyperactor::mailbox::server_return_handle;
3938
use hyperactor::reference::Reference;
4039
use hyperactor::serde_json;
4140
use mockall::automock;
@@ -284,9 +283,7 @@ impl RemoteProcessAllocator {
284283
}
285284
};
286285
let router = DialMailboxRouter::new();
287-
let mailbox_handle = router
288-
.clone()
289-
.serve(forwarder_rx, server_return_handle(router.clone()));
286+
let mailbox_handle = router.clone().serve(forwarder_rx);
290287
tracing::info!("started forwarder on: {}", forwarder_addr);
291288

292289
// Check if we need to write TORCH_ELASTIC_CUSTOM_HOSTNAMES_LIST_FILE

hyperactor_mesh/src/bootstrap.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,7 @@ pub async fn bootstrap() -> anyhow::Error {
168168
let (proc, mesh_agent) = MeshAgent::bootstrap(proc_id.clone()).await?;
169169
let (proc_addr, proc_rx) =
170170
channel::serve(ChannelAddr::any(listen_transport)).await?;
171-
// Undeliverable messages get forwarded to the mesh agent.
172-
let handle = proc.clone().serve(proc_rx, mesh_agent.port());
171+
let handle = proc.clone().serve(proc_rx);
173172
drop(handle); // linter appeasement; it is safe to drop this future
174173
tx.send(Process2Allocator(
175174
bootstrap_index,

hyperactor_mesh/src/logging.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,6 @@ mod tests {
840840
use hyperactor::channel::ChannelTx;
841841
use hyperactor::channel::Tx;
842842
use hyperactor::id;
843-
use hyperactor::mailbox;
844843
use hyperactor::mailbox::BoxedMailboxSender;
845844
use hyperactor::mailbox::DialMailboxRouter;
846845
use hyperactor::mailbox::MailboxServer;
@@ -858,8 +857,7 @@ mod tests {
858857
.await
859858
.unwrap();
860859
let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
861-
proc.clone()
862-
.serve(client_rx, mailbox::monitored_return_handle());
860+
proc.clone().serve(client_rx);
863861
router.bind(id!(client[0]).into(), proc_addr.clone());
864862
let client = proc.attach("client").unwrap();
865863

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use hyperactor::mailbox::MailboxServer;
3737
use hyperactor::mailbox::MessageEnvelope;
3838
use hyperactor::mailbox::PortReceiver;
3939
use hyperactor::mailbox::Undeliverable;
40-
use hyperactor::mailbox::server_return_handle;
4140
use hyperactor::metrics;
4241
use hyperactor::proc::Proc;
4342
use hyperactor::reference::ProcId;
@@ -210,9 +209,7 @@ impl ProcMesh {
210209
// Work around for Allocs that have more than one world.
211210
world_ids.insert(proc_id.world_id().clone());
212211
}
213-
router
214-
.clone()
215-
.serve(router_rx, server_return_handle(router.clone()));
212+
router.clone().serve(router_rx);
216213

217214
// Set up a client proc for the mesh itself, so that we can attach ourselves
218215
// to it, and communicate with the agents. We wire it into the same router as
@@ -226,9 +223,7 @@ impl ProcMesh {
226223
client_proc_id.clone(),
227224
BoxedMailboxSender::new(router.clone()),
228225
);
229-
client_proc
230-
.clone()
231-
.serve(client_rx, server_return_handle(client_proc.clone()));
226+
client_proc.clone().serve(client_rx);
232227
router.bind(client_proc_id.clone().into(), client_proc_addr.clone());
233228

234229
// Bind this router to the global router, to enable cross-mesh routing.

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,12 @@ use hyperactor::channel::ChannelAddr;
3737
use hyperactor::clock::Clock;
3838
use hyperactor::clock::RealClock;
3939
use hyperactor::mailbox::BoxedMailboxSender;
40-
use hyperactor::mailbox::DeliveryError;
4140
use hyperactor::mailbox::DialMailboxRouter;
4241
use hyperactor::mailbox::IntoBoxedMailboxSender;
4342
use hyperactor::mailbox::MailboxClient;
4443
use hyperactor::mailbox::MailboxSender;
4544
use hyperactor::mailbox::MessageEnvelope;
4645
use hyperactor::mailbox::Undeliverable;
47-
use hyperactor::mailbox::UndeliverableMessageError;
4846
use hyperactor::proc::Proc;
4947
use hyperactor::supervision::ActorSupervisionEvent;
5048
use serde::Deserialize;
@@ -162,30 +160,6 @@ impl Actor for MeshAgent {
162160
self.proc.set_supervision_coordinator(this.port())?;
163161
Ok(())
164162
}
165-
166-
// This is an override of the default actor behavior.
167-
async fn handle_undeliverable_message(
168-
&mut self,
169-
cx: &Instance<Self>,
170-
undelivered: Undeliverable<MessageEnvelope>,
171-
) -> Result<(), anyhow::Error> {
172-
let Undeliverable(ref envelope) = undelivered;
173-
tracing::debug!("took charge of a message not delivered: {}", envelope);
174-
175-
let sender = envelope.sender().clone();
176-
if cx.self_id() == &sender {
177-
anyhow::bail!(UndeliverableMessageError::delivery_failure(envelope));
178-
}
179-
180-
let mut envelope = envelope.clone();
181-
let return_port = PortRef::attest_message_port(&sender);
182-
return_port.send(cx, undelivered).map_err(|err| {
183-
envelope.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
184-
UndeliverableMessageError::return_failure(&envelope)
185-
})?;
186-
187-
Ok(())
188-
}
189163
}
190164

191165
#[async_trait]

hyperactor_multiprocess/src/proc_actor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use hyperactor::mailbox::MailboxAdminMessageHandler;
4444
use hyperactor::mailbox::MailboxClient;
4545
use hyperactor::mailbox::MailboxServer;
4646
use hyperactor::mailbox::MailboxServerHandle;
47-
use hyperactor::mailbox::monitored_return_handle;
4847
use hyperactor::mailbox::open_port;
4948
use hyperactor::proc::ActorLedgerSnapshot;
5049
use hyperactor::proc::Proc;
@@ -419,7 +418,7 @@ impl ProcActor {
419418
lifecycle_mode: ProcLifecycleMode,
420419
) -> Result<BootstrappedProc, anyhow::Error> {
421420
let (local_addr, rx) = channel::serve(listen_addr).await?;
422-
let mailbox_handle = proc.clone().serve(rx, monitored_return_handle());
421+
let mailbox_handle = proc.clone().serve(rx);
423422
let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
424423

425424
let handle = match proc

0 commit comments

Comments
 (0)