Skip to content

Commit 73fd339

Browse files
authored
feat: collect addresses from connected peers (#421)
* use `HashMap` for `AddrV2Handler.known_peers` This way from the beginning we only add new peers or update existing ones in instead of adding everything with a valid timestamp and then later in the flow drop all duplicates. * send `GetAddr` after handshake and process `Addr` Makes sure we send `GetAddr` after handshake to request `Addr`/`AddrV2` messages and convert `Addr` messages to `AddrV2` if we receive them, which shouldn't really happen since we signal v2 support during the handshake. * allow a wider range of last-seen timestamps With the current validation rules we drop most of the addresses while they still might be useful since it doesn't necessarily mean they are offline. We should still keep more addresses to have at least some addresses to try to connect to.
1 parent b7ee2e4 commit 73fd339

File tree

2 files changed

+71
-42
lines changed

2 files changed

+71
-42
lines changed

dash-spv/src/network/addrv2.rs

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! AddrV2 message handling for modern peer exchange protocol
22
33
use rand::prelude::*;
4-
use std::collections::HashSet;
4+
use std::collections::{HashMap, HashSet};
55
use std::net::SocketAddr;
66
use std::sync::Arc;
77
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -13,10 +13,23 @@ use dashcore::network::message::NetworkMessage;
1313

1414
use crate::network::constants::{MAX_ADDR_TO_SEND, MAX_ADDR_TO_STORE};
1515

16+
const ONE_WEEK: u32 = 7 * 24 * 60 * 60;
17+
const TEN_MINUTES: u32 = 600;
18+
19+
/// Evict oldest entries if the map exceeds capacity, keeping the freshest addresses.
20+
fn evict_if_needed(peers: &mut HashMap<SocketAddr, AddrV2Message>) {
21+
if peers.len() > MAX_ADDR_TO_STORE {
22+
let mut entries: Vec<_> = peers.drain().collect();
23+
entries.sort_by_key(|(_, msg)| std::cmp::Reverse(msg.time));
24+
entries.truncate(MAX_ADDR_TO_STORE);
25+
peers.extend(entries);
26+
}
27+
}
28+
1629
/// Handler for AddrV2 peer exchange protocol
1730
pub struct AddrV2Handler {
1831
/// Known peer addresses from AddrV2 messages
19-
known_peers: Arc<RwLock<Vec<AddrV2Message>>>,
32+
known_peers: Arc<RwLock<HashMap<SocketAddr, AddrV2Message>>>,
2033
/// Peers that support AddrV2
2134
supports_addrv2: Arc<RwLock<HashSet<SocketAddr>>>,
2235
}
@@ -25,7 +38,7 @@ impl AddrV2Handler {
2538
/// Create a new AddrV2 handler
2639
pub fn new() -> Self {
2740
Self {
28-
known_peers: Arc::new(RwLock::new(Vec::new())),
41+
known_peers: Arc::new(RwLock::new(HashMap::new())),
2942
supports_addrv2: Arc::new(RwLock::new(HashSet::new())),
3043
}
3144
}
@@ -47,44 +60,38 @@ impl AddrV2Handler {
4760
})
4861
.as_secs() as u32;
4962

50-
let _initial_count = known_peers.len();
63+
let received = messages.len();
5164
let mut added = 0;
65+
let mut updated = 0;
5266

5367
for msg in messages {
54-
// Validate timestamp
55-
// Accept addresses from up to 3 hours ago and up to 10 minutes in the future
56-
if msg.time <= now.saturating_sub(10800) || msg.time > now + 600 {
68+
// Accept addresses seen within the last week. Older addresses are likely stale.
69+
// Also, reject timestamps more than 10 minutes in the future which are invalid.
70+
if msg.time < now.saturating_sub(ONE_WEEK) || msg.time > now + TEN_MINUTES {
5771
log::trace!("Ignoring AddrV2 with invalid timestamp: {}", msg.time);
5872
continue;
5973
}
6074

61-
// Only store if we can convert to socket address
62-
if msg.socket_addr().is_ok() {
63-
known_peers.push(msg);
64-
added += 1;
65-
}
66-
}
67-
68-
// Sort by timestamp (newest first) and deduplicate
69-
known_peers.sort_by_key(|a| std::cmp::Reverse(a.time));
75+
let Ok(socket_addr) = msg.socket_addr() else {
76+
continue;
77+
};
7078

71-
// Deduplicate by socket address
72-
let mut seen = HashSet::new();
73-
known_peers.retain(|addr| {
74-
if let Ok(socket_addr) = addr.socket_addr() {
75-
seen.insert(socket_addr)
76-
} else {
77-
false
79+
// Only update if new or has fresher timestamp
80+
match known_peers.get(&socket_addr) {
81+
Some(existing) if existing.time >= msg.time => continue,
82+
Some(_) => updated += 1,
83+
None => added += 1,
7884
}
79-
});
85+
known_peers.insert(socket_addr, msg);
86+
}
8087

81-
// Keep only the most recent addresses
82-
known_peers.truncate(MAX_ADDR_TO_STORE);
88+
evict_if_needed(&mut known_peers);
8389

84-
let _processed_count = added;
8590
log::info!(
86-
"Processed AddrV2 messages: added {}, total known peers: {}",
91+
"Processed AddrV2 messages: received {}, added {}, updated {}, total known peers: {}",
92+
received,
8793
added,
94+
updated,
8895
known_peers.len()
8996
);
9097
}
@@ -102,9 +109,8 @@ impl AddrV2Handler {
102109
let count = count.min(MAX_ADDR_TO_SEND).min(known_peers.len());
103110

104111
let addresses: Vec<AddrV2Message> =
105-
known_peers.choose_multiple(&mut rng, count).cloned().collect();
112+
known_peers.values().choose_multiple(&mut rng, count).into_iter().cloned().collect();
106113

107-
log::debug!("Sharing {} addresses with peer", addresses.len());
108114
addresses
109115
}
110116

@@ -115,7 +121,7 @@ impl AddrV2Handler {
115121

116122
/// Get all known socket addresses
117123
pub async fn get_known_addresses(&self) -> Vec<AddrV2Message> {
118-
self.known_peers.read().await.clone()
124+
self.known_peers.read().await.values().cloned().collect()
119125
}
120126

121127
/// Add a known peer address
@@ -141,13 +147,8 @@ impl AddrV2Handler {
141147
};
142148

143149
let mut known_peers = self.known_peers.write().await;
144-
known_peers.push(addr_msg);
145-
146-
// Keep size under control
147-
if known_peers.len() > MAX_ADDR_TO_STORE {
148-
known_peers.sort_by_key(|a| std::cmp::Reverse(a.time));
149-
known_peers.truncate(MAX_ADDR_TO_STORE);
150-
}
150+
known_peers.insert(addr, addr_msg);
151+
evict_if_needed(&mut known_peers);
151152
}
152153

153154
/// Build a GetAddr response message

dash-spv/src/network/manager.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::network::{
2626
};
2727
use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage};
2828
use async_trait::async_trait;
29-
use dashcore::network::address::AddrV2Message;
29+
use dashcore::network::address::{AddrV2, AddrV2Message};
3030
use dashcore::network::constants::ServiceFlags;
3131
use dashcore::network::message::NetworkMessage;
3232
use dashcore::network::message_headers2::CompressionState;
@@ -257,6 +257,11 @@ impl PeerNetworkManager {
257257
Ok(_) => {
258258
log::info!("Successfully connected to {}", addr);
259259

260+
// Request addresses from the peer for discovery
261+
if let Err(e) = peer.send_message(NetworkMessage::GetAddr).await {
262+
log::warn!("Failed to send GetAddr to {}: {}", addr, e);
263+
}
264+
260265
// Record successful connection
261266
reputation_manager.record_successful_connection(addr).await;
262267

@@ -463,9 +468,32 @@ impl PeerNetworkManager {
463468
);
464469
continue;
465470
}
466-
NetworkMessage::Addr(_) => {
467-
// Handle legacy addr messages (convert to AddrV2 if needed)
468-
log::trace!("Received legacy addr message from {}", addr);
471+
NetworkMessage::Addr(addresses) => {
472+
// Convert legacy addr messages to AddrV2 format
473+
let converted: Vec<AddrV2Message> = addresses
474+
.iter()
475+
.filter_map(|(time, a)| {
476+
let socket = a.socket_addr().ok()?;
477+
let addr_v2 = match socket.ip() {
478+
std::net::IpAddr::V4(v4) => AddrV2::Ipv4(v4),
479+
std::net::IpAddr::V6(v6) => AddrV2::Ipv6(v6),
480+
};
481+
Some(AddrV2Message {
482+
time: *time,
483+
services: a.services,
484+
addr: addr_v2,
485+
port: socket.port(),
486+
})
487+
})
488+
.collect();
489+
if !converted.is_empty() {
490+
log::debug!(
491+
"Converted {} legacy addr entries from {}",
492+
converted.len(),
493+
addr
494+
);
495+
addrv2_handler.handle_addrv2(converted).await;
496+
}
469497
continue;
470498
}
471499
NetworkMessage::Headers(headers) => {

0 commit comments

Comments
 (0)