Skip to content

Commit b35d7f4

Browse files
committed
some p2p refactors on versioning
1 parent a417b7b commit b35d7f4

File tree

15 files changed

+151
-73
lines changed

15 files changed

+151
-73
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,25 @@ license = "Apache-2.0"
1010
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1111

1212
[workspace.dependencies]
13+
# async stuff
1314
tokio-util = { version = "0.7.10", features = ["rt"] }
1415
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
15-
parking_lot = "0.12.2"
16+
async-trait = "0.1.81"
17+
18+
# serialize & deserialize
1619
serde = { version = "1.0", features = ["derive"] }
1720
serde_json = "1.0"
18-
async-trait = "0.1.81"
21+
22+
# http client
1923
reqwest = "0.12.5"
2024

25+
# env reading
2126
dotenvy = "0.15.7"
2227

28+
# randomization
2329
rand = "0.8.5"
2430

31+
# logging & errors
2532
env_logger = "0.11.3"
2633
log = "0.4.21"
2734
eyre = "0.6.12"

compute/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ profiling = []
1818
[dependencies]
1919
tokio-util = { version = "0.7.10", features = ["rt"] }
2020
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
21-
parking_lot = "0.12.2"
2221
serde = { version = "1.0", features = ["derive"] }
2322
serde_json = "1.0"
2423
async-trait = "0.1.81"

