Skip to content

Commit 37cf677

Browse files
committed
added RPC api logic
1 parent a5c8680 commit 37cf677

File tree

12 files changed

+127
-210
lines changed

12 files changed

+127
-210
lines changed

compute/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,5 @@ pub mod workers;
99
/// This value is attached within the published messages.
1010
pub const DRIA_COMPUTE_NODE_VERSION: &str = env!("CARGO_PKG_VERSION");
1111

12-
pub use utils::refresh_dria_nodes;
13-
1412
pub use config::DriaComputeNodeConfig;
1513
pub use node::DriaComputeNode;

compute/src/node/diagnostic.rs

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use colored::Colorize;
2-
use dkn_p2p::libp2p::multiaddr::Protocol;
32
use std::time::Duration;
43

5-
use crate::{refresh_dria_nodes, utils::get_points, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
4+
use crate::{utils::get_points, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
65

76
/// Number of seconds such that if the last heartbeat ACK is older than this, the node is considered unreachable.
87
const HEARTBEAT_LIVENESS_SECS: Duration = Duration::from_secs(150);
@@ -81,51 +80,31 @@ impl DriaComputeNode {
8180
HEARTBEAT_LIVENESS_SECS.as_secs()
8281
);
8382
}
84-
85-
// added rpc nodes check, sometimes this happens when API is down / bugs for some reason
86-
if self.dria_nodes.rpc_peerids.is_empty() {
87-
log::error!("No RPC peer IDs were found to be available, please restart your node!",);
88-
}
8983
}
9084

9185
/// Updates the local list of available nodes by refreshing it.
9286
/// Dials the RPC nodes again for better connectivity.
9387
pub(crate) async fn handle_available_nodes_refresh(&mut self) {
9488
log::info!("Refreshing available Dria nodes.");
9589

96-
// refresh available nodes
97-
if let Err(e) = refresh_dria_nodes(&mut self.dria_nodes).await {
98-
log::error!("Error refreshing available nodes: {:?}", e);
90+
// FIXME: what to do for refreshing nodes
91+
// if let Err(e) = refresh_dria_nodes(&mut self.dria_nodes).await {
92+
// log::error!("Error refreshing available nodes: {:?}", e);
93+
// };
94+
95+
// TODO: check if we are connected to the node, and dial again if not
96+
97+
// dial the RPC
98+
log::info!("Dialling RPC at: {}", self.dria_nodes.addr);
99+
let fut = self
100+
.p2p
101+
.dial(self.dria_nodes.peer_id, self.dria_nodes.addr.clone());
102+
match tokio::time::timeout(Duration::from_secs(10), fut).await {
103+
Err(timeout) => log::error!("Timeout dialling RPC node: {:?}", timeout),
104+
Ok(res) => match res {
105+
Err(e) => log::warn!("Error dialling RPC node: {:?}", e),
106+
Ok(_) => log::info!("Successfully dialled RPC!"),
107+
},
99108
};
100-
101-
// dial all rpc nodes
102-
for addr in self.dria_nodes.rpc_addrs.iter() {
103-
log::info!("Dialling RPC node: {}", addr);
104-
105-
// get peer id from rpc address
106-
if let Some(peer_id) = addr.iter().find_map(|p| match p {
107-
Protocol::P2p(peer_id) => Some(peer_id),
108-
_ => None,
109-
}) {
110-
let fut = self.p2p.dial(peer_id, addr.clone());
111-
match tokio::time::timeout(Duration::from_secs(10), fut).await {
112-
Err(timeout) => {
113-
log::error!("Timeout dialling RPC node: {:?}", timeout);
114-
}
115-
Ok(res) => match res {
116-
Err(e) => {
117-
log::warn!("Error dialling RPC node: {:?}", e);
118-
}
119-
Ok(_) => {
120-
log::info!("Successfully dialled RPC node: {}", addr);
121-
}
122-
},
123-
};
124-
} else {
125-
log::warn!("Missing peerID in address: {}", addr);
126-
}
127-
}
128-
129-
log::info!("Finished refreshing!");
130109
}
131110
}

compute/src/node/mod.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use dkn_p2p::{
22
libp2p::{request_response, PeerId},
3-
DriaNodes, DriaP2PClient, DriaP2PCommander, DriaP2PProtocol,
3+
DriaP2PClient, DriaP2PCommander, DriaP2PProtocol,
44
};
55
use eyre::Result;
66
use std::collections::HashMap;
77
use tokio::sync::mpsc;
88

