Skip to content

Commit 000ccf3

Browse files
committed
added autonat, some log rfks
1 parent 25f0d32 commit 000ccf3

File tree

10 files changed

+93
-31
lines changed

10 files changed

+93
-31
lines changed

Cargo.lock

Lines changed: 27 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ debug = true
2020
# async stuff
2121
tokio-util = { version = "0.7.10", features = ["rt"] }
2222
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
23-
async-trait = "0.1.81"
2423

2524
# serialize & deserialize
2625
serde = { version = "1.0", features = ["derive"] }
@@ -31,8 +30,6 @@ reqwest = "0.12.5"
3130

3231
# utilities
3332
dotenvy = "0.15.7"
34-
35-
# randomization
3633
rand = "0.8.5"
3734

3835
# logging & errors

compute/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ authors = ["Erhan Tezcan <[email protected]>"]
1010
# async stuff
1111
tokio-util.workspace = true
1212
tokio.workspace = true
13-
async-trait.workspace = true
1413

1514
# serialize & deserialize
1615
serde.workspace = true

compute/src/node/core.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,13 @@ impl DriaComputeNode {
1919
// prepare durations for sleeps
2020
let mut diagnostic_refresh_interval =
2121
tokio::time::interval(Duration::from_secs(DIAGNOSTIC_REFRESH_INTERVAL_SECS));
22+
diagnostic_refresh_interval.tick().await; // move each one tick
2223
let mut available_node_refresh_interval =
2324
tokio::time::interval(Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS));
25+
available_node_refresh_interval.tick().await; // move each one tick
2426
let mut heartbeat_interval =
2527
tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2628

