Skip to content

Commit a5c8680

Browse files
committed
few rfks, todo: rpc choice logic
1 parent 569cec3 commit a5c8680

File tree

5 files changed

+27
-25
lines changed

5 files changed

+27
-25
lines changed

compute/src/node/diagnostic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl DriaComputeNode {
9999
};
100100

101101
// dial all rpc nodes
102-
for addr in self.dria_nodes.rpc_nodes.iter() {
102+
for addr in self.dria_nodes.rpc_addrs.iter() {
103103
log::info!("Dialling RPC node: {}", addr);
104104

105105
// get peer id from rpc address

compute/src/reqres/heartbeat.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@ pub struct HeartbeatRequest {
3232
pub struct HeartbeatResponse {
3333
/// UUID as given in the request.
3434
pub(crate) heartbeat_id: Uuid,
35-
/// Acknowledgement of the heartbeat.
36-
pub(crate) ack: bool,
35+
/// An associated error with the response,
36+
///
37+
/// - `None` means that the heartbeat was acknowledged.
38+
/// - `Some` means that the heartbeat was not acknowledged for the given reason.
39+
pub(crate) error: Option<String>,
3740
}
3841

3942
impl IsResponder for HeartbeatRequester {
@@ -80,14 +83,18 @@ impl HeartbeatRequester {
8083
res: HeartbeatResponse,
8184
) -> Result<()> {
8285
if let Some(deadline) = node.heartbeats.remove(&res.heartbeat_id) {
83-
if !res.ack {
84-
Err(eyre!("Heartbeat was not acknowledged."))
85-
} else if chrono::Utc::now() > deadline {
86-
Err(eyre!("Acknowledged heartbeat was past the deadline."))
86+
if let Some(err) = res.error {
87+
Err(eyre!("Heartbeat was not acknowledged: {}", err))
8788
} else {
89+
// acknowledge heartbeat
8890
node.last_heartbeat_at = chrono::Utc::now();
8991
node.num_heartbeats += 1;
9092

93+
// for diagnostics, we can check if the heartbeat was past its deadline as well
94+
if chrono::Utc::now() > deadline {
95+
log::warn!("Acknowledged heartbeat was past its deadline.")
96+
}
97+
9198
Ok(())
9299
}
93100
} else {

compute/src/utils/nodes.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use dkn_p2p::{libp2p::PeerId, DriaNetworkType, DriaNodes};
1+
use dkn_p2p::{
2+
libp2p::{Multiaddr, PeerId},
3+
DriaNetworkType, DriaNodes,
4+
};
25
use dkn_utils::parse_vec;
36
use eyre::Result;
47

@@ -8,7 +11,7 @@ pub async fn refresh_dria_nodes(nodes: &mut DriaNodes) -> Result<()> {
811
struct DriaNodesApiResponse {
912
pub rpcs: Vec<String>,
1013
#[serde(rename = "rpcAddrs")]
11-
pub rpc_addrs: Vec<String>,
14+
pub rpc_addrs: Vec<Multiaddr>,
1215
}
1316

1417
// url to be used is determined by the network type
@@ -22,12 +25,7 @@ pub async fn refresh_dria_nodes(nodes: &mut DriaNodes) -> Result<()> {
2225
let response = reqwest::get(url).await?;
2326
let response_body = response.json::<DriaNodesApiResponse>().await?;
2427

25-
nodes
26-
.rpc_nodes
27-
.extend(parse_vec(response_body.rpc_addrs).unwrap_or_else(|e| {
28-
log::error!("Failed to parse rpc nodes: {}", e);
29-
vec![]
30-
}));
28+
nodes.rpc_addrs.extend(response_body.rpc_addrs);
3129
nodes
3230
.rpc_peerids
3331
.extend(parse_vec::<PeerId>(response_body.rpcs).unwrap_or_else(|e| {
@@ -46,12 +44,12 @@ mod tests {
4644
async fn test_refresh_dria_nodes() {
4745
let mut nodes = DriaNodes::new(DriaNetworkType::Community);
4846
refresh_dria_nodes(&mut nodes).await.unwrap();
49-
assert!(!nodes.rpc_nodes.is_empty());
47+
assert!(!nodes.rpc_addrs.is_empty());
5048
assert!(!nodes.rpc_peerids.is_empty());
5149

5250
let mut nodes = DriaNodes::new(DriaNetworkType::Pro);
5351
refresh_dria_nodes(&mut nodes).await.unwrap();
54-
assert!(!nodes.rpc_nodes.is_empty());
52+
assert!(!nodes.rpc_addrs.is_empty());
5553
assert!(!nodes.rpc_peerids.is_empty());
5654
}
5755
}

p2p/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl DriaP2PClient {
8383

8484
// dial rpc nodes
8585
// this will cause `identify` event to be called on their side
86-
for rpc_addr in nodes.rpc_nodes.iter().cloned() {
86+
for rpc_addr in nodes.rpc_addrs.iter().cloned() {
8787
log::info!("Dialing RPC node: {}", rpc_addr);
8888
if let Err(e) = swarm.dial(rpc_addr) {
8989
log::error!("Could not dial RPC node: {:?}", e);

p2p/src/nodes.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@ use std::{collections::HashSet, fmt::Debug};
33

44
use crate::DriaNetworkType;
55

6-
/// Dria-owned nodes within the hybrid P2P network.
7-
///
8-
/// - RPC: used for RPC nodes for task & ping messages.
96
#[derive(Debug, Clone)]
107
pub struct DriaNodes {
11-
pub rpc_nodes: HashSet<Multiaddr>,
8+
pub rpc_addrs: HashSet<Multiaddr>,
129
pub rpc_peerids: HashSet<PeerId>,
1310
pub network: DriaNetworkType,
1411
}
@@ -17,13 +14,13 @@ impl DriaNodes {
1714
/// Creates a new `AvailableNodes` struct for the given network type.
1815
pub fn new(network: DriaNetworkType) -> Self {
1916
Self {
20-
rpc_nodes: HashSet::new(),
17+
rpc_addrs: HashSet::new(),
2118
rpc_peerids: HashSet::new(),
2219
network,
2320
}
2421
}
2522
pub fn with_rpc_nodes(mut self, addresses: impl IntoIterator<Item = Multiaddr>) -> Self {
26-
self.rpc_nodes.extend(addresses);
23+
self.rpc_addrs.extend(addresses);
2724
self
2825
}
2926

@@ -34,7 +31,7 @@ impl DriaNodes {
3431

3532
/// Adds the static nodes to the struct, with respect to network type.
3633
pub fn with_statics(mut self) -> Self {
37-
self.rpc_nodes.extend(self.network.get_static_rpc_nodes());
34+
self.rpc_addrs.extend(self.network.get_static_rpc_nodes());
3835
self.rpc_peerids
3936
.extend(self.network.get_static_rpc_peer_ids());
4037

0 commit comments

Comments
 (0)