Skip to content

Commit e54f601

Browse files
mailbox: undeliverable: custom_monitored_return_handle (#754)
Summary: Pull Request resolved: #754 new function `custom_monitored_return_handle` to replace the few remaining `monitored_return_handle` in order that we get better insight into where they still exist. Reviewed By: mariusae Differential Revision: D79593009 fbshipit-source-id: eca4b267333e0f1201508135a9710441ba10beda
1 parent c1cf115 commit e54f601

File tree

3 files changed

+24
-4
lines changed

3 files changed

+24
-4
lines changed

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ mod undeliverable;
127127
/// For [`Undeliverable`], a message type for delivery failures.
128128
pub use undeliverable::Undeliverable;
129129
pub use undeliverable::UndeliverableMessageError;
130+
pub use undeliverable::custom_monitored_return_handle;
130131
pub use undeliverable::monitored_return_handle; // TODO: Audit
131132
pub use undeliverable::supervise_undeliverable_messages;
132133
/// For [`MailboxAdminMessage`], a message type for mailbox administration.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
7070
return_handle.clone()
7171
}
7272

73+
/// Now that monitored return handles are rare, it's becoming helpful
74+
/// to get insights into where they are getting used (so that they can
75+
/// be eliminated and replaced with something better).
76+
#[track_caller]
77+
pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
78+
let caller = caller.to_owned();
79+
let (return_handle, mut rx) = new_undeliverable_port();
80+
tokio::task::spawn(async move {
81+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
82+
envelope.try_set_error(DeliveryError::BrokenLink(
83+
"message returned to undeliverable port".to_string(),
84+
));
85+
tracing::error!("{caller} took back an undeliverable message: {}", envelope);
86+
}
87+
});
88+
return_handle
89+
}
90+
7391
/// Returns a message envelope to its original sender.
7492
pub(crate) fn return_undeliverable(
7593
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl ProcMesh {
193193
}
194194
router
195195
.clone()
196-
.serve(router_rx, mailbox::monitored_return_handle());
196+
.serve(router_rx, mailbox::custom_monitored_return_handle("router"));
197197

198198
// Set up a client proc for the mesh itself, so that we can attach ourselves
199199
// to it, and communicate with the agents. We wire it into the same router as
@@ -207,9 +207,10 @@ impl ProcMesh {
207207
client_proc_id.clone(),
208208
BoxedMailboxSender::new(router.clone()),
209209
);
210-
client_proc
211-
.clone()
212-
.serve(client_rx, mailbox::monitored_return_handle());
210+
client_proc.clone().serve(
211+
client_rx,
212+
mailbox::custom_monitored_return_handle("client proc"),
213+
);
213214
router.bind(client_proc_id.clone().into(), client_proc_addr.clone());
214215

215216
// Bind this router to the global router, to enable cross-mesh routing.

0 commit comments

Comments
 (0)