Skip to content

Commit 79d6067

Browse files
committed
refactor available node checks, todo add network modes
1 parent ee8790f commit 79d6067

File tree

11 files changed

+317
-220
lines changed

11 files changed

+317
-220
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.2.22"
10+
version = "0.2.23"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

compute/src/node.rs

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use dkn_p2p::{libp2p::gossipsub, DriaP2PClient};
1+
use dkn_p2p::{libp2p::gossipsub, DriaP2PClient, DriaP2PProtocol};
22
use eyre::{eyre, Result};
3-
use std::time::Duration;
43
use tokio_util::sync::CancellationToken;
54

65
use crate::{
@@ -9,9 +8,6 @@ use crate::{
98
utils::{crypto::secret_to_keypair, AvailableNodes, DKNMessage},
109
};
1110

12-
/// Number of seconds between refreshing the Admin RPC PeerIDs from Dria server.
13-
const RPC_PEER_ID_REFRESH_INTERVAL_SECS: u64 = 30;
14-
1511
/// **Dria Compute Node**
1612
///
1713
/// Internally, the node will create a new P2P client with the given secret key.
@@ -29,7 +25,6 @@ pub struct DriaComputeNode {
2925
pub config: DriaComputeNodeConfig,
3026
pub p2p: DriaP2PClient,
3127
pub available_nodes: AvailableNodes,
32-
pub available_nodes_last_refreshed: tokio::time::Instant,
3328
pub cancellation: CancellationToken,
3429
}
3530

@@ -41,31 +36,21 @@ impl DriaComputeNode {
4136
let keypair = secret_to_keypair(&config.secret_key);
4237

4338
// get available nodes (bootstrap, relay, rpc) for p2p
44-
let available_nodes = AvailableNodes::default()
45-
.join(AvailableNodes::new_from_statics())
46-
.join(AvailableNodes::new_from_env())
47-
.join(
48-
AvailableNodes::get_available_nodes()
49-
.await
50-
.unwrap_or_default(),
51-
)
52-
.sort_dedup();
39+
let mut available_nodes =
40+
AvailableNodes::new_from_statics().join(AvailableNodes::new_from_env());
41+
available_nodes.refresh().await;
5342

5443
// we are using the major.minor version as the P2P version
5544
// so that patch versions do not interfere with the protocol
56-
const P2P_VERSION: &str = concat!(
57-
env!("CARGO_PKG_VERSION_MAJOR"),
58-
".",
59-
env!("CARGO_PKG_VERSION_MINOR")
60-
);
45+
let protocol = DriaP2PProtocol::new_major_minor("dria");
6146

6247
// create p2p client
6348
let mut p2p = DriaP2PClient::new(
6449
keypair,
6550
config.p2p_listen_addr.clone(),
6651
&available_nodes.bootstrap_nodes,
6752
&available_nodes.relay_nodes,
68-
P2P_VERSION,
53+
protocol,
6954
)?;
7055

7156
// dial rpc nodes
@@ -83,7 +68,6 @@ impl DriaComputeNode {
8368
config,
8469
cancellation,
8570
available_nodes,
86-
available_nodes_last_refreshed: tokio::time::Instant::now(),
8771
})
8872
}
8973