compute/src/node.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use dkn_p2p::{libp2p::gossipsub, DriaP2P};
1+
use dkn_p2p::{libp2p::gossipsub, DriaP2PClient};
22
use eyre::{eyre, Result};
33
use std::time::Duration;
44
use tokio_util::sync::CancellationToken;
@@ -27,7 +27,7 @@ const RPC_PEER_ID_REFRESH_INTERVAL_SECS: u64 = 30;
2727
/// ```
2828
pub struct DriaComputeNode {
2929
pub config: DriaComputeNodeConfig,
30-
pub p2p: DriaP2P,
30+
pub p2p: DriaP2PClient,
3131
pub available_nodes: AvailableNodes,
3232
pub available_nodes_last_refreshed: tokio::time::Instant,
3333
pub cancellation: CancellationToken,
@@ -51,11 +51,21 @@ impl DriaComputeNode {
5151
)
5252
.sort_dedup();
5353

54-
let p2p = DriaP2P::new(
54+
// we are using the major.minor version as the P2P version
55+
// so that patch versions do not interfere with the protocol
56+
const P2P_VERSION: &str = concat!(
57+
env!("CARGO_PKG_VERSION_MAJOR"),
58+
".",
59+
env!("CARGO_PKG_VERSION_MINOR")
60+
);
61+
62+
// create p2p client
63+
let p2p = DriaP2PClient::new(
5564
keypair,
5665
config.p2p_listen_addr.clone(),
5766
&available_nodes.bootstrap_nodes,
5867
&available_nodes.relay_nodes,
68+
P2P_VERSION,
5969
)?;
6070

6171
Ok(DriaComputeNode {

compute/src/utils/available_nodes.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use dkn_p2p::libp2p::{Multiaddr, PeerId};
2+
use dkn_workflows::split_csv_line;
23
use eyre::Result;
34
use std::{env, fmt::Debug, str::FromStr};
45

5-
use dkn_workflows::split_csv_line;
6-
76
/// Static bootstrap nodes for the Kademlia DHT bootstrap step.
87
const STATIC_BOOTSTRAP_NODES: [&str; 4] = [
98
"/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4",

compute/src/utils/crypto.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use dkn_p2p::libp2p_identity::Keypair;
1+
use dkn_p2p::libp2p_identity;
22
use ecies::PublicKey;
33
use eyre::{Context, Result};
44
use libsecp256k1::{Message, SecretKey};
@@ -55,12 +55,12 @@ pub fn encrypt_bytes(data: impl AsRef<[u8]>, public_key: &PublicKey) -> Result<S
5555
/// Converts a `libsecp256k1::SecretKey` to a `libp2p_identity::secp256k1::Keypair`.
5656
/// To do this, we serialize the secret key and create a new keypair from it.
5757
#[inline]
58-
pub fn secret_to_keypair(secret_key: &SecretKey) -> Keypair {
58+
pub fn secret_to_keypair(secret_key: &SecretKey) -> libp2p_identity::Keypair {
5959
let bytes = secret_key.serialize();
6060

6161
let secret_key = dkn_p2p::libp2p_identity::secp256k1::SecretKey::try_from_bytes(bytes)
6262
.expect("Failed to create secret key");
63-
dkn_p2p::libp2p_identity::secp256k1::Keypair::from(secret_key).into()
63+
libp2p_identity::secp256k1::Keypair::from(secret_key).into()
6464
}
6565

6666
#[cfg(test)]

p2p/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ eyre.workspace = true
3333

3434
[dev-dependencies]
3535
env_logger.workspace = true
36+
tokio.workspace = true

p2p/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# DKN Peer-to-Peer Client
2+
3+
## Installation
4+
5+
Add the package via `git` within your Cargo dependencies:
6+
7+
```toml
8+
dkn-p2p = { git = "https://github.com/firstbatchxyz/dkn-compute-node" }
9+
```
10+
11+
## Usage
12+
13+
TODO: !!!

p2p/src/behaviour.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use std::time::Duration;
44

55
use libp2p::identity::{Keypair, PeerId, PublicKey};
66
use libp2p::kad::store::MemoryStore;
7+
use libp2p::StreamProtocol;
78
use libp2p::{autonat, dcutr, gossipsub, identify, kad, relay, swarm::NetworkBehaviour};
89

9-
use crate::versioning::{P2P_KADEMLIA_PROTOCOL, P2P_PROTOCOL_STRING};
10+
// use crate::versioning::{P2P_KADEMLIA_PROTOCOL, P2P_PROTOCOL_STRING};
1011

1112
#[derive(NetworkBehaviour)]
1213
pub struct DriaBehaviour {
@@ -19,29 +20,37 @@ pub struct DriaBehaviour {
1920
}
2021

2122
impl DriaBehaviour {
22-
pub fn new(key: &Keypair, relay_behavior: relay::client::Behaviour) -> Self {
23+
pub fn new(
24+
key: &Keypair,
25+
relay_behavior: relay::client::Behaviour,
26+
identity_protocol: String,
27+
kademlia_protocol: StreamProtocol,
28+
) -> Self {
2329
let public_key = key.public();
2430
let peer_id = public_key.to_peer_id();
2531
Self {
2632
relay: relay_behavior,
2733
gossipsub: create_gossipsub_behavior(peer_id),
28-
kademlia: create_kademlia_behavior(peer_id),
34+
kademlia: create_kademlia_behavior(peer_id, kademlia_protocol),
2935
autonat: create_autonat_behavior(peer_id),
3036
dcutr: create_dcutr_behavior(peer_id),
31-
identify: create_identify_behavior(public_key),
37+
identify: create_identify_behavior(public_key, identity_protocol),
3238
}
3339
}
3440
}
3541

3642
/// Configures the Kademlia DHT behavior for the node.
3743
#[inline]
38-
fn create_kademlia_behavior(local_peer_id: PeerId) -> kad::Behaviour<MemoryStore> {
44+
fn create_kademlia_behavior(
45+
local_peer_id: PeerId,
46+
protocol_name: StreamProtocol,
47+
) -> kad::Behaviour<MemoryStore> {
3948
use kad::{Behaviour, Config};
4049

4150
const QUERY_TIMEOUT_SECS: u64 = 5 * 60;
4251
const RECORD_TTL_SECS: u64 = 30;
4352

44-
let mut cfg = Config::new(P2P_KADEMLIA_PROTOCOL);
53+
let mut cfg = Config::new(protocol_name);
4554
cfg.set_query_timeout(Duration::from_secs(QUERY_TIMEOUT_SECS))
4655
.set_record_ttl(Some(Duration::from_secs(RECORD_TTL_SECS)));
4756

@@ -50,10 +59,13 @@ fn create_kademlia_behavior(local_peer_id: PeerId) -> kad::Behaviour<MemoryStore
5059

5160
/// Configures the Identify behavior to allow nodes to exchange information like supported protocols.
5261
#[inline]
53-
fn create_identify_behavior(local_public_key: PublicKey) -> identify::Behaviour {
62+
fn create_identify_behavior(
63+
local_public_key: PublicKey,
64+
protocol_version: String,
65+
) -> identify::Behaviour {
5466
use identify::{Behaviour, Config};
5567

56-
let cfg = Config::new(P2P_PROTOCOL_STRING.to_string(), local_public_key);
68+
let cfg = Config::new(protocol_version, local_public_key);
5769

5870
Behaviour::new(cfg)
5971
}

p2p/src/client.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@ use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult};
88
use libp2p::{
99
autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, swarm::SwarmEvent, tcp, yamux,
1010
};
11-
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder};
11+
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
1212
use libp2p_identity::Keypair;
13-
use std::time::Duration;
14-
use std::time::Instant;
15-
use versioning::{P2P_KADEMLIA_PREFIX, P2P_KADEMLIA_PROTOCOL, P2P_PROTOCOL_STRING};
13+
use std::time::{Duration, Instant};
1614

1715
/// P2P client, exposes a simple interface to handle P2P communication.
18-
pub struct DriaP2P {
16+
pub struct DriaP2PClient {
1917
/// `Swarm` instance, everything is accesses through this one.
2018
swarm: Swarm<DriaBehaviour>,
2119
/// Peer count for All and Mesh peers.
@@ -25,6 +23,10 @@ pub struct DriaP2P {
2523
peer_count: (usize, usize),
2624
/// Last time the peer count was refreshed.
2725
peer_last_refreshed: Instant,
26+
/// Identity protocol string to be used for the Identity behaviour.
27+
identity_protocol: String,
28+
/// Kademlia protocol, must match with other peers in the network.
29+
kademlia_protocol: StreamProtocol,
2830
}
2931

3032
/// Number of seconds before an idle connection is closed.
@@ -33,14 +35,24 @@ const IDLE_CONNECTION_TIMEOUT_SECS: u64 = 60;
3335
/// Number of seconds between refreshing the Kademlia DHT.
3436
const PEER_REFRESH_INTERVAL_SECS: u64 = 30;
3537

36-
impl DriaP2P {
38+
impl DriaP2PClient {
3739
/// Creates a new P2P client with the given keypair and listen address.
40+
///
41+
/// Can provide a list of bootstrap and relay nodes to connect to as well at the start.
42+
///
43+
/// The `version` is used to create the protocol strings for the client, and its very important that
44+
/// they match with the clients existing within the network.
3845
pub fn new(
3946
keypair: Keypair,
4047
listen_addr: Multiaddr,
4148
bootstraps: &[Multiaddr],
4249
relays: &[Multiaddr],
50+
version: &str,
4351
) -> Result<Self> {
52+
let identity_protocol = format!("{}{}", P2P_IDENTITY_PREFIX, version);
53+
let kademlia_protocol =
54+
StreamProtocol::try_from_owned(format!("{}{}", P2P_KADEMLIA_PREFIX, version))?;
55+
4456
// this is our peerId
4557
let node_peerid = keypair.public().to_peer_id();
4658
log::info!("Compute node peer address: {}", node_peerid);
@@ -54,7 +66,14 @@ impl DriaP2P {
5466
)?
5567
.with_quic()
5668
.with_relay_client(noise::Config::new, yamux::Config::default)?
57-
.with_behaviour(|key, relay_behavior| Ok(DriaBehaviour::new(key, relay_behavior)))?
69+
.with_behaviour(|key, relay_behavior| {
70+
Ok(DriaBehaviour::new(
71+
key,
72+
relay_behavior,
73+
identity_protocol.clone(),
74+
kademlia_protocol.clone(),
75+
))
76+
})?
5877
.with_swarm_config(|c| {
5978
c.with_idle_connection_timeout(Duration::from_secs(IDLE_CONNECTION_TIMEOUT_SECS))
6079
})
@@ -107,6 +126,8 @@ impl DriaP2P {
107126
swarm,
108127
peer_count: (0, 0),
109128
peer_last_refreshed: Instant::now(),
129+
identity_protocol,
130+
kademlia_protocol,
110131
})
111132
}
112133

@@ -231,12 +252,12 @@ impl DriaP2P {
231252
/// - For Kademlia, we check the kademlia protocol and then add the address to the Kademlia routing table.
232253
fn handle_identify_event(&mut self, peer_id: PeerId, info: identify::Info) {
233254
// check identify protocol string
234-
if info.protocol_version != P2P_PROTOCOL_STRING {
255+
if info.protocol_version != self.identity_protocol {
235256
log::warn!(
236257
"Identify: Peer {} has different Identify protocol: (them {}, you {})",
237258
peer_id,
238259
info.protocol_version,
239-
P2P_PROTOCOL_STRING
260+
self.identity_protocol
240261
);
241262
return;
242263
}
@@ -248,7 +269,7 @@ impl DriaP2P {
248269
.find(|p| p.to_string().starts_with(P2P_KADEMLIA_PREFIX))
249270
{
250271
// if it matches our protocol, add it to the Kademlia routing table
251-
if *kad_protocol == P2P_KADEMLIA_PROTOCOL {
272+
if *kad_protocol == self.kademlia_protocol {
252273
// filter listen addresses
253274
let addrs = info.listen_addrs.into_iter().filter(|listen_addr| {
254275
if let Some(Protocol::Ip4(ipv4_addr)) = listen_addr.iter().next() {
@@ -279,7 +300,7 @@ impl DriaP2P {
279300
"Identify: Peer {} has different Kademlia version: (them {}, you {})",
280301
peer_id,
281302
kad_protocol,
282-
P2P_KADEMLIA_PROTOCOL
303+
self.kademlia_protocol
283304
);
284305
}
285306
}

0 commit comments

Comments
 (0)