Skip to content

Commit 9892359

Browse files
committed
use HashSet in available nodes
1 parent b734284 commit 9892359

File tree

4 files changed

+53
-70
lines changed

4 files changed

+53
-70
lines changed

compute/src/node.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ impl DriaComputeNode {
5353
let mut p2p = DriaP2PClient::new(
5454
keypair,
5555
config.p2p_listen_addr.clone(),
56-
&available_nodes.bootstrap_nodes,
57-
&available_nodes.relay_nodes,
56+
available_nodes.bootstrap_nodes.clone().into_iter(),
57+
available_nodes.relay_nodes.clone().into_iter(),
5858
protocol,
5959
)?;
6060

@@ -144,13 +144,13 @@ impl DriaComputeNode {
144144
};
145145

146146
// dial all rpc nodes for better connectivity
147-
for rpc_addr in self.available_nodes.rpc_addrs.iter() {
148-
log::debug!("Dialling RPC node: {}", rpc_addr);
149-
// TODO: does this cause resource issues?
150-
if let Err(e) = self.p2p.dial(rpc_addr.clone()) {
151-
log::warn!("Error dialling RPC node: {:?}", e);
152-
};
153-
}
147+
// for rpc_addr in self.available_nodes.rpc_addrs.iter() {
148+
// log::debug!("Dialling RPC node: {}", rpc_addr);
149+
// // TODO: does this cause resource issues?
150+
// if let Err(e) = self.p2p.dial(rpc_addr.clone()) {
151+
// log::warn!("Error dialling RPC node: {:?}", e);
152+
// };
153+
// }
154154

155155
// also print network info
156156
log::debug!("{:?}", self.p2p.network_info().connection_counters());

compute/src/utils/available_nodes/mod.rs

Lines changed: 32 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use dkn_p2p::libp2p::{Multiaddr, PeerId};
22
use dkn_workflows::split_csv_line;
33
use eyre::Result;
4-
use std::{env, fmt::Debug, str::FromStr};
4+
use std::{collections::HashSet, env, fmt::Debug, str::FromStr};
55
use tokio::time::Instant;
66

77
mod statics;
88

99
use crate::DriaNetworkType;
1010

1111
/// Number of seconds between refreshing the available nodes.
12-
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 30;
12+
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 5;
1313

1414
/// Available nodes within the hybrid P2P network.
1515
///
@@ -21,10 +21,10 @@ const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 30;
2121
/// with them via GossipSub only.
2222
#[derive(Debug, Clone)]
2323
pub struct AvailableNodes {
24-
pub bootstrap_nodes: Vec<Multiaddr>,
25-
pub relay_nodes: Vec<Multiaddr>,
26-
pub rpc_nodes: Vec<PeerId>,
27-
pub rpc_addrs: Vec<Multiaddr>,
24+
pub bootstrap_nodes: HashSet<Multiaddr>,
25+
pub relay_nodes: HashSet<Multiaddr>,
26+
pub rpc_nodes: HashSet<PeerId>,
27+
pub rpc_addrs: HashSet<Multiaddr>,
2828
pub last_refreshed: Instant,
2929
pub network_type: DriaNetworkType,
3030
pub refresh_interval_secs: u64,
@@ -34,10 +34,10 @@ impl AvailableNodes {
3434
/// Creates a new `AvailableNodes` struct for the given network type.
3535
pub fn new(network: DriaNetworkType) -> Self {
3636
Self {
37-
bootstrap_nodes: vec![],
38-
relay_nodes: vec![],
39-
rpc_nodes: vec![],
40-
rpc_addrs: vec![],
37+
bootstrap_nodes: HashSet::new(),
38+
relay_nodes: HashSet::new(),
39+
rpc_nodes: HashSet::new(),
40+
rpc_addrs: HashSet::new(),
4141
last_refreshed: Instant::now(),
4242
network_type: network,
4343
refresh_interval_secs: DEFAULT_REFRESH_INTERVAL_SECS,
@@ -63,6 +63,8 @@ impl AvailableNodes {
6363
} else {
6464
log::debug!("Using additional bootstrap nodes: {:#?}", bootstrap_nodes);
6565
}
66+
self.bootstrap_nodes
67+
.extend(parse_vec(bootstrap_nodes).into_iter());
6668

6769
// parse relay nodes
6870
let relay_nodes = split_csv_line(&env::var("DKN_RELAY_NODES").unwrap_or_default());
@@ -71,47 +73,17 @@ impl AvailableNodes {
7173
} else {
7274
log::debug!("Using additional relay nodes: {:#?}", relay_nodes);
7375
}
74-
75-
self.bootstrap_nodes = parse_vec(bootstrap_nodes);
76-
self.relay_nodes = parse_vec(relay_nodes);
76+
self.relay_nodes.extend(parse_vec(relay_nodes).into_iter());
7777
}
7878

7979
/// Adds the static nodes to the struct, with respect to network type.
8080
pub fn populate_with_statics(&mut self) {
81-
self.bootstrap_nodes = self.network_type.get_static_bootstrap_nodes();
82-
self.relay_nodes = self.network_type.get_static_relay_nodes();
83-
self.rpc_nodes = self.network_type.get_static_rpc_peer_ids();
84-
}
85-
86-
/// Joins the struct with another `AvailableNodes` struct.
87-
pub fn join(mut self, other: Self) -> Self {
88-
self.bootstrap_nodes.extend(other.bootstrap_nodes);
89-
self.relay_nodes.extend(other.relay_nodes);
90-
self.rpc_nodes.extend(other.rpc_nodes);
91-
self.rpc_addrs.extend(other.rpc_addrs);
92-
self
93-
}
94-
95-
/// Removes duplicates within all fields.
96-
pub fn sort_dedup(mut self) -> Self {
97-
self.bootstrap_nodes.sort_unstable();
98-
self.bootstrap_nodes.dedup();
99-
100-
self.relay_nodes.sort_unstable();
101-
self.relay_nodes.dedup();
102-
103-
self.rpc_nodes.sort_unstable();
104-
self.rpc_nodes.dedup();
105-
106-
self.rpc_addrs.sort_unstable();
107-
self.rpc_addrs.dedup();
108-
109-
self
110-
}
111-
112-
/// Returns whether enough time has passed since the last refresh.
113-
pub fn can_refresh(&self) -> bool {
114-
self.last_refreshed.elapsed().as_secs() > self.refresh_interval_secs
81+
self.bootstrap_nodes
82+
.extend(self.network_type.get_static_bootstrap_nodes().into_iter());
83+
self.relay_nodes
84+
.extend(self.network_type.get_static_relay_nodes().into_iter());
85+
self.rpc_nodes
86+
.extend(self.network_type.get_static_rpc_peer_ids().into_iter());
11587
}
11688

11789
/// Refresh available nodes using the API.
@@ -132,15 +104,23 @@ impl AvailableNodes {
132104
};
133105
let response = reqwest::get(url).await?;
134106
let response_body = response.json::<AvailableNodesApiResponse>().await?;
135-
136-
self.bootstrap_nodes = parse_vec(response_body.bootstraps);
137-
self.relay_nodes = parse_vec(response_body.relays);
138-
self.rpc_nodes = parse_vec(response_body.rpcs);
139-
self.rpc_addrs = parse_vec(response_body.rpc_addrs);
107+
self.bootstrap_nodes
108+
.extend(parse_vec(response_body.bootstraps).into_iter());
109+
self.relay_nodes
110+
.extend(parse_vec(response_body.relays).into_iter());
111+
self.rpc_addrs
112+
.extend(parse_vec(response_body.rpc_addrs).into_iter());
113+
self.rpc_nodes
114+
.extend(parse_vec::<PeerId>(response_body.rpcs).into_iter());
140115
self.last_refreshed = Instant::now();
141116

142117
Ok(())
143118
}
119+
120+
/// Returns whether enough time has passed since the last refresh.
121+
pub fn can_refresh(&self) -> bool {
122+
self.last_refreshed.elapsed().as_secs() > self.refresh_interval_secs
123+
}
144124
}
145125

146126
/// Like `parse` of `str` but for vectors.

p2p/src/client.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ impl DriaP2PClient {
4242
pub fn new(
4343
keypair: Keypair,
4444
listen_addr: Multiaddr,
45-
bootstraps: &[Multiaddr],
46-
relays: &[Multiaddr],
45+
bootstraps: impl Iterator<Item = Multiaddr>,
46+
relays: impl Iterator<Item = Multiaddr>,
4747
protocol: DriaP2PProtocol,
4848
) -> Result<Self> {
4949
// this is our peerId
@@ -80,19 +80,16 @@ impl DriaP2PClient {
8080
.set_mode(Some(libp2p::kad::Mode::Server));
8181

8282
// initiate bootstrap
83-
log::info!("Initiating bootstrap: {:#?}", bootstraps);
8483
for addr in bootstraps {
84+
log::info!("Dialling bootstrap: {:#?}", addr);
8585
if let Some(peer_id) = addr.iter().find_map(|p| match p {
8686
Protocol::P2p(peer_id) => Some(peer_id),
8787
_ => None,
8888
}) {
8989
log::info!("Dialling peer: {}", addr);
9090
swarm.dial(addr.clone())?;
9191
log::info!("Adding {} to Kademlia routing table", addr);
92-
swarm
93-
.behaviour_mut()
94-
.kademlia
95-
.add_address(&peer_id, addr.clone());
92+
swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
9693
} else {
9794
log::warn!("Missing peerID in address: {}", addr);
9895
}
@@ -111,8 +108,8 @@ impl DriaP2PClient {
111108
log::info!("Listening p2p network on: {}", listen_addr);
112109
swarm.listen_on(listen_addr)?;
113110

114-
log::info!("Listening to relay nodes: {:#?}", relays);
115111
for addr in relays {
112+
log::info!("Listening to relay: {}", addr);
116113
swarm.listen_on(addr.clone().with(Protocol::P2pCircuit))?;
117114
}
118115

p2p/tests/listen_test.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ async fn test_listen_topic_once() -> Result<()> {
2525
"/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa",
2626
)?];
2727
let protocol = DriaP2PProtocol::new_major_minor("dria");
28-
let mut client = DriaP2PClient::new(keypair, addr, &bootstraps, &relays, protocol)?;
28+
let mut client = DriaP2PClient::new(
29+
keypair,
30+
addr,
31+
bootstraps.into_iter(),
32+
relays.into_iter(),
33+
protocol,
34+
)?;
2935

3036
// subscribe to the given topic
3137
client.subscribe(TOPIC)?;

0 commit comments

Comments
 (0)