Skip to content

Commit 7981b63

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Plumbing: Add cx: context::Actor to PortHandle (#1350)
Summary: Pull Request resolved: #1350 Differential Revision: D83385432
1 parent 319da74 commit 7981b63

File tree

8 files changed

+107
-61
lines changed

8 files changed

+107
-61
lines changed

hyperactor/src/actor.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -624,14 +624,13 @@ impl<A: Actor> ActorHandle<A> {
624624
/// are always queued in process, and do not require serialization.
625625
pub fn send<M: Message>(
626626
&self,
627-
// TODO(pzhang): use this parameter to generate sequence number.
628-
_cx: &impl context::Actor,
627+
cx: &impl context::Actor,
629628
message: M,
630629
) -> Result<(), MailboxSenderError>
631630
where
632631
A: Handler<M>,
633632
{
634-
self.ports.get().send(message)
633+
self.ports.get().send(cx, message)
635634
}
636635

637636
/// Return a port for the provided message type handled by the actor.

hyperactor/src/mailbox.rs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
//!
1818
//! ```
1919
//! # use hyperactor::mailbox::Mailbox;
20+
//! # use hyperactor::Proc;
2021
//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
2122
//! # tokio_test::block_on(async {
23+
//! # let proc = Proc::local();
24+
//! # let (client, _) = proc.instance("client").unwrap();
2225
//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
2326
//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
2427
//! let mbox = Mailbox::new_detached(actor_id);
2528
//! let (port, mut receiver) = mbox.open_port::<u64>();
2629
//!
27-
//! port.send(123).unwrap();
30+
//! port.send(&client, 123).unwrap();
2831
//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
2932
//! # })
3033
//! ```
@@ -1088,7 +1091,9 @@ impl MailboxClient {
10881091
tokio::spawn(async move {
10891092
let result = return_receiver.await;
10901093
if let Ok(message) = result {
1091-
let _ = return_handle_0.send(Undeliverable(message));
1094+
// When returning messages, we do not care whether the messages are delivered
1095+
// out of order.
1096+
let _ = return_handle_0.anon_send(Undeliverable(message));
10921097
} else {
10931098
// Sender dropped, this task can end.
10941099
}
@@ -1578,9 +1583,24 @@ impl<M: Message> PortHandle<M> {
15781583
}
15791584

15801585
/// Send a message to this port.
1581-
pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1586+
pub fn send(&self, _cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
15821587
let mut headers = Attrs::new();
15831588

1589+
crate::mailbox::headers::set_send_timestamp(&mut headers);
1590+
// TODO(pzhang) Use cx to add SEQ_INFO header.
1591+
1592+
self.sender.send(headers, message).map_err(|err| {
1593+
MailboxSenderError::new_unbound::<M>(
1594+
self.mailbox.actor_id().clone(),
1595+
MailboxSenderErrorKind::Other(err),
1596+
)
1597+
})
1598+
}
1599+
1600+
/// Send a message to this port without a known sender. This method should
1601+
/// only be used if you do not care about out-of-ordering delivery.
1602+
pub fn anon_send(&self, message: M) -> Result<(), MailboxSenderError> {
1603+
let mut headers = Attrs::new();
15841604
crate::mailbox::headers::set_send_timestamp(&mut headers);
15851605

15861606
self.sender.send(headers, message).map_err(|err| {
@@ -2610,6 +2630,7 @@ mod tests {
26102630
use crate::channel::sim::SimAddr;
26112631
use crate::clock::Clock;
26122632
use crate::clock::RealClock;
2633+
use crate::context::Mailbox as _;
26132634
use crate::data::Serialized;
26142635
use crate::id;
26152636
use crate::proc::Proc;
@@ -2653,32 +2674,33 @@ mod tests {
26532674

26542675
#[tokio::test]
26552676
async fn test_mailbox_accum() {
2656-
let mbox = Mailbox::new_detached(id!(test[0].test));
2657-
let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2677+
let proc = Proc::local();
2678+
let (client, _) = proc.instance("client").unwrap();
2679+
let (port, mut receiver) = client.mailbox().open_accum_port(accum::max::<i64>());
26582680

26592681
for i in -3..4 {
2660-
port.send(i).unwrap();
2682+
port.send(&client, i).unwrap();
26612683
let received: accum::Max<i64> = receiver.recv().await.unwrap();
26622684
let msg = received.get();
26632685
assert_eq!(msg, &i);
26642686
}
26652687
// Send a smaller or same value. Should still receive the previous max.
26662688
for i in -3..4 {
2667-
port.send(i).unwrap();
2689+
port.send(&client, i).unwrap();
26682690
assert_eq!(receiver.recv().await.unwrap().get(), &3);
26692691
}
26702692
// send a larger value. Should receive the new max.
2671-
port.send(4).unwrap();
2693+
port.send(&client, 4).unwrap();
26722694
assert_eq!(receiver.recv().await.unwrap().get(), &4);
26732695

26742696
// Send multiple updates. Should only receive the final change.
26752697
for i in 5..10 {
2676-
port.send(i).unwrap();
2698+
port.send(&client, i).unwrap();
26772699
}
26782700
assert_eq!(receiver.recv().await.unwrap().get(), &9);
2679-
port.send(1).unwrap();
2680-
port.send(3).unwrap();
2681-
port.send(2).unwrap();
2701+
port.send(&client, 1).unwrap();
2702+
port.send(&client, 3).unwrap();
2703+
port.send(&client, 2).unwrap();
26822704
assert_eq!(receiver.recv().await.unwrap().get(), &9);
26832705
}
26842706

@@ -2706,9 +2728,10 @@ mod tests {
27062728
#[tokio::test]
27072729
#[ignore] // error behavior changed, but we will bring it back
27082730
async fn test_mailbox_once() {
2709-
let mbox = Mailbox::new_detached(id!(test[0].test));
2731+
let proc = Proc::local();
2732+
let (client, _) = proc.instance("client").unwrap();
27102733

2711-
let (port, receiver) = mbox.open_once_port::<u64>();
2734+
let (port, receiver) = client.open_once_port::<u64>();
27122735

27132736
// let port_id = port.port_id().clone();
27142737

@@ -2977,19 +3000,20 @@ mod tests {
29773000

29783001
#[tokio::test]
29793002
async fn test_enqueue_port() {
2980-
let mbox = Mailbox::new_detached(id!(test[0].test));
3003+
let proc = Proc::local();
3004+
let (client, _) = proc.instance("client").unwrap();
29813005

29823006
let count = Arc::new(AtomicUsize::new(0));
29833007
let count_clone = count.clone();
2984-
let port = mbox.open_enqueue_port(move |_, n| {
3008+
let port = client.mailbox().open_enqueue_port(move |_, n| {
29853009
count_clone.fetch_add(n, Ordering::SeqCst);
29863010
Ok(())
29873011
});
29883012

2989-
port.send(10).unwrap();
2990-
port.send(5).unwrap();
2991-
port.send(1).unwrap();
2992-
port.send(0).unwrap();
3013+
port.send(&client, 10).unwrap();
3014+
port.send(&client, 5).unwrap();
3015+
port.send(&client, 1).unwrap();
3016+
port.send(&client, 0).unwrap();
29933017

29943018
assert_eq!(count.load(Ordering::SeqCst), 16);
29953019
}
@@ -3054,6 +3078,7 @@ mod tests {
30543078
let proc_id = id!(quux[0]);
30553079
let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
30563080
ProcSupervisionCoordinator::set(&proc).await.unwrap();
3081+
let (client, _) = proc.instance("client").unwrap();
30573082

30583083
let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
30593084
let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
@@ -3063,7 +3088,7 @@ mod tests {
30633088
Serialized::serialize(&1u64).unwrap(),
30643089
Attrs::new(),
30653090
);
3066-
return_handle.send(Undeliverable(message)).unwrap();
3091+
return_handle.send(&client, Undeliverable(message)).unwrap();
30673092

30683093
RealClock
30693094
.sleep(tokio::time::Duration::from_millis(100))
@@ -3095,7 +3120,9 @@ mod tests {
30953120
Serialized::serialize(&1u64).unwrap(),
30963121
Attrs::new(),
30973122
);
3098-
return_handle.send(Undeliverable(envelope.clone())).unwrap();
3123+
return_handle
3124+
.anon_send(Undeliverable(envelope.clone()))
3125+
.unwrap();
30993126
// Check we receive the undelivered message.
31003127
assert!(
31013128
RealClock
@@ -3747,12 +3774,13 @@ mod tests {
37473774

37483775
#[tokio::test]
37493776
async fn test_port_contramap() {
3750-
let mbox = Mailbox::new_detached(id!(test[0].test));
3751-
let (handle, mut rx) = mbox.open_port();
3777+
let proc = Proc::local();
3778+
let (client, _) = proc.instance("client").unwrap();
3779+
let (handle, mut rx) = client.open_port();
37523780

37533781
handle
37543782
.contramap(|m| (1, m))
3755-
.send("hello".to_string())
3783+
.send(&client, "hello".to_string())
37563784
.unwrap();
37573785
assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
37583786
}

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::ActorId;
1717
use crate::Message;
1818
use crate::Named;
1919
use crate::PortId;
20+
use crate::Proc;
2021
use crate::actor::ActorStatus;
2122
use crate::id;
2223
use crate::mailbox::DeliveryError;
@@ -94,7 +95,7 @@ pub(crate) fn return_undeliverable(
9495
envelope: MessageEnvelope,
9596
) {
9697
let envelope_copy = envelope.clone();
97-
if (return_handle.send(Undeliverable(envelope))).is_err() {
98+
if (return_handle.anon_send(Undeliverable(envelope))).is_err() {
9899
UndeliverableMailboxSender.post(envelope_copy, /*unsued*/ return_handle)
99100
}
100101
}
@@ -157,6 +158,9 @@ pub fn supervise_undeliverable_messages_with<R, F>(
157158
F: Fn(&MessageEnvelope) + Send + Sync + 'static,
158159
{
159160
crate::init::get_runtime().spawn(async move {
161+
// Create a local client for this task.
162+
let proc = Proc::local();
163+
let (client, _) = proc.instance("undeliverable_supervisor").unwrap();
160164
while let Ok(Undeliverable(mut env)) = rx.recv().await {
161165
// Let caller log/trace before we mutate.
162166
on_undeliverable(&env);
@@ -173,12 +177,15 @@ pub fn supervise_undeliverable_messages_with<R, F>(
173177
let actor_id = env.dest().actor_id().clone();
174178
let headers = env.headers().clone();
175179

176-
if let Err(e) = sink.send(ActorSupervisionEvent::new(
177-
actor_id,
178-
ActorStatus::Failed(format!("message not delivered: {}", env)),
179-
Some(headers),
180-
None,
181-
)) {
180+
if let Err(e) = sink.send(
181+
&client,
182+
ActorSupervisionEvent::new(
183+
actor_id,
184+
ActorStatus::Failed(format!("message not delivered: {}", env)),
185+
Some(headers),
186+
None,
187+
),
188+
) {
182189
tracing::warn!(
183190
%e,
184191
actor=%env.dest().actor_id(),

hyperactor/src/proc.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,9 @@ impl Proc {
389389
.map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
390390
}
391391

392-
fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
392+
fn handle_supervision_event(&self, cx: &impl context::Actor, event: ActorSupervisionEvent) {
393393
let result = match self.state().supervision_coordinator_port.get() {
394-
Some(port) => port.send(event).map_err(anyhow::Error::from),
394+
Some(port) => port.send(cx, event).map_err(anyhow::Error::from),
395395
None => Err(anyhow::anyhow!(
396396
"coordinator port is not set for proc {}",
397397
self.proc_id()
@@ -1044,7 +1044,9 @@ impl<A: Actor> Instance<A> {
10441044
let clock = self.proc.state().clock.clone();
10451045
tokio::spawn(async move {
10461046
clock.non_advancing_sleep(delay).await;
1047-
if let Err(e) = port.send(message) {
1047+
// There is only one message from this context, so there is no need
1048+
// to worry out-of-order delivery.
1049+
if let Err(e) = port.anon_send(message) {
10481050
// TODO: this is a fire-n-forget thread. We need to
10491051
// handle errors in a better way.
10501052
tracing::info!("{}: error sending delayed message: {}", self_id, e);
@@ -1113,7 +1115,7 @@ impl<A: Actor> Instance<A> {
11131115
if let Some(parent) = self.cell.maybe_unlink_parent() {
11141116
if let Some(event) = event {
11151117
// Parent exists, failure should be propagated to the parent.
1116-
parent.send_supervision_event_or_crash(event);
1118+
parent.send_supervision_event_or_crash(&self, event);
11171119
}
11181120
// TODO: we should get rid of this signal, and use *only* supervision events for
11191121
// the purpose of conveying lifecycle changes
@@ -1132,7 +1134,7 @@ impl<A: Actor> Instance<A> {
11321134
// Note that orphaned actor is unexpected and would only happen if
11331135
// there is a bug.
11341136
if let Some(event) = event {
1135-
self.proc.handle_supervision_event(event);
1137+
self.proc.handle_supervision_event(&self, event);
11361138
}
11371139
}
11381140
self.change_status(actor_status);
@@ -1667,7 +1669,9 @@ impl InstanceCell {
16671669
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
16681670
pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
16691671
if let Some((signal_port, _)) = &self.inner.actor_loop {
1670-
signal_port.send(signal).map_err(ActorError::from)
1672+
// The owner of InstanceCell would not use PortRef to send signal.
1673+
// So we do not need to worry about out-of-order delivery here.
1674+
signal_port.anon_send(signal).map_err(ActorError::from)
16711675
} else {
16721676
tracing::warn!(
16731677
"{}: attempted to send signal {} to detached actor",
@@ -1686,10 +1690,14 @@ impl InstanceCell {
16861690
/// Note that "let it crash" is the default behavior when a supervision event
16871691
/// cannot be delivered upstream. It is the upstream's responsibility to
16881692
/// detect and handle crashes.
1689-
pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1693+
pub fn send_supervision_event_or_crash(
1694+
&self,
1695+
cx: &impl context::Actor,
1696+
event: ActorSupervisionEvent,
1697+
) {
16901698
match &self.inner.actor_loop {
16911699
Some((_, supervision_port)) => {
1692-
if let Err(err) = supervision_port.send(event) {
1700+
if let Err(err) = supervision_port.send(cx, event) {
16931701
tracing::error!(
16941702
"{}: failed to send supervision event to actor: {:?}. Crash the process.",
16951703
self.actor_id(),
@@ -2662,7 +2670,7 @@ mod tests {
26622670
let (tx, rx) = client.open_once_port();
26632671
handle.send(&client, tx).unwrap();
26642672
let usize_handle = rx.recv().await.unwrap();
2665-
usize_handle.send(123).unwrap();
2673+
usize_handle.send(&client, 123).unwrap();
26662674

26672675
handle.drain_and_stop().unwrap();
26682676
handle.await;

hyperactor_multiprocess/src/system_actor.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,11 +1652,14 @@ impl Handler<MaintainWorldHealth> for SystemActor {
16521652

16531653
// The proc has expired heartbeating and it manages the lifecycle of system, schedule system stop
16541654
let (tx, _) = cx.open_once_port::<()>();
1655-
cx.port().send(SystemMessage::Stop {
1656-
worlds: None,
1657-
proc_timeout: Duration::from_secs(5),
1658-
reply_port: tx.bind(),
1659-
})?;
1655+
cx.port().send(
1656+
&cx,
1657+
SystemMessage::Stop {
1658+
worlds: None,
1659+
proc_timeout: Duration::from_secs(5),
1660+
reply_port: tx.bind(),
1661+
},
1662+
)?;
16601663
}
16611664

16621665
if world.state.status == WorldStatus::Live {

monarch_hyperactor/src/mailbox.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,12 @@ pub(super) struct PythonPortHandle {
249249

250250
#[pymethods]
251251
impl PythonPortHandle {
252-
// TODO(pzhang) Use instance after its required by PortHandle.
253-
fn send(&self, _instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
254-
self.inner
255-
.send(message)
256-
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
252+
fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
253+
instance_dispatch!(instance, |cx_instance| {
254+
self.inner
255+
.send(cx_instance, message)
256+
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
257+
});
257258
Ok(())
258259
}
259260

monarch_hyperactor/src/proc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ async fn check_actor_supervision_state(
680680
// TODO: should allow for multiple attempts
681681
tracing::error!("system actor is not alive, aborting!");
682682
// Send a signal to the client to abort.
683-
signal_port.send(Signal::Stop).unwrap();
683+
signal_port.send(&instance, Signal::Stop).unwrap();
684684
}
685685
}
686686
Ok(())

0 commit comments

Comments
 (0)