Skip to content

Commit 95f5027

Browse files
committed
added RPC refresh logic
1 parent 37cf677 commit 95f5027

File tree

13 files changed

+102
-262
lines changed

13 files changed

+102
-262
lines changed

compute/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fastbloom-rs = "0.5.9"
4646
# machine diagnostics
4747
# system info
4848
sysinfo = "0.33.1"
49-
# gpu info TODO: this gives a build error on Windows
49+
# gpu info FIXME: this gives a build error on Windows
5050
# wgpu = { version = "23.0.1", features = [
5151
# "serde",
5252
# "dx12",

compute/src/node/core.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use eyre::Result;
1+
use dkn_p2p::libp2p::{Multiaddr, PeerId};
2+
use eyre::{eyre, Result};
23
use std::time::Duration;
34
use tokio_util::sync::CancellationToken;
45

@@ -90,6 +91,18 @@ impl DriaComputeNode {
9091
DriaMessage::new(data, topic, self.p2p.protocol(), &self.config.secret_key)
9192
}
9293

94+
/// Dial the given peer at the given address.
95+
pub async fn dial_with_timeout(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<()> {
96+
// while not yet known, some people get stuck during the dialling step,
97+
// this timeout prevents that.
98+
const DIAL_TIMEOUT: Duration = Duration::from_secs(10);
99+
100+
match tokio::time::timeout(DIAL_TIMEOUT, self.p2p.dial(peer_id, addr)).await {
101+
Err(timeout) => Err(eyre!("Timeout dialling RPC node: {}", timeout)),
102+
Ok(result) => result, // this is also a `Result` enum
103+
}
104+
}
105+
93106
/// Shutdown channels between p2p, worker and yourself.
94107
///
95108
/// Can be inlined as it is called only once from very few places.

compute/src/node/diagnostic.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use colored::Colorize;
22
use std::time::Duration;
33

4-
use crate::{utils::get_points, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
4+
use crate::{
5+
utils::{get_points, DriaRPC},
6+
DriaComputeNode, DRIA_COMPUTE_NODE_VERSION,
7+
};
58

69
/// Number of seconds such that if the last heartbeat ACK is older than this, the node is considered unreachable.
710
const HEARTBEAT_LIVENESS_SECS: Duration = Duration::from_secs(150);
@@ -82,29 +85,47 @@ impl DriaComputeNode {
8285
}
8386
}
8487

85-
/// Updates the local list of available nodes by refreshing it.
86-
/// Dials the RPC nodes again for better connectivity.
88+
/// Dials the existing RPC node if we are not connected to it.
89+
///
90+
/// If there is an error while doing that,
91+
/// it will try to get a new RPC node and dial it.
8792
pub(crate) async fn handle_available_nodes_refresh(&mut self) {
88-
log::info!("Refreshing available Dria nodes.");
93+
log::debug!("Checking RPC connections for diagnostics.");
8994

90-
// FIXME: what to do for refreshing nodes
91-
// if let Err(e) = refresh_dria_nodes(&mut self.dria_nodes).await {
92-
// log::error!("Error refreshing available nodes: {:?}", e);
93-
// };
94-
95-
// TODO: check if we are connected to the node, and dial again if not
96-
97-
// dial the RPC
98-
log::info!("Dialling RPC at: {}", self.dria_nodes.addr);
99-
let fut = self
95+
// check if we are connected
96+
let is_connected = self
10097
.p2p
101-
.dial(self.dria_nodes.peer_id, self.dria_nodes.addr.clone());
102-
match tokio::time::timeout(Duration::from_secs(10), fut).await {
103-
Err(timeout) => log::error!("Timeout dialling RPC node: {:?}", timeout),
104-
Ok(res) => match res {
105-
Err(e) => log::warn!("Error dialling RPC node: {:?}", e),
106-
Ok(_) => log::info!("Successfully dialled RPC!"),
107-
},
108-
};
98+
.is_connected(self.dria_rpc.peer_id)
99+
.await
100+
.unwrap_or(false);
101+
102+
// if we are not connected, try to dial it again
103+
if !is_connected {
104+
log::info!("Dialling RPC at: {}", self.dria_rpc.addr);
105+
if let Err(err) = self
106+
.dial_with_timeout(self.dria_rpc.peer_id, self.dria_rpc.addr.clone())
107+
.await
108+
{
109+
// if we also cannot dial it, get a new RPC node
110+
log::warn!(
111+
"Could not dial to RPC at: {}: {err:?}\nWill get a new RPC node.",
112+
self.dria_rpc.addr,
113+
);
114+
self.dria_rpc = DriaRPC::new(self.dria_rpc.network).await;
115+
116+
// now dial this new RPC again
117+
if let Err(err) = self
118+
.dial_with_timeout(self.dria_rpc.peer_id, self.dria_rpc.addr.clone())
119+
.await
120+
{
121+
// worst-case we cant dial this one too, just leave it for the next diagnostic
122+
log::error!("Could not dial the new RPC: {err:?}\nWill try again in the next diagnostic refresh.");
123+
}
124+
} else {
125+
log::info!("Successfully dialled to RPC at: {}", self.dria_rpc.addr);
126+
}
127+
} else {
128+
log::debug!("Connection with {} is intact.", self.dria_rpc.peer_id);
129+
}
109130
}
110131
}

