Skip to content

Commit 6e87ff0

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Plumbing: Use context::Actor for Rust PortRef and Port (#1370)
Summary: Pull Request resolved: #1370 This diff is part of the effect to adding sequencing logic to sender actor. See D83371710 for details. This diff updates the `send` methods in Rust `PortRef` and `Port` to use `context::Actor`. Reviewed By: pablorfb-meta, mariusae Differential Revision: D83507394 fbshipit-source-id: fae0a83192628a33092526d06ef27b7e6daf9308
1 parent a79fdd5 commit 6e87ff0

File tree

4 files changed

+30
-28
lines changed

4 files changed

+30
-28
lines changed

hyperactor/src/mailbox.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,27 +1164,27 @@ impl MailboxSender for MailboxClient {
11641164
}
11651165

11661166
/// Wrapper to turn `PortRef` into a `Sink`.
1167-
pub struct PortSink<C: context::Mailbox, M: RemoteMessage> {
1168-
caps: C,
1167+
pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1168+
cx: C,
11691169
port: PortRef<M>,
11701170
}
11711171

1172-
impl<C: context::Mailbox, M: RemoteMessage> PortSink<C, M> {
1172+
impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
11731173
/// Create new PortSink
1174-
pub fn new(caps: C, port: PortRef<M>) -> Self {
1175-
Self { caps, port }
1174+
pub fn new(cx: C, port: PortRef<M>) -> Self {
1175+
Self { cx, port }
11761176
}
11771177
}
11781178

1179-
impl<C: context::Mailbox, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1179+
impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
11801180
type Error = MailboxSenderError;
11811181

11821182
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
11831183
Poll::Ready(Ok(()))
11841184
}
11851185

11861186
fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1187-
self.port.send(&self.caps, item)
1187+
self.port.send(&self.cx, item)
11881188
}
11891189

