Skip to content

Commit b0936b4

Browse files
committed
feat: filter out lower-height peers from PeerResponse messages
Signed-off-by: ljedrz <ljedrz@users.noreply.github.com>
1 parent 2b08609 commit b0936b4

File tree

7 files changed

+126
-35
lines changed

7 files changed

+126
-35
lines changed

node/bft/src/gateway.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -963,10 +963,10 @@ impl<N: Network> Gateway<N> {
963963
// The trusted ones are already handled by `handle_trusted_validators`.
964964
let trusted_validators = self.trusted_peers();
965965
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
966-
for candidate_addr in self.candidate_peers() {
967-
if !trusted_validators.contains(&candidate_addr) {
966+
for peer in self.get_candidate_peers() {
967+
if !trusted_validators.contains(&peer.listener_addr) {
968968
// Attempt to connect to unconnected validators.
969-
self.connect(candidate_addr);
969+
self.connect(peer.listener_addr);
970970
}
971971
}
972972

node/router/messages/src/peer_response.rs

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::borrow::Cow;
2121

2222
#[derive(Clone, Debug, PartialEq, Eq)]
2323
pub struct PeerResponse {
24-
pub peers: Vec<SocketAddr>,
24+
pub peers: Vec<(SocketAddr, Option<u32>)>,
2525
}
2626

2727
impl MessageTrait for PeerResponse {
@@ -39,20 +39,59 @@ impl ToBytes for PeerResponse {
3939
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Too many peers: {}", self.peers.len())));
4040
}
4141

42+
// A version indicator; we don't expect empty peer responses, so a zero value can serve
43+
// as an indicator that this message is to be processed differently. The version value
44+
// can be changed to a 2 in the future, once everyone expects it there.
45+
0u8.write_le(&mut writer)?;
46+
4247
(self.peers.len() as u8).write_le(&mut writer)?;
43-
for peer in self.peers.iter() {
44-
peer.write_le(&mut writer)?;
48+
for (addr, height) in self.peers.iter() {
49+
addr.write_le(&mut writer)?;
50+
if let Some(h) = height {
51+
1u8.write_le(&mut writer)?;
52+
h.write_le(&mut writer)?;
53+
} else {
54+
0u8.write_le(&mut writer)?;
55+
}
4556
}
4657
Ok(())
4758
}
4859
}
4960

5061
impl FromBytes for PeerResponse {
5162
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self> {
52-
let count = u8::read_le(&mut reader)?;
63+
// Read the peer count if their heights aren't present; otherwise, interpret this value
64+
// as the message version. It is a workaround for a currently missing version value.
65+
// The worst-case scenario is if a node hasn't updated, and it gets a `PeerRequest` from
66+
// its only peer who has; this would cause it to return a message that appears as if it
67+
// contains heights (due to a leading `0`), but it would end up failing to deserialize.
68+
// TODO: after a release or two, we should always be expecting the version to be present,
69+
// simplifying the deserialization; also, remove the `empty_old_peerlist_handling` test.
70+
let mut contains_heights = false;
71+
let count_or_version = u8::read_le(&mut reader)?;
72+
let count = if count_or_version == 0 {
73+
// Version indicator found; this message will contain optional heights.
74+
contains_heights = true;
75+
// If the first value is a zero, the next u8 is the peer count.
76+
u8::read_le(&mut reader)?
77+
} else {
78+
// A non-zero value indicates that this is the "old" PeerResponse without heights.
79+
count_or_version
80+
};
81+
5382
let mut peers = Vec::with_capacity(count as usize);
5483
for _ in 0..count {
55-
peers.push(SocketAddr::read_le(&mut reader)?);
84+
let addr = SocketAddr::read_le(&mut reader)?;
85+
let height = if contains_heights {
86+
match u8::read_le(&mut reader)? {
87+
1 => Some(u32::read_le(&mut reader)?),
88+
0 => None,
89+
_ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid peer height".to_string())),
90+
}
91+
} else {
92+
None
93+
};
94+
peers.push((addr, height));
5695
}
5796

5897
Ok(Self { peers })
@@ -69,14 +108,19 @@ pub mod prop_tests {
69108
collection::vec,
70109
prelude::{BoxedStrategy, Strategy, any},
71110
};
72-
use std::net::{IpAddr, SocketAddr};
111+
use std::{
112+
io,
113+
net::{IpAddr, SocketAddr},
114+
};
73115
use test_strategy::proptest;
74116

75-
pub fn any_valid_socket_addr() -> BoxedStrategy<SocketAddr> {
76-
any::<(IpAddr, u16)>().prop_map(|(ip_addr, port)| SocketAddr::new(ip_addr, port)).boxed()
117+
pub fn any_valid_socket_addr() -> BoxedStrategy<(SocketAddr, Option<u32>)> {
118+
any::<(IpAddr, u16, Option<u32>)>()
119+
.prop_map(|(ip_addr, port, height)| (SocketAddr::new(ip_addr, port), height))
120+
.boxed()
77121
}
78122

79-
pub fn any_vec() -> BoxedStrategy<Vec<SocketAddr>> {
123+
pub fn any_vec() -> BoxedStrategy<Vec<(SocketAddr, Option<u32>)>> {
80124
vec(any_valid_socket_addr(), 0..50).prop_map(|v| v).boxed()
81125
}
82126

@@ -91,4 +135,14 @@ pub mod prop_tests {
91135
let decoded = PeerResponse::read_le(&mut bytes.into_inner().reader()).unwrap();
92136
assert_eq!(decoded, peer_response);
93137
}
138+
139+
// The following test will be obsolete once all the nodes handle heights in the `PeerResponse`.
140+
#[test]
141+
fn empty_old_peerlist_handling() {
142+
// An empty `PeerResponse` without heights contains a single 0u8.
143+
let serialized = &[0u8];
144+
let deserialized = PeerResponse::read_le(&serialized[..]).unwrap_err();
145+
// Check for the expected error.
146+
assert_eq!(deserialized.kind(), io::ErrorKind::UnexpectedEof);
147+
}
94148
}

node/router/src/heartbeat.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,21 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
239239
// Initialize an RNG.
240240
let rng = &mut OsRng;
241241

242-
// Attempt to connect to more peers.
243-
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
244-
self.router().connect(peer_ip);
242+
// Attempt to connect to more peers, separately choosing from those at a greater block
243+
// height, and those whose height is lower or unknown to us.
244+
let own_height = self.router().ledger.latest_block_height();
245+
let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
246+
.router()
247+
.get_candidate_peers()
248+
.into_iter()
249+
.partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
250+
// We may not know of half of `num_deficient` candidates; account for it using `min`.
251+
let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
252+
for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
253+
self.router().connect(peer.listener_addr);
254+
}
255+
for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
256+
self.router().connect(peer.listener_addr);
245257
}
246258

247259
if self.router().allow_external_peers() {

node/router/src/helpers/peer.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub struct CandidatePeer {
4545
pub listener_addr: SocketAddr,
4646
/// Indicates whether the peer is considered trusted.
4747
pub trusted: bool,
48+
/// The latest block height known to be associated with the peer.
49+
pub last_height_seen: Option<u32>,
4850
}
4951

5052
/// A fully connected peer.
@@ -73,7 +75,7 @@ pub struct ConnectedPeer<N: Network> {
7375
impl<N: Network> Peer<N> {
7476
/// Create a candidate peer.
7577
pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self {
76-
Self::Candidate(CandidatePeer { listener_addr, trusted })
78+
Self::Candidate(CandidatePeer { listener_addr, trusted, last_height_seen: None })
7779
}
7880

7981
/// Create a connecting peer.
@@ -114,7 +116,11 @@ impl<N: Network> Peer<N> {
114116

115117
/// Demote a peer to candidate status, marking it as disconnected.
116118
pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) {
117-
*self = Self::new_candidate(listener_addr, self.is_trusted());
119+
*self = Self::Candidate(CandidatePeer {
120+
listener_addr,
121+
trusted: self.is_trusted(),
122+
last_height_seen: self.last_height_seen(),
123+
});
118124
}
119125

120126
/// Returns the type of the node (only applicable to connected peers).

node/router/src/inbound.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -341,25 +341,25 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
341341

342342
// Truncate and convert to socket addrs.
343343
peers.truncate(MAX_PEERS_TO_SEND);
344-
let peers = peers.into_iter().map(|peer| peer.listener_addr).collect();
344+
let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();
345345

346346
// Send a `PeerResponse` message to the peer.
347347
self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
348348
true
349349
}
350350

351351
/// Handles a `PeerResponse` message.
352-
fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
352+
fn peer_response(&self, _peer_ip: SocketAddr, peers: &[(SocketAddr, Option<u32>)]) -> bool {
353353
// Check if the number of peers received is less than MAX_PEERS_TO_SEND.
354354
if peers.len() > MAX_PEERS_TO_SEND {
355355
return false;
356356
}
357-
// Filter out invalid addresses.
357+
// Filter out invalid addresses and peers with a lower known block height.
358358
let peers = match self.router().is_dev() {
359359
// In development mode, relax the validity requirements to make operating devnets more flexible.
360-
true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
360+
true => peers.iter().copied().filter(|(ip, _)| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
361361
// In production mode, ensure the peer IPs are valid.
362-
false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(*ip)).collect(),
362+
false => peers.iter().copied().filter(|(ip, _)| self.router().is_valid_peer_ip(*ip)).collect(),
363363
};
364364
// Adds the given peer IPs to the list of candidate peers.
365365
self.router().insert_candidate_peers(&peers);

node/router/src/lib.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -294,13 +294,19 @@ pub trait PeerPoolHandling<N: Network>: P2P {
294294
}
295295

296296
/// Returns the list of candidate peers.
297-
fn candidate_peers(&self) -> HashSet<SocketAddr> {
297+
fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
298298
let banned_ips = self.tcp().banned_peers().get_banned_ips();
299299
self.peer_pool()
300300
.read()
301301
.iter()
302302
.filter_map(|(addr, peer)| {
303-
(matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr)
303+
if let Peer::Candidate(peer) = peer
304+
&& !banned_ips.contains(&addr.ip())
305+
{
306+
Some(peer.clone())
307+
} else {
308+
None
309+
}
304310
})
305311
.collect()
306312
}
@@ -607,24 +613,33 @@ impl<N: Network> Router<N> {
607613
///
608614
/// This method skips adding any given peers if the combined size exceeds the threshold,
609615
/// as the peer providing this list could be subverting the protocol.
610-
pub fn insert_candidate_peers(&self, peers: &[SocketAddr]) {
616+
pub fn insert_candidate_peers(&self, peers: &[(SocketAddr, Option<u32>)]) {
611617
// Compute the maximum number of candidate peers.
612618
let max_candidate_peers = Self::MAXIMUM_CANDIDATE_PEERS.saturating_sub(self.number_of_candidate_peers());
613619
{
614620
let mut peer_pool = self.peer_pool.write();
615621
// Ensure the combined number of peers does not surpass the threshold.
616622
let eligible_peers = peers
617623
.iter()
618-
.filter(|&peer_ip| {
619-
// Ensure the peer is not itself, and is not already known.
620-
!self.is_local_ip(*peer_ip) && !peer_pool.contains_key(peer_ip)
624+
.filter(|(peer_ip, _)| {
625+
// Ensure the peer is not itself or connected.
626+
!self.is_local_ip(*peer_ip) && !self.is_connected(*peer_ip)
621627
})
622-
.take(max_candidate_peers)
623-
.map(|addr| (*addr, Peer::new_candidate(*addr, false)))
624-
.collect::<Vec<_>>();
628+
.take(max_candidate_peers);
625629

626630
// Proceed to insert the eligible candidate peer IPs.
627-
peer_pool.extend(eligible_peers);
631+
for (addr, height) in eligible_peers {
632+
match peer_pool.entry(*addr) {
633+
Entry::Vacant(entry) => {
634+
entry.insert(Peer::new_candidate(*addr, false));
635+
}
636+
Entry::Occupied(mut entry) => {
637+
if let Peer::Candidate(peer) = entry.get_mut() {
638+
peer.last_height_seen = *height;
639+
}
640+
}
641+
}
642+
}
628643
}
629644
#[cfg(feature = "metrics")]
630645
self.update_metrics();

node/tests/peering.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ macro_rules! test_reject_unsolicited_peer_response {
5555
// Check the candidate peers.
5656
assert_eq!(node.router().number_of_candidate_peers(), 0);
5757

58-
let peers = vec!["1.1.1.1:1111".parse().unwrap(), "2.2.2.2:2222".parse().unwrap()];
58+
let peers = vec![
59+
("1.1.1.1:1111".parse().unwrap(), None),
60+
("2.2.2.2:2222".parse().unwrap(), None),
61+
];
5962

6063
// Send a `PeerResponse` to the node.
6164
assert!(
@@ -71,8 +74,9 @@ macro_rules! test_reject_unsolicited_peer_response {
7174
deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 0);
7275

7376
// Make sure the sent addresses weren't inserted in the candidate peers.
74-
for peer in peers {
75-
assert!(!node.router().candidate_peers().contains(&peer));
77+
let candidate_peer_addrs = node.router().get_candidate_peers().into_iter().map(|peer| peer.listener_addr).collect::<Vec<_>>();
78+
for (peer, _) in peers {
79+
assert!(!candidate_peer_addrs.contains(&peer));
7680
}
7781
}
7882
}

0 commit comments

Comments
 (0)