Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 188 additions & 19 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
time::{Duration, Instant},
};
use tracing::{debug, error, trace, warn};
use types::{DataColumnSubnetId, EthSpec, SyncSubnetId};
use types::{DataColumnSubnetId, EthSpec, SubnetId, SyncSubnetId};

pub use libp2p::core::Multiaddr;
pub use libp2p::identity::Keypair;
Expand Down Expand Up @@ -52,6 +52,11 @@ pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600);
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
/// peers.
pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2;
/// Avoid pruning sampling peers if subnet peer count is <= TARGET_SUBNET_PEERS.
pub const MIN_SAMPLING_COLUMN_SUBNET_PEERS: u64 = TARGET_SUBNET_PEERS as u64;
/// For non sampling columns, we need to ensure there is at least one peer for
/// publishing during proposals.
pub const MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS: u64 = 1;
/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of
/// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and
/// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55.
Expand Down Expand Up @@ -729,7 +734,7 @@ impl<E: EthSpec> PeerManager<E> {
}
} else {
// we have no meta-data for this peer, update
debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata");
debug!(%peer_id, new_seq_no = meta_data.seq_number(), cgc=?meta_data.custody_group_count().ok(), "Obtained peer's metadata");
}

let known_custody_group_count = peer_info
Expand Down Expand Up @@ -949,6 +954,43 @@ impl<E: EthSpec> PeerManager<E> {
}
}

/// Run discovery query for additional custody peers if we fall below `TARGET_PEERS`.
fn maintain_custody_peers(&mut self) {
let subnets_to_discover: Vec<SubnetDiscovery> = self
.network_globals
.sampling_subnets()
.iter()
.filter_map(|custody_subnet| {
if self
.network_globals
.peers
.read()
.has_good_peers_in_custody_subnet(
custody_subnet,
MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize,
)
{
None
} else {
Some(SubnetDiscovery {
subnet: Subnet::DataColumn(*custody_subnet),
min_ttl: None,
})
}
})
.collect();

// request the subnet query from discovery
if !subnets_to_discover.is_empty() {
debug!(
subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>(),
"Making subnet queries for maintaining custody peers"
);
self.events
.push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover));
}
}