@@ -111,8 +95,10 @@ impl DriaComputeNode {
11195

11296
/// Publishes a given message to the network w.r.t the topic of it.
11397
///
114-
/// Internally, the message is JSON serialized to bytes and then published to the network as is.
115-
pub fn publish(&mut self, message: DKNMessage) -> Result<()> {
98+
/// Internally, identity is attached to the the message which is then JSON serialized to bytes
99+
/// and then published to the network as is.
100+
pub fn publish(&mut self, mut message: DKNMessage) -> Result<()> {
101+
message = message.with_identity(self.p2p.protocol().identity());
116102
let message_bytes = serde_json::to_vec(&message)?;
117103
let message_id = self.p2p.publish(&message.topic, message_bytes)?;
118104
log::info!("Published message ({}) to {}", message_id, message.topic);
@@ -145,11 +131,19 @@ impl DriaComputeNode {
145131
tokio::select! {
146132
event = self.p2p.process_events() => {
147133
// refresh admin rpc peer ids
148-
if self.available_nodes_last_refreshed.elapsed() > Duration::from_secs(RPC_PEER_ID_REFRESH_INTERVAL_SECS) {
134+
if self.available_nodes.can_refresh() {
149135
log::info!("Refreshing available nodes.");
150136

151-
self.available_nodes = AvailableNodes::get_available_nodes().await.unwrap_or_default().join(self.available_nodes.clone()).sort_dedup();
152-
self.available_nodes_last_refreshed = tokio::time::Instant::now();
137+
self.available_nodes.refresh().await;
138+
139+
// dial all rpc nodes for better connectivity
140+
for rpc_addr in self.available_nodes.rpc_addrs.iter() {
141+
log::debug!("Dialling RPC node: {}", rpc_addr);
142+
// TODO: does this cause resource issues?
143+
if let Err(e) = self.p2p.dial(rpc_addr.clone()) {
144+
log::warn!("Error dialling RPC node: {:?}", e);
145+
};
146+
}
153147

154148
// also print network info
155149
log::debug!("{:?}", self.p2p.network_info().connection_counters());
@@ -173,12 +167,7 @@ impl DriaComputeNode {
173167
}
174168
};
175169

176-
// log::info!(
177-
// "Received {} message ({})\nFrom: {}\nSource: {}",
178-
// topic_str,
179-
// message_id,
180-
// peer_id,
181-
// );
170+
// log the received message
182171
log::info!(
183172
"Received {} message ({}) from {}",
184173
topic_str,

compute/src/utils/available_nodes.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ const STATIC_RPC_PEER_IDS: [&str; 0] = [];
2525
/// API URL for refreshing the Admin RPC PeerIDs from Dria server.
2626
const RPC_PEER_ID_REFRESH_API_URL: &str = "https://dkn.dria.co/available-nodes";
2727

28+
/// Number of seconds between refreshing the Admin RPC PeerIDs from Dria server.
29+
const RPC_PEER_ID_REFRESH_INTERVAL_SECS: u64 = 30;
30+
2831
/// Available nodes within the hybrid P2P network.
2932
///
3033
/// - Bootstrap: used for Kademlia DHT bootstrap.
@@ -33,12 +36,14 @@ const RPC_PEER_ID_REFRESH_API_URL: &str = "https://dkn.dria.co/available-nodes";
3336
///
3437
/// Note that while bootstrap & relay nodes are `Multiaddr`, RPC nodes are `PeerId` because we communicate
3538
/// with them via GossipSub only.
36-
#[derive(Debug, Default, Clone)]
39+
#[derive(Debug, Clone)]
3740
pub struct AvailableNodes {
3841
pub bootstrap_nodes: Vec<Multiaddr>,
3942
pub relay_nodes: Vec<Multiaddr>,
4043
pub rpc_nodes: Vec<PeerId>,
4144
pub rpc_addrs: Vec<Multiaddr>,
45+
// refreshing
46+
pub last_refreshed: tokio::time::Instant,
4247
}
4348

4449
impl AvailableNodes {
@@ -69,6 +74,7 @@ impl AvailableNodes {
6974
relay_nodes: parse_vec(relay_nodes),
7075
rpc_nodes: vec![],
7176
rpc_addrs: vec![],
77+
last_refreshed: tokio::time::Instant::now(),
7278
}
7379
}
7480

@@ -79,6 +85,7 @@ impl AvailableNodes {
7985
relay_nodes: parse_vec(STATIC_RELAY_NODES.to_vec()),
8086
rpc_nodes: parse_vec(STATIC_RPC_PEER_IDS.to_vec()),
8187
rpc_addrs: vec![],
88+
last_refreshed: tokio::time::Instant::now(),
8289
}
8390
}
8491

@@ -108,6 +115,10 @@ impl AvailableNodes {
108115
self
109116
}
110117

118+
pub fn can_refresh(&self) -> bool {
119+
self.last_refreshed.elapsed().as_secs() > RPC_PEER_ID_REFRESH_INTERVAL_SECS
120+
}
121+
111122
/// Refreshes the available nodes for Bootstrap, Relay and RPC nodes.
112123
pub async fn get_available_nodes() -> Result<Self> {
113124
#[derive(serde::Deserialize, Debug)]
@@ -127,8 +138,14 @@ impl AvailableNodes {
127138
relay_nodes: parse_vec(response_body.relays),
128139
rpc_nodes: parse_vec(response_body.rpcs),
129140
rpc_addrs: parse_vec(response_body.rpc_addrs),
141+
last_refreshed: tokio::time::Instant::now(),
130142
})
131143
}
144+
145+
/// Refresh available nodes using the API.
146+
pub async fn refresh(&mut self) {
147+
todo!("TODO: refresh the available nodes")
148+
}
132149
}
133150

134151
/// Like `parse` of `str` but for vectors.

compute/src/utils/message.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ impl DKNMessage {
4949
payload: BASE64_STANDARD.encode(data),
5050
topic: topic.to_string(),
5151
version: DRIA_COMPUTE_NODE_VERSION.to_string(),
52-
identity: dkn_p2p::P2P_IDENTITY_PREFIX
53-
.trim_end_matches('/')
54-
.to_string(),
52+
identity: String::default(),
5553
timestamp: get_current_time_nanos(),
5654
}
5755
}
@@ -70,6 +68,12 @@ impl DKNMessage {
7068
Self::new(signed_data, topic)
7169
}
7270

71+
/// Sets the identity of the message.
72+
pub fn with_identity(mut self, identity: String) -> Self {
73+
self.identity = identity;
74+
self
75+
}
76+
7377
/// Decodes the base64 payload into bytes.
7478
#[inline(always)]
7579
pub fn decode_payload(&self) -> Result<Vec<u8>, base64::DecodeError> {
@@ -102,11 +106,11 @@ impl DKNMessage {
102106
let (signature_hex_bytes, body) =
103107
(&data[..SIGNATURE_SIZE_HEX - 2], &data[SIGNATURE_SIZE_HEX..]);
104108
let signature_bytes =
105-
hex::decode(signature_hex_bytes).wrap_err("Could not decode signature hex")?;
109+
hex::decode(signature_hex_bytes).wrap_err("could not decode signature hex")?;
106110

107111
// now obtain the signature itself
108112
let signature = Signature::parse_standard_slice(&signature_bytes)
109-
.wrap_err("Could not parse signature bytes")?;
113+
.wrap_err("could not parse signature bytes")?;
110114

111115
// verify signature w.r.t the body and the given public key
112116
let digest = Message::parse(&sha256hash(body));

0 commit comments

Comments
 (0)