Skip to content

Commit 319da74

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Add cx: context::Actor to ActorHandle
Differential Revision: D83533433
1 parent 8a7e009 commit 319da74

File tree

16 files changed

+408
-274
lines changed

16 files changed

+408
-274
lines changed

controller/src/lib.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,14 +1678,17 @@ mod tests {
16781678
// Join the world.
16791679
server_handle
16801680
.system_actor_handle()
1681-
.send(SystemMessage::Join {
1682-
proc_id: local_proc_id.clone(),
1683-
world_id,
1684-
proc_message_port: local_proc_message_port.bind(),
1685-
proc_addr: local_proc_addr,
1686-
labels: HashMap::new(),
1687-
lifecycle_mode: ProcLifecycleMode::ManagedBySystem,
1688-
})
1681+
.send(
1682+
&client_mailbox,
1683+
SystemMessage::Join {
1684+
proc_id: local_proc_id.clone(),
1685+
world_id,
1686+
proc_message_port: local_proc_message_port.bind(),
1687+
proc_addr: local_proc_addr,
1688+
labels: HashMap::new(),
1689+
lifecycle_mode: ProcLifecycleMode::ManagedBySystem,
1690+
},
1691+
)
16891692
.unwrap();
16901693

16911694
assert_matches!(
@@ -1779,14 +1782,17 @@ mod tests {
17791782
// Join the world.
17801783
server_handle
17811784
.system_actor_handle()
1782-
.send(SystemMessage::Join {
1783-
proc_id: local_proc_id.clone(),
1784-
world_id,
1785-
proc_message_port: local_proc_message_port.bind(),
1786-
proc_addr: local_proc_addr,
1787-
labels: HashMap::new(),
1788-
lifecycle_mode: ProcLifecycleMode::ManagedBySystem,
1789-
})
1785+
.send(
1786+
&client_mailbox,
1787+
SystemMessage::Join {
1788+
proc_id: local_proc_id.clone(),
1789+
world_id,
1790+
proc_message_port: local_proc_message_port.bind(),
1791+
proc_addr: local_proc_addr,
1792+
labels: HashMap::new(),
1793+
lifecycle_mode: ProcLifecycleMode::ManagedBySystem,
1794+
},
1795+
)
17901796
.unwrap();
17911797

17921798
assert_matches!(

hyperactor/src/actor.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,12 @@ impl<A: Actor> ActorHandle<A> {
622622

623623
/// Send a message to the actor. Messages sent through the handle
624624
/// are always queued in process, and do not require serialization.
625-
pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
625+
pub fn send<M: Message>(
626+
&self,
627+
// TODO(pzhang): use this parameter to generate sequence number.
628+
_cx: &impl context::Actor,
629+
message: M,
630+
) -> Result<(), MailboxSenderError>
626631
where
627632
A: Handler<M>,
628633
{
@@ -753,10 +758,10 @@ mod tests {
753758
#[tokio::test]
754759
async fn test_server_basic() {
755760
let proc = Proc::local();
756-
let client = proc.attach("client").unwrap();
761+
let (client, _) = proc.instance("client").unwrap();
757762
let (tx, mut rx) = client.open_port();
758763
let handle = proc.spawn::<EchoActor>("echo", tx.bind()).await.unwrap();
759-
handle.send(123u64).unwrap();
764+
handle.send(&client, 123u64).unwrap();
760765
handle.drain_and_stop().unwrap();
761766
handle.await;
762767

@@ -766,7 +771,7 @@ mod tests {
766771
#[tokio::test]
767772
async fn test_ping_pong() {
768773
let proc = Proc::local();
769-
let client = proc.attach("client").unwrap();
774+
let (client, _) = proc.instance("client").unwrap();
770775
let (undeliverable_msg_tx, _) = client.open_port();
771776

772777
let ping_pong_actor_params =
@@ -783,7 +788,10 @@ mod tests {
783788
let (local_port, local_receiver) = client.open_once_port();
784789

785790
ping_handle
786-
.send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
791+
.send(
792+
&client,
793+
PingPongMessage(10, pong_handle.bind(), local_port.bind()),
794+
)
787795
.unwrap();
788796

789797
assert!(local_receiver.recv().await.unwrap());
@@ -792,7 +800,7 @@ mod tests {
792800
#[tokio::test]
793801
async fn test_ping_pong_on_handler_error() {
794802
let proc = Proc::local();
795-
let client = proc.attach("client").unwrap();
803+
let (client, _) = proc.instance("client").unwrap();
796804
let (undeliverable_msg_tx, _) = client.open_port();
797805

798806
// Need to set a supervison coordinator for this Proc because there will
@@ -814,11 +822,14 @@ mod tests {
814822
let (local_port, local_receiver) = client.open_once_port();
815823

816824
ping_handle
817-
.send(PingPongMessage(
818-
error_ttl + 1, // will encounter an error at TTL=66
819-
pong_handle.bind(),
820-
local_port.bind(),
821-
))
825+
.send(
826+
&client,
827+
PingPongMessage(
828+
error_ttl + 1, // will encounter an error at TTL=66
829+
pong_handle.bind(),
830+
local_port.bind(),
831+
),
832+
)
822833
.unwrap();
823834

824835
// TODO: Fix this receiver hanging issue in T200423722.
@@ -861,10 +872,10 @@ mod tests {
861872
async fn test_init() {
862873
let proc = Proc::local();
863874
let handle = proc.spawn::<InitActor>("init", ()).await.unwrap();
864-
let client = proc.attach("client").unwrap();
875+
let (client, _) = proc.instance("client").unwrap();
865876

866877
let (port, receiver) = client.open_once_port();
867-
handle.send(port).unwrap();
878+
handle.send(&client, port).unwrap();
868879
assert!(receiver.recv().await.unwrap());
869880

870881
handle.drain_and_stop().unwrap();
@@ -946,12 +957,12 @@ mod tests {
946957
M: RemoteMessage,
947958
MultiActor: Handler<M>,
948959
{
949-
self.handle.send(message).unwrap()
960+
self.handle.send(&self.client, message).unwrap()
950961
}
951962

952963
async fn sync(&self) {
953964
let (port, done) = self.client.open_once_port::<bool>();
954-
self.handle.send(port).unwrap();
965+
self.handle.send(&self.client, port).unwrap();
955966
assert!(done.recv().await.unwrap());
956967
}
957968

0 commit comments

Comments
 (0)