fn maintain_trusted_peers(&mut self) {
let trusted_peers = self.trusted_peers.clone();
for trusted_peer in trusted_peers {
Expand Down Expand Up @@ -1091,14 +1133,17 @@ impl<E: EthSpec> PeerManager<E> {
// uniformly distributed, remove random peers.
if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) {
// Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)>
let mut subnet_to_peer: HashMap<Subnet, Vec<(PeerId, PeerInfo<E>)>> = HashMap::new();
let mut att_subnet_to_peer: HashMap<SubnetId, Vec<(PeerId, PeerInfo<E>)>> =
HashMap::new();
// These variables are used to track if a peer is in a long-lived sync-committee as we
// may wish to retain this peer over others when pruning.
let mut sync_committee_peer_count: HashMap<SyncSubnetId, u64> = HashMap::new();
let mut peer_to_sync_committee: HashMap<
PeerId,
std::collections::HashSet<SyncSubnetId>,
> = HashMap::new();
let mut peer_to_sync_committee: HashMap<PeerId, HashSet<SyncSubnetId>> = HashMap::new();

let mut custody_subnet_peer_count: HashMap<DataColumnSubnetId, u64> = HashMap::new();
let mut peer_to_custody_subnet: HashMap<PeerId, HashSet<DataColumnSubnetId>> =
HashMap::new();
let sampling_subnets = self.network_globals.sampling_subnets();

for (peer_id, info) in self.network_globals.peers.read().connected_peers() {
// Ignore peers we trust or that we are already pruning
Expand All @@ -1112,9 +1157,9 @@ impl<E: EthSpec> PeerManager<E> {
// the dense sync committees.
for subnet in info.long_lived_subnets() {
match subnet {
Subnet::Attestation(_) => {
subnet_to_peer
.entry(subnet)
Subnet::Attestation(subnet_id) => {
att_subnet_to_peer
.entry(subnet_id)
.or_default()
.push((*peer_id, info.clone()));
}
Expand All @@ -1125,26 +1170,31 @@ impl<E: EthSpec> PeerManager<E> {
.or_default()
.insert(id);
}
// TODO(das) to be implemented. We're not pruning data column peers yet
// because data column topics are subscribed as core topics until we
// implement recomputing data column subnets.
Subnet::DataColumn(_) => {}
Subnet::DataColumn(id) => {
*custody_subnet_peer_count.entry(id).or_default() += 1;
peer_to_custody_subnet
.entry(*peer_id)
.or_default()
.insert(id);
}
}
}
}

// Add to the peers to prune mapping
while peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) {
if let Some((_, peers_on_subnet)) = subnet_to_peer
if let Some((_, peers_on_subnet)) = att_subnet_to_peer
.iter_mut()
.max_by_key(|(_, peers)| peers.len())
{
// and the subnet still contains peers
if !peers_on_subnet.is_empty() {
// Order the peers by the number of subnets they are long-lived
// subscribed too, shuffle equal peers.
// subscribed too, shuffle equal peers. Prioritize unsynced peers for pruning.
peers_on_subnet.shuffle(&mut rand::rng());
peers_on_subnet.sort_by_key(|(_, info)| info.long_lived_subnet_count());
peers_on_subnet.sort_by_key(|(_, info)| {
(info.long_lived_attnet_count(), info.is_synced_or_advanced())
});

// Try and find a candidate peer to remove from the subnet.
// We ignore peers that would put us below our target outbound peers
Expand Down Expand Up @@ -1187,6 +1237,32 @@ impl<E: EthSpec> PeerManager<E> {
}
}

// Ensure custody subnet peers are protected based on subnet type and peer count.
if let Some(subnets) = peer_to_custody_subnet.get(candidate_peer) {
let mut should_protect = false;
for subnet_id in subnets {
if let Some(subnet_count) =
custody_subnet_peer_count.get(subnet_id).copied()
{
let threshold = if sampling_subnets.contains(subnet_id) {
MIN_SAMPLING_COLUMN_SUBNET_PEERS
} else {
MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS
};

if subnet_count <= threshold {
should_protect = true;
break;
}
}
}

if should_protect {
// Do not drop this peer in this pruning interval
continue;
}
}

if info.is_outbound_only() {
outbound_peers_pruned += 1;
}
Expand All @@ -1202,7 +1278,7 @@ impl<E: EthSpec> PeerManager<E> {
if let Some(index) = removed_peer_index {
let (candidate_peer, _) = peers_on_subnet.remove(index);
// Remove pruned peers from other subnet counts
for subnet_peers in subnet_to_peer.values_mut() {
for subnet_peers in att_subnet_to_peer.values_mut() {
subnet_peers.retain(|(peer_id, _)| peer_id != &candidate_peer);
}
// Remove pruned peers from all sync-committee counts
Expand All @@ -1218,6 +1294,19 @@ impl<E: EthSpec> PeerManager<E> {
}
}
}
// Remove pruned peers from all custody subnet counts
if let Some(known_custody_subnets) =
peer_to_custody_subnet.get(&candidate_peer)
{
for custody_subnet in known_custody_subnets {
if let Some(custody_subnet_count) =
custody_subnet_peer_count.get_mut(custody_subnet)
{
*custody_subnet_count =
custody_subnet_count.saturating_sub(1);
}
}
}
peers_to_prune.insert(candidate_peer);
} else {
peers_on_subnet.clear();
Expand Down Expand Up @@ -1271,6 +1360,9 @@ impl<E: EthSpec> PeerManager<E> {
// Update peer score metrics;
self.update_peer_score_metrics();

// Maintain minimum count for custody peers.
self.maintain_custody_peers();

// Maintain minimum count for sync committee peers.
self.maintain_sync_committee_peers();

Expand Down Expand Up @@ -2153,6 +2245,83 @@ mod tests {
assert!(!connected_peers.contains(&peers[5]));
}

/// Test that custody subnet peers below threshold are protected from pruning.
/// Creates 3 peers: 2 on sampling subnet (below MIN_SAMPLING_COLUMN_SUBNET_PEERS=3),
/// 1 with no subnet. Should prune the peer with no subnet and keep the custody subnet peers.
#[tokio::test]
async fn test_peer_manager_protect_custody_subnet_peers_below_threshold() {
let target = 2;
let mut peer_manager = build_peer_manager(target).await;

// Set up sampling subnets
let mut sampling_subnets = HashSet::new();
sampling_subnets.insert(0.into());
*peer_manager.network_globals.sampling_subnets.write() = sampling_subnets;

let mut peers = Vec::new();

// Create 3 peers
for i in 0..3 {
let peer_id = PeerId::random();
peer_manager.inject_connect_ingoing(&peer_id, "/ip4/0.0.0.0".parse().unwrap(), None);

let custody_subnets = if i < 2 {
// First 2 peers on sampling subnet 0
[0.into()].into_iter().collect()
} else {
// Last peer has no custody subnets
HashSet::new()
};

// Set custody subnets for the peer
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&peer_id)
.unwrap()
.set_custody_subnets(custody_subnets.clone());

// Add subscriptions for custody subnets
for subnet_id in custody_subnets {
peer_manager
.network_globals
.peers
.write()
.add_subscription(&peer_id, Subnet::DataColumn(subnet_id));
}

peers.push(peer_id);
}

// Verify initial setup
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);

// Perform the heartbeat to trigger pruning
peer_manager.heartbeat();

// Should prune down to target of 2 peers
assert_eq!(
peer_manager.network_globals.connected_or_dialing_peers(),
target
);

let connected_peers: std::collections::HashSet<_> = peer_manager
.network_globals
.peers
.read()
.connected_or_dialing_peers()
.cloned()
.collect();

// The 2 custody subnet peers should be protected
assert!(connected_peers.contains(&peers[0]));
assert!(connected_peers.contains(&peers[1]));

// The peer with no custody subnets should be pruned
assert!(!connected_peers.contains(&peers[2]));
}

/// Test the pruning logic to prioritise peers with the most subnets, but not at the expense of
/// removing our few sync-committee subnets.
///
Expand Down Expand Up @@ -2265,7 +2434,7 @@ mod tests {
/// This test is for reproducing the issue:
/// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659
///
/// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't
/// Whether the issue happens depends on `att_subnet_to_peer` (HashMap), since HashMap doesn't
/// guarantee a particular order of iteration. So we repeat the test case to try to reproduce
/// the issue.
#[tokio::test]
Expand Down
Loading
Loading