Skip to content

Commit f94a491

Browse files
committed
[hyperactor] make all variants of spawn sync
Building on the previous diff, separating out remote from local instantiation, this diff implements synchronous spawns throughout. This means we can always spawn an actor in a nonblocking way, regardless of context. Spawns should also become infallible, instead relying on supervision to handle errors. Differential Revision: [D87608082](https://our.internmc.facebook.com/intern/diff/D87608082/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87608082/)! ghstack-source-id: 324888978 Pull Request resolved: #1968
1 parent 961336a commit f94a491

File tree

14 files changed

+65
-158
lines changed

14 files changed

+65
-158
lines changed

hyperactor/src/actor.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
9898

9999
/// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
100100
/// The spawned actor will be supervised by the parent (spawning) actor.
101-
async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102-
cx.instance().spawn(self).await
101+
fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102+
cx.instance().spawn(self)
103103
}
104104

105105
/// Spawns this actor in a detached state, handling its messages
@@ -108,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
108108
///
109109
/// Actors spawned through `spawn_detached` are not attached to a supervision
110110
/// hierarchy, and not managed by a [`Proc`].
111-
async fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112-
Proc::local().spawn("anon", self).await
111+
fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112+
Proc::local().spawn("anon", self)
113113
}
114114

115115
/// This method is used by the runtime to spawn the actor server. It can be
@@ -260,7 +260,7 @@ pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
260260
Box::pin(async move {
261261
let params = bincode::deserialize(&serialized_params)?;
262262
let actor = Self::new(params).await?;
263-
let handle = proc.spawn(&name, actor).await?;
263+
let handle = proc.spawn(&name, actor)?;
264264
// We return only the ActorId, not a typed ActorRef.
265265
// Callers that hold this ID can interact with the actor
266266
// only via the serialized/opaque messaging path, which
@@ -792,7 +792,7 @@ mod tests {
792792
let client = proc.attach("client").unwrap();
793793
let (tx, mut rx) = client.open_port();
794794
let actor = EchoActor(tx.bind());
795-
let handle = proc.spawn::<EchoActor>("echo", actor).await.unwrap();
795+
let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
796796
handle.send(123u64).unwrap();
797797
handle.drain_and_stop().unwrap();
798798
handle.await;
@@ -808,14 +808,8 @@ mod tests {
808808

809809
let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
810810
let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
811-
let ping_handle = proc
812-
.spawn::<PingPongActor>("ping", ping_actor)
813-
.await
814-
.unwrap();
815-
let pong_handle = proc
816-
.spawn::<PingPongActor>("pong", pong_actor)
817-
.await
818-
.unwrap();
811+
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
812+
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
819813

820814
let (local_port, local_receiver) = client.open_once_port();
821815

@@ -842,14 +836,8 @@ mod tests {
842836
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
843837
let pong_actor =
844838
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
845-
let ping_handle = proc
846-
.spawn::<PingPongActor>("ping", ping_actor)
847-
.await
848-
.unwrap();
849-
let pong_handle = proc
850-
.spawn::<PingPongActor>("pong", pong_actor)
851-
.await
852-
.unwrap();
839+
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
840+
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
853841

854842
let (local_port, local_receiver) = client.open_once_port();
855843

@@ -895,7 +883,7 @@ mod tests {
895883
async fn test_init() {
896884
let proc = Proc::local();
897885
let actor = InitActor(false);
898-
let handle = proc.spawn::<InitActor>("init", actor).await.unwrap();
886+
let handle = proc.spawn::<InitActor>("init", actor).unwrap();
899887
let client = proc.attach("client").unwrap();
900888

901889
let (port, receiver) = client.open_once_port();
@@ -954,7 +942,7 @@ mod tests {
954942
let proc = Proc::local();
955943
let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
956944
let actor = MultiActor(values.clone());
957-
let handle = proc.spawn::<MultiActor>("myactor", actor).await.unwrap();
945+
let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
958946
let (client, client_handle) = proc.instance("client").unwrap();
959947
Self {
960948
proc,
@@ -1072,7 +1060,7 @@ mod tests {
10721060
// Just test that we can round-trip the handle through a downcast.
10731061

10741062
let proc = Proc::local();
1075-
let handle = proc.spawn("nothing", NothingActor).await.unwrap();
1063+
let handle = proc.spawn("nothing", NothingActor).unwrap();
10761064
let cell = handle.cell();
10771065

10781066
// Invalid actor doesn't succeed.

hyperactor/src/host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,7 @@ mod tests {
12201220
#[tokio::test]
12211221
async fn test_basic() {
12221222
let proc_manager =
1223-
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
1223+
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) });
12241224
let procs = Arc::clone(&proc_manager.procs);
12251225
let (mut host, _handle) =
12261226
Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3125,7 +3125,7 @@ mod tests {
31253125
let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
31263126
ProcSupervisionCoordinator::set(&proc).await.unwrap();
31273127

3128-
let foo = proc.spawn("foo", Foo).await.unwrap();
3128+
let foo = proc.spawn("foo", Foo).unwrap();
31293129
let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
31303130
let message = MessageEnvelope::new(
31313131
foo.actor_id().clone(),

hyperactor/src/proc.rs

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -507,12 +507,7 @@ impl Proc {
507507

508508
/// Spawn a named (root) actor on this proc. The name of the actor must be
509509
/// unique.
510-
#[hyperactor::observe_result("Proc")]
511-
pub async fn spawn<A: Actor>(
512-
&self,
513-
name: &str,
514-
actor: A,
515-
) -> Result<ActorHandle<A>, anyhow::Error> {
510+
pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
516511
let actor_id = self.allocate_root_id(name)?;
517512
let span = tracing::span!(
518513
Level::INFO,
@@ -532,10 +527,7 @@ impl Proc {
532527
.ledger
533528
.insert(actor_id.clone(), instance.inner.cell.downgrade())?;
534529

535-
Ok(instance
536-
.start(actor, actor_loop_receivers.take().unwrap(), work_rx)
537-
.instrument(span)
538-
.await)
530+
Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
539531
}
540532

541533
/// Create and return an actor instance and its corresponding handle. This allows actors to be
@@ -584,17 +576,15 @@ impl Proc {
584576
///
585577
/// When spawn_child returns, the child has an associated cell and is linked
586578
/// with its parent.
587-
async fn spawn_child<A: Actor>(
579+
fn spawn_child<A: Actor>(
588580
&self,
589581
parent: InstanceCell,
590582
actor: A,
591583
) -> Result<ActorHandle<A>, anyhow::Error> {
592584
let actor_id = self.allocate_child_id(parent.actor_id())?;
593585
let (instance, mut actor_loop_receivers, work_rx) =
594586
Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
595-
Ok(instance
596-
.start(actor, actor_loop_receivers.take().unwrap(), work_rx)
597-
.await)
587+
Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
598588
}
599589

600590
/// Call `abort` on the `JoinHandle` associated with the given
@@ -1166,8 +1156,7 @@ impl<A: Actor> Instance<A> {
11661156

11671157
/// Start an A-typed actor onto this instance with the provided params. When spawn returns,
11681158
/// the actor has been linked with its parent, if it has one.
1169-
#[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))]
1170-
async fn start(
1159+
fn start(
11711160
self,
11721161
actor: A,
11731162
actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
@@ -1540,11 +1529,8 @@ impl<A: Actor> Instance<A> {
15401529
}
15411530

15421531
/// Spawn on child on this instance. Currently used only by cap::CanSpawn.
1543-
pub(crate) async fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1544-
self.inner
1545-
.proc
1546-
.spawn_child(self.inner.cell.clone(), actor)
1547-
.await
1532+
pub(crate) fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1533+
self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
15481534
}
15491535

15501536
/// Create a new direct child instance.
@@ -2282,7 +2268,7 @@ mod tests {
22822268
cx: &crate::Context<Self>,
22832269
reply: oneshot::Sender<ActorHandle<TestActor>>,
22842270
) -> Result<(), anyhow::Error> {
2285-
let handle = TestActor::default().spawn(cx).await?;
2271+
let handle = TestActor::default().spawn(cx)?;
22862272
reply.send(handle).unwrap();
22872273
Ok(())
22882274
}
@@ -2292,7 +2278,7 @@ mod tests {
22922278
#[async_timed_test(timeout_secs = 30)]
22932279
async fn test_spawn_actor() {
22942280
let proc = Proc::local();
2295-
let handle = proc.spawn("test", TestActor::default()).await.unwrap();
2281+
let handle = proc.spawn("test", TestActor::default()).unwrap();
22962282

22972283
// Check on the join handle.
22982284
assert!(logs_contain(
@@ -2343,11 +2329,9 @@ mod tests {
23432329
let proc = Proc::local();
23442330
let first = proc
23452331
.spawn::<TestActor>("first", TestActor::default())
2346-
.await
23472332
.unwrap();
23482333
let second = proc
23492334
.spawn::<TestActor>("second", TestActor::default())
2350-
.await
23512335
.unwrap();
23522336
let (tx, rx) = oneshot::channel::<()>();
23532337
let reply_message = TestActorMessage::Reply(tx);
@@ -2387,12 +2371,10 @@ mod tests {
23872371

23882372
let target_actor = proc
23892373
.spawn::<TestActor>("target", TestActor::default())
2390-
.await
23912374
.unwrap();
23922375
let target_actor_ref = target_actor.bind();
23932376
let lookup_actor = proc
23942377
.spawn::<LookupTestActor>("lookup", LookupTestActor::default())
2395-
.await
23962378
.unwrap();
23972379

23982380
assert!(
@@ -2453,7 +2435,6 @@ mod tests {
24532435

24542436
let first = proc
24552437
.spawn::<TestActor>("first", TestActor::default())
2456-
.await
24572438
.unwrap();
24582439
let second = TestActor::spawn_child(&first).await;
24592440
let third = TestActor::spawn_child(&second).await;
@@ -2524,7 +2505,6 @@ mod tests {
25242505

25252506
let root = proc
25262507
.spawn::<TestActor>("root", TestActor::default())
2527-
.await
25282508
.unwrap();
25292509
let root_1 = TestActor::spawn_child(&root).await;
25302510
let root_2 = TestActor::spawn_child(&root).await;
@@ -2548,7 +2528,6 @@ mod tests {
25482528

25492529
let root = proc
25502530
.spawn::<TestActor>("root", TestActor::default())
2551-
.await
25522531
.unwrap();
25532532
let root_1 = TestActor::spawn_child(&root).await;
25542533
let root_2 = TestActor::spawn_child(&root).await;
@@ -2589,10 +2568,7 @@ mod tests {
25892568
let proc = Proc::local();
25902569

25912570
// Add the 1st root. This root will remain active until the end of the test.
2592-
let root: ActorHandle<TestActor> = proc
2593-
.spawn::<TestActor>("root", TestActor::default())
2594-
.await
2595-
.unwrap();
2571+
let root: ActorHandle<TestActor> = proc.spawn("root", TestActor::default()).unwrap();
25962572
wait_until_idle(&root).await;
25972573
{
25982574
let snapshot = proc.state().ledger.snapshot();
@@ -2606,10 +2582,8 @@ mod tests {
26062582
}
26072583

26082584
// Add the 2nd root.
2609-
let another_root: ActorHandle<TestActor> = proc
2610-
.spawn::<TestActor>("another_root", TestActor::default())
2611-
.await
2612-
.unwrap();
2585+
let another_root: ActorHandle<TestActor> =
2586+
proc.spawn("another_root", TestActor::default()).unwrap();
26132587
wait_until_idle(&another_root).await;
26142588
{
26152589
let snapshot = proc.state().ledger.snapshot();
@@ -2844,7 +2818,7 @@ mod tests {
28442818
let proc = Proc::local();
28452819
let state = Arc::new(AtomicUsize::new(0));
28462820
let actor = TestActor(state.clone());
2847-
let handle = proc.spawn::<TestActor>("test", actor).await.unwrap();
2821+
let handle = proc.spawn::<TestActor>("test", actor).unwrap();
28482822
let client = proc.attach("client").unwrap();
28492823
let (tx, rx) = client.open_once_port();
28502824
handle.send(tx).unwrap();
@@ -2868,10 +2842,7 @@ mod tests {
28682842
ProcSupervisionCoordinator::set(&proc).await.unwrap();
28692843

28702844
let (client, _handle) = proc.instance("client").unwrap();
2871-
let actor_handle = proc
2872-
.spawn::<TestActor>("test", TestActor::default())
2873-
.await
2874-
.unwrap();
2845+
let actor_handle = proc.spawn("test", TestActor::default()).unwrap();
28752846
actor_handle
28762847
.panic(&client, "some random failure".to_string())
28772848
.await
@@ -2895,6 +2866,8 @@ mod tests {
28952866

28962867
#[async_timed_test(timeout_secs = 30)]
28972868
async fn test_local_supervision_propagation() {
2869+
hyperactor_telemetry::initialize_logging_for_test();
2870+
28982871
#[derive(Debug)]
28992872
struct TestActor(Arc<AtomicBool>, bool);
29002873

@@ -2943,7 +2916,6 @@ mod tests {
29432916

29442917
let root = proc
29452918
.spawn::<TestActor>("root", TestActor(root_state.clone(), false))
2946-
.await
29472919
.unwrap();
29482920
let root_1 = proc
29492921
.spawn_child::<TestActor>(
@@ -2953,32 +2925,27 @@ mod tests {
29532925
true, /* set true so children's event stops here */
29542926
),
29552927
)
2956-
.await
29572928
.unwrap();
29582929
let root_1_1 = proc
29592930
.spawn_child::<TestActor>(
29602931
root_1.cell().clone(),
29612932
TestActor(root_1_1_state.clone(), false),
29622933
)
2963-
.await
29642934
.unwrap();
29652935
let root_1_1_1 = proc
29662936
.spawn_child::<TestActor>(
29672937
root_1_1.cell().clone(),
29682938
TestActor(root_1_1_1_state.clone(), false),
29692939
)
2970-
.await
29712940
.unwrap();
29722941
let root_2 = proc
29732942
.spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
2974-
.await
29752943
.unwrap();
29762944
let root_2_1 = proc
29772945
.spawn_child::<TestActor>(
29782946
root_2.cell().clone(),
29792947
TestActor(root_2_1_state.clone(), false),
29802948
)
2981-
.await
29822949
.unwrap();
29832950

29842951
// fail `root_1_1_1`, the supervision msg should be propagated to
@@ -3030,7 +2997,7 @@ mod tests {
30302997

30312998
let (instance, handle) = proc.instance("my_test_actor").unwrap();
30322999

3033-
let child_actor = TestActor::default().spawn(&instance).await.unwrap();
3000+
let child_actor = TestActor::default().spawn(&instance).unwrap();
30343001

30353002
let (port, mut receiver) = instance.open_port();
30363003
child_actor
@@ -3061,10 +3028,7 @@ mod tests {
30613028
// Intentionally not setting a proc supervison coordinator. This
30623029
// should cause the process to terminate.
30633030
// ProcSupervisionCoordinator::set(&proc).await.unwrap();
3064-
let root = proc
3065-
.spawn::<TestActor>("root", TestActor::default())
3066-
.await
3067-
.unwrap();
3031+
let root = proc.spawn("root", TestActor::default()).unwrap();
30683032
let (client, _handle) = proc.instance("client").unwrap();
30693033
root.fail(&client, anyhow::anyhow!("some random failure"))
30703034
.await
@@ -3159,7 +3123,7 @@ mod tests {
31593123
}
31603124

31613125
trace_and_block(async {
3162-
let handle = LoggingActor::default().spawn_detached().await.unwrap();
3126+
let handle = LoggingActor::default().spawn_detached().unwrap();
31633127
handle.send("hello world".to_string()).unwrap();
31643128
handle.send("hello world again".to_string()).unwrap();
31653129
handle.send(123u64).unwrap();

hyperactor/src/test_utils/proc_supervison.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ impl ProcSupervisionCoordinator {
4343
pub async fn set(proc: &Proc) -> Result<ReportedEvent, anyhow::Error> {
4444
let state = ReportedEvent::new();
4545
let actor = ProcSupervisionCoordinator(state.clone());
46-
let coordinator = proc
47-
.spawn::<ProcSupervisionCoordinator>("coordinator", actor)
48-
.await?;
46+
let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
4947
proc.set_supervision_coordinator(coordinator.port())?;
5048
Ok(state)
5149
}

0 commit comments

Comments
 (0)