diff --git a/hyper/src/commands/demo.rs b/hyper/src/commands/demo.rs index fee3e947e..7be94018f 100644 --- a/hyper/src/commands/demo.rs +++ b/hyper/src/commands/demo.rs @@ -246,7 +246,7 @@ impl DemoMessageHandler for DemoActor { async fn spawn_child(&mut self, cx: &Context) -> Result, anyhow::Error> { tracing::info!("demo: spawn child"); - Ok(Self.spawn(cx).await?.bind()) + Ok(Self.spawn(cx)?.bind()) } async fn error(&mut self, _cx: &Context, message: String) -> Result<(), anyhow::Error> { diff --git a/hyperactor/example/derive.rs b/hyperactor/example/derive.rs index dfa9c7d1c..cce8380d7 100644 --- a/hyperactor/example/derive.rs +++ b/hyperactor/example/derive.rs @@ -134,7 +134,7 @@ async fn main() -> Result<(), anyhow::Error> { // Spawn our actor, and get a handle for rank 0. let shopping_list_actor: hyperactor::ActorHandle = - proc.spawn("shopping", ShoppingListActor::default()).await?; + proc.spawn("shopping", ShoppingListActor::default())?; let shopping_api: hyperactor::ActorRef = shopping_list_actor.bind(); // We join the system, so that we can send messages to actors. let (client, _) = proc.instance("client").unwrap(); diff --git a/hyperactor/example/stream.rs b/hyperactor/example/stream.rs index 27c7e3e68..729500ffd 100644 --- a/hyperactor/example/stream.rs +++ b/hyperactor/example/stream.rs @@ -83,10 +83,8 @@ impl Handler for CountClient { async fn main() { let proc = Proc::local(); - let counter_actor: ActorHandle = proc - .spawn("counter", CounterActor::default()) - .await - .unwrap(); + let counter_actor: ActorHandle = + proc.spawn("counter", CounterActor::default()).unwrap(); for i in 0..10 { // Spawn new "countees". Every time each subscribes, the counter broadcasts @@ -96,7 +94,6 @@ async fn main() { &format!("countee_{}", i), CountClient::new(counter_actor.port().bind()), ) - .await .unwrap(); #[allow(clippy::disallowed_methods)] tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 0c03753f5..c574cdfdb 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -98,8 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static { /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]). /// The spawned actor will be supervised by the parent (spawning) actor. - async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result> { - cx.instance().spawn(self).await + fn spawn(self, cx: &impl context::Actor) -> anyhow::Result> { + cx.instance().spawn(self) } /// Spawns this actor in a detached state, handling its messages @@ -108,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static { /// /// Actors spawned through `spawn_detached` are not attached to a supervision /// hierarchy, and not managed by a [`Proc`]. - async fn spawn_detached(self) -> Result, anyhow::Error> { - Proc::local().spawn("anon", self).await + fn spawn_detached(self) -> Result, anyhow::Error> { + Proc::local().spawn("anon", self) } /// This method is used by the runtime to spawn the actor server. It can be @@ -260,7 +260,7 @@ pub trait RemoteSpawn: Actor + Referable + Binds { Box::pin(async move { let params = bincode::deserialize(&serialized_params)?; let actor = Self::new(params).await?; - let handle = proc.spawn(&name, actor).await?; + let handle = proc.spawn(&name, actor)?; // We return only the ActorId, not a typed ActorRef. // Callers that hold this ID can interact with the actor // only via the serialized/opaque messaging path, which @@ -792,7 +792,7 @@ mod tests { let client = proc.attach("client").unwrap(); let (tx, mut rx) = client.open_port(); let actor = EchoActor(tx.bind()); - let handle = proc.spawn::("echo", actor).await.unwrap(); + let handle = proc.spawn::("echo", actor).unwrap(); handle.send(123u64).unwrap(); handle.drain_and_stop().unwrap(); handle.await; @@ -808,14 +808,8 @@ mod tests { let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None); let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None); - let ping_handle = proc - .spawn::("ping", ping_actor) - .await - .unwrap(); - let pong_handle = proc - .spawn::("pong", pong_actor) - .await - .unwrap(); + let ping_handle = proc.spawn::("ping", ping_actor).unwrap(); + let pong_handle = proc.spawn::("pong", pong_actor).unwrap(); let (local_port, local_receiver) = client.open_once_port(); @@ -842,14 +836,8 @@ mod tests { PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None); let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None); - let ping_handle = proc - .spawn::("ping", ping_actor) - .await - .unwrap(); - let pong_handle = proc - .spawn::("pong", pong_actor) - .await - .unwrap(); + let ping_handle = proc.spawn::("ping", ping_actor).unwrap(); + let pong_handle = proc.spawn::("pong", pong_actor).unwrap(); let (local_port, local_receiver) = client.open_once_port(); @@ -895,7 +883,7 @@ mod tests { async fn test_init() { let proc = Proc::local(); let actor = InitActor(false); - let handle = proc.spawn::("init", actor).await.unwrap(); + let handle = proc.spawn::("init", actor).unwrap(); let client = proc.attach("client").unwrap(); let (port, receiver) = client.open_once_port(); @@ -954,7 +942,7 @@ mod tests { let proc = Proc::local(); let values: MultiValues = Arc::new(Mutex::new((0, "".to_string()))); let actor = MultiActor(values.clone()); - let handle = proc.spawn::("myactor", actor).await.unwrap(); + let handle = proc.spawn::("myactor", actor).unwrap(); let (client, client_handle) = proc.instance("client").unwrap(); Self { proc, @@ -1072,7 +1060,7 @@ mod tests { // Just test that we can round-trip the handle through a downcast. let proc = Proc::local(); - let handle = proc.spawn("nothing", NothingActor).await.unwrap(); + let handle = proc.spawn("nothing", NothingActor).unwrap(); let cell = handle.cell(); // Invalid actor doesn't succeed. diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index 8e1c4766e..f6c4c9df1 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -1220,7 +1220,7 @@ mod tests { #[tokio::test] async fn test_basic() { let proc_manager = - LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await }); + LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) }); let procs = Arc::clone(&proc_manager.procs); let (mut host, _handle) = Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local)) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index fb0f011f6..93d4ad2b5 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -3125,7 +3125,7 @@ mod tests { let mut proc = Proc::new(proc_id.clone(), proc_forwarder); ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let foo = proc.spawn("foo", Foo).await.unwrap(); + let foo = proc.spawn("foo", Foo).unwrap(); let return_handle = foo.port::>(); let message = MessageEnvelope::new( foo.actor_id().clone(), diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index a520568c9..093cf5476 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -507,12 +507,7 @@ impl Proc { /// Spawn a named (root) actor on this proc. The name of the actor must be /// unique. - #[hyperactor::observe_result("Proc")] - pub async fn spawn( - &self, - name: &str, - actor: A, - ) -> Result, anyhow::Error> { + pub fn spawn(&self, name: &str, actor: A) -> Result, anyhow::Error> { let actor_id = self.allocate_root_id(name)?; let span = tracing::span!( Level::INFO, @@ -532,10 +527,7 @@ impl Proc { .ledger .insert(actor_id.clone(), instance.inner.cell.downgrade())?; - Ok(instance - .start(actor, actor_loop_receivers.take().unwrap(), work_rx) - .instrument(span) - .await) + Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx)) } /// Create and return an actor instance and its corresponding handle. This allows actors to be @@ -584,7 +576,7 @@ impl Proc { /// /// When spawn_child returns, the child has an associated cell and is linked /// with its parent. - async fn spawn_child( + fn spawn_child( &self, parent: InstanceCell, actor: A, @@ -592,9 +584,7 @@ impl Proc { let actor_id = self.allocate_child_id(parent.actor_id())?; let (instance, mut actor_loop_receivers, work_rx) = Instance::new(self.clone(), actor_id, false, Some(parent.clone())); - Ok(instance - .start(actor, actor_loop_receivers.take().unwrap(), work_rx) - .await) + Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx)) } /// Call `abort` on the `JoinHandle` associated with the given @@ -1166,8 +1156,7 @@ impl Instance { /// Start an A-typed actor onto this instance with the provided params. When spawn returns, /// the actor has been linked with its parent, if it has one. - #[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))] - async fn start( + fn start( self, actor: A, actor_loop_receivers: (PortReceiver, PortReceiver), @@ -1531,11 +1520,8 @@ impl Instance { } /// Spawn on child on this instance. Currently used only by cap::CanSpawn. - pub(crate) async fn spawn(&self, actor: C) -> anyhow::Result> { - self.inner - .proc - .spawn_child(self.inner.cell.clone(), actor) - .await + pub(crate) fn spawn(&self, actor: C) -> anyhow::Result> { + self.inner.proc.spawn_child(self.inner.cell.clone(), actor) } /// Create a new direct child instance. @@ -2273,7 +2259,7 @@ mod tests { cx: &crate::Context, reply: oneshot::Sender>, ) -> Result<(), anyhow::Error> { - let handle = TestActor::default().spawn(cx).await?; + let handle = TestActor::default().spawn(cx)?; reply.send(handle).unwrap(); Ok(()) } @@ -2283,7 +2269,7 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_spawn_actor() { let proc = Proc::local(); - let handle = proc.spawn("test", TestActor::default()).await.unwrap(); + let handle = proc.spawn("test", TestActor::default()).unwrap(); // Check on the join handle. assert!(logs_contain( @@ -2334,11 +2320,9 @@ mod tests { let proc = Proc::local(); let first = proc .spawn::("first", TestActor::default()) - .await .unwrap(); let second = proc .spawn::("second", TestActor::default()) - .await .unwrap(); let (tx, rx) = oneshot::channel::<()>(); let reply_message = TestActorMessage::Reply(tx); @@ -2378,12 +2362,10 @@ mod tests { let target_actor = proc .spawn::("target", TestActor::default()) - .await .unwrap(); let target_actor_ref = target_actor.bind(); let lookup_actor = proc .spawn::("lookup", LookupTestActor::default()) - .await .unwrap(); assert!( @@ -2444,7 +2426,6 @@ mod tests { let first = proc .spawn::("first", TestActor::default()) - .await .unwrap(); let second = TestActor::spawn_child(&first).await; let third = TestActor::spawn_child(&second).await; @@ -2515,7 +2496,6 @@ mod tests { let root = proc .spawn::("root", TestActor::default()) - .await .unwrap(); let root_1 = TestActor::spawn_child(&root).await; let root_2 = TestActor::spawn_child(&root).await; @@ -2539,7 +2519,6 @@ mod tests { let root = proc .spawn::("root", TestActor::default()) - .await .unwrap(); let root_1 = TestActor::spawn_child(&root).await; let root_2 = TestActor::spawn_child(&root).await; @@ -2580,10 +2559,7 @@ mod tests { let proc = Proc::local(); // Add the 1st root. This root will remain active until the end of the test. - let root: ActorHandle = proc - .spawn::("root", TestActor::default()) - .await - .unwrap(); + let root: ActorHandle = proc.spawn("root", TestActor::default()).unwrap(); wait_until_idle(&root).await; { let snapshot = proc.state().ledger.snapshot(); @@ -2597,10 +2573,8 @@ mod tests { } // Add the 2nd root. - let another_root: ActorHandle = proc - .spawn::("another_root", TestActor::default()) - .await - .unwrap(); + let another_root: ActorHandle = + proc.spawn("another_root", TestActor::default()).unwrap(); wait_until_idle(&another_root).await; { let snapshot = proc.state().ledger.snapshot(); @@ -2837,7 +2811,7 @@ mod tests { let proc = Proc::local(); let state = Arc::new(AtomicUsize::new(0)); let actor = TestActor(state.clone()); - let handle = proc.spawn::("test", actor).await.unwrap(); + let handle = proc.spawn::("test", actor).unwrap(); let client = proc.attach("client").unwrap(); let (tx, rx) = client.open_once_port(); handle.send(tx).unwrap(); @@ -2861,10 +2835,7 @@ mod tests { ProcSupervisionCoordinator::set(&proc).await.unwrap(); let (client, _handle) = proc.instance("client").unwrap(); - let actor_handle = proc - .spawn::("test", TestActor::default()) - .await - .unwrap(); + let actor_handle = proc.spawn("test", TestActor::default()).unwrap(); actor_handle .panic(&client, "some random failure".to_string()) .await @@ -2888,6 +2859,8 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_local_supervision_propagation() { + hyperactor_telemetry::initialize_logging_for_test(); + #[derive(Debug)] struct TestActor(Arc, bool); @@ -2936,7 +2909,6 @@ mod tests { let root = proc .spawn::("root", TestActor(root_state.clone(), false)) - .await .unwrap(); let root_1 = proc .spawn_child::( @@ -2946,32 +2918,27 @@ mod tests { true, /* set true so children's event stops here */ ), ) - .await .unwrap(); let root_1_1 = proc .spawn_child::( root_1.cell().clone(), TestActor(root_1_1_state.clone(), false), ) - .await .unwrap(); let root_1_1_1 = proc .spawn_child::( root_1_1.cell().clone(), TestActor(root_1_1_1_state.clone(), false), ) - .await .unwrap(); let root_2 = proc .spawn_child::(root.cell().clone(), TestActor(root_2_state.clone(), false)) - .await .unwrap(); let root_2_1 = proc .spawn_child::( root_2.cell().clone(), TestActor(root_2_1_state.clone(), false), ) - .await .unwrap(); // fail `root_1_1_1`, the supervision msg should be propagated to @@ -3023,7 +2990,7 @@ mod tests { let (instance, handle) = proc.instance("my_test_actor").unwrap(); - let child_actor = TestActor::default().spawn(&instance).await.unwrap(); + let child_actor = TestActor::default().spawn(&instance).unwrap(); let (port, mut receiver) = instance.open_port(); child_actor @@ -3054,10 +3021,7 @@ mod tests { // Intentionally not setting a proc supervison coordinator. This // should cause the process to terminate. // ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let root = proc - .spawn::("root", TestActor::default()) - .await - .unwrap(); + let root = proc.spawn("root", TestActor::default()).unwrap(); let (client, _handle) = proc.instance("client").unwrap(); root.fail(&client, anyhow::anyhow!("some random failure")) .await @@ -3152,7 +3116,7 @@ mod tests { } trace_and_block(async { - let handle = LoggingActor::default().spawn_detached().await.unwrap(); + let handle = LoggingActor::default().spawn_detached().unwrap(); handle.send("hello world".to_string()).unwrap(); handle.send("hello world again".to_string()).unwrap(); handle.send(123u64).unwrap(); diff --git a/hyperactor/src/test_utils/proc_supervison.rs b/hyperactor/src/test_utils/proc_supervison.rs index e63a1b883..cc8136f75 100644 --- a/hyperactor/src/test_utils/proc_supervison.rs +++ b/hyperactor/src/test_utils/proc_supervison.rs @@ -43,9 +43,7 @@ impl ProcSupervisionCoordinator { pub async fn set(proc: &Proc) -> Result { let state = ReportedEvent::new(); let actor = ProcSupervisionCoordinator(state.clone()); - let coordinator = proc - .spawn::("coordinator", actor) - .await?; + let coordinator = proc.spawn::("coordinator", actor)?; proc.set_supervision_coordinator(coordinator.port())?; Ok(state) } diff --git a/hyperactor/test/host_bootstrap.rs b/hyperactor/test/host_bootstrap.rs index d15dfef78..ce6b9b203 100644 --- a/hyperactor/test/host_bootstrap.rs +++ b/hyperactor/test/host_bootstrap.rs @@ -19,7 +19,6 @@ async fn main() { let proc = ProcessProcManager::::boot_proc(|proc| async move { proc.spawn("echo", hyperactor::host::testing::EchoActor) - .await }) .await .unwrap(); diff --git a/hyperactor_macros/tests/basic.rs b/hyperactor_macros/tests/basic.rs index 93351a196..8ae3882ea 100644 --- a/hyperactor_macros/tests/basic.rs +++ b/hyperactor_macros/tests/basic.rs @@ -182,7 +182,7 @@ mod tests { async fn test_client_macros() { let proc = Proc::local(); let (client, _) = proc.instance("client").unwrap(); - let actor_handle = proc.spawn("foo", TestVariantFormsActor {}).await.unwrap(); + let actor_handle = proc.spawn("foo", TestVariantFormsActor {}).unwrap(); assert_eq!(actor_handle.call_struct(&client, 10).await.unwrap(), 10,); diff --git a/hyperactor_macros/tests/export.rs b/hyperactor_macros/tests/export.rs index 78db578b5..ea40ea62e 100644 --- a/hyperactor_macros/tests/export.rs +++ b/hyperactor_macros/tests/export.rs @@ -107,7 +107,7 @@ mod tests { let proc = Proc::local(); let (client, _) = proc.instance("client").unwrap(); let (tx, mut rx) = client.open_port(); - let actor_handle = proc.spawn("test", TestActor::new(tx.bind())).await.unwrap(); + let actor_handle = proc.spawn("test", TestActor::new(tx.bind())).unwrap(); // This will call binds actor_handle.bind::(); // Verify that the ports can be gotten successfully. @@ -185,7 +185,7 @@ mod tests { let proc = Proc::local(); let (client, _) = proc.instance("client").unwrap(); let (tx, mut rx) = client.open_port(); - let actor_handle = proc.spawn("test", TestActor::new(tx.bind())).await.unwrap(); + let actor_handle = proc.spawn("test", TestActor::new(tx.bind())).unwrap(); actor_handle.send(123u64).unwrap(); actor_handle.send(TestMessage("foo".to_string())).unwrap(); diff --git a/hyperactor_mesh/examples/sieve.rs b/hyperactor_mesh/examples/sieve.rs index 9cbb5f57f..bb52015e1 100644 --- a/hyperactor_mesh/examples/sieve.rs +++ b/hyperactor_mesh/examples/sieve.rs @@ -87,8 +87,7 @@ impl Handler for SieveActor { self.next = Some( SieveActor::new(SieveParams { prime: msg.number }) .await? - .spawn(cx) - .await?, + .spawn(cx)?, ); } } diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 4056a7b5e..782ff4f87 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -491,12 +491,10 @@ impl Bootstrap { let (host, _handle) = ok!(Host::serve(manager, addr).await); let addr = host.addr().clone(); let system_proc = host.system_proc().clone(); - let host_mesh_agent = ok!(system_proc - .spawn::( - "agent", - HostMeshAgent::new(HostAgentMode::Process(host)), - ) - .await); + let host_mesh_agent = ok!(system_proc.spawn::( + "agent", + HostMeshAgent::new(HostAgentMode::Process(host)), + )); tracing::info!( "serving host at {}, agent: {}", @@ -2618,11 +2616,8 @@ mod tests { // Spawn the log client and disable aggregation (immediate // print + tap push). let log_client_actor = LogClientActor::new(()).await.unwrap(); - let log_client: ActorRef = proc - .spawn("log_client", log_client_actor) - .await - .unwrap() - .bind(); + let log_client: ActorRef = + proc.spawn("log_client", log_client_actor).unwrap().bind(); log_client.set_aggregate(&client, None).await.unwrap(); // Spawn the forwarder in this proc (it will serve @@ -2630,7 +2625,6 @@ mod tests { let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap(); let _log_forwarder: ActorRef = proc .spawn("log_forwarder", log_forwarder_actor) - .await .unwrap() .bind(); diff --git a/hyperactor_mesh/src/connect.rs b/hyperactor_mesh/src/connect.rs index 62f3ec987..ad4ae7fa4 100644 --- a/hyperactor_mesh/src/connect.rs +++ b/hyperactor_mesh/src/connect.rs @@ -430,7 +430,7 @@ mod tests { let proc = Proc::local(); let (client, _client_handle) = proc.instance("client")?; let (connect, completer) = Connect::allocate(client.self_id().clone(), client); - let actor = proc.spawn("actor", EchoActor {}).await?; + let actor = proc.spawn("actor", EchoActor {})?; actor.send(connect)?; let (mut rd, mut wr) = completer.complete().await?.into_split(); let send = [3u8, 4u8, 5u8, 6u8]; diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index e1865df35..832026f30 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -1603,15 +1603,11 @@ mod tests { std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string()); } let log_client_actor = LogClientActor::new(()).await.unwrap(); - let log_client: ActorRef = proc - .spawn("log_client", log_client_actor) - .await - .unwrap() - .bind(); + let log_client: ActorRef = + proc.spawn("log_client", log_client_actor).unwrap().bind(); let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap(); let log_forwarder: ActorRef = proc .spawn("log_forwarder", log_forwarder_actor) - .await .unwrap() .bind(); diff --git a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs index 07a527ee8..98cadacdf 100644 --- a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs @@ -258,7 +258,7 @@ impl ProcMeshAgent { record_supervision_events: false, supervision_events: HashMap::new(), }; - let handle = proc.spawn::("mesh", agent).await?; + let handle = proc.spawn::("mesh", agent)?; Ok((proc, handle)) } @@ -271,7 +271,7 @@ impl ProcMeshAgent { record_supervision_events: true, supervision_events: HashMap::new(), }; - proc.spawn::("agent", agent).await + proc.spawn::("agent", agent) } async fn destroy_and_wait_except_current<'a>( diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index fb5e15955..ae973e9cb 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -268,8 +268,7 @@ impl HostMesh { let addr = host.addr().clone(); let system_proc = host.system_proc().clone(); let host_mesh_agent = system_proc - .spawn::("agent", HostMeshAgent::new(HostAgentMode::Process(host))) - .await + .spawn("agent", HostMeshAgent::new(HostAgentMode::Process(host))) .map_err(v1::Error::SingletonActorSpawnError)?; host_mesh_agent.bind::(); @@ -438,7 +437,6 @@ impl HostMesh { let controller = HostMeshController::new(mesh.deref().clone()); controller .spawn(cx) - .await .map_err(|e| v1::Error::ControllerActorSpawnError(mesh.name().clone(), e))?; tracing::info!(name = "HostMeshStatus", status = "Allocate::Created"); @@ -952,7 +950,6 @@ impl HostMeshRef { let controller = ProcMeshController::new(mesh.deref().clone()); controller .spawn(cx) - .await .map_err(|e| v1::Error::ControllerActorSpawnError(mesh.name().clone(), e))?; } mesh diff --git a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs index d316b9369..09518d1dc 100644 --- a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs @@ -476,7 +476,7 @@ impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline { let system_proc = host.system_proc().clone(); let actor = HostMeshAgent::new(host); - let host_mesh_agent = system_proc.spawn::("agent", actor).await?; + let host_mesh_agent = system_proc.spawn("agent", actor)?; Ok(Self { host_mesh_agent, @@ -537,7 +537,6 @@ mod tests { created: HashMap::new(), }, ) - .await .unwrap(); let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()) diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index b7e7c5354..f62fe3c10 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -1031,7 +1031,6 @@ impl ProcMeshRef { let controller = ActorMeshController::::new(mesh.deref().clone()); controller .spawn(cx) - .await .map_err(|e| Error::ControllerActorSpawnError(mesh.name().clone(), e))?; Ok(mesh) } diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 6d70ee279..b951dd13c 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -422,25 +422,21 @@ impl ProcActor { let mailbox_handle = proc.clone().serve(rx); let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin); - let handle = match proc - .clone() - .spawn( - "proc", - ProcActor::new(ProcActorParams { - proc: proc.clone(), - world_id: world_id.clone(), - system_actor_ref: SYSTEM_ACTOR_REF.clone(), - bootstrap_channel_addr: bootstrap_addr, - local_addr, - state_watch: state_tx, - supervisor_actor_ref, - supervision_update_interval, - labels, - lifecycle_mode, - }), - ) - .await - { + let handle = match proc.clone().spawn( + "proc", + ProcActor::new(ProcActorParams { + proc: proc.clone(), + world_id: world_id.clone(), + system_actor_ref: SYSTEM_ACTOR_REF.clone(), + bootstrap_channel_addr: bootstrap_addr, + local_addr, + state_watch: state_tx, + supervisor_actor_ref, + supervision_update_interval, + labels, + lifecycle_mode, + }), + ) { Ok(handle) => handle, Err(e) => { Self::failed_proc_bootstrap_cleanup(mailbox_handle).await; @@ -448,7 +444,7 @@ impl ProcActor { } }; - let comm_actor = match proc.clone().spawn("comm", CommActor::default()).await { + let comm_actor = match proc.clone().spawn("comm", CommActor::default()) { Ok(handle) => handle, Err(e) => { Self::failed_proc_bootstrap_cleanup(mailbox_handle).await; @@ -1446,14 +1442,12 @@ mod tests { "ping", PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); let pong_handle = proc_1 .spawn( "pong", PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); // Now kill the system server making message delivery between @@ -1574,14 +1568,12 @@ mod tests { "ping", PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); let pong_handle = proc_1 .spawn( "pong", PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); // Have 'ping' send 'pong' a message. diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index c790042aa..1266c0e6d 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -1191,9 +1191,7 @@ impl SystemActor { BoxedMailboxSender::new(params.mailbox_router.clone()), clock, ); - let actor_handle = system_proc - .spawn(SYSTEM_ACTOR_ID.name(), SystemActor::new(params)) - .await?; + let actor_handle = system_proc.spawn(SYSTEM_ACTOR_ID.name(), SystemActor::new(params))?; Ok((actor_handle, system_proc)) } @@ -2294,14 +2292,12 @@ mod tests { "ping", PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); let pong_handle = proc_1 .spawn( "pong", PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), ) - .await .unwrap(); // Now kill pong's mailbox server making message delivery diff --git a/monarch_extension/src/logging.rs b/monarch_extension/src/logging.rs index f2b5fe13f..9cd5ca51c 100644 --- a/monarch_extension/src/logging.rs +++ b/monarch_extension/src/logging.rs @@ -92,8 +92,7 @@ impl LoggingMeshClient { PyPythonTask::new(async move { let client_actor = proc_mesh .client_proc() - .spawn("log_client", LogClientActor::default()) - .await?; + .spawn("log_client", LogClientActor::default())?; let client_actor_ref = client_actor.bind(); let forwarder_mesh = instance_dispatch!(instance, |cx| { proc_mesh diff --git a/monarch_extension/src/mesh_controller.rs b/monarch_extension/src/mesh_controller.rs index 201679b67..952defac8 100644 --- a/monarch_extension/src/mesh_controller.rs +++ b/monarch_extension/src/mesh_controller.rs @@ -141,18 +141,15 @@ impl _Controller { let controller_handle: Arc>> = signal_safe_block_on(py, async move { let controller_handle = instance_dispatch!(client, |instance| { - instance - .proc() - .spawn( - &Name::new("mesh_controller").to_string(), - MeshControllerActor::new(MeshControllerActorParams { - proc_mesh, - id, - rank_map, - }) - .await, - ) - .await? + instance.proc().spawn( + &Name::new("mesh_controller").to_string(), + MeshControllerActor::new(MeshControllerActorParams { + proc_mesh, + id, + rank_map, + }) + .await, + )? }); Ok::<_, anyhow::Error>(Arc::new(Mutex::new(controller_handle))) })??; diff --git a/monarch_hyperactor/src/code_sync/manager.rs b/monarch_hyperactor/src/code_sync/manager.rs index 4f14a3d62..424ef92d9 100644 --- a/monarch_hyperactor/src/code_sync/manager.rs +++ b/monarch_hyperactor/src/code_sync/manager.rs @@ -233,7 +233,7 @@ impl CodeSyncManager { cx: &Context<'a, Self>, ) -> Result<&'a ActorHandle> { self.rsync - .get_or_try_init(RsyncActor::default().spawn(cx)) + .get_or_try_init(async move { RsyncActor::default().spawn(cx) }) .await } @@ -242,7 +242,7 @@ impl CodeSyncManager { cx: &Context<'a, Self>, ) -> Result<&'a ActorHandle> { self.auto_reload - .get_or_try_init(async move { AutoReloadActor::new().await?.spawn(cx).await }) + .get_or_try_init(async move { AutoReloadActor::new().await?.spawn(cx) }) .await } @@ -251,7 +251,7 @@ impl CodeSyncManager { cx: &Context<'a, Self>, ) -> Result<&'a ActorHandle> { self.conda_sync - .get_or_try_init(CondaSyncActor::default().spawn(cx)) + .get_or_try_init(async move { CondaSyncActor::default().spawn(cx) }) .await } } diff --git a/monarch_hyperactor/src/proc.rs b/monarch_hyperactor/src/proc.rs index cc2cb2f32..542fbc50b 100644 --- a/monarch_hyperactor/src/proc.rs +++ b/monarch_hyperactor/src/proc.rs @@ -147,12 +147,10 @@ impl PyProc { let pickled_type = PickledPyObject::pickle(actor.as_any())?; crate::runtime::future_into_py(py, async move { Ok(PythonActorHandle { - inner: proc - .spawn( - name.as_deref().unwrap_or("anon"), - PythonActor::new(pickled_type).await?, - ) - .await?, + inner: proc.spawn( + name.as_deref().unwrap_or("anon"), + PythonActor::new(pickled_type).await?, + )?, }) }) } @@ -172,7 +170,6 @@ impl PyProc { name.as_deref().unwrap_or("anon"), PythonActor::new(pickled_type).await?, ) - .await }) .map_err(|e| PyRuntimeError::new_err(e.to_string()))??, }) diff --git a/monarch_hyperactor/src/v1/logging.rs b/monarch_hyperactor/src/v1/logging.rs index ceb1de624..e7e1129a9 100644 --- a/monarch_hyperactor/src/v1/logging.rs +++ b/monarch_hyperactor/src/v1/logging.rs @@ -203,13 +203,10 @@ impl LoggingMeshClient { // the caller's process). let client_actor: ActorHandle = instance_dispatch!(instance, async move |cx_instance| { - cx_instance - .proc() - .spawn( - &Name::new("log_client").to_string(), - LogClientActor::default(), - ) - .await + cx_instance.proc().spawn( + &Name::new("log_client").to_string(), + LogClientActor::default(), + ) })?; let client_actor_ref = client_actor.bind(); diff --git a/monarch_tensor_worker/src/borrow.rs b/monarch_tensor_worker/src/borrow.rs index 9af653348..4569f21fd 100644 --- a/monarch_tensor_worker/src/borrow.rs +++ b/monarch_tensor_worker/src/borrow.rs @@ -209,7 +209,6 @@ mod tests { }) .await?, ) - .await .unwrap(); worker_handle @@ -348,7 +347,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle diff --git a/monarch_tensor_worker/src/comm.rs b/monarch_tensor_worker/src/comm.rs index 14831ab19..bee93e5c5 100644 --- a/monarch_tensor_worker/src/comm.rs +++ b/monarch_tensor_worker/src/comm.rs @@ -254,7 +254,6 @@ impl CommMessageHandler for NcclCommActor { NcclCommActor::new(CommParams::FromComm(Arc::new(Mutex::new(split_comm)))) .await? .spawn(cx) - .await } async fn split_from( @@ -273,8 +272,7 @@ impl CommMessageHandler for NcclCommActor { Some(split_comm) => Ok(Some( NcclCommActor::new(CommParams::FromComm(Arc::new(Mutex::new(split_comm)))) .await? - .spawn(cx) - .await?, + .spawn(cx)?, )), None => Ok(None), } @@ -1082,8 +1080,8 @@ mod tests { let (actor0, actor1) = tokio::join!(actor0, actor1); let (actor0, actor1) = (actor0.unwrap(), actor1.unwrap()); - let handle0 = actor0.spawn_detached().await.unwrap(); - let handle1 = actor1.spawn_detached().await.unwrap(); + let handle0 = actor0.spawn_detached().unwrap(); + let handle1 = actor1.spawn_detached().unwrap(); let cell0 = TensorCell::new(factory_float_tensor(&[1.0], device0.into())); @@ -1149,8 +1147,8 @@ mod tests { let (actor0, actor1) = tokio::join!(actor0, actor1); let (actor0, actor1) = (actor0.unwrap(), actor1.unwrap()); - let handle0 = actor0.spawn_detached().await.unwrap(); - let handle1 = actor1.spawn_detached().await.unwrap(); + let handle0 = actor0.spawn_detached().unwrap(); + let handle1 = actor1.spawn_detached().unwrap(); let cell0 = TensorCell::new(factory_float_tensor(&[1.0], device0.into())); @@ -1222,8 +1220,8 @@ mod tests { let (actor0, actor1) = tokio::join!(actor0, actor1); let (actor0, actor1) = (actor0.unwrap(), actor1.unwrap()); - let handle0 = proc.spawn("comm0", actor0).await.unwrap(); - let handle1 = proc.spawn("comm1", actor1).await.unwrap(); + let handle0 = proc.spawn("comm0", actor0).unwrap(); + let handle1 = proc.spawn("comm1", actor1).unwrap(); let cell0 = TensorCell::new(factory_float_tensor(&[1.0], device0.into())); let dest_rank = 0; @@ -1283,7 +1281,6 @@ mod tests { .await .unwrap(), ) - .await })) .await?; @@ -1459,7 +1456,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let handle2 = proc .spawn( @@ -1473,7 +1469,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let unique_id = UniqueId::new().unwrap(); @@ -1631,7 +1626,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let unique_id = UniqueId::new().unwrap(); @@ -1766,8 +1760,8 @@ mod tests { let (actor0, actor1) = tokio::join!(actor0, actor1); let (actor0, actor1) = (actor0?, actor1?); - let handle0 = actor0.spawn_detached().await.unwrap(); - let handle1 = actor1.spawn_detached().await.unwrap(); + let handle0 = actor0.spawn_detached().unwrap(); + let handle1 = actor1.spawn_detached().unwrap(); let cell0 = TensorCell::new(factory_float_tensor(&[1.0], device0.into())); let port0 = client.open_once_port(); @@ -1786,11 +1780,11 @@ mod tests { Stream::get_current_stream_on_device(device1), port1.0, ))?; - let (work0, work1) = tokio::join!( + let (work0, work1) = tokio::try_join!( CommWork::from(vec![cell0.clone()], port0.1), CommWork::from(vec![cell1.clone()], port1.1) - ); - let (work0, work1) = (work0?, work1?); + ) + .unwrap(); // Wait for the work to enqueue onto the stream. work0.wait().await?; work1.wait().await?; @@ -1800,11 +1794,9 @@ mod tests { while !work0.is_completed().await? { // No need to sleep or yield, because the await on each iteration // will give other tasks a chance to make progress. - tracing::debug!("waiting for work0..."); } while !work1.is_completed().await? { // Same as above. - tracing::debug!("waiting for work1..."); } // Check that the tensors are correct after the work completes. diff --git a/monarch_tensor_worker/src/lib.rs b/monarch_tensor_worker/src/lib.rs index cf7fe90b5..7f467a411 100644 --- a/monarch_tensor_worker/src/lib.rs +++ b/monarch_tensor_worker/src/lib.rs @@ -307,8 +307,7 @@ impl WorkerMessageHandler for WorkerActor { rank: self.rank.try_into().unwrap(), }) .await? - .spawn(cx) - .await?; + .spawn(cx)?; let tensor = factory_zeros(&[1], ScalarType::Float, Layout::Strided, device.into()); let cell = TensorCell::new(tensor); @@ -451,8 +450,7 @@ impl WorkerMessageHandler for WorkerActor { controller_actor: self.controller_actor.clone(), respond_with_python_message: self.respond_with_python_message, }) - .spawn(cx) - .await?; + .spawn(cx)?; self.streams.insert(result, Arc::new(handle)); Ok(()) } @@ -1174,7 +1172,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -1269,7 +1266,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -1320,7 +1316,7 @@ mod tests { let (client, controller_ref, mut controller_rx) = proc.attach_actor("controller").unwrap(); let worker_handle = proc - .spawn::( + .spawn( "worker", WorkerActor::new(WorkerParams { world_size: 1, @@ -1331,7 +1327,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -1404,7 +1399,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -1479,7 +1473,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let (split_arg, sort_list, mesh_ref, dim, layout, none, scalar, device, memory_format) = Python::with_gil(|py| { @@ -1769,7 +1762,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -1845,7 +1837,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -1928,7 +1919,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let worker_handle2 = proc .spawn( @@ -1942,7 +1932,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let unique_id = UniqueId::new().unwrap(); @@ -1980,7 +1969,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); worker_handle .command_group( @@ -2071,7 +2059,6 @@ mod tests { .await .unwrap(), ) - .await .unwrap(); let ref_arg: PickledPyObject = diff --git a/monarch_tensor_worker/src/stream.rs b/monarch_tensor_worker/src/stream.rs index 6cc08a193..a949f9e6c 100644 --- a/monarch_tensor_worker/src/stream.rs +++ b/monarch_tensor_worker/src/stream.rs @@ -2095,20 +2095,18 @@ mod tests { let (client, _handle) = proc.instance("client")?; let (supervision_tx, supervision_rx) = client.open_port(); proc.set_supervision_coordinator(supervision_tx)?; - let stream_actor = proc - .spawn( - "stream", - StreamActor::new(StreamParams { - world_size, - rank: 0, - creation_mode: StreamCreationMode::UseDefaultStream, - id: 0.into(), - device: Some(CudaDevice::new(0.into())), - controller_actor: controller_actor.clone(), - respond_with_python_message: false, - }), - ) - .await?; + let stream_actor = proc.spawn( + "stream", + StreamActor::new(StreamParams { + world_size, + rank: 0, + creation_mode: StreamCreationMode::UseDefaultStream, + id: 0.into(), + device: Some(CudaDevice::new(0.into())), + controller_actor: controller_actor.clone(), + respond_with_python_message: false, + }), + )?; Ok(Self { proc, @@ -2577,20 +2575,17 @@ mod tests { .define_recording(&test_setup.client, 0.into()) .await?; - let dummy_comm = test_setup - .proc - .spawn( - "comm", - NcclCommActor::new(CommParams::New { - device: CudaDevice::new(0.into()), - unique_id: UniqueId::new()?, - world_size: 1, - rank: 0, - }) - .await - .unwrap(), - ) - .await?; + let dummy_comm = test_setup.proc.spawn( + "comm", + NcclCommActor::new(CommParams::New { + device: CudaDevice::new(0.into()), + unique_id: UniqueId::new()?, + world_size: 1, + rank: 0, + }) + .await + .unwrap(), + )?; test_setup .stream_actor @@ -2972,21 +2967,18 @@ mod tests { async fn test_borrow_in_recording() -> Result<()> { let mut test_setup = TestSetup::new().await?; - let borrower_stream = test_setup - .proc - .spawn( - "stream1", - StreamActor::new(StreamParams { - world_size: 1, - rank: 0, - creation_mode: StreamCreationMode::CreateNewStream, - id: 1.into(), - device: Some(CudaDevice::new(0.into())), - controller_actor: test_setup.controller_actor.clone(), - respond_with_python_message: false, - }), - ) - .await?; + let borrower_stream = test_setup.proc.spawn( + "stream1", + StreamActor::new(StreamParams { + world_size: 1, + rank: 0, + creation_mode: StreamCreationMode::CreateNewStream, + id: 1.into(), + device: Some(CudaDevice::new(0.into())), + controller_actor: test_setup.controller_actor.clone(), + respond_with_python_message: false, + }), + )?; let lender_stream = test_setup.stream_actor.clone(); @@ -3262,20 +3254,17 @@ mod tests { let recording_ref = test_setup.next_ref(); let comm = Arc::new( - test_setup - .proc - .spawn( - "comm", - NcclCommActor::new(CommParams::New { - device: CudaDevice::new(0.into()), - unique_id: UniqueId::new()?, - world_size: 1, - rank: 0, - }) - .await - .unwrap(), - ) - .await?, + test_setup.proc.spawn( + "comm", + NcclCommActor::new(CommParams::New { + device: CudaDevice::new(0.into()), + unique_id: UniqueId::new()?, + world_size: 1, + rank: 0, + }) + .await + .unwrap(), + )?, ); let factory = Factory { @@ -3589,8 +3578,8 @@ mod tests { let (actor0, actor1) = tokio::join!(actor0, actor1); let (actor0, actor1) = (actor0.unwrap(), actor1.unwrap()); - let comm0 = test_setup.proc.spawn("comm0", actor0).await.unwrap(); - let comm1 = test_setup.proc.spawn("comm1", actor1).await.unwrap(); + let comm0 = test_setup.proc.spawn("comm0", actor0).unwrap(); + let comm1 = test_setup.proc.spawn("comm1", actor1).unwrap(); let comm0 = Arc::new(comm0); let comm1 = Arc::new(comm1); @@ -3602,21 +3591,18 @@ mod tests { }; let send_stream = test_setup.stream_actor.clone(); - let recv_stream = test_setup - .proc - .spawn( - "recv_stream", - StreamActor::new(StreamParams { - world_size: 2, - rank: 1, - creation_mode: StreamCreationMode::CreateNewStream, - id: 1.into(), - device: Some(CudaDevice::new(1.into())), - controller_actor: test_setup.controller_actor.clone(), - respond_with_python_message: false, - }), - ) - .await?; + let recv_stream = test_setup.proc.spawn( + "recv_stream", + StreamActor::new(StreamParams { + world_size: 2, + rank: 1, + creation_mode: StreamCreationMode::CreateNewStream, + id: 1.into(), + device: Some(CudaDevice::new(1.into())), + controller_actor: test_setup.controller_actor.clone(), + respond_with_python_message: false, + }), + )?; send_stream .define_recording(&test_setup.client, recording_ref)