Skip to content

Commit e7da297

Browse files
committed
use observed_addr, rfk cancellations, smol fixes
1 parent 7fe02d2 commit e7da297

File tree

4 files changed

+65
-60
lines changed

4 files changed

+65
-60
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ debug:
1919

2020
.PHONY: trace # | Run with crate-level TRACE logging
2121
trace:
22-
RUST_LOG=none,dkn_compute=trace cargo run
22+
RUST_LOG=none,dkn_compute=trace,libp2p=debug cargo run
2323

2424
.PHONY: build # | Build
2525
build:

src/node.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl DriaComputeNode {
5454
)
5555
.sort_dedup();
5656

57-
let p2p = P2PClient::new(keypair, listen_addr, &available_nodes, cancellation.clone())?;
57+
let p2p = P2PClient::new(keypair, listen_addr, &available_nodes)?;
5858

5959
Ok(DriaComputeNode {
6060
config,
@@ -187,11 +187,7 @@ impl DriaComputeNode {
187187
// validate the message based on the result
188188
match handle_result {
189189
Ok(acceptance) => {
190-
log::debug!(
191-
"Validating message ({}): {:?}",
192-
message_id,
193-
acceptance
194-
);
190+
195191
self.p2p.validate_message(&message_id, &peer_id, acceptance)?;
196192
},
197193
Err(err) => {

src/p2p/behaviour.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ impl DriaBehaviour {
2626
relay: relay_behavior,
2727
gossipsub: create_gossipsub_behavior(key.clone()),
2828
kademlia: create_kademlia_behavior(peer_id),
29-
identify: create_identify_behavior(public_key.clone()),
30-
autonat: create_autonat_behavior(public_key),
29+
autonat: create_autonat_behavior(peer_id),
30+
identify: create_identify_behavior(public_key),
3131
dcutr: create_dcutr_behavior(peer_id),
3232
}
3333
}
@@ -71,11 +71,11 @@ fn create_dcutr_behavior(local_peer_id: PeerId) -> dcutr::Behaviour {
7171

7272
/// Configures the Autonat behavior to assist in network address translation detection.
7373
#[inline]
74-
fn create_autonat_behavior(key: PublicKey) -> autonat::Behaviour {
74+
fn create_autonat_behavior(local_peer_id: PeerId) -> autonat::Behaviour {
7575
use autonat::{Behaviour, Config};
7676

7777
Behaviour::new(
78-
key.to_peer_id(),
78+
local_peer_id,
7979
Config {
8080
only_global_ips: false,
8181
..Default::default()

src/p2p/client.rs

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1+
use crate::p2p::AvailableNodes;
12
use libp2p::futures::StreamExt;
23
use libp2p::gossipsub::{
34
Message, MessageAcceptance, MessageId, PublishError, SubscriptionError, TopicHash,
45
};
56
use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult};
6-
use libp2p::{gossipsub, identify, kad, multiaddr::Protocol, noise, swarm::SwarmEvent, tcp, yamux};
7+
use libp2p::{
8+
autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, swarm::SwarmEvent, tcp, yamux,
9+
};
710
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder};
811
use libp2p_identity::Keypair;
912
use tokio::time::Duration;
10-
use tokio_util::sync::CancellationToken;
11-
12-
use crate::p2p::AvailableNodes;
13+
use tokio::time::Instant;
1314

1415
use super::{DriaBehaviour, DriaBehaviourEvent, DRIA_PROTO_NAME};
1516

1617
/// Underlying libp2p client.
1718
pub struct P2PClient {
1819
swarm: Swarm<DriaBehaviour>,
19-
cancellation: CancellationToken,
2020
/// Peer count for (All, Mesh).
2121
peer_count: (usize, usize),
22-
peer_last_refreshed: tokio::time::Instant,
22+
peer_last_refreshed: Instant,
2323
}
2424

2525
/// Number of seconds before an idle connection is closed.
@@ -34,7 +34,6 @@ impl P2PClient {
3434
keypair: Keypair,
3535
listen_addr: Multiaddr,
3636
available_nodes: &AvailableNodes,
37-
cancellation: CancellationToken,
3837
) -> Result<Self, String> {
3938
// this is our peerId
4039
let node_peerid = keypair.public().to_peer_id();
@@ -115,9 +114,8 @@ impl P2PClient {
115114

116115
Ok(Self {
117116
swarm,
118-
cancellation,
119117
peer_count: (0, 0),
120-
peer_last_refreshed: tokio::time::Instant::now(),
118+
peer_last_refreshed: Instant::now(),
121119
})
122120
}
123121

@@ -169,6 +167,8 @@ impl P2PClient {
169167
propagation_source: &PeerId,
170168
acceptance: MessageAcceptance,
171169
) -> Result<(), PublishError> {
170+
log::debug!("Validating message ({}): {:?}", msg_id, acceptance);
171+
172172
let msg_was_in_cache = self
173173
.swarm
174174
.behaviour_mut()
@@ -191,65 +191,74 @@ impl P2PClient {
191191
}
192192

193193
/// Listens to the Swarm for incoming messages.
194+
///
194195
/// This method should be called in a loop to keep the client running.
195-
/// When a message is received, it will be returned.
196+
/// When a GossipSub message is received, it will be returned.
196197
pub async fn process_events(&mut self) -> Option<(PeerId, MessageId, Message)> {
197198
loop {
198199
// refresh peers
199200
self.refresh_peer_counts().await;
200201

201202
// wait for next event
202-
tokio::select! {
203-
event = self.swarm.select_next_some() => match event {
204-
SwarmEvent::Behaviour(DriaBehaviourEvent::Kademlia(
205-
kad::Event::OutboundQueryProgressed {
206-
result: QueryResult::GetClosestPeers(result),
207-
..
208-
},
209-
)) => self.handle_closest_peers_result(result),
210-
SwarmEvent::Behaviour(DriaBehaviourEvent::Identify(identify::Event::Received {
211-
peer_id,
212-
info,
203+
match self.swarm.select_next_some().await {
204+
SwarmEvent::Behaviour(DriaBehaviourEvent::Kademlia(
205+
kad::Event::OutboundQueryProgressed {
206+
result: QueryResult::GetClosestPeers(result),
213207
..
214-
})) => self.handle_identify_event(peer_id, info),
215-
SwarmEvent::Behaviour(DriaBehaviourEvent::Gossipsub(gossipsub::Event::Message {
208+
},
209+
)) => self.handle_closest_peers_result(result),
210+
SwarmEvent::Behaviour(DriaBehaviourEvent::Identify(
211+
identify::Event::Received { peer_id, info, .. },
212+
)) => self.handle_identify_event(peer_id, info),
213+
SwarmEvent::Behaviour(DriaBehaviourEvent::Gossipsub(
214+
gossipsub::Event::Message {
216215
propagation_source: peer_id,
217216
message_id,
218217
message,
219-
})) => {
220-
return Some((peer_id, message_id, message));
221-
}
222-
SwarmEvent::NewListenAddr { address, .. } => {
223-
log::info!("Local node is listening on {}", address);
224-
}
225-
_ => log::trace!("Unhandled Swarm Event: {:?}", event),
226-
},
227-
_ = self.cancellation.cancelled() => {
228-
return None;
218+
},
219+
)) => {
220+
return Some((peer_id, message_id, message));
221+
}
222+
SwarmEvent::Behaviour(DriaBehaviourEvent::Autonat(
223+
autonat::Event::StatusChanged { old, new },
224+
)) => {
225+
log::warn!("AutoNAT status changed from {:?} to {:?}", old, new);
226+
}
227+
SwarmEvent::NewListenAddr { address, .. } => {
228+
log::warn!("Local node is listening on {}", address);
229229
}
230+
SwarmEvent::ExternalAddrConfirmed { address } => {
231+
log::warn!("External address confirmed: {}", address);
232+
}
233+
event => log::trace!("Unhandled Swarm Event: {:?}", event),
230234
}
231235
}
232236
}
233237

234238
/// Handles identify events to add peer addresses to Kademlia, if protocols match.
235239
fn handle_identify_event(&mut self, peer_id: PeerId, info: identify::Info) {
236240
let protocol_match = info.protocols.iter().any(|p| *p == DRIA_PROTO_NAME);
237-
for addr in info.listen_addrs {
238-
if protocol_match {
239-
// if it matches our protocol, add it to the Kademlia routing table
240-
log::info!("Identify: Peer {} identified at {}", peer_id, addr);
241+
let addr = info.observed_addr;
241242

242-
self.swarm
243-
.behaviour_mut()
244-
.kademlia
245-
.add_address(&peer_id, addr);
246-
} else {
247-
log::trace!(
248-
"Identify: Incoming from different protocol, address {}. PeerID is {}",
249-
addr,
250-
peer_id
251-
);
252-
}
243+
if protocol_match {
244+
// if it matches our protocol, add it to the Kademlia routing table
245+
log::info!(
246+
"Identify: {} peer {} identified at {}",
247+
info.protocol_version,
248+
peer_id,
249+
addr
250+
);
251+
252+
self.swarm
253+
.behaviour_mut()
254+
.kademlia
255+
.add_address(&peer_id, addr);
256+
} else {
257+
log::trace!(
258+
"Identify: Incoming from different protocol, address {}. PeerID is {}",
259+
addr,
260+
peer_id
261+
);
253262
}
254263
}
255264

@@ -309,7 +318,7 @@ impl P2PClient {
309318
.behaviour_mut()
310319
.kademlia
311320
.get_closest_peers(random_peer);
312-
self.peer_last_refreshed = tokio::time::Instant::now();
321+
self.peer_last_refreshed = Instant::now();
313322

314323
// get peer count
315324
let gossipsub = &self.swarm.behaviour().gossipsub;

0 commit comments

Comments
 (0)