Skip to content

Commit a0bb5c7

Browse files
committed
[hyperactor] make all variants of spawn sync
Pull Request resolved: #1968 Building on the previous diff, separating out remote from local instantiation, this diff implements synchronous spawns throughout. This means we can always spawn an actor in a nonblocking way, regardless of context. Spawns should also become infallible, instead relying on supervision to handle errors. ghstack-source-id: 325462726 Differential Revision: [D87608082](https://our.internmc.facebook.com/intern/diff/D87608082/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87608082/)!
1 parent 9d17635 commit a0bb5c7

File tree

30 files changed

+184
-310
lines changed

30 files changed

+184
-310
lines changed

hyper/src/commands/demo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ impl DemoMessageHandler for DemoActor {
246246

247247
async fn spawn_child(&mut self, cx: &Context<Self>) -> Result<ActorRef<Self>, anyhow::Error> {
248248
tracing::info!("demo: spawn child");
249-
Ok(Self.spawn(cx).await?.bind())
249+
Ok(Self.spawn(cx)?.bind())
250250
}
251251

252252
async fn error(&mut self, _cx: &Context<Self>, message: String) -> Result<(), anyhow::Error> {

hyperactor/example/derive.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async fn main() -> Result<(), anyhow::Error> {
134134

135135
// Spawn our actor, and get a handle for rank 0.
136136
let shopping_list_actor: hyperactor::ActorHandle<ShoppingListActor> =
137-
proc.spawn("shopping", ShoppingListActor::default()).await?;
137+
proc.spawn("shopping", ShoppingListActor::default())?;
138138
let shopping_api: hyperactor::ActorRef<ShoppingApi> = shopping_list_actor.bind();
139139
// We join the system, so that we can send messages to actors.
140140
let (client, _) = proc.instance("client").unwrap();

hyperactor/example/stream.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,8 @@ impl Handler<u64> for CountClient {
8383
async fn main() {
8484
let proc = Proc::local();
8585

86-
let counter_actor: ActorHandle<CounterActor> = proc
87-
.spawn("counter", CounterActor::default())
88-
.await
89-
.unwrap();
86+
let counter_actor: ActorHandle<CounterActor> =
87+
proc.spawn("counter", CounterActor::default()).unwrap();
9088

9189
for i in 0..10 {
9290
// Spawn new "countees". Every time each subscribes, the counter broadcasts
@@ -96,7 +94,6 @@ async fn main() {
9694
&format!("countee_{}", i),
9795
CountClient::new(counter_actor.port().bind()),
9896
)
99-
.await
10097
.unwrap();
10198
#[allow(clippy::disallowed_methods)]
10299
tokio::time::sleep(Duration::from_millis(100)).await;

hyperactor/src/actor.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
9898

9999
/// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
100100
/// The spawned actor will be supervised by the parent (spawning) actor.
101-
async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102-
cx.instance().spawn(self).await
101+
fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102+
cx.instance().spawn(self)
103103
}
104104

105105
/// Spawns this actor in a detached state, handling its messages
@@ -108,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
108108
///
109109
/// Actors spawned through `spawn_detached` are not attached to a supervision
110110
/// hierarchy, and not managed by a [`Proc`].
111-
async fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112-
Proc::local().spawn("anon", self).await
111+
fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112+
Proc::local().spawn("anon", self)
113113
}
114114

115115
/// This method is used by the runtime to spawn the actor server. It can be
@@ -260,7 +260,7 @@ pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
260260
Box::pin(async move {
261261
let params = bincode::deserialize(&serialized_params)?;
262262
let actor = Self::new(params).await?;
263-
let handle = proc.spawn(&name, actor).await?;
263+
let handle = proc.spawn(&name, actor)?;
264264
// We return only the ActorId, not a typed ActorRef.
265265
// Callers that hold this ID can interact with the actor
266266
// only via the serialized/opaque messaging path, which
@@ -792,7 +792,7 @@ mod tests {
792792
let client = proc.attach("client").unwrap();
793793
let (tx, mut rx) = client.open_port();
794794
let actor = EchoActor(tx.bind());
795-
let handle = proc.spawn::<EchoActor>("echo", actor).await.unwrap();
795+
let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
796796
handle.send(123u64).unwrap();
797797
handle.drain_and_stop().unwrap();
798798
handle.await;
@@ -808,14 +808,8 @@ mod tests {
808808

809809
let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
810810
let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
811-
let ping_handle = proc
812-
.spawn::<PingPongActor>("ping", ping_actor)
813-
.await
814-
.unwrap();
815-
let pong_handle = proc
816-
.spawn::<PingPongActor>("pong", pong_actor)
817-
.await
818-
.unwrap();
811+
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
812+
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
819813

820814
let (local_port, local_receiver) = client.open_once_port();
821815

@@ -842,14 +836,8 @@ mod tests {
842836
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
843837
let pong_actor =
844838
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
845-
let ping_handle = proc
846-
.spawn::<PingPongActor>("ping", ping_actor)
847-
.await
848-
.unwrap();
849-
let pong_handle = proc
850-
.spawn::<PingPongActor>("pong", pong_actor)
851-
.await
852-
.unwrap();
839+
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
840+
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
853841

854842
let (local_port, local_receiver) = client.open_once_port();
855843

@@ -895,7 +883,7 @@ mod tests {
895883
async fn test_init() {
896884
let proc = Proc::local();
897885
let actor = InitActor(false);
898-
let handle = proc.spawn::<InitActor>("init", actor).await.unwrap();
886+
let handle = proc.spawn::<InitActor>("init", actor).unwrap();
899887
let client = proc.attach("client").unwrap();
900888

901889
let (port, receiver) = client.open_once_port();
@@ -954,7 +942,7 @@ mod tests {
954942
let proc = Proc::local();
955943
let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
956944
let actor = MultiActor(values.clone());
957-
let handle = proc.spawn::<MultiActor>("myactor", actor).await.unwrap();
945+
let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
958946
let (client, client_handle) = proc.instance("client").unwrap();
959947
Self {
960948
proc,
@@ -1072,7 +1060,7 @@ mod tests {
10721060
// Just test that we can round-trip the handle through a downcast.
10731061

10741062
let proc = Proc::local();
1075-
let handle = proc.spawn("nothing", NothingActor).await.unwrap();
1063+
let handle = proc.spawn("nothing", NothingActor).unwrap();
10761064
let cell = handle.cell();
10771065

10781066
// Invalid actor doesn't succeed.

hyperactor/src/host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,7 @@ mod tests {
12201220
#[tokio::test]
12211221
async fn test_basic() {
12221222
let proc_manager =
1223-
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
1223+
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) });
12241224
let procs = Arc::clone(&proc_manager.procs);
12251225
let (mut host, _handle) =
12261226
Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3125,7 +3125,7 @@ mod tests {
31253125
let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
31263126
ProcSupervisionCoordinator::set(&proc).await.unwrap();
31273127

3128-
let foo = proc.spawn("foo", Foo).await.unwrap();
3128+
let foo = proc.spawn("foo", Foo).unwrap();
31293129
let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
31303130
let message = MessageEnvelope::new(
31313131
foo.actor_id().clone(),

0 commit comments

Comments
 (0)