Skip to content

Commit 16bc756

Browse files
committed
[hyperactor] make all variants of spawn sync
Pull Request resolved: #1968 Building on the previous diff, separating out remote from local instantiation, this diff implements synchronous spawns throughout. This means we can always spawn an actor in a nonblocking way, regardless of context. Spawns should also become infallible, instead relying on supervision to handle errors. ghstack-source-id: 325005475 Differential Revision: [D87608082](https://our.internmc.facebook.com/intern/diff/D87608082/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87608082/)!
1 parent daa1a38 commit 16bc756

File tree

14 files changed

+65
-158
lines changed

14 files changed

+65
-158
lines changed

hyperactor/src/actor.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
9898

9999
/// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
100100
/// The spawned actor will be supervised by the parent (spawning) actor.
101-
async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102-
cx.instance().spawn(self).await
101+
fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102+
cx.instance().spawn(self)
103103
}
104104

105105
/// Spawns this actor in a detached state, handling its messages
@@ -108,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
108108
///
109109
/// Actors spawned through `spawn_detached` are not attached to a supervision
110110
/// hierarchy, and not managed by a [`Proc`].
111-
async fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112-
Proc::local().spawn("anon", self).await
111+
fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112+
Proc::local().spawn("anon", self)
113113
}
114114

115115
/// This method is used by the runtime to spawn the actor server. It can be
@@ -260,7 +260,7 @@ pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
260260
Box::pin(async move {
261261
let params = bincode::deserialize(&serialized_params)?;
262262
let actor = Self::new(params).await?;
263-
let handle = proc.spawn(&name, actor).await?;
263+
let handle = proc.spawn(&name, actor)?;
264264
// We return only the ActorId, not a typed ActorRef.
265265
// Callers that hold this ID can interact with the actor
266266
// only via the serialized/opaque messaging path, which
@@ -792,7 +792,7 @@ mod tests {
792792
let client = proc.attach("client").unwrap();
793793
let (tx, mut rx) = client.open_port();
794794
let actor = EchoActor(tx.bind());
795-
let handle = proc.spawn::<EchoActor>("echo", actor).await.unwrap();
795+
let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
796796
handle.send(123u64).unwrap();
797797
handle.drain_and_stop().unwrap();
798798
handle.await;
@@ -808,14 +808,8 @@ mod tests {
808808

809809
let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
810810
let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
811-
let ping_handle = proc
812-
.spawn::<PingPongActor>("ping", ping_actor)
813-
.await
814-
.unwrap();
815-
let pong_handle = proc
816-
.spawn::<PingPongActor>("pong", pong_actor)
817-
.await
818-
.unwrap();
811+
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
812+
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
819813

820814
let (local_port, local_receiver) = client.open_once_port();
821815

@@ -842,14 +836,8 @@ mod tests {
842836
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
843837
let pong_actor =
844838
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
845-
let ping_handle = proc
846-
.spawn::<PingPongActor>("ping", ping_actor)
847-
.await
848-
.unwrap();
849-
let pong_handle = proc
850-
.spawn::<PingPongActor>("pong", pong_actor)
851-
.await
852-
.unwrap();
839+
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
840+
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
853841

854842
let (local_port, local_receiver) = client.open_once_port();
855843

@@ -895,7 +883,7 @@ mod tests {
895883
async fn test_init() {
896884
let proc = Proc::local();
897885
let actor = InitActor(false);
898-
let handle = proc.spawn::<InitActor>("init", actor).await.unwrap();
886+
let handle = proc.spawn::<InitActor>("init", actor).unwrap();
899887
let client = proc.attach("client").unwrap();
900888

901889
let (port, receiver) = client.open_once_port();
@@ -954,7 +942,7 @@ mod tests {
954942
let proc = Proc::local();
955943
let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
956944
let actor = MultiActor(values.clone());
957-
let handle = proc.spawn::<MultiActor>("myactor", actor).await.unwrap();
945+
let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
958946
let (client, client_handle) = proc.instance("client").unwrap();
959947
Self {
960948
proc,
@@ -1072,7 +1060,7 @@ mod tests {
10721060
// Just test that we can round-trip the handle through a downcast.
10731061

10741062
let proc = Proc::local();
1075-
let handle = proc.spawn("nothing", NothingActor).await.unwrap();
1063+
let handle = proc.spawn("nothing", NothingActor).unwrap();
10761064
let cell = handle.cell();
10771065

10781066
// Invalid actor doesn't succeed.

hyperactor/src/host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,7 @@ mod tests {
12201220
#[tokio::test]
12211221
async fn test_basic() {
12221222
let proc_manager =
1223-
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
1223+
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) });
12241224
let procs = Arc::clone(&proc_manager.procs);
12251225
let (mut host, _handle) =
12261226
Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3125,7 +3125,7 @@ mod tests {
31253125
let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
31263126
ProcSupervisionCoordinator::set(&proc).await.unwrap();
31273127

3128-
let foo = proc.spawn("foo", Foo).await.unwrap();
3128+
let foo = proc.spawn("foo", Foo).unwrap();
31293129
let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
31303130
let message = MessageEnvelope::new(
31313131
foo.actor_id().clone(),

hyperactor/src/proc.rs

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -507,12 +507,7 @@ impl Proc {
507507

508508
/// Spawn a named (root) actor on this proc. The name of the actor must be
509509
/// unique.
510-
#[hyperactor::observe_result("Proc")]
511-
pub async fn spawn<A: Actor>(
512-
&self,
513-
name: &str,
514-
actor: A,
515-
) -> Result<ActorHandle<A>, anyhow::Error> {
510+
pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
516511
let actor_id = self.allocate_root_id(name)?;
517512
let span = tracing::span!(
518513
Level::INFO,
@@ -532,10 +527,7 @@ impl Proc {
532527
.ledger
533528
.insert(actor_id.clone(), instance.inner.cell.downgrade())?;
534529

535-
Ok(instance
536-
.start(actor, actor_loop_receivers.take().unwrap(), work_rx)
537-
.instrument(span)
538-
.await)
530+
Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
539531
}
540532

541533
/// Create and return an actor instance and its corresponding handle. This allows actors to be
@@ -584,17 +576,15 @@ impl Proc {
584576
///
585577
/// When spawn_child returns, the child has an associated cell and is linked
586578
/// with its parent.
587-
async fn spawn_child<A: Actor>(
579+
fn spawn_child<A: Actor>(
588580
&self,
589581
parent: InstanceCell,
590582
actor: A,
591583
) -> Result<ActorHandle<A>, anyhow::Error> {
592584
let actor_id = self.allocate_child_id(parent.actor_id())?;
593585
let (instance, mut actor_loop_receivers, work_rx) =
594586
Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
595-
Ok(instance
596-
.start(actor, actor_loop_receivers.take().unwrap(), work_rx)
597-
.await)
587+
Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
598588
}
599589

600590
/// Call `abort` on the `JoinHandle` associated with the given
@@ -1166,8 +1156,7 @@ impl<A: Actor> Instance<A> {
11661156

11671157
/// Start an A-typed actor onto this instance with the provided params. When spawn returns,
11681158
/// the actor has been linked with its parent, if it has one.
1169-
#[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))]
1170-
async fn start(
1159+
fn start(
11711160
self,
11721161
actor: A,
11731162
actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
@@ -1531,11 +1520,8 @@ impl<A: Actor> Instance<A> {
15311520
}
15321521

15331522
/// Spawn on child on this instance. Currently used only by cap::CanSpawn.
1534-
pub(crate) async fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1535-
self.inner
1536-
.proc
1537-
.spawn_child(self.inner.cell.clone(), actor)
1538-
.await
1523+
pub(crate) fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1524+
self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
15391525
}
15401526

15411527
/// Create a new direct child instance.
@@ -2273,7 +2259,7 @@ mod tests {
22732259
cx: &crate::Context<Self>,
22742260
reply: oneshot::Sender<ActorHandle<TestActor>>,
22752261
) -> Result<(), anyhow::Error> {
2276-
let handle = TestActor::default().spawn(cx).await?;
2262+
let handle = TestActor::default().spawn(cx)?;
22772263
reply.send(handle).unwrap();
22782264
Ok(())
22792265
}
@@ -2283,7 +2269,7 @@ mod tests {
22832269
#[async_timed_test(timeout_secs = 30)]
22842270
async fn test_spawn_actor() {
22852271
let proc = Proc::local();
2286-
let handle = proc.spawn("test", TestActor::default()).await.unwrap();
2272+
let handle = proc.spawn("test", TestActor::default()).unwrap();
22872273

22882274
// Check on the join handle.
22892275
assert!(logs_contain(
@@ -2334,11 +2320,9 @@ mod tests {
23342320
let proc = Proc::local();
23352321
let first = proc
23362322
.spawn::<TestActor>("first", TestActor::default())
2337-
.await
23382323
.unwrap();
23392324
let second = proc
23402325
.spawn::<TestActor>("second", TestActor::default())
2341-
.await
23422326
.unwrap();
23432327
let (tx, rx) = oneshot::channel::<()>();
23442328
let reply_message = TestActorMessage::Reply(tx);
@@ -2378,12 +2362,10 @@ mod tests {
23782362

23792363
let target_actor = proc
23802364
.spawn::<TestActor>("target", TestActor::default())
2381-
.await
23822365
.unwrap();
23832366
let target_actor_ref = target_actor.bind();
23842367
let lookup_actor = proc
23852368
.spawn::<LookupTestActor>("lookup", LookupTestActor::default())
2386-
.await
23872369
.unwrap();
23882370

23892371
assert!(
@@ -2444,7 +2426,6 @@ mod tests {
24442426

24452427
let first = proc
24462428
.spawn::<TestActor>("first", TestActor::default())
2447-
.await
24482429
.unwrap();
24492430
let second = TestActor::spawn_child(&first).await;
24502431
let third = TestActor::spawn_child(&second).await;
@@ -2515,7 +2496,6 @@ mod tests {
25152496

25162497
let root = proc
25172498
.spawn::<TestActor>("root", TestActor::default())
2518-
.await
25192499
.unwrap();
25202500
let root_1 = TestActor::spawn_child(&root).await;
25212501
let root_2 = TestActor::spawn_child(&root).await;
@@ -2539,7 +2519,6 @@ mod tests {
25392519

25402520
let root = proc
25412521
.spawn::<TestActor>("root", TestActor::default())
2542-
.await
25432522
.unwrap();
25442523
let root_1 = TestActor::spawn_child(&root).await;
25452524
let root_2 = TestActor::spawn_child(&root).await;
@@ -2580,10 +2559,7 @@ mod tests {
25802559
let proc = Proc::local();
25812560

25822561
// Add the 1st root. This root will remain active until the end of the test.
2583-
let root: ActorHandle<TestActor> = proc
2584-
.spawn::<TestActor>("root", TestActor::default())
2585-
.await
2586-
.unwrap();
2562+
let root: ActorHandle<TestActor> = proc.spawn("root", TestActor::default()).unwrap();
25872563
wait_until_idle(&root).await;
25882564
{
25892565
let snapshot = proc.state().ledger.snapshot();
@@ -2597,10 +2573,8 @@ mod tests {
25972573
}
25982574

25992575
// Add the 2nd root.
2600-
let another_root: ActorHandle<TestActor> = proc
2601-
.spawn::<TestActor>("another_root", TestActor::default())
2602-
.await
2603-
.unwrap();
2576+
let another_root: ActorHandle<TestActor> =
2577+
proc.spawn("another_root", TestActor::default()).unwrap();
26042578
wait_until_idle(&another_root).await;
26052579
{
26062580
let snapshot = proc.state().ledger.snapshot();
@@ -2837,7 +2811,7 @@ mod tests {
28372811
let proc = Proc::local();
28382812
let state = Arc::new(AtomicUsize::new(0));
28392813
let actor = TestActor(state.clone());
2840-
let handle = proc.spawn::<TestActor>("test", actor).await.unwrap();
2814+
let handle = proc.spawn::<TestActor>("test", actor).unwrap();
28412815
let client = proc.attach("client").unwrap();
28422816
let (tx, rx) = client.open_once_port();
28432817
handle.send(tx).unwrap();
@@ -2861,10 +2835,7 @@ mod tests {
28612835
ProcSupervisionCoordinator::set(&proc).await.unwrap();
28622836

28632837
let (client, _handle) = proc.instance("client").unwrap();
2864-
let actor_handle = proc
2865-
.spawn::<TestActor>("test", TestActor::default())
2866-
.await
2867-
.unwrap();
2838+
let actor_handle = proc.spawn("test", TestActor::default()).unwrap();
28682839
actor_handle
28692840
.panic(&client, "some random failure".to_string())
28702841
.await
@@ -2888,6 +2859,8 @@ mod tests {
28882859

28892860
#[async_timed_test(timeout_secs = 30)]
28902861
async fn test_local_supervision_propagation() {
2862+
hyperactor_telemetry::initialize_logging_for_test();
2863+
28912864
#[derive(Debug)]
28922865
struct TestActor(Arc<AtomicBool>, bool);
28932866

@@ -2936,7 +2909,6 @@ mod tests {
29362909

29372910
let root = proc
29382911
.spawn::<TestActor>("root", TestActor(root_state.clone(), false))
2939-
.await
29402912
.unwrap();
29412913
let root_1 = proc
29422914
.spawn_child::<TestActor>(
@@ -2946,32 +2918,27 @@ mod tests {
29462918
true, /* set true so children's event stops here */
29472919
),
29482920
)
2949-
.await
29502921
.unwrap();
29512922
let root_1_1 = proc
29522923
.spawn_child::<TestActor>(
29532924
root_1.cell().clone(),
29542925
TestActor(root_1_1_state.clone(), false),
29552926
)
2956-
.await
29572927
.unwrap();
29582928
let root_1_1_1 = proc
29592929
.spawn_child::<TestActor>(
29602930
root_1_1.cell().clone(),
29612931
TestActor(root_1_1_1_state.clone(), false),
29622932
)
2963-
.await
29642933
.unwrap();
29652934
let root_2 = proc
29662935
.spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
2967-
.await
29682936
.unwrap();
29692937
let root_2_1 = proc
29702938
.spawn_child::<TestActor>(
29712939
root_2.cell().clone(),
29722940
TestActor(root_2_1_state.clone(), false),
29732941
)
2974-
.await
29752942
.unwrap();
29762943

29772944
// fail `root_1_1_1`, the supervision msg should be propagated to
@@ -3023,7 +2990,7 @@ mod tests {
30232990

30242991
let (instance, handle) = proc.instance("my_test_actor").unwrap();
30252992

3026-
let child_actor = TestActor::default().spawn(&instance).await.unwrap();
2993+
let child_actor = TestActor::default().spawn(&instance).unwrap();
30272994

30282995
let (port, mut receiver) = instance.open_port();
30292996
child_actor
@@ -3054,10 +3021,7 @@ mod tests {
30543021
// Intentionally not setting a proc supervison coordinator. This
30553022
// should cause the process to terminate.
30563023
// ProcSupervisionCoordinator::set(&proc).await.unwrap();
3057-
let root = proc
3058-
.spawn::<TestActor>("root", TestActor::default())
3059-
.await
3060-
.unwrap();
3024+
let root = proc.spawn("root", TestActor::default()).unwrap();
30613025
let (client, _handle) = proc.instance("client").unwrap();
30623026
root.fail(&client, anyhow::anyhow!("some random failure"))
30633027
.await
@@ -3152,7 +3116,7 @@ mod tests {
31523116
}
31533117

31543118
trace_and_block(async {
3155-
let handle = LoggingActor::default().spawn_detached().await.unwrap();
3119+
let handle = LoggingActor::default().spawn_detached().unwrap();
31563120
handle.send("hello world".to_string()).unwrap();
31573121
handle.send("hello world again".to_string()).unwrap();
31583122
handle.send(123u64).unwrap();

hyperactor/src/test_utils/proc_supervison.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ impl ProcSupervisionCoordinator {
4343
pub async fn set(proc: &Proc) -> Result<ReportedEvent, anyhow::Error> {
4444
let state = ReportedEvent::new();
4545
let actor = ProcSupervisionCoordinator(state.clone());
46-
let coordinator = proc
47-
.spawn::<ProcSupervisionCoordinator>("coordinator", actor)
48-
.await?;
46+
let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
4947
proc.set_supervision_coordinator(coordinator.port())?;
5048
Ok(state)
5149
}

0 commit comments

Comments
 (0)