Skip to content

Commit b734284

Browse files
committed
better available node logic & network selections
1 parent 79d6067 commit b734284

File tree

8 files changed

+226
-104
lines changed

8 files changed

+226
-104
lines changed

compute/src/config.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,33 @@ use libsecp256k1::{PublicKey, SecretKey};
99

1010
use std::{env, str::FromStr};
1111

12+
/// Network type.
13+
#[derive(Default, Debug, Clone, Copy)]
14+
pub enum DriaNetworkType {
15+
#[default]
16+
Community,
17+
Pro,
18+
}
19+
20+
impl From<&str> for DriaNetworkType {
21+
fn from(s: &str) -> Self {
22+
match s {
23+
"community" => DriaNetworkType::Community,
24+
"pro" => DriaNetworkType::Pro,
25+
_ => Default::default(),
26+
}
27+
}
28+
}
29+
30+
impl DriaNetworkType {
31+
pub fn protocol_name(&self) -> &str {
32+
match self {
33+
DriaNetworkType::Community => "dria",
34+
DriaNetworkType::Pro => "dria-sdk",
35+
}
36+
}
37+
}
38+
1239
#[derive(Debug, Clone)]
1340
pub struct DriaComputeNodeConfig {
1441
/// Wallet secret/private key.
@@ -23,6 +50,8 @@ pub struct DriaComputeNodeConfig {
2350
pub p2p_listen_addr: Multiaddr,
2451
/// Workflow configurations, e.g. models and providers.
2552
pub workflows: DriaWorkflowsConfig,
53+
/// Network type of the node.
54+
pub network_type: DriaNetworkType,
2655
}
2756

2857
/// The default P2P network listen address.
@@ -104,13 +133,19 @@ impl DriaComputeNodeConfig {
104133
let p2p_listen_addr = Multiaddr::from_str(&p2p_listen_addr_str)
105134
.expect("Could not parse the given P2P listen address.");
106135

136+
// parse network type
137+
let network_type = env::var("DKN_NETWORK")
138+
.map(|s| DriaNetworkType::from(s.as_str()))
139+
.unwrap_or_default();
140+
107141
Self {
108142
admin_public_key,
109143
secret_key,
110144
public_key,
111145
address,
112146
workflows,
113147
p2p_listen_addr,
148+
network_type,
114149
}
115150
}
116151

compute/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ pub(crate) mod utils;
1111
/// This value is attached within the published messages.
1212
pub const DRIA_COMPUTE_NODE_VERSION: &str = env!("CARGO_PKG_VERSION");
1313

14-
pub use config::DriaComputeNodeConfig;
14+
pub use config::{DriaComputeNodeConfig, DriaNetworkType};
1515
pub use node::DriaComputeNode;

compute/src/node.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,21 @@ impl DriaComputeNode {
3333
config: DriaComputeNodeConfig,
3434
cancellation: CancellationToken,
3535
) -> Result<Self> {
36+
// create the keypair from secret key
3637
let keypair = secret_to_keypair(&config.secret_key);
3738

3839
// get available nodes (bootstrap, relay, rpc) for p2p
39-
let mut available_nodes =
40-
AvailableNodes::new_from_statics().join(AvailableNodes::new_from_env());
41-
available_nodes.refresh().await;
40+
let mut available_nodes = AvailableNodes::new(config.network_type);
41+
available_nodes.populate_with_statics();
42+
available_nodes.populate_with_env();
43+
if let Err(e) = available_nodes.populate_with_api().await {
44+
log::error!("Error populating available nodes: {:?}", e);
45+
};
4246

4347
// we are using the major.minor version as the P2P version
4448
// so that patch versions do not interfere with the protocol
45-
let protocol = DriaP2PProtocol::new_major_minor("dria");
49+
let protocol = DriaP2PProtocol::new_major_minor(config.network_type.protocol_name());
50+
log::info!("Using identity: {}", protocol);
4651

4752
// create p2p client
4853
let mut p2p = DriaP2PClient::new(
@@ -134,7 +139,9 @@ impl DriaComputeNode {
134139
if self.available_nodes.can_refresh() {
135140
log::info!("Refreshing available nodes.");
136141

137-
self.available_nodes.refresh().await;
142+
if let Err(e) = self.available_nodes.populate_with_api().await {
143+
log::error!("Error refreshing available nodes: {:?}", e);
144+
};
138145

139146
// dial all rpc nodes for better connectivity
140147
for rpc_addr in self.available_nodes.rpc_addrs.iter() {

compute/src/utils/available_nodes.rs renamed to compute/src/utils/available_nodes/mod.rs

Lines changed: 59 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,14 @@ use dkn_p2p::libp2p::{Multiaddr, PeerId};
22
use dkn_workflows::split_csv_line;
33
use eyre::Result;
44
use std::{env, fmt::Debug, str::FromStr};
5+
use tokio::time::Instant;
56

6-
/// Static bootstrap nodes for the Kademlia DHT bootstrap step.
7-
const STATIC_BOOTSTRAP_NODES: [&str; 4] = [
8-
"/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4",
9-
"/ip4/18.234.39.91/tcp/4001/p2p/16Uiu2HAmJqegPzwuGKWzmb5m3RdSUJ7NhEGWB5jNCd3ca9zdQ9dU",
10-
"/ip4/54.242.44.217/tcp/4001/p2p/16Uiu2HAmR2sAoh9F8jT9AZup9y79Mi6NEFVUbwRvahqtWamfabkz",
11-
"/ip4/52.201.242.227/tcp/4001/p2p/16Uiu2HAmFEUCy1s1gjyHfc8jey4Wd9i5bSDnyFDbWTnbrF2J3KFb",
12-
];
7+
mod statics;
138

14-
/// Static relay nodes for the `P2pCircuit`.
15-
const STATIC_RELAY_NODES: [&str; 4] = [
16-
"/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa",
17-
"/ip4/18.232.93.227/tcp/4001/p2p/16Uiu2HAmHeGKhWkXTweHJTA97qwP81ww1W2ntGaebeZ25ikDhd4z",
18-
"/ip4/54.157.219.194/tcp/4001/p2p/16Uiu2HAm7A5QVSy5FwrXAJdNNsdfNAcaYahEavyjnFouaEi22dcq",
19-
"/ip4/54.88.171.104/tcp/4001/p2p/16Uiu2HAm5WP1J6bZC3aHxd7XCUumMt9txAystmbZSaMS2omHepXa",
20-
];
9+
use crate::DriaNetworkType;
2110

22-
/// Static RPC Peer IDs for the Admin RPC.
23-
const STATIC_RPC_PEER_IDS: [&str; 0] = [];
24-
25-
/// API URL for refreshing the Admin RPC PeerIDs from Dria server.
26-
const RPC_PEER_ID_REFRESH_API_URL: &str = "https://dkn.dria.co/available-nodes";
27-
28-
/// Number of seconds between refreshing the Admin RPC PeerIDs from Dria server.
29-
const RPC_PEER_ID_REFRESH_INTERVAL_SECS: u64 = 30;
11+
/// Number of seconds between refreshing the available nodes.
12+
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 30;
3013

3114
/// Available nodes within the hybrid P2P network.
3215
///
@@ -42,17 +25,37 @@ pub struct AvailableNodes {
4225
pub relay_nodes: Vec<Multiaddr>,
4326
pub rpc_nodes: Vec<PeerId>,
4427
pub rpc_addrs: Vec<Multiaddr>,
45-
// refreshing
46-
pub last_refreshed: tokio::time::Instant,
28+
pub last_refreshed: Instant,
29+
pub network_type: DriaNetworkType,
30+
pub refresh_interval_secs: u64,
4731
}
4832

4933
impl AvailableNodes {
34+
/// Creates a new `AvailableNodes` struct for the given network type.
35+
pub fn new(network: DriaNetworkType) -> Self {
36+
Self {
37+
bootstrap_nodes: vec![],
38+
relay_nodes: vec![],
39+
rpc_nodes: vec![],
40+
rpc_addrs: vec![],
41+
last_refreshed: Instant::now(),
42+
network_type: network,
43+
refresh_interval_secs: DEFAULT_REFRESH_INTERVAL_SECS,
44+
}
45+
}
46+
47+
/// Sets the refresh interval in seconds.
48+
pub fn with_refresh_interval(mut self, interval_secs: u64) -> Self {
49+
self.refresh_interval_secs = interval_secs;
50+
self
51+
}
52+
5053
/// Parses static bootstrap & relay nodes from environment variables.
5154
///
5255
/// The environment variables are:
5356
/// - `DRIA_BOOTSTRAP_NODES`: comma-separated list of bootstrap nodes
5457
/// - `DRIA_RELAY_NODES`: comma-separated list of relay nodes
55-
pub fn new_from_env() -> Self {
58+
pub fn populate_with_env(&mut self) {
5659
// parse bootstrap nodes
5760
let bootstrap_nodes = split_csv_line(&env::var("DKN_BOOTSTRAP_NODES").unwrap_or_default());
5861
if bootstrap_nodes.is_empty() {
@@ -69,24 +72,15 @@ impl AvailableNodes {
6972
log::debug!("Using additional relay nodes: {:#?}", relay_nodes);
7073
}
7174

72-
Self {
73-
bootstrap_nodes: parse_vec(bootstrap_nodes),
74-
relay_nodes: parse_vec(relay_nodes),
75-
rpc_nodes: vec![],
76-
rpc_addrs: vec![],
77-
last_refreshed: tokio::time::Instant::now(),
78-
}
75+
self.bootstrap_nodes = parse_vec(bootstrap_nodes);
76+
self.relay_nodes = parse_vec(relay_nodes);
7977
}
8078

81-
/// Creates a new `AvailableNodes` struct from the static nodes, hardcoded within the code.
82-
pub fn new_from_statics() -> Self {
83-
Self {
84-
bootstrap_nodes: parse_vec(STATIC_BOOTSTRAP_NODES.to_vec()),
85-
relay_nodes: parse_vec(STATIC_RELAY_NODES.to_vec()),
86-
rpc_nodes: parse_vec(STATIC_RPC_PEER_IDS.to_vec()),
87-
rpc_addrs: vec![],
88-
last_refreshed: tokio::time::Instant::now(),
89-
}
79+
/// Adds the static nodes to the struct, with respect to network type.
80+
pub fn populate_with_statics(&mut self) {
81+
self.bootstrap_nodes = self.network_type.get_static_bootstrap_nodes();
82+
self.relay_nodes = self.network_type.get_static_relay_nodes();
83+
self.rpc_nodes = self.network_type.get_static_rpc_peer_ids();
9084
}
9185

9286
/// Joins the struct with another `AvailableNodes` struct.
@@ -115,13 +109,14 @@ impl AvailableNodes {
115109
self
116110
}
117111

112+
/// Returns whether enough time has passed since the last refresh.
118113
pub fn can_refresh(&self) -> bool {
119-
self.last_refreshed.elapsed().as_secs() > RPC_PEER_ID_REFRESH_INTERVAL_SECS
114+
self.last_refreshed.elapsed().as_secs() > self.refresh_interval_secs
120115
}
121116

122-
/// Refreshes the available nodes for Bootstrap, Relay and RPC nodes.
123-
pub async fn get_available_nodes() -> Result<Self> {
124-
#[derive(serde::Deserialize, Debug)]
117+
/// Refresh available nodes using the API.
118+
pub async fn populate_with_api(&mut self) -> Result<()> {
119+
#[derive(serde::Deserialize, Default, Debug)]
125120
struct AvailableNodesApiResponse {
126121
pub bootstraps: Vec<String>,
127122
pub relays: Vec<String>,
@@ -130,21 +125,21 @@ impl AvailableNodes {
130125
pub rpc_addrs: Vec<String>,
131126
}
132127

133-
let response = reqwest::get(RPC_PEER_ID_REFRESH_API_URL).await?;
128+
// make the request w.r.t network type
129+
let url = match self.network_type {
130+
DriaNetworkType::Community => "https://dkn.dria.co/available-nodes",
131+
DriaNetworkType::Pro => "https://dkn.dria.co/sdk/available-nodes",
132+
};
133+
let response = reqwest::get(url).await?;
134134
let response_body = response.json::<AvailableNodesApiResponse>().await?;
135135

136-
Ok(Self {
137-
bootstrap_nodes: parse_vec(response_body.bootstraps),
138-
relay_nodes: parse_vec(response_body.relays),
139-
rpc_nodes: parse_vec(response_body.rpcs),
140-
rpc_addrs: parse_vec(response_body.rpc_addrs),
141-
last_refreshed: tokio::time::Instant::now(),
142-
})
143-
}
136+
self.bootstrap_nodes = parse_vec(response_body.bootstraps);
137+
self.relay_nodes = parse_vec(response_body.relays);
138+
self.rpc_nodes = parse_vec(response_body.rpcs);
139+
self.rpc_addrs = parse_vec(response_body.rpc_addrs);
140+
self.last_refreshed = Instant::now();
144141

145-
/// Refresh available nodes using the API.
146-
pub async fn refresh(&mut self) {
147-
todo!("TODO: refresh the available nodes")
142+
Ok(())
148143
}
149144
}
150145

@@ -171,7 +166,12 @@ mod tests {
171166
#[tokio::test]
172167
#[ignore = "run this manually"]
173168
async fn test_get_available_nodes() {
174-
let available_nodes = AvailableNodes::get_available_nodes().await.unwrap();
175-
println!("{:#?}", available_nodes);
169+
let mut available_nodes = AvailableNodes::new(DriaNetworkType::Community);
170+
available_nodes.populate_with_api().await.unwrap();
171+
println!("Community: {:#?}", available_nodes);
172+
173+
let mut available_nodes = AvailableNodes::new(DriaNetworkType::Pro);
174+
available_nodes.populate_with_api().await.unwrap();
175+
println!("Pro: {:#?}", available_nodes);
176176
}
177177
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use crate::DriaNetworkType;
2+
use dkn_p2p::libp2p::{Multiaddr, PeerId};
3+
4+
/// Static bootstrap nodes for the Kademlia DHT bootstrap step.
5+
const STATIC_BOOTSTRAP_NODES: ([&str; 4], [&str; 0]) = (
6+
// community
7+
[
8+
"/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4",
9+
"/ip4/18.234.39.91/tcp/4001/p2p/16Uiu2HAmJqegPzwuGKWzmb5m3RdSUJ7NhEGWB5jNCd3ca9zdQ9dU",
10+
"/ip4/54.242.44.217/tcp/4001/p2p/16Uiu2HAmR2sAoh9F8jT9AZup9y79Mi6NEFVUbwRvahqtWamfabkz",
11+
"/ip4/52.201.242.227/tcp/4001/p2p/16Uiu2HAmFEUCy1s1gjyHfc8jey4Wd9i5bSDnyFDbWTnbrF2J3KFb",
12+
],
13+
// pro
14+
[],
15+
);
16+
17+
/// Static relay nodes for the `P2pCircuit`.
18+
const STATIC_RELAY_NODES: ([&str; 4], [&str; 0]) = (
19+
// community
20+
[
21+
"/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa",
22+
"/ip4/18.232.93.227/tcp/4001/p2p/16Uiu2HAmHeGKhWkXTweHJTA97qwP81ww1W2ntGaebeZ25ikDhd4z",
23+
"/ip4/54.157.219.194/tcp/4001/p2p/16Uiu2HAm7A5QVSy5FwrXAJdNNsdfNAcaYahEavyjnFouaEi22dcq",
24+
"/ip4/54.88.171.104/tcp/4001/p2p/16Uiu2HAm5WP1J6bZC3aHxd7XCUumMt9txAystmbZSaMS2omHepXa",
25+
],
26+
// pro
27+
[],
28+
);
29+
30+
/// Static RPC Peer IDs for the Admin RPC.
31+
const STATIC_RPC_PEER_IDS: ([&str; 0], [&str; 0]) = (
32+
// community
33+
[],
34+
// pro
35+
[],
36+
);
37+
38+
impl DriaNetworkType {
39+
// TODO: kind of smelly code here
40+
pub fn get_static_bootstrap_nodes(&self) -> Vec<Multiaddr> {
41+
match self {
42+
DriaNetworkType::Community => STATIC_BOOTSTRAP_NODES.0.iter(),
43+
DriaNetworkType::Pro => STATIC_BOOTSTRAP_NODES.1.iter(),
44+
}
45+
.filter_map(|s| s.parse().ok())
46+
.collect()
47+
}
48+
49+
pub fn get_static_relay_nodes(&self) -> Vec<Multiaddr> {
50+
match self {
51+
DriaNetworkType::Community => STATIC_RELAY_NODES.0.iter(),
52+
DriaNetworkType::Pro => STATIC_RELAY_NODES.1.iter(),
53+
}
54+
.filter_map(|s| s.parse().ok())
55+
.collect()
56+
}
57+
58+
pub fn get_static_rpc_peer_ids(&self) -> Vec<PeerId> {
59+
match self {
60+
DriaNetworkType::Community => STATIC_RPC_PEER_IDS.0.iter(),
61+
DriaNetworkType::Pro => STATIC_RPC_PEER_IDS.1.iter(),
62+
}
63+
.filter_map(|s| s.parse().ok())
64+
.collect()
65+
}
66+
}
67+
68+
// help me

p2p/src/behaviour.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ fn create_connection_limits_behaviour() -> connection_limits::Behaviour {
4949

5050
/// Number of established outgoing connections limit, this is directly correlated to peer count
5151
/// so limiting this will cause a limitation on peers as well.
52-
const EST_OUTGOING_LIMIT: u32 = 450;
52+
const EST_OUTGOING_LIMIT: u32 = 300;
5353

5454
let limits =
5555
ConnectionLimits::default().with_max_established_outgoing(Some(EST_OUTGOING_LIMIT));

0 commit comments

Comments
 (0)