compute/src/node/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ const PUBLISH_CHANNEL_BUFSIZE: usize = 1024;
2121

2222
pub struct DriaComputeNode {
2323
pub config: DriaComputeNodeConfig,
24-
/// Pre-defined nodes that belong to Dria, e.g. bootstraps, relays and RPCs.
25-
pub dria_nodes: DriaRPC,
24+
/// Chosen RPC node.
25+
pub dria_rpc: DriaRPC,
2626
/// Peer-to-peer client commander to interact with the network.
2727
pub p2p: DriaP2PCommander,
2828
/// The last time the node had an acknowledged heartbeat.
@@ -116,7 +116,7 @@ impl DriaComputeNode {
116116
DriaComputeNode {
117117
config,
118118
p2p: p2p_commander,
119-
dria_nodes,
119+
dria_rpc: dria_nodes,
120120
// receivers
121121
task_output_rx: publish_rx,
122122
reqres_rx: request_rx,

compute/src/node/reqres.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ impl DriaComputeNode {
3030
log::info!("Received a request ({}) from {}", request_id, peer_id);
3131

3232
// ensure that message is from the known RPCs
33-
if self.dria_nodes.peer_id != peer_id {
33+
if self.dria_rpc.peer_id != peer_id {
3434
log::warn!("Received request from unauthorized source: {}", peer_id);
35-
log::debug!("Allowed source: {}", self.dria_nodes.peer_id);
35+
log::debug!("Allowed source: {}", self.dria_rpc.peer_id);
3636
} else if let Err(e) = self.handle_request(peer_id, request, channel).await {
3737
log::error!("Error handling request: {:?}", e);
3838
}
@@ -194,7 +194,7 @@ impl DriaComputeNode {
194194
/// Sends a heartbeat request to the configured RPC node.
195195
#[inline]
196196
pub(crate) async fn send_heartbeat(&mut self) -> Result<()> {
197-
let peer_id = self.dria_nodes.peer_id;
197+
let peer_id = self.dria_rpc.peer_id;
198198
let request_id = HeartbeatRequester::send_heartbeat(self, peer_id).await?;
199199
log::info!("Sent heartbeat request ({}) to {}", request_id, peer_id);
200200

compute/src/reqres/heartbeat.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ pub struct HeartbeatRequest {
3232
pub struct HeartbeatResponse {
3333
/// UUID as given in the request.
3434
pub(crate) heartbeat_id: Uuid,
35-
/// An associated error with the response,
36-
///
35+
/// An associated error with the response:
3736
/// - `None` means that the heartbeat was acknowledged.
3837
/// - `Some` means that the heartbeat was not acknowledged for the given reason.
3938
pub(crate) error: Option<String>,
@@ -66,10 +65,9 @@ impl HeartbeatRequester {
6665
.p2p
6766
.request(
6867
peer_id,
69-
serde_json::to_vec(&heartbeat_request).expect("TODO: !!!"),
68+
serde_json::to_vec(&heartbeat_request).expect("should be serializable"),
7069
)
71-
.await
72-
.expect("TODO: !!!");
70+
.await?;
7371

7472
// add it to local heartbeats set
7573
node.heartbeats.insert(uuid, deadline);

compute/src/utils/rpc.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use dkn_p2p::DriaNetworkType;
33
use eyre::Result;
44
use std::fmt::Debug;
55

6+
/// The connected RPC node, as per the Star network topology.
67
#[derive(Debug, Clone)]
78
pub struct DriaRPC {
89
pub addr: Multiaddr,
@@ -13,14 +14,16 @@ pub struct DriaRPC {
1314
impl DriaRPC {
1415
/// Creates a new `AvailableNodes` struct for the given network type.
1516
pub async fn new(network: DriaNetworkType) -> Self {
16-
let addr = refresh_rpc_addr(&network).await.expect("TODO: !!!");
17+
let addr = refresh_rpc_addr(&network)
18+
.await
19+
.expect("could not get RPC address");
1720
let peer_id = addr
1821
.iter()
1922
.find_map(|p| match p {
2023
Protocol::P2p(peer_id) => Some(peer_id),
2124
_ => None,
2225
})
23-
.expect("TODO: !!!");
26+
.expect("returned address does not contain a peer id");
2427

2528
Self {
2629
addr,
@@ -63,22 +66,4 @@ mod tests {
6366
let node = DriaRPC::new(DriaNetworkType::Community).await;
6467
println!("{:?}", node);
6568
}
66-
67-
#[tokio::test]
68-
async fn test_extract_peer_id() {
69-
let addr: Multiaddr =
70-
"/ip4/98.85.74.179/tcp/4001/p2p/16Uiu2HAmH4YGRWuJSvo5bxdShozKSve1WaZMGzAr3GiNNzadsdaN"
71-
.parse()
72-
.unwrap();
73-
let expected_peer_id: PeerId = "16Uiu2HAmH4YGRWuJSvo5bxdShozKSve1WaZMGzAr3GiNNzadsdaN"
74-
.parse()
75-
.unwrap();
76-
77-
let peer_id = addr.iter().find_map(|p| match p {
78-
Protocol::P2p(peer_id) => Some(peer_id),
79-
_ => None,
80-
});
81-
82-
assert_eq!(Some(expected_peer_id), peer_id);
83-
}
8469
}

compute/src/workers/task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ mod tests {
332332
log::info!("Got all results, closing channel.");
333333
publish_rx.close();
334334

335-
// TODO: this bugs out
335+
// FIXME: this bugs out
336336
worker_handle.await.unwrap();
337337
log::info!("Done.");
338338
}

p2p/README.md

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,19 @@ Here is an example where we create the said entities:
2525
```rs
2626
use dkn_p2p::{DriaP2PClient, DriaP2PProtocol};
2727

28-
// your wallet, or something random maybe
29-
let keypair = Keypair::generate_secp256k1();
30-
31-
// your listen address
32-
let addr = Multiaddr::from_str("/ip4/0.0.0.0/tcp/4001")?;
33-
34-
// static bootstrap & relay & rpc addresses
35-
let bootstraps = vec![Multiaddr::from_str(
36-
"some-multiaddrs-here"
37-
)?];
38-
let relays = vec![Multiaddr::from_str(
39-
"some-multiaddrs-here"
40-
)?];
41-
let rpcs = vec![Multiaddr::from_str(
42-
"some-multiaddrs-here"
43-
)?];
44-
45-
let protocol = "0.2";
28+
let keypair = Keypair::generate_secp256k1(); // or your wallet
29+
let listen_addr = Multiaddr::from_str("/ip4/0.0.0.0/tcp/4001")?;
30+
let rpc_addr = Multiaddr::from_str("some-multiaddr-here")?;
31+
let protocol = "0.4"; // DKN protocol version
4632

4733
// `new` returns 3 things:
4834
// - p2p client itself, to be given to a thread
4935
// - p2p commander, a small client to be able to speak with the p2p in another thread
5036
// - `msg_rx`, the channel to listen for gossipsub messages
5137
let (client, mut commander, mut msg_rx) = DriaP2PClient::new(
5238
keypair,
53-
addr,
54-
bootstraps,
55-
relays,
56-
rpc,
39+
listen_addr,
40+
rpc_addr,
5741
protocol
5842
)?;
5943
```

p2p/src/behaviour.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,13 @@ pub struct DriaBehaviour {
1111
}
1212

1313
impl DriaBehaviour {
14-
pub fn new(
15-
key: &Keypair,
16-
identity_protocol: String,
17-
reqres_protocol: StreamProtocol,
18-
) -> Result<Self> {
14+
pub fn new(key: &Keypair, identity_protocol: String, reqres_protocol: StreamProtocol) -> Self {
1915
let public_key = key.public();
2016

21-
Ok(Self {
17+
Self {
2218
identify: create_identify_behaviour(public_key, identity_protocol),
2319
request_response: create_request_response_behaviour(reqres_protocol),
24-
})
20+
}
2521
}
2622
}
2723

0 commit comments

Comments
 (0)