From a85a25cf413f996d1ba53499d642807b065d6aea Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Thu, 20 Nov 2025 19:46:29 -0800 Subject: [PATCH] [hyperactor] make all variants of `spawn` sync Building on the previous diff, separating out remote from local instantiation, this diff implements synchronous spawns throughout. This means we can always spawn an actor in a nonblocking way, regardless of context. Spawns should also become infallible, instead relying on supervision to handle errors. Differential Revision: [D87608082](https://our.internmc.facebook.com/intern/diff/D87608082/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87608082/)! [ghstack-poisoned] --- hyperactor/src/actor.rs | 38 ++++------ hyperactor/src/host.rs | 2 +- hyperactor/src/mailbox.rs | 2 +- hyperactor/src/proc.rs | 74 +++++-------------- hyperactor/src/test_utils/proc_supervison.rs | 4 +- hyperactor/test/host_bootstrap.rs | 1 - hyperactor_mesh/src/bootstrap.rs | 18 ++--- hyperactor_mesh/src/connect.rs | 2 +- hyperactor_mesh/src/logging.rs | 8 +- hyperactor_mesh/src/proc_mesh/mesh_agent.rs | 4 +- hyperactor_mesh/src/v1/host_mesh.rs | 3 +- .../src/v1/host_mesh/mesh_agent.rs | 3 +- hyperactor_multiprocess/src/proc_actor.rs | 58 ++++----------- hyperactor_multiprocess/src/system_actor.rs | 6 +- 14 files changed, 65 insertions(+), 158 deletions(-) diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 0c03753f5..c574cdfdb 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -98,8 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static { /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]). /// The spawned actor will be supervised by the parent (spawning) actor. - async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result> { - cx.instance().spawn(self).await + fn spawn(self, cx: &impl context::Actor) -> anyhow::Result> { + cx.instance().spawn(self) } /// Spawns this actor in a detached state, handling its messages @@ -108,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static { /// /// Actors spawned through `spawn_detached` are not attached to a supervision /// hierarchy, and not managed by a [`Proc`]. - async fn spawn_detached(self) -> Result, anyhow::Error> { - Proc::local().spawn("anon", self).await + fn spawn_detached(self) -> Result, anyhow::Error> { + Proc::local().spawn("anon", self) } /// This method is used by the runtime to spawn the actor server. It can be @@ -260,7 +260,7 @@ pub trait RemoteSpawn: Actor + Referable + Binds { Box::pin(async move { let params = bincode::deserialize(&serialized_params)?; let actor = Self::new(params).await?; - let handle = proc.spawn(&name, actor).await?; + let handle = proc.spawn(&name, actor)?; // We return only the ActorId, not a typed ActorRef. // Callers that hold this ID can interact with the actor // only via the serialized/opaque messaging path, which @@ -792,7 +792,7 @@ mod tests { let client = proc.attach("client").unwrap(); let (tx, mut rx) = client.open_port(); let actor = EchoActor(tx.bind()); - let handle = proc.spawn::("echo", actor).await.unwrap(); + let handle = proc.spawn::("echo", actor).unwrap(); handle.send(123u64).unwrap(); handle.drain_and_stop().unwrap(); handle.await; @@ -808,14 +808,8 @@ mod tests { let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None); let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None); - let ping_handle = proc - .spawn::("ping", ping_actor) - .await - .unwrap(); - let pong_handle = proc - .spawn::("pong", pong_actor) - .await - .unwrap(); + let ping_handle = proc.spawn::("ping", ping_actor).unwrap(); + let pong_handle = proc.spawn::("pong", pong_actor).unwrap(); let (local_port, local_receiver) = client.open_once_port(); @@ -842,14 +836,8 @@ mod tests { PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None); let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None); - let ping_handle = proc - .spawn::("ping", ping_actor) - .await - .unwrap(); - let pong_handle = proc - .spawn::("pong", pong_actor) - .await - .unwrap(); + let ping_handle = proc.spawn::("ping", ping_actor).unwrap(); + let pong_handle = proc.spawn::("pong", pong_actor).unwrap(); let (local_port, local_receiver) = client.open_once_port(); @@ -895,7 +883,7 @@ mod tests { async fn test_init() { let proc = Proc::local(); let actor = InitActor(false); - let handle = proc.spawn::("init", actor).await.unwrap(); + let handle = proc.spawn::("init", actor).unwrap(); let client = proc.attach("client").unwrap(); let (port, receiver) = client.open_once_port(); @@ -954,7 +942,7 @@ mod tests { let proc = Proc::local(); let values: MultiValues = Arc::new(Mutex::new((0, "".to_string()))); let actor = MultiActor(values.clone()); - let handle = proc.spawn::("myactor", actor).await.unwrap(); + let handle = proc.spawn::("myactor", actor).unwrap(); let (client, client_handle) = proc.instance("client").unwrap(); Self { proc, @@ -1072,7 +1060,7 @@ mod tests { // Just test that we can round-trip the handle through a downcast. let proc = Proc::local(); - let handle = proc.spawn("nothing", NothingActor).await.unwrap(); + let handle = proc.spawn("nothing", NothingActor).unwrap(); let cell = handle.cell(); // Invalid actor doesn't succeed. diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index 8e1c4766e..f6c4c9df1 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -1220,7 +1220,7 @@ mod tests { #[tokio::test] async fn test_basic() { let proc_manager = - LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await }); + LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) }); let procs = Arc::clone(&proc_manager.procs); let (mut host, _handle) = Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local)) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index fb0f011f6..93d4ad2b5 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -3125,7 +3125,7 @@ mod tests { let mut proc = Proc::new(proc_id.clone(), proc_forwarder); ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let foo = proc.spawn("foo", Foo).await.unwrap(); + let foo = proc.spawn("foo", Foo).unwrap(); let return_handle = foo.port::>(); let message = MessageEnvelope::new( foo.actor_id().clone(), diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 93e6cf6e5..65fb5346d 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -507,12 +507,7 @@ impl Proc { /// Spawn a named (root) actor on this proc. The name of the actor must be /// unique. - #[hyperactor::observe_result("Proc")] - pub async fn spawn( - &self, - name: &str, - actor: A, - ) -> Result, anyhow::Error> { + pub fn spawn(&self, name: &str, actor: A) -> Result, anyhow::Error> { let actor_id = self.allocate_root_id(name)?; let span = tracing::span!( Level::INFO, @@ -532,10 +527,7 @@ impl Proc { .ledger .insert(actor_id.clone(), instance.inner.cell.downgrade())?; - Ok(instance - .start(actor, actor_loop_receivers.take().unwrap(), work_rx) - .instrument(span) - .await) + Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx)) } /// Create and return an actor instance and its corresponding handle. This allows actors to be @@ -584,7 +576,7 @@ impl Proc { /// /// When spawn_child returns, the child has an associated cell and is linked /// with its parent. - async fn spawn_child( + fn spawn_child( &self, parent: InstanceCell, actor: A, @@ -592,9 +584,7 @@ impl Proc { let actor_id = self.allocate_child_id(parent.actor_id())?; let (instance, mut actor_loop_receivers, work_rx) = Instance::new(self.clone(), actor_id, false, Some(parent.clone())); - Ok(instance - .start(actor, actor_loop_receivers.take().unwrap(), work_rx) - .await) + Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx)) } /// Call `abort` on the `JoinHandle` associated with the given @@ -1166,8 +1156,7 @@ impl Instance { /// Start an A-typed actor onto this instance with the provided params. When spawn returns, /// the actor has been linked with its parent, if it has one. - #[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))] - async fn start( + fn start( self, actor: A, actor_loop_receivers: (PortReceiver, PortReceiver), @@ -1540,11 +1529,8 @@ impl Instance { } /// Spawn on child on this instance. Currently used only by cap::CanSpawn. - pub(crate) async fn spawn(&self, actor: C) -> anyhow::Result> { - self.inner - .proc - .spawn_child(self.inner.cell.clone(), actor) - .await + pub(crate) fn spawn(&self, actor: C) -> anyhow::Result> { + self.inner.proc.spawn_child(self.inner.cell.clone(), actor) } /// Create a new direct child instance. @@ -2282,7 +2268,7 @@ mod tests { cx: &crate::Context, reply: oneshot::Sender>, ) -> Result<(), anyhow::Error> { - let handle = TestActor::default().spawn(cx).await?; + let handle = TestActor::default().spawn(cx)?; reply.send(handle).unwrap(); Ok(()) } @@ -2292,7 +2278,7 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_spawn_actor() { let proc = Proc::local(); - let handle = proc.spawn("test", TestActor::default()).await.unwrap(); + let handle = proc.spawn("test", TestActor::default()).unwrap(); // Check on the join handle. assert!(logs_contain( @@ -2343,11 +2329,9 @@ mod tests { let proc = Proc::local(); let first = proc .spawn::("first", TestActor::default()) - .await .unwrap(); let second = proc .spawn::("second", TestActor::default()) - .await .unwrap(); let (tx, rx) = oneshot::channel::<()>(); let reply_message = TestActorMessage::Reply(tx); @@ -2387,12 +2371,10 @@ mod tests { let target_actor = proc .spawn::("target", TestActor::default()) - .await .unwrap(); let target_actor_ref = target_actor.bind(); let lookup_actor = proc .spawn::("lookup", LookupTestActor::default()) - .await .unwrap(); assert!( @@ -2453,7 +2435,6 @@ mod tests { let first = proc .spawn::("first", TestActor::default()) - .await .unwrap(); let second = TestActor::spawn_child(&first).await; let third = TestActor::spawn_child(&second).await; @@ -2524,7 +2505,6 @@ mod tests { let root = proc .spawn::("root", TestActor::default()) - .await .unwrap(); let root_1 = TestActor::spawn_child(&root).await; let root_2 = TestActor::spawn_child(&root).await; @@ -2548,7 +2528,6 @@ mod tests { let root = proc .spawn::("root", TestActor::default()) - .await .unwrap(); let root_1 = TestActor::spawn_child(&root).await; let root_2 = TestActor::spawn_child(&root).await; @@ -2589,10 +2568,7 @@ mod tests { let proc = Proc::local(); // Add the 1st root. This root will remain active until the end of the test. - let root: ActorHandle = proc - .spawn::("root", TestActor::default()) - .await - .unwrap(); + let root: ActorHandle = proc.spawn("root", TestActor::default()).unwrap(); wait_until_idle(&root).await; { let snapshot = proc.state().ledger.snapshot(); @@ -2606,10 +2582,8 @@ mod tests { } // Add the 2nd root. - let another_root: ActorHandle = proc - .spawn::("another_root", TestActor::default()) - .await - .unwrap(); + let another_root: ActorHandle = + proc.spawn("another_root", TestActor::default()).unwrap(); wait_until_idle(&another_root).await; { let snapshot = proc.state().ledger.snapshot(); @@ -2844,7 +2818,7 @@ mod tests { let proc = Proc::local(); let state = Arc::new(AtomicUsize::new(0)); let actor = TestActor(state.clone()); - let handle = proc.spawn::("test", actor).await.unwrap(); + let handle = proc.spawn::("test", actor).unwrap(); let client = proc.attach("client").unwrap(); let (tx, rx) = client.open_once_port(); handle.send(tx).unwrap(); @@ -2868,10 +2842,7 @@ mod tests { ProcSupervisionCoordinator::set(&proc).await.unwrap(); let (client, _handle) = proc.instance("client").unwrap(); - let actor_handle = proc - .spawn::("test", TestActor::default()) - .await - .unwrap(); + let actor_handle = proc.spawn("test", TestActor::default()).unwrap(); actor_handle .panic(&client, "some random failure".to_string()) .await @@ -2895,6 +2866,8 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_local_supervision_propagation() { + hyperactor_telemetry::initialize_logging_for_test(); + #[derive(Debug)] struct TestActor(Arc, bool); @@ -2943,7 +2916,6 @@ mod tests { let root = proc .spawn::("root", TestActor(root_state.clone(), false)) - .await .unwrap(); let root_1 = proc .spawn_child::( @@ -2953,32 +2925,27 @@ mod tests { true, /* set true so children's event stops here */ ), ) - .await .unwrap(); let root_1_1 = proc .spawn_child::( root_1.cell().clone(), TestActor(root_1_1_state.clone(), false), ) - .await .unwrap(); let root_1_1_1 = proc .spawn_child::( root_1_1.cell().clone(), TestActor(root_1_1_1_state.clone(), false), ) - .await .unwrap(); let root_2 = proc .spawn_child::(root.cell().clone(), TestActor(root_2_state.clone(), false)) - .await .unwrap(); let root_2_1 = proc .spawn_child::( root_2.cell().clone(), TestActor(root_2_1_state.clone(), false), ) - .await .unwrap(); // fail `root_1_1_1`, the supervision msg should be propagated to @@ -3030,7 +2997,7 @@ mod tests { let (instance, handle) = proc.instance("my_test_actor").unwrap(); - let child_actor = TestActor::default().spawn(&instance).await.unwrap(); + let child_actor = TestActor::default().spawn(&instance).unwrap(); let (port, mut receiver) = instance.open_port(); child_actor @@ -3061,10 +3028,7 @@ mod tests { // Intentionally not setting a proc supervison coordinator. This // should cause the process to terminate. // ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let root = proc - .spawn::("root", TestActor::default()) - .await - .unwrap(); + let root = proc.spawn("root", TestActor::default()).unwrap(); let (client, _handle) = proc.instance("client").unwrap(); root.fail(&client, anyhow::anyhow!("some random failure")) .await @@ -3159,7 +3123,7 @@ mod tests { } trace_and_block(async { - let handle = LoggingActor::default().spawn_detached().await.unwrap(); + let handle = LoggingActor::default().spawn_detached().unwrap(); handle.send("hello world".to_string()).unwrap(); handle.send("hello world again".to_string()).unwrap(); handle.send(123u64).unwrap(); diff --git a/hyperactor/src/test_utils/proc_supervison.rs b/hyperactor/src/test_utils/proc_supervison.rs index e63a1b883..cc8136f75 100644 --- a/hyperactor/src/test_utils/proc_supervison.rs +++ b/hyperactor/src/test_utils/proc_supervison.rs @@ -43,9 +43,7 @@ impl ProcSupervisionCoordinator { pub async fn set(proc: &Proc) -> Result { let state = ReportedEvent::new(); let actor = ProcSupervisionCoordinator(state.clone()); - let coordinator = proc - .spawn::("coordinator", actor) - .await?; + let coordinator = proc.spawn::("coordinator", actor)?; proc.set_supervision_coordinator(coordinator.port())?; Ok(state) } diff --git a/hyperactor/test/host_bootstrap.rs b/hyperactor/test/host_bootstrap.rs index d15dfef78..ce6b9b203 100644 --- a/hyperactor/test/host_bootstrap.rs +++ b/hyperactor/test/host_bootstrap.rs @@ -19,7 +19,6 @@ async fn main() { let proc = ProcessProcManager::::boot_proc(|proc| async move { proc.spawn("echo", hyperactor::host::testing::EchoActor) - .await }) .await .unwrap(); diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 35969f177..d55e746d1 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -490,12 +490,10 @@ impl Bootstrap { let (host, _handle) = ok!(Host::serve(manager, addr).await); let addr = host.addr().clone(); let system_proc = host.system_proc().clone(); - let host_mesh_agent = ok!(system_proc - .spawn::( - "agent", - HostMeshAgent::new(HostAgentMode::Process(host)), - ) - .await); + let host_mesh_agent = ok!(system_proc.spawn::( + "agent", + HostMeshAgent::new(HostAgentMode::Process(host)), + )); tracing::info!( "serving host at {}, agent: {}", @@ -2612,11 +2610,8 @@ mod tests { // Spawn the log client and disable aggregation (immediate // print + tap push). let log_client_actor = LogClientActor::new(()).await.unwrap(); - let log_client: ActorRef = proc - .spawn("log_client", log_client_actor) - .await - .unwrap() - .bind(); + let log_client: ActorRef = + proc.spawn("log_client", log_client_actor).unwrap().bind(); log_client.set_aggregate(&client, None).await.unwrap(); // Spawn the forwarder in this proc (it will serve @@ -2624,7 +2619,6 @@ mod tests { let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap(); let _log_forwarder: ActorRef = proc .spawn("log_forwarder", log_forwarder_actor) - .await .unwrap() .bind(); diff --git a/hyperactor_mesh/src/connect.rs b/hyperactor_mesh/src/connect.rs index 62f3ec987..ad4ae7fa4 100644 --- a/hyperactor_mesh/src/connect.rs +++ b/hyperactor_mesh/src/connect.rs @@ -430,7 +430,7 @@ mod tests { let proc = Proc::local(); let (client, _client_handle) = proc.instance("client")?; let (connect, completer) = Connect::allocate(client.self_id().clone(), client); - let actor = proc.spawn("actor", EchoActor {}).await?; + let actor = proc.spawn("actor", EchoActor {})?; actor.send(connect)?; let (mut rd, mut wr) = completer.complete().await?.into_split(); let send = [3u8, 4u8, 5u8, 6u8]; diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index e1865df35..832026f30 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -1603,15 +1603,11 @@ mod tests { std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string()); } let log_client_actor = LogClientActor::new(()).await.unwrap(); - let log_client: ActorRef = proc - .spawn("log_client", log_client_actor) - .await - .unwrap() - .bind(); + let log_client: ActorRef = + proc.spawn("log_client", log_client_actor).unwrap().bind(); let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap(); let log_forwarder: ActorRef = proc .spawn("log_forwarder", log_forwarder_actor) - .await .unwrap() .bind(); diff --git a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs index 07a527ee8..98cadacdf 100644 --- a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs @@ -258,7 +258,7 @@ impl ProcMeshAgent { record_supervision_events: false, supervision_events: HashMap::new(), }; - let handle = proc.spawn::("mesh", agent).await?; + let handle = proc.spawn::("mesh", agent)?; Ok((proc, handle)) } @@ -271,7 +271,7 @@ impl ProcMeshAgent { record_supervision_events: true, supervision_events: HashMap::new(), }; - proc.spawn::("agent", agent).await + proc.spawn::("agent", agent) } async fn destroy_and_wait_except_current<'a>( diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 7872049d0..203d48213 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -261,8 +261,7 @@ impl HostMesh { let addr = host.addr().clone(); let system_proc = host.system_proc().clone(); let host_mesh_agent = system_proc - .spawn::("agent", HostMeshAgent::new(HostAgentMode::Process(host))) - .await + .spawn("agent", HostMeshAgent::new(HostAgentMode::Process(host))) .map_err(v1::Error::SingletonActorSpawnError)?; host_mesh_agent.bind::(); diff --git a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs index d316b9369..09518d1dc 100644 --- a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs @@ -476,7 +476,7 @@ impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline { let system_proc = host.system_proc().clone(); let actor = HostMeshAgent::new(host); - let host_mesh_agent = system_proc.spawn::("agent", actor).await?; + let host_mesh_agent = system_proc.spawn("agent", actor)?; Ok(Self { host_mesh_agent, @@ -537,7 +537,6 @@ mod tests { created: HashMap::new(), }, ) - .await .unwrap(); let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()) diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 082c6f634..b951dd13c 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -422,25 +422,21 @@ impl ProcActor { let mailbox_handle = proc.clone().serve(rx); let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin); - let handle = match proc - .clone() - .spawn( - "proc", - ProcActor::new(ProcActorParams { - proc: proc.clone(), - world_id: world_id.clone(), - system_actor_ref: SYSTEM_ACTOR_REF.clone(), - bootstrap_channel_addr: bootstrap_addr, - local_addr, - state_watch: state_tx, - supervisor_actor_ref, - supervision_update_interval, - labels, - lifecycle_mode, - }), - ) - .await - { + let handle = match proc.clone().spawn( + "proc", + ProcActor::new(ProcActorParams { + proc: proc.clone(), + world_id: world_id.clone(), + system_actor_ref: SYSTEM_ACTOR_REF.clone(), + bootstrap_channel_addr: bootstrap_addr, + local_addr, + state_watch: state_tx, + supervisor_actor_ref, + supervision_update_interval, + labels, + lifecycle_mode, + }), + ) { Ok(handle) => handle, Err(e) => { Self::failed_proc_bootstrap_cleanup(mailbox_handle).await; @@ -448,7 +444,7 @@ impl ProcActor { } }; - let comm_actor = match proc.clone().spawn("comm", CommActor::default()).await { + let comm_actor = match proc.clone().spawn("comm", CommActor::default()) { Ok(handle) => handle, Err(e) => { Self::failed_proc_bootstrap_cleanup(mailbox_handle).await; @@ -971,15 +967,6 @@ mod tests { impl Actor for TestActor {} - #[async_trait] - impl RemoteSpawn for TestActor { - type Params = (); - - async fn new(_: ()) -> anyhow::Result { - Ok(Self) - } - } - #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)] enum TestActorMessage { Increment(u64, #[reply] OncePortRef), @@ -1043,15 +1030,6 @@ mod tests { impl Actor for SleepActor {} - #[async_trait] - impl RemoteSpawn for SleepActor { - type Params = (); - - async fn new(_: ()) -> anyhow::Result { - Ok(Default::default()) - } - } - #[async_trait] impl Handler for SleepActor { async fn handle(&mut self, _cx: &Context, message: u64) -> anyhow::Result<()> { @@ -1464,14 +1442,12 @@ mod tests { "ping", PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); let pong_handle = proc_1 .spawn( "pong", PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); // Now kill the system server making message delivery between @@ -1592,14 +1568,12 @@ mod tests { "ping", PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); let pong_handle = proc_1 .spawn( "pong", PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); // Have 'ping' send 'pong' a message. diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index c790042aa..1266c0e6d 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -1191,9 +1191,7 @@ impl SystemActor { BoxedMailboxSender::new(params.mailbox_router.clone()), clock, ); - let actor_handle = system_proc - .spawn(SYSTEM_ACTOR_ID.name(), SystemActor::new(params)) - .await?; + let actor_handle = system_proc.spawn(SYSTEM_ACTOR_ID.name(), SystemActor::new(params))?; Ok((actor_handle, system_proc)) } @@ -2294,14 +2292,12 @@ mod tests { "ping", PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); let pong_handle = proc_1 .spawn( "pong", PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); // Now kill pong's mailbox server making message delivery