Skip to content

Commit 6159f93

Browse files
authored
Merge pull request #126 from firstbatchxyz/erhant/addr-fix-and-port-check
better peer addr, check listen addr in use
2 parents 6997c8f + 9a7c066 commit 6159f93

File tree

7 files changed

+94
-28
lines changed

7 files changed

+94
-28
lines changed

Cargo.lock

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dkn-compute"
3-
version = "0.2.8"
3+
version = "0.2.9"
44
edition = "2021"
55
license = "Apache-2.0"
66
readme = "README.md"
@@ -70,6 +70,7 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9
7070
libp2p-identity = { version = "0.2.9", features = ["secp256k1"] }
7171
tracing = { version = "0.1.40" }
7272
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
73+
port_check = "0.2.1"
7374

7475
# Vendor OpenSSL so that its easier to build cross-platform packages
7576
[dependencies.openssl]

src/config/mod.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ mod models;
22
mod ollama;
33
mod openai;
44

5-
use crate::utils::crypto::to_address;
5+
use crate::utils::{address_in_use, crypto::to_address};
66
use eyre::{eyre, Result};
7+
use libp2p::Multiaddr;
78
use libsecp256k1::{PublicKey, SecretKey};
89
use models::ModelConfig;
910
use ollama::OllamaConfig;
1011
use ollama_workflows::ModelProvider;
1112
use openai::OpenAIConfig;
1213

13-
use std::{env, time::Duration};
14+
use std::{env, str::FromStr, time::Duration};
1415

1516
/// Timeout duration for checking model performance during a generation.
1617
const CHECK_TIMEOUT_DURATION: Duration = Duration::from_secs(80);
@@ -28,8 +29,8 @@ pub struct DriaComputeNodeConfig {
2829
pub address: [u8; 20],
2930
/// Admin public key, used for message authenticity.
3031
pub admin_public_key: PublicKey,
31-
/// P2P listen address as a string, e.g. `/ip4/0.0.0.0/tcp/4001`.
32-
pub p2p_listen_addr: String,
32+
/// P2P listen address, e.g. `/ip4/0.0.0.0/tcp/4001`.
33+
pub p2p_listen_addr: Multiaddr,
3334
/// Available LLM models & providers for the node.
3435
pub model_config: ModelConfig,
3536
/// Even if Ollama is not used, we store the host & port here.
@@ -104,9 +105,11 @@ impl DriaComputeNodeConfig {
104105
}
105106
log::info!("Models: {:?}", model_config.models);
106107

107-
let p2p_listen_addr = env::var("DKN_P2P_LISTEN_ADDR")
108+
let p2p_listen_addr_str = env::var("DKN_P2P_LISTEN_ADDR")
108109
.map(|addr| addr.trim_matches('"').to_string())
109110
.unwrap_or(DEFAULT_P2P_LISTEN_ADDR.to_string());
111+
let p2p_listen_addr = Multiaddr::from_str(&p2p_listen_addr_str)
112+
.expect("Could not parse the given P2P listen address.");
110113

111114
Self {
112115
admin_public_key,
@@ -178,6 +181,18 @@ impl DriaComputeNodeConfig {
178181
Ok(())
179182
}
180183
}
184+
185+
// ensure that listen address is free
186+
pub fn check_address_in_use(&self) -> Result<()> {
187+
if address_in_use(&self.p2p_listen_addr) {
188+
return Err(eyre!(
189+
"Listen address {} is already in use.",
190+
self.p2p_listen_addr
191+
));
192+
}
193+
194+
Ok(())
195+
}
181196
}
182197