99
use crate::{
1010
config::*,
11-
utils::{crypto::secret_to_keypair, get_points, refresh_dria_nodes, SpecCollector},
11+
utils::{crypto::secret_to_keypair, get_points, DriaRPC, SpecCollector},
1212
workers::task::{TaskWorker, TaskWorkerInput, TaskWorkerMetadata, TaskWorkerOutput},
1313
};
1414

@@ -22,7 +22,7 @@ const PUBLISH_CHANNEL_BUFSIZE: usize = 1024;
2222
pub struct DriaComputeNode {
2323
pub config: DriaComputeNodeConfig,
2424
/// Pre-defined nodes that belong to Dria, e.g. bootstraps, relays and RPCs.
25-
pub dria_nodes: DriaNodes,
25+
pub dria_nodes: DriaRPC,
2626
/// Peer-to-peer client commander to interact with the network.
2727
pub p2p: DriaP2PCommander,
2828
/// The last time the node had an acknowledged heartbeat.
@@ -70,11 +70,8 @@ impl DriaComputeNode {
7070
// create the keypair from secret key
7171
let keypair = secret_to_keypair(&config.secret_key);
7272

73-
// get available nodes (bootstrap, relay, rpc) for p2p
74-
let mut dria_nodes = DriaNodes::new(config.network_type).with_statics();
75-
if let Err(e) = refresh_dria_nodes(&mut dria_nodes).await {
76-
log::error!("Error populating available nodes: {:?}", e);
77-
};
73+
// get available rpc node
74+
let dria_nodes = DriaRPC::new(config.network_type).await;
7875

7976
// we are using the major.minor version as the P2P version
8077
// so that patch versions do not interfere with the protocol
@@ -85,7 +82,7 @@ impl DriaComputeNode {
8582
let (p2p_client, p2p_commander, request_rx) = DriaP2PClient::new(
8683
keypair,
8784
config.p2p_listen_addr.clone(),
88-
&dria_nodes,
85+
&dria_nodes.addr,
8986
protocol,
9087
)?;
9188

compute/src/node/reqres.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ impl DriaComputeNode {
3030
log::info!("Received a request ({}) from {}", request_id, peer_id);
3131

3232
// ensure that message is from the known RPCs
33-
if !self.dria_nodes.rpc_peerids.contains(&peer_id) {
33+
if self.dria_nodes.peer_id != peer_id {
3434
log::warn!("Received request from unauthorized source: {}", peer_id);
35-
log::debug!("Allowed sources: {:#?}", self.dria_nodes.rpc_peerids);
35+
log::debug!("Allowed source: {}", self.dria_nodes.peer_id);
3636
} else if let Err(e) = self.handle_request(peer_id, request, channel).await {
3737
log::error!("Error handling request: {:?}", e);
3838
}
@@ -191,16 +191,10 @@ impl DriaComputeNode {
191191
Ok(())
192192
}
193193

194+
/// Sends a heartbeat request to the configured RPC node.
195+
#[inline]
194196
pub(crate) async fn send_heartbeat(&mut self) -> Result<()> {
195-
// FIXME: how to decide the peer here?
196-
let peer_id = self
197-
.dria_nodes
198-
.rpc_peerids
199-
.iter()
200-
.last()
201-
.expect("TODO: !!!")
202-
.to_owned();
203-
197+
let peer_id = self.dria_nodes.peer_id;
204198
let request_id = HeartbeatRequester::send_heartbeat(self, peer_id).await?;
205199
log::info!("Sent heartbeat request ({}) to {}", request_id, peer_id);
206200

compute/src/utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ pub mod crypto;
33
mod message;
44
pub use message::DriaMessage;
55

6-
mod nodes;
7-
pub use nodes::*;
6+
mod rpc;
7+
pub use rpc::*;
88

99
mod specs;
1010
pub use specs::*;

compute/src/utils/nodes.rs

Lines changed: 0 additions & 55 deletions
This file was deleted.

compute/src/utils/rpc.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use dkn_p2p::libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
2+
use dkn_p2p::DriaNetworkType;
3+
use eyre::Result;
4+
use std::fmt::Debug;
5+
6+
#[derive(Debug, Clone)]
7+
pub struct DriaRPC {
8+
pub addr: Multiaddr,
9+
pub peer_id: PeerId,
10+
pub network: DriaNetworkType,
11+
}
12+
13+
impl DriaRPC {
14+
/// Creates a new `AvailableNodes` struct for the given network type.
15+
pub async fn new(network: DriaNetworkType) -> Self {
16+
let addr = refresh_rpc_addr(&network).await.expect("TODO: !!!");
17+
let peer_id = addr
18+
.iter()
19+
.find_map(|p| match p {
20+
Protocol::P2p(peer_id) => Some(peer_id),
21+
_ => None,
22+
})
23+
.expect("TODO: !!!");
24+
25+
Self {
26+
addr,
27+
peer_id,
28+
network,
29+
}
30+
}
31+
}
32+
33+
/// Calls the DKN API to get an RPC address for the given network type.
34+
///
35+
/// The peer id is expected to be within the multi-address.
36+
async fn refresh_rpc_addr(network: &DriaNetworkType) -> Result<Multiaddr> {
37+
#[derive(serde::Deserialize, Debug)]
38+
struct DriaNodesApiResponse {
39+
pub rpc: Multiaddr,
40+
}
41+
42+
// url to be used is determined by the network type
43+
let url = match network {
44+
DriaNetworkType::Community => "https://dkn.dria.co/v4/available-nodes",
45+
DriaNetworkType::Pro => "https://dkn.dria.co/v4/sdk/available-nodes",
46+
DriaNetworkType::Test => "https://dkn.dria.co/v4/test/available-nodes",
47+
};
48+
49+
// make the request
50+
let response = reqwest::get(url).await?;
51+
let response_body = response.json::<DriaNodesApiResponse>().await?;
52+
53+
Ok(response_body.rpc)
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
59+
use super::*;
60+
61+
#[tokio::test]
62+
async fn test_dria_nodes() {
63+
let node = DriaRPC::new(DriaNetworkType::Community).await;
64+
println!("{:?}", node);
65+
}
66+
67+
#[tokio::test]
68+
async fn test_extract_peer_id() {
69+
let addr: Multiaddr =
70+
"/ip4/98.85.74.179/tcp/4001/p2p/16Uiu2HAmH4YGRWuJSvo5bxdShozKSve1WaZMGzAr3GiNNzadsdaN"
71+
.parse()
72+
.unwrap();
73+
let expected_peer_id: PeerId = "16Uiu2HAmH4YGRWuJSvo5bxdShozKSve1WaZMGzAr3GiNNzadsdaN"
74+
.parse()
75+
.unwrap();
76+
77+
let peer_id = addr.iter().find_map(|p| match p {
78+
Protocol::P2p(peer_id) => Some(peer_id),
79+
_ => None,
80+
});
81+
82+
assert_eq!(Some(expected_peer_id), peer_id);
83+
}
84+
}

p2p/src/client.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::time::Duration;
1111
use tokio::sync::mpsc;
1212

1313
use crate::behaviour::{DriaBehaviour, DriaBehaviourEvent};
14-
use crate::{DriaNodes, DriaP2PProtocol};
14+
use crate::DriaP2PProtocol;
1515

1616
use super::commands::DriaP2PCommand;
1717
use super::DriaP2PCommander;
@@ -48,7 +48,7 @@ impl DriaP2PClient {
4848
pub fn new(
4949
keypair: Keypair,
5050
listen_addr: Multiaddr,
51-
nodes: &DriaNodes,
51+
rpc_addr: &Multiaddr,
5252
protocol: DriaP2PProtocol,
5353
) -> Result<(
5454
DriaP2PClient,
@@ -81,14 +81,11 @@ impl DriaP2PClient {
8181
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())?;
8282
}
8383

84-
// dial rpc nodes
85-
// this will cause `identify` event to be called on their side
86-
for rpc_addr in nodes.rpc_addrs.iter().cloned() {
87-
log::info!("Dialing RPC node: {}", rpc_addr);
88-
if let Err(e) = swarm.dial(rpc_addr) {
89-
log::error!("Could not dial RPC node: {:?}", e);
90-
};
91-
}
84+
// dial rpc node, this will cause `identify` event to be called on their side
85+
log::info!("Dialing RPC node: {}", rpc_addr);
86+
if let Err(e) = swarm.dial(rpc_addr.clone()) {
87+
log::error!("Could not dial RPC node: {:?}", e);
88+
};
9289

9390
// create commander
9491
let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_CHANNEL_BUFSIZE);

p2p/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ pub use protocol::DriaP2PProtocol;
1212
mod network;
1313
pub use network::DriaNetworkType;
1414

15-
mod nodes;
16-
pub use nodes::DriaNodes;
17-
1815
// re-exports
1916
pub use libp2p;
2017
pub use libp2p_identity;

0 commit comments

Comments
 (0)