Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hyper/src/commands/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl DemoMessageHandler for DemoActor {

async fn spawn_child(&mut self, cx: &Context<Self>) -> Result<ActorRef<Self>, anyhow::Error> {
tracing::info!("demo: spawn child");
Ok(Self.spawn(cx).await?.bind())
Ok(Self.spawn(cx)?.bind())
}

async fn error(&mut self, _cx: &Context<Self>, message: String) -> Result<(), anyhow::Error> {
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/example/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn main() -> Result<(), anyhow::Error> {

// Spawn our actor, and get a handle for rank 0.
let shopping_list_actor: hyperactor::ActorHandle<ShoppingListActor> =
proc.spawn("shopping", ShoppingListActor::default()).await?;
proc.spawn("shopping", ShoppingListActor::default())?;
let shopping_api: hyperactor::ActorRef<ShoppingApi> = shopping_list_actor.bind();
// We join the system, so that we can send messages to actors.
let (client, _) = proc.instance("client").unwrap();
Expand Down
7 changes: 2 additions & 5 deletions hyperactor/example/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ impl Handler<u64> for CountClient {
async fn main() {
let proc = Proc::local();

let counter_actor: ActorHandle<CounterActor> = proc
.spawn("counter", CounterActor::default())
.await
.unwrap();
let counter_actor: ActorHandle<CounterActor> =
proc.spawn("counter", CounterActor::default()).unwrap();

for i in 0..10 {
// Spawn new "countees". Every time each subscribes, the counter broadcasts
Expand All @@ -96,7 +94,6 @@ async fn main() {
&format!("countee_{}", i),
CountClient::new(counter_actor.port().bind()),
)
.await
.unwrap();
#[allow(clippy::disallowed_methods)]
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
38 changes: 13 additions & 25 deletions hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static {

/// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
/// The spawned actor will be supervised by the parent (spawning) actor.
async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
cx.instance().spawn(self).await
fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
cx.instance().spawn(self)
}

/// Spawns this actor in a detached state, handling its messages
Expand All @@ -108,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static {
///
/// Actors spawned through `spawn_detached` are not attached to a supervision
/// hierarchy, and not managed by a [`Proc`].
async fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
Proc::local().spawn("anon", self).await
fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
Proc::local().spawn("anon", self)
}

/// This method is used by the runtime to spawn the actor server. It can be
Expand Down Expand Up @@ -260,7 +260,7 @@ pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
Box::pin(async move {
let params = bincode::deserialize(&serialized_params)?;
let actor = Self::new(params).await?;
let handle = proc.spawn(&name, actor).await?;
let handle = proc.spawn(&name, actor)?;
// We return only the ActorId, not a typed ActorRef.
// Callers that hold this ID can interact with the actor
// only via the serialized/opaque messaging path, which
Expand Down Expand Up @@ -792,7 +792,7 @@ mod tests {
let client = proc.attach("client").unwrap();
let (tx, mut rx) = client.open_port();
let actor = EchoActor(tx.bind());
let handle = proc.spawn::<EchoActor>("echo", actor).await.unwrap();
let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
handle.send(123u64).unwrap();
handle.drain_and_stop().unwrap();
handle.await;
Expand All @@ -808,14 +808,8 @@ mod tests {

let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
let ping_handle = proc
.spawn::<PingPongActor>("ping", ping_actor)
.await
.unwrap();
let pong_handle = proc
.spawn::<PingPongActor>("pong", pong_actor)
.await
.unwrap();
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();

let (local_port, local_receiver) = client.open_once_port();

Expand All @@ -842,14 +836,8 @@ mod tests {
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
let pong_actor =
PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
let ping_handle = proc
.spawn::<PingPongActor>("ping", ping_actor)
.await
.unwrap();
let pong_handle = proc
.spawn::<PingPongActor>("pong", pong_actor)
.await
.unwrap();
let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();

let (local_port, local_receiver) = client.open_once_port();

Expand Down Expand Up @@ -895,7 +883,7 @@ mod tests {
async fn test_init() {
let proc = Proc::local();
let actor = InitActor(false);
let handle = proc.spawn::<InitActor>("init", actor).await.unwrap();
let handle = proc.spawn::<InitActor>("init", actor).unwrap();
let client = proc.attach("client").unwrap();

let (port, receiver) = client.open_once_port();
Expand Down Expand Up @@ -954,7 +942,7 @@ mod tests {
let proc = Proc::local();
let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
let actor = MultiActor(values.clone());
let handle = proc.spawn::<MultiActor>("myactor", actor).await.unwrap();
let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
let (client, client_handle) = proc.instance("client").unwrap();
Self {
proc,
Expand Down Expand Up @@ -1072,7 +1060,7 @@ mod tests {
// Just test that we can round-trip the handle through a downcast.

let proc = Proc::local();
let handle = proc.spawn("nothing", NothingActor).await.unwrap();
let handle = proc.spawn("nothing", NothingActor).unwrap();
let cell = handle.cell();

// Invalid actor doesn't succeed.
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ mod tests {
#[tokio::test]
async fn test_basic() {
let proc_manager =
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) });
let procs = Arc::clone(&proc_manager.procs);
let (mut host, _handle) =
Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3125,7 +3125,7 @@ mod tests {
let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
ProcSupervisionCoordinator::set(&proc).await.unwrap();

let foo = proc.spawn("foo", Foo).await.unwrap();
let foo = proc.spawn("foo", Foo).unwrap();
let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
let message = MessageEnvelope::new(
foo.actor_id().clone(),
Expand Down
Loading
Loading