183198
#[cfg(test)]

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ async fn main() -> Result<()> {
4646

4747
// create configurations & check required services
4848
let config = DriaComputeNodeConfig::new();
49+
config.check_address_in_use()?;
4950
let service_check_token = token.clone();
5051
let mut config_clone = config.clone();
5152
let service_check_handle = tokio::spawn(async move {

src/node.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use eyre::{eyre, Result};
2-
use libp2p::{gossipsub, Multiaddr};
3-
use std::{str::FromStr, time::Duration};
2+
use libp2p::gossipsub;
3+
use std::time::Duration;
44
use tokio_util::sync::CancellationToken;
55

66
use crate::{
@@ -40,7 +40,6 @@ impl DriaComputeNode {
4040
cancellation: CancellationToken,
4141
) -> Result<Self> {
4242
let keypair = secret_to_keypair(&config.secret_key);
43-
let listen_addr = Multiaddr::from_str(config.p2p_listen_addr.as_str())?;
4443

4544
// get available nodes (bootstrap, relay, rpc) for p2p
4645
let available_nodes = AvailableNodes::default()
@@ -53,7 +52,7 @@ impl DriaComputeNode {
5352
)
5453
.sort_dedup();
5554

56-
let p2p = P2PClient::new(keypair, listen_addr, &available_nodes)?;
55+
let p2p = P2PClient::new(keypair, config.p2p_listen_addr.clone(), &available_nodes)?;
5756

5857
Ok(DriaComputeNode {
5958
p2p,

src/p2p/client.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,7 @@ impl P2PClient {
236236
///
237237
/// - For Kademlia, we check the kademlia protocol and then add the address to the Kademlia routing table.
238238
fn handle_identify_event(&mut self, peer_id: PeerId, info: identify::Info) {
239-
// we only care about the observed address, although there may be other addresses at `info.listen_addrs`
240-
// TODO: this may be wrong
241-
let addr = info.observed_addr;
242-
243-
// check protocol string
239+
// check identify protocol string
244240
if info.protocol_version != P2P_PROTOCOL_STRING {
245241
log::warn!(
246242
"Identify: Peer {} has different Identify protocol: (them {}, you {})",
@@ -259,17 +255,31 @@ impl P2PClient {
259255
{
260256
// if it matches our protocol, add it to the Kademlia routing table
261257
if *kad_protocol == P2P_KADEMLIA_PROTOCOL {
262-
log::info!(
263-
"Identify: {} peer {} identified at {}",
264-
kad_protocol,
265-
peer_id,
266-
addr
267-
);
268-
269-
self.swarm
270-
.behaviour_mut()
271-
.kademlia
272-
.add_address(&peer_id, addr);
258+
// filter listen addresses
259+
let addrs = info.listen_addrs.into_iter().filter(|listen_addr| {
260+
if let Some(Protocol::Ip4(ipv4_addr)) = listen_addr.iter().next() {
261+
// ignore private & localhost addresses
262+
!(ipv4_addr.is_private() || ipv4_addr.is_loopback())
263+
} else {
264+
// ignore non ipv4 addresses
265+
false
266+
}
267+
});
268+
269+
// add them to kademlia
270+
for addr in addrs {
271+
log::info!(
272+
"Identify: {} peer {} identified at {}",
273+
kad_protocol,
274+
peer_id,
275+
addr
276+
);
277+
278+
self.swarm
279+
.behaviour_mut()
280+
.kademlia
281+
.add_address(&peer_id, addr);
282+
}
273283
} else {
274284
log::warn!(
275285
"Identify: Peer {} has different Kademlia version: (them {}, you {})",

src/utils/mod.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ pub use message::DKNMessage;
77
mod available_nodes;
88
pub use available_nodes::AvailableNodes;
99

10-
use std::time::{Duration, SystemTime};
10+
use libp2p::{multiaddr::Protocol, Multiaddr};
11+
use port_check::is_port_reachable;
12+
use std::{
13+
net::{Ipv4Addr, SocketAddrV4},
14+
time::{Duration, SystemTime},
15+
};
1116

1217
/// Returns the current time in nanoseconds since the Unix epoch.
1318
///
@@ -23,6 +28,34 @@ pub fn get_current_time_nanos() -> u128 {
2328
.as_nanos()
2429
}
2530

31+
/// Checks if a given address is already in use locally.
32+
/// This is mostly used to see if the P2P address is already in use.
33+
///
34+
/// Simply tries to connect with TCP to the given address.
35+
#[inline]
36+
pub fn address_in_use(addr: &Multiaddr) -> bool {
37+
addr.iter()
38+
// find the port within our multiaddr
39+
.find_map(|p| {
40+
if let Protocol::Tcp(port) = p {
41+
Some(port)
42+
} else {
43+
None
44+
}
45+
46+
// }
47+
})
48+
// check if its reachable or not
49+
.map(|port| is_port_reachable(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
50+
.unwrap_or_else(|| {
51+
log::error!(
52+
"Could not find any TCP port in the given address: {:?}",
53+
addr
54+
);
55+
false
56+
})
57+
}
58+
2659
/// Utility to parse comma-separated string values, mostly read from the environment.
2760
/// - Trims `"` from both ends at the start
2861
/// - For each item, trims whitespace from both ends

0 commit comments

Comments
 (0)