Skip to content

Commit 28c31d9

Browse files
authored
Merge pull request #612 from openmina/feat/node/web
Webnode: expose some rpcs for FE
2 parents dd3bca3 + 0e0080f commit 28c31d9

File tree

7 files changed

+157
-61
lines changed

7 files changed

+157
-61
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ vergen = { version = "8.2.4", features = ["build", "cargo", "git", "gitcl", "rus
4141
linkme = { workspace = true }
4242
redux = { workspace = true, features=["serializable_callbacks"] }
4343

44+
[target.'cfg(target_family = "wasm")'.dependencies]
45+
wasm-bindgen = "0.2"
46+
4447
[features]
4548
replay = []
4649
p2p-webrtc = ["p2p/p2p-webrtc"]

node/common/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ node = { path = "../../node", features = ["replay"] }
2323

2424
[target.'cfg(target_family = "wasm")'.dependencies]
2525
redux = { workspace = true }
26+
wasm-bindgen = "0.2"
27+
wasm-bindgen-futures = "0.4.42"
2628
gloo-timers = { version = "0.3", features = ["futures"] }
29+
gloo-utils = "0.2"
2730
tracing-wasm = "0.2"
31+
2832
[target.'cfg(not(target_family = "wasm"))'.dependencies]
2933
redux = { workspace = true, features=["serializable_callbacks"] }
3034
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts;
1+
mod sender;
2+
pub use sender::RpcSender;
3+
4+
pub mod stats;
5+
26
use node::rpc::{
37
RpcBlockProducerStatsGetResponse, RpcDiscoveryBoostrapStatsResponse,
48
RpcDiscoveryRoutingTableResponse, RpcHealthCheckResponse, RpcLedgerAccountsResponse,
@@ -33,11 +37,6 @@ pub struct NodeRpcRequest {
3337
pub responder: Box<dyn Send + std::any::Any>,
3438
}
3539

36-
#[derive(Clone)]
37-
pub struct RpcSender {
38-
tx: mpsc::Sender<NodeRpcRequest>,
39-
}
40-
4140
pub type RpcReceiver = mpsc::Receiver<NodeRpcRequest>;
4241

4342
pub struct RpcService {
@@ -47,53 +46,6 @@ pub struct RpcService {
4746
req_receiver: mpsc::Receiver<NodeRpcRequest>,
4847
}
4948

50-
impl RpcSender {
51-
pub fn new(tx: mpsc::Sender<NodeRpcRequest>) -> Self {
52-
Self { tx }
53-
}
54-
55-
pub async fn oneshot_request<T>(&self, req: RpcRequest) -> Option<T>
56-
where
57-
T: 'static + Send + Serialize,
58-
{
59-
let (tx, rx) = oneshot::channel::<T>();
60-
let responder = Box::new(tx);
61-
let sender = self.tx.clone();
62-
let _ = sender.send(NodeRpcRequest { req, responder }).await;
63-
64-
rx.await.ok()
65-
}
66-
67-
pub async fn multishot_request<T>(
68-
&self,
69-
expected_messages: usize,
70-
req: RpcRequest,
71-
) -> mpsc::Receiver<T>
72-
where
73-
T: 'static + Send + Serialize,
74-
{
75-
let (tx, rx) = mpsc::channel::<T>(expected_messages);
76-
let responder = Box::new(tx);
77-
let sender = self.tx.clone();
78-
let _ = sender.send(NodeRpcRequest { req, responder }).await;
79-
80-
rx
81-
}
82-
83-
pub async fn peer_connect(
84-
&self,
85-
opts: P2pConnectionOutgoingInitOpts,
86-
) -> Result<String, String> {
87-
let peer_id = opts.peer_id().to_string();
88-
let req = RpcRequest::P2pConnectionOutgoing(opts);
89-
self.oneshot_request::<RpcP2pConnectionOutgoingResponse>(req)
90-
.await
91-
.ok_or_else(|| "state machine shut down".to_owned())??;
92-
93-
Ok(peer_id)
94-
}
95-
}
96-
9749
impl Default for RpcService {
9850
fn default() -> Self {
9951
Self::new()
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#[cfg(target_family = "wasm")]
2+
use gloo_utils::format::JsValueSerdeExt;
3+
use serde::Serialize;
4+
#[cfg(target_family = "wasm")]
5+
use wasm_bindgen::prelude::*;
6+
7+
use node::core::channels::{mpsc, oneshot};
8+
use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts;
9+
use node::rpc::*;
10+
11+
use super::stats::Stats;
12+
use super::NodeRpcRequest;
13+
14+
#[derive(Clone)]
15+
#[cfg_attr(target_family = "wasm", wasm_bindgen)]
16+
pub struct RpcSender {
17+
tx: mpsc::Sender<NodeRpcRequest>,
18+
}
19+
20+
impl RpcSender {
21+
pub fn new(tx: mpsc::Sender<NodeRpcRequest>) -> Self {
22+
Self { tx }
23+
}
24+
25+
pub async fn oneshot_request<T>(&self, req: RpcRequest) -> Option<T>
26+
where
27+
T: 'static + Send + Serialize,
28+
{
29+
let (tx, rx) = oneshot::channel::<T>();
30+
let responder = Box::new(tx);
31+
let sender = self.tx.clone();
32+
let _ = sender.send(NodeRpcRequest { req, responder }).await;
33+
34+
rx.await.ok()
35+
}
36+
37+
pub async fn multishot_request<T>(
38+
&self,
39+
expected_messages: usize,
40+
req: RpcRequest,
41+
) -> mpsc::Receiver<T>
42+
where
43+
T: 'static + Send + Serialize,
44+
{
45+
let (tx, rx) = mpsc::channel::<T>(expected_messages);
46+
let responder = Box::new(tx);
47+
let sender = self.tx.clone();
48+
let _ = sender.send(NodeRpcRequest { req, responder }).await;
49+
50+
rx
51+
}
52+
}
53+
54+
impl RpcSender {
55+
pub async fn peer_connect(
56+
&self,
57+
opts: P2pConnectionOutgoingInitOpts,
58+
) -> Result<String, String> {
59+
let peer_id = opts.peer_id().to_string();
60+
let req = RpcRequest::P2pConnectionOutgoing(opts);
61+
self.oneshot_request::<RpcP2pConnectionOutgoingResponse>(req)
62+
.await
63+
.ok_or_else(|| "state machine shut down".to_owned())??;
64+
65+
Ok(peer_id)
66+
}
67+
}
68+
69+
#[cfg_attr(target_family = "wasm", wasm_bindgen)]
70+
impl RpcSender {
71+
pub fn stats(&self) -> Stats {
72+
Stats::new(self.clone())
73+
}
74+
}
75+
76+
#[cfg(target_family = "wasm")]
77+
#[cfg_attr(target_family = "wasm", wasm_bindgen)]
78+
impl RpcSender {
79+
pub async fn status(&self) -> JsValue {
80+
let res = self
81+
.oneshot_request::<RpcStatusGetResponse>(RpcRequest::StatusGet)
82+
.await
83+
.flatten();
84+
JsValue::from_serde(&res).unwrap_or_default()
85+
}
86+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#[cfg(target_family = "wasm")]
2+
use gloo_utils::format::JsValueSerdeExt;
3+
#[cfg(target_family = "wasm")]
4+
use node::rpc::{RpcBlockProducerStatsGetResponse, RpcRequest};
5+
#[cfg(target_family = "wasm")]
6+
use wasm_bindgen::prelude::*;
7+
8+
use super::RpcSender;
9+
10+
#[derive(Clone)]
11+
#[cfg_attr(target_family = "wasm", wasm_bindgen)]
12+
pub struct Stats {
13+
#[allow(unused)]
14+
sender: RpcSender,
15+
}
16+
17+
impl Stats {
18+
pub fn new(sender: RpcSender) -> Self {
19+
Self { sender }
20+
}
21+
}
22+
23+
#[cfg(target_family = "wasm")]
24+
#[cfg_attr(target_family = "wasm", wasm_bindgen)]
25+
impl Stats {
26+
pub async fn block_producer(&self) -> JsValue {
27+
let res = self
28+
.sender
29+
.oneshot_request::<RpcBlockProducerStatsGetResponse>(RpcRequest::BlockProducerStatsGet)
30+
.await
31+
.flatten();
32+
JsValue::from_serde(&res).unwrap_or_default()
33+
}
34+
}

node/web/src/lib.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![cfg(target_family = "wasm")]
22

3+
use openmina_node_common::rpc::RpcSender;
34
pub use openmina_node_common::*;
45

56
mod rayon;
@@ -15,15 +16,24 @@ use wasm_bindgen::prelude::*;
1516

1617
use crate::node::P2pTaskRemoteSpawner;
1718

18-
#[wasm_bindgen]
19-
pub async fn start() {
20-
console_error_panic_hook::set_once();
21-
tracing::initialize(tracing::Level::INFO);
19+
/// Automatically run after wasm is loaded.
20+
#[wasm_bindgen(start)]
21+
fn main() {
22+
wasm_bindgen_futures::spawn_local(async {
23+
console_error_panic_hook::set_once();
24+
tracing::initialize(tracing::Level::INFO);
2225

23-
init_rayon().await.unwrap();
26+
init_rayon().await.unwrap();
27+
});
28+
}
2429

30+
#[wasm_bindgen]
31+
pub async fn run() -> RpcSender {
32+
// TODO(binier): what if above init in the `main` function isn't done
33+
// and this function gets called.
2534
let p2p_task_spawner = P2pTaskRemoteSpawner::create();
2635

36+
let (rpc_sender_tx, rpc_sender_rx) = ::node::core::channels::oneshot::channel();
2737
let _ = thread::spawn(move || {
2838
let block_verifier_index = get_verifier_index(VerifierKind::Blockchain).into();
2939
let work_verifier_index = get_verifier_index(VerifierKind::Transaction).into();
@@ -32,16 +42,19 @@ pub async fn start() {
3242
node_builder
3343
.block_verifier_index(block_verifier_index)
3444
.work_verifier_index(work_verifier_index);
35-
node_builder.p2p_no_discovery()
45+
node_builder
46+
.p2p_no_discovery()
3647
.p2p_custom_task_spawner(p2p_task_spawner)
3748
.unwrap();
3849
node_builder.gather_stats();
3950
let mut node = node_builder.build().context("node build failed!").unwrap();
51+
let _ = rpc_sender_tx.send(node.rpc());
4052

4153
wasm_bindgen_futures::spawn_local(async move {
4254
node.run_forever().await;
4355
});
4456
wasm_bindgen::throw_str("Cursed hack to keep workers alive. See https://github.com/rustwasm/wasm-bindgen/issues/2945");
45-
}).join_async().await.unwrap();
46-
::node::core::log::info!(redux::Timestamp::global_now(); "node shut down");
57+
});
58+
59+
rpc_sender_rx.await.unwrap()
4760
}

0 commit comments

Comments
 (0)