Skip to content

Commit 5b5a94b

Browse files
mariusaefacebook-github-bot
authored andcommitted
simple pub/sub example (#838)
Summary: Pull Request resolved: #838 This is a simple example to demonstrate how to implement pub/sub channels in Monarch. This example does not yet handle unsubscribes (through undeliverable message notifications). This will come as a follow-up. ghstack-source-id: 302711477 Reviewed By: highker Differential Revision: D80140121 fbshipit-source-id: d778bcac0c7bafb2f54670629127e6fff7af71cc
1 parent a01f0c6 commit 5b5a94b

File tree

2 files changed

+100
-1
lines changed

2 files changed

+100
-1
lines changed

hyperactor/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor:[channel_benchmarks,hyperactor,hyperactor-example-derive]
1+
# @generated by autocargo from //monarch/hyperactor:[channel_benchmarks,hyperactor,hyperactor-example-derive,hyperactor-example-stream]
22

33
[package]
44
name = "hyperactor"
@@ -17,6 +17,10 @@ path = "benches/main.rs"
1717
name = "hyperactor_example_derive"
1818
path = "example/derive.rs"
1919

20+
[[bin]]
21+
name = "hyperactor_example_stream"
22+
path = "example/stream.rs"
23+
2024
[dependencies]
2125
anyhow = "1.0.98"
2226
async-trait = "0.1.86"

hyperactor/example/stream.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! This example implements a basic pub-sub pattern in Hyperactor.
10+
11+
use std::time::Duration;
12+
13+
use async_trait::async_trait;
14+
use hyperactor::Actor;
15+
use hyperactor::ActorHandle;
16+
use hyperactor::Context;
17+
use hyperactor::Handler;
18+
use hyperactor::Instance;
19+
use hyperactor::Named;
20+
use hyperactor::PortRef;
21+
use hyperactor::proc::Proc;
22+
use serde::Deserialize;
23+
use serde::Serialize;
24+
25+
#[derive(Debug, Actor, Default)]
26+
struct CounterActor {
27+
subscribers: Vec<PortRef<u64>>,
28+
n: u64,
29+
}
30+
31+
#[derive(Serialize, Deserialize, Debug, Named)]
32+
struct Subscribe(PortRef<u64>);
33+
34+
#[async_trait]
35+
impl Handler<Subscribe> for CounterActor {
36+
async fn handle(
37+
&mut self,
38+
cx: &Context<Self>,
39+
subscriber: Subscribe,
40+
) -> Result<(), anyhow::Error> {
41+
self.subscribers.push(subscriber.0);
42+
for port in &self.subscribers {
43+
port.send(cx, self.n)?;
44+
}
45+
self.n += 1;
46+
Ok(())
47+
}
48+
}
49+
50+
#[derive(Debug)]
51+
struct CountClient {
52+
counter: PortRef<Subscribe>,
53+
}
54+
55+
#[async_trait]
56+
impl Actor for CountClient {
57+
// Where to send subscribe messages.
58+
type Params = PortRef<Subscribe>;
59+
60+
async fn new(counter: PortRef<Subscribe>) -> Result<Self, anyhow::Error> {
61+
Ok(Self { counter })
62+
}
63+
64+
async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
65+
// Subscribe to the counter on initialization. We give it our u64 port to report
66+
// messages back to.
67+
self.counter.send(this, Subscribe(this.port().bind()))?;
68+
Ok(())
69+
}
70+
}
71+
72+
#[async_trait]
73+
impl Handler<u64> for CountClient {
74+
async fn handle(&mut self, cx: &Context<Self>, count: u64) -> Result<(), anyhow::Error> {
75+
eprintln!("{}: count: {}", cx.self_id(), count);
76+
Ok(())
77+
}
78+
}
79+
80+
#[tokio::main]
81+
async fn main() {
82+
let proc = Proc::local();
83+
84+
let counter_actor: ActorHandle<CounterActor> = proc.spawn("counter", ()).await.unwrap();
85+
86+
for i in 0..10 {
87+
// Spawn new "countees". Every time each subscribes, the counter broadcasts
88+
// the count to everyone.
89+
let _countee_actor: ActorHandle<CountClient> = proc
90+
.spawn(&format!("countee_{}", i), counter_actor.port().bind())
91+
.await
92+
.unwrap();
93+
tokio::time::sleep(Duration::from_millis(100)).await;
94+
}
95+
}

0 commit comments

Comments
 (0)