11901190
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {

hyperactor/src/reference.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ use parse::ParseError;
7272
use parse::Token;
7373
use parse::parse;
7474

75-
use crate::proc::SEQ_INFO;
76-
use crate::proc::SeqInfo;
77-
7875
/// A universal reference to hierarchical identifiers in Hyperactor.
7976
///
8077
/// References implement a concrete syntax which can be parsed via
@@ -723,7 +720,7 @@ impl<A: RemoteActor> ActorRef<A> {
723720
/// Send an [`M`]-typed message to the referenced actor.
724721
pub fn send<M: RemoteMessage>(
725722
&self,
726-
cx: &impl context::Mailbox,
723+
cx: &impl context::Actor,
727724
message: M,
728725
) -> Result<(), MailboxSenderError>
729726
where
@@ -736,7 +733,7 @@ impl<A: RemoteActor> ActorRef<A> {
736733
/// headers.
737734
pub fn send_with_headers<M: RemoteMessage>(
738735
&self,
739-
cx: &impl context::Mailbox,
736+
cx: &impl context::Actor,
740737
headers: Attrs,
741738
message: M,
742739
) -> Result<(), MailboxSenderError>
@@ -883,7 +880,7 @@ impl PortId {
883880
/// Send a serialized message to this port, provided a sending capability,
884881
/// such as [`crate::actor::Instance`]. It is the sender's responsibility
885882
/// to ensure that the provided message is well-typed.
886-
pub fn send(&self, cx: &impl context::Mailbox, serialized: Serialized) {
883+
pub fn send(&self, cx: &impl context::Actor, serialized: Serialized) {
887884
let mut headers = Attrs::new();
888885
crate::mailbox::headers::set_send_timestamp(&mut headers);
889886
cx.post(self.clone(), headers, serialized);
@@ -894,7 +891,7 @@ impl PortId {
894891
/// It is the sender's responsibility to ensure that the provided message is well-typed.
895892
pub fn send_with_headers(
896893
&self,
897-
cx: &impl context::Mailbox,
894+
cx: &impl context::Actor,
898895
serialized: Serialized,
899896
mut headers: Attrs,
900897
) {
@@ -1014,7 +1011,7 @@ impl<M: RemoteMessage> PortRef<M> {
10141011

10151012
/// Send a message to this port, provided a sending capability, such as
10161013
/// [`crate::actor::Instance`].
1017-
pub fn send(&self, cx: &impl context::Mailbox, message: M) -> Result<(), MailboxSenderError> {
1014+
pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
10181015
self.send_with_headers(cx, Attrs::new(), message)
10191016
}
10201017

@@ -1023,7 +1020,7 @@ impl<M: RemoteMessage> PortRef<M> {
10231020
/// headers.
10241021
pub fn send_with_headers(
10251022
&self,
1026-
cx: &impl context::Mailbox,
1023+
cx: &impl context::Actor,
10271024
headers: Attrs,
10281025
message: M,
10291026
) -> Result<(), MailboxSenderError> {
@@ -1041,7 +1038,7 @@ impl<M: RemoteMessage> PortRef<M> {
10411038
/// [`crate::actor::Instance`].
10421039
pub fn send_serialized(
10431040
&self,
1044-
cx: &impl context::Mailbox,
1041+
cx: &impl context::Actor,
10451042
mut headers: Attrs,
10461043
message: Serialized,
10471044
) {
@@ -1050,7 +1047,7 @@ impl<M: RemoteMessage> PortRef<M> {
10501047
}
10511048

10521049
/// Convert this port into a sink that can be used to send messages using the given capability.
1053-
pub fn into_sink<C: context::Mailbox>(self, cx: C) -> PortSink<C, M> {
1050+
pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
10541051
PortSink::new(cx, self)
10551052
}
10561053
}
@@ -1138,15 +1135,15 @@ impl<M: RemoteMessage> OncePortRef<M> {
11381135

11391136
/// Send a message to this port, provided a sending capability, such as
11401137
/// [`crate::actor::Instance`].
1141-
pub fn send(self, cx: &impl context::Mailbox, message: M) -> Result<(), MailboxSenderError> {
1138+
pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
11421139
self.send_with_headers(cx, Attrs::new(), message)
11431140
}
11441141

11451142
/// Send a message to this port, provided a sending capability, such as
11461143
/// [`crate::actor::Instance`]. Additional context can be provided in the form of headers.
11471144
pub fn send_with_headers(
11481145
self,
1149-
cx: &impl context::Mailbox,
1146+
cx: &impl context::Actor,
11501147
mut headers: Attrs,
11511148
message: M,
11521149
) -> Result<(), MailboxSenderError> {

hyperactor_macros/tests/export.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ mod tests {
111111
#[async_timed_test(timeout_secs = 30)]
112112
async fn test_binds() {
113113
let proc = Proc::local();
114-
let client = proc.attach("client").unwrap();
114+
let (client, _) = proc.instance("client").unwrap();
115115
let (tx, mut rx) = client.open_port();
116116
let params = TestActorParams {
117117
forward_port: tx.bind(),
@@ -192,7 +192,7 @@ mod tests {
192192
#[async_timed_test(timeout_secs = 30)]
193193
async fn test_ref_alias() {
194194
let proc = Proc::local();
195-
let client = proc.attach("client").unwrap();
195+
let (client, _) = proc.instance("client").unwrap();
196196
let (tx, mut rx) = client.open_port();
197197
let params = TestActorParams {
198198
forward_port: tx.bind(),

monarch_hyperactor/src/mailbox.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use serde::Serialize;
4848
use crate::actor::PythonMessage;
4949
use crate::actor::PythonMessageKind;
5050
use crate::context::PyInstance;
51+
use crate::instance_dispatch;
5152
use crate::proc::PyActorId;
5253
use crate::pytokio::PyPythonTask;
5354
use crate::pytokio::PythonTask;
@@ -321,9 +322,11 @@ impl PythonPortRef {
321322
}
322323

323324
fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
324-
self.inner
325-
.send(&instance._mailbox().inner, message)
326-
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
325+
instance_dispatch!(instance, |cx_instance| {
326+
self.inner
327+
.send(cx_instance, message)
328+
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
329+
});
327330
Ok(())
328331
}
329332

@@ -537,9 +540,11 @@ impl PythonOncePortRef {
537540
return Err(PyErr::new::<PyValueError, _>("OncePortRef is already used"));
538541
};
539542

540-
port_ref
541-
.send(&instance._mailbox().inner, message)
542-
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
543+
instance_dispatch!(instance, |cx_instance| {
544+
port_ref
545+
.send(cx_instance, message)
546+
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
547+
});
543548
Ok(())
544549
}
545550

0 commit comments

Comments
 (0)