27-
// move each one tick
28-
available_node_refresh_interval.tick().await;
29-
diagnostic_refresh_interval.tick().await;
30-
heartbeat_interval.tick().await;
31-
3229
loop {
3330
tokio::select! {
3431
// a task is completed by the worker & should be responded to the requesting peer

compute/src/node/reqres.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl DriaComputeNode {
2727
request_id,
2828
channel,
2929
} => {
30-
log::info!("Received a request ({}) from {}", request_id, peer_id);
30+
log::debug!("Received a request ({}) from {}", request_id, peer_id);
3131

3232
// ensure that message is from the known RPCs
3333
if self.dria_rpc.peer_id != peer_id {
@@ -42,7 +42,7 @@ impl DriaComputeNode {
4242
response,
4343
request_id,
4444
} => {
45-
log::info!("Received a response ({}) from {}", request_id, peer_id);
45+
log::debug!("Received a response ({}) from {}", request_id, peer_id);
4646
if let Err(e) = self.handle_response(peer_id, response).await {
4747
log::error!("Error handling response: {:?}", e);
4848
}
@@ -57,6 +57,11 @@ impl DriaComputeNode {
5757
#[inline]
5858
async fn handle_response(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<()> {
5959
if let Ok(heartbeat_response) = HeartbeatRequester::try_parse_response(&data) {
60+
log::info!(
61+
"Received a {} response from {}",
62+
"heartbeat".blue(),
63+
peer_id
64+
);
6065
HeartbeatRequester::handle_ack(self, heartbeat_response).await
6166
} else {
6267
Err(eyre::eyre!("Received unhandled request from {}", peer_id))
@@ -196,7 +201,12 @@ impl DriaComputeNode {
196201
pub(crate) async fn send_heartbeat(&mut self) -> Result<()> {
197202
let peer_id = self.dria_rpc.peer_id;
198203
let request_id = HeartbeatRequester::send_heartbeat(self, peer_id).await?;
199-
log::info!("Sent heartbeat request ({}) to {}", request_id, peer_id);
204+
log::info!(
205+
"Sent {} request ({}) to {}",
206+
"heartbeat".blue(),
207+
request_id,
208+
peer_id
209+
);
200210

201211
Ok(())
202212
}

compute/src/reqres/heartbeat.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub struct HeartbeatResponse {
3838

3939
impl IsResponder for HeartbeatRequester {
4040
type Request = DriaMessage; // TODO: HeartbeatRequest;
41-
type Response = DriaMessage; // TODO: HeartbeatResponse;
41+
type Response = HeartbeatResponse;
4242
}
4343

4444
/// Any acknowledged heartbeat that is older than this duration is considered dead.
@@ -80,10 +80,8 @@ impl HeartbeatRequester {
8080
/// Handles the heartbeat request received from the network.
8181
pub(crate) async fn handle_ack(
8282
node: &mut DriaComputeNode,
83-
ack_message: DriaMessage,
83+
res: HeartbeatResponse,
8484
) -> Result<()> {
85-
let res = ack_message.parse_payload::<HeartbeatResponse>()?;
86-
8785
if let Some(deadline) = node.heartbeats.remove(&res.heartbeat_id) {
8886
if let Some(err) = res.error {
8987
Err(eyre!("Heartbeat was not acknowledged: {}", err))

monitor/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ authors = ["Erhan Tezcan <[email protected]>"]
1010
# async stuff
1111
tokio-util.workspace = true
1212
tokio.workspace = true
13-
async-trait.workspace = true
1413

1514
# serialize & deserialize
1615
serde.workspace = true

p2p/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9
1515
"identify",
1616
"tokio",
1717
"noise",
18+
"autonat",
1819
"macros",
1920
"request-response",
2021
"cbor",

p2p/src/behaviour.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
use eyre::Result;
22
use libp2p::identity::{Keypair, PublicKey};
3-
use libp2p::StreamProtocol;
4-
use libp2p::{identify, request_response};
3+
use libp2p::{autonat, identify, request_response, PeerId, StreamProtocol};
54
use std::time::Duration;
65

76
#[derive(libp2p::swarm::NetworkBehaviour)]
87
pub struct DriaBehaviour {
98
pub identify: identify::Behaviour,
9+
pub autonat: autonat::Behaviour,
1010
pub request_response: request_response::cbor::Behaviour<Vec<u8>, Vec<u8>>,
1111
}
1212

1313
impl DriaBehaviour {
1414
pub fn new(key: &Keypair, identity_protocol: String, reqres_protocol: StreamProtocol) -> Self {
1515
let public_key = key.public();
16+
let peer_id = public_key.to_peer_id();
1617

1718
Self {
1819
identify: create_identify_behaviour(public_key, identity_protocol),
1920
request_response: create_request_response_behaviour(reqres_protocol),
21+
autonat: create_autonat_behaviour(peer_id),
2022
}
2123
}
2224
}
@@ -30,11 +32,11 @@ fn create_request_response_behaviour(
3032
) -> request_response::cbor::Behaviour<Vec<u8>, Vec<u8>> {
3133
use request_response::{Behaviour, Config, ProtocolSupport};
3234

33-
const REQUEST_RESPONSE_TIMEOUT_SECS: u64 = 180;
35+
const REQUEST_RESPONSE_TIMEOUT: Duration = Duration::from_secs(180);
3436

3537
Behaviour::new(
3638
[(protocol_name, ProtocolSupport::Full)],
37-
Config::default().with_request_timeout(Duration::from_secs(REQUEST_RESPONSE_TIMEOUT_SECS)),
39+
Config::default().with_request_timeout(REQUEST_RESPONSE_TIMEOUT),
3840
)
3941
}
4042

@@ -48,3 +50,17 @@ fn create_identify_behaviour(
4850

4951
Behaviour::new(Config::new(protocol_version, local_public_key))
5052
}
53+
54+
/// Configures the Autonat behavior to assist in network address translation detection.
55+
#[inline]
56+
fn create_autonat_behaviour(local_peer_id: PeerId) -> autonat::Behaviour {
57+
use autonat::{Behaviour, Config};
58+
59+
Behaviour::new(
60+
local_peer_id,
61+
Config {
62+
// only_global_ips: false,
63+
..Default::default()
64+
},
65+
)
66+
}

p2p/src/client.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use libp2p::swarm::{
44
dial_opts::{DialOpts, PeerCondition},
55
SwarmEvent,
66
};
7-
use libp2p::{identify, noise, request_response, tcp, yamux};
7+
use libp2p::{autonat, identify, noise, request_response, tcp, yamux};
88
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder};
99
use libp2p_identity::Keypair;
1010
use std::time::Duration;
@@ -90,6 +90,7 @@ impl DriaP2PClient {
9090

9191
// create p2p client itself
9292
let (reqres_tx, reqres_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE);
93+
9394
let client = Self {
9495
peer_id,
9596
swarm,
@@ -241,17 +242,23 @@ impl DriaP2PClient {
241242
})) => self.handle_identify_event(peer_id, info),
242243

243244
SwarmEvent::NewListenAddr { address, .. } => {
244-
log::warn!("Local node is listening on {}", address);
245+
log::warn!("Local node is listening on {address}");
245246
}
246247
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
247-
log::info!(
248-
"External address of peer {} confirmed: {}",
249-
peer_id,
250-
address
251-
);
248+
log::info!("External address of peer {peer_id} confirmed: {address}");
252249
}
253250
SwarmEvent::ExternalAddrConfirmed { address } => {
254-
log::info!("External address confirmed: {}", address);
251+
log::info!("External address confirmed: {address}");
252+
}
253+
254+
/*****************************************
255+
* AutoNAT stuff *
256+
*****************************************/
257+
SwarmEvent::Behaviour(DriaBehaviourEvent::Autonat(autonat::Event::StatusChanged {
258+
old,
259+
new,
260+
})) => {
261+
log::info!("AutoNAT status changed from {old:?} to {new:?}");
255262
}
256263

257264
/*****************************************
@@ -303,7 +310,18 @@ impl DriaP2PClient {
303310
);
304311
}
305312

306-
event => log::trace!("Unhandled Swarm Event: {:?}", event),
313+
SwarmEvent::ExpiredListenAddr {
314+
address,
315+
listener_id,
316+
} => {
317+
log::warn!("Listener {listener_id} expired: {address}");
318+
}
319+
320+
SwarmEvent::ListenerError { listener_id, error } => {
321+
log::error!("Listener {listener_id} error: {error}");
322+
}
323+
324+
event => log::debug!("Unhandled Swarm Event: {:?}", event),
307325
}
308326
}
309327

@@ -313,6 +331,7 @@ impl DriaP2PClient {
313331
///
314332
/// - For Kademlia, we check the kademlia protocol and then add the address to the Kademlia routing table.
315333
fn handle_identify_event(&mut self, peer_id: PeerId, info: identify::Info) {
334+
println!("{}: {:?}", peer_id, info.protocols);
316335
// check identify protocol string
317336
if info.protocol_version != self.protocol.identity {
318337
log::warn!(

0 commit comments

Comments
 (0)