@@ -17,7 +17,7 @@ use std::{
17
17
time:: { Duration , Instant } ,
18
18
} ;
19
19
use tracing:: { debug, error, trace, warn} ;
20
- use types:: { DataColumnSubnetId , EthSpec , SyncSubnetId } ;
20
+ use types:: { DataColumnSubnetId , EthSpec , SubnetId , SyncSubnetId } ;
21
21
22
22
pub use libp2p:: core:: Multiaddr ;
23
23
pub use libp2p:: identity:: Keypair ;
@@ -52,6 +52,11 @@ pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600);
52
52
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
53
53
/// peers.
54
54
pub const MIN_SYNC_COMMITTEE_PEERS : u64 = 2 ;
55
+ /// Avoid pruning sampling peers if subnet peer count is <= TARGET_SUBNET_PEERS.
56
+ pub const MIN_SAMPLING_COLUMN_SUBNET_PEERS : u64 = TARGET_SUBNET_PEERS as u64 ;
57
+ /// For non sampling columns, we need to ensure there is at least one peer for
58
+ /// publishing during proposals.
59
+ pub const MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS : u64 = 1 ;
55
60
/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of
56
61
/// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and
57
62
/// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55.
@@ -729,7 +734,7 @@ impl<E: EthSpec> PeerManager<E> {
729
734
}
730
735
} else {
731
736
// we have no meta-data for this peer, update
732
- debug ! ( %peer_id, new_seq_no = meta_data. seq_number( ) , "Obtained peer's metadata" ) ;
737
+ debug ! ( %peer_id, new_seq_no = meta_data. seq_number( ) , cgc=?meta_data . custody_group_count ( ) . ok ( ) , "Obtained peer's metadata" ) ;
733
738
}
734
739
735
740
let known_custody_group_count = peer_info
@@ -949,6 +954,43 @@ impl<E: EthSpec> PeerManager<E> {
949
954
}
950
955
}
951
956
957
+ /// Run discovery query for additional custody peers if we fall below `TARGET_PEERS`.
958
+ fn maintain_custody_peers ( & mut self ) {
959
+ let subnets_to_discover: Vec < SubnetDiscovery > = self
960
+ . network_globals
961
+ . sampling_subnets ( )
962
+ . iter ( )
963
+ . filter_map ( |custody_subnet| {
964
+ if self
965
+ . network_globals
966
+ . peers
967
+ . read ( )
968
+ . has_good_peers_in_custody_subnet (
969
+ custody_subnet,
970
+ MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize ,
971
+ )
972
+ {
973
+ None
974
+ } else {
975
+ Some ( SubnetDiscovery {
976
+ subnet : Subnet :: DataColumn ( * custody_subnet) ,
977
+ min_ttl : None ,
978
+ } )
979
+ }
980
+ } )
981
+ . collect ( ) ;
982
+
983
+ // request the subnet query from discovery
984
+ if !subnets_to_discover. is_empty ( ) {
985
+ debug ! (
986
+ subnets = ?subnets_to_discover. iter( ) . map( |s| s. subnet) . collect:: <Vec <_>>( ) ,
987
+ "Making subnet queries for maintaining custody peers"
988
+ ) ;
989
+ self . events
990
+ . push ( PeerManagerEvent :: DiscoverSubnetPeers ( subnets_to_discover) ) ;
991
+ }
992
+ }
993
+
952
994
fn maintain_trusted_peers ( & mut self ) {
953
995
let trusted_peers = self . trusted_peers . clone ( ) ;
954
996
for trusted_peer in trusted_peers {
@@ -1091,14 +1133,17 @@ impl<E: EthSpec> PeerManager<E> {
1091
1133
// uniformly distributed, remove random peers.
1092
1134
if peers_to_prune. len ( ) < connected_peer_count. saturating_sub ( self . target_peers ) {
1093
1135
// Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)>
1094
- let mut subnet_to_peer: HashMap < Subnet , Vec < ( PeerId , PeerInfo < E > ) > > = HashMap :: new ( ) ;
1136
+ let mut att_subnet_to_peer: HashMap < SubnetId , Vec < ( PeerId , PeerInfo < E > ) > > =
1137
+ HashMap :: new ( ) ;
1095
1138
// These variables are used to track if a peer is in a long-lived sync-committee as we
1096
1139
// may wish to retain this peer over others when pruning.
1097
1140
let mut sync_committee_peer_count: HashMap < SyncSubnetId , u64 > = HashMap :: new ( ) ;
1098
- let mut peer_to_sync_committee: HashMap <
1099
- PeerId ,
1100
- std:: collections:: HashSet < SyncSubnetId > ,
1101
- > = HashMap :: new ( ) ;
1141
+ let mut peer_to_sync_committee: HashMap < PeerId , HashSet < SyncSubnetId > > = HashMap :: new ( ) ;
1142
+
1143
+ let mut custody_subnet_peer_count: HashMap < DataColumnSubnetId , u64 > = HashMap :: new ( ) ;
1144
+ let mut peer_to_custody_subnet: HashMap < PeerId , HashSet < DataColumnSubnetId > > =
1145
+ HashMap :: new ( ) ;
1146
+ let sampling_subnets = self . network_globals . sampling_subnets ( ) ;
1102
1147
1103
1148
for ( peer_id, info) in self . network_globals . peers . read ( ) . connected_peers ( ) {
1104
1149
// Ignore peers we trust or that we are already pruning
@@ -1112,9 +1157,9 @@ impl<E: EthSpec> PeerManager<E> {
1112
1157
// the dense sync committees.
1113
1158
for subnet in info. long_lived_subnets ( ) {
1114
1159
match subnet {
1115
- Subnet :: Attestation ( _ ) => {
1116
- subnet_to_peer
1117
- . entry ( subnet )
1160
+ Subnet :: Attestation ( subnet_id ) => {
1161
+ att_subnet_to_peer
1162
+ . entry ( subnet_id )
1118
1163
. or_default ( )
1119
1164
. push ( ( * peer_id, info. clone ( ) ) ) ;
1120
1165
}
@@ -1125,26 +1170,31 @@ impl<E: EthSpec> PeerManager<E> {
1125
1170
. or_default ( )
1126
1171
. insert ( id) ;
1127
1172
}
1128
- // TODO(das) to be implemented. We're not pruning data column peers yet
1129
- // because data column topics are subscribed as core topics until we
1130
- // implement recomputing data column subnets.
1131
- Subnet :: DataColumn ( _) => { }
1173
+ Subnet :: DataColumn ( id) => {
1174
+ * custody_subnet_peer_count. entry ( id) . or_default ( ) += 1 ;
1175
+ peer_to_custody_subnet
1176
+ . entry ( * peer_id)
1177
+ . or_default ( )
1178
+ . insert ( id) ;
1179
+ }
1132
1180
}
1133
1181
}
1134
1182
}
1135
1183
1136
1184
// Add to the peers to prune mapping
1137
1185
while peers_to_prune. len ( ) < connected_peer_count. saturating_sub ( self . target_peers ) {
1138
- if let Some ( ( _, peers_on_subnet) ) = subnet_to_peer
1186
+ if let Some ( ( _, peers_on_subnet) ) = att_subnet_to_peer
1139
1187
. iter_mut ( )
1140
1188
. max_by_key ( |( _, peers) | peers. len ( ) )
1141
1189
{
1142
1190
// and the subnet still contains peers
1143
1191
if !peers_on_subnet. is_empty ( ) {
1144
1192
// Order the peers by the number of subnets they are long-lived
1145
- // subscribed too, shuffle equal peers.
1193
+ // subscribed too, shuffle equal peers. Prioritize unsynced peers for pruning.
1146
1194
peers_on_subnet. shuffle ( & mut rand:: rng ( ) ) ;
1147
- peers_on_subnet. sort_by_key ( |( _, info) | info. long_lived_subnet_count ( ) ) ;
1195
+ peers_on_subnet. sort_by_key ( |( _, info) | {
1196
+ ( info. long_lived_attnet_count ( ) , info. is_synced_or_advanced ( ) )
1197
+ } ) ;
1148
1198
1149
1199
// Try and find a candidate peer to remove from the subnet.
1150
1200
// We ignore peers that would put us below our target outbound peers
@@ -1187,6 +1237,32 @@ impl<E: EthSpec> PeerManager<E> {
1187
1237
}
1188
1238
}
1189
1239
1240
+ // Ensure custody subnet peers are protected based on subnet type and peer count.
1241
+ if let Some ( subnets) = peer_to_custody_subnet. get ( candidate_peer) {
1242
+ let mut should_protect = false ;
1243
+ for subnet_id in subnets {
1244
+ if let Some ( subnet_count) =
1245
+ custody_subnet_peer_count. get ( subnet_id) . copied ( )
1246
+ {
1247
+ let threshold = if sampling_subnets. contains ( subnet_id) {
1248
+ MIN_SAMPLING_COLUMN_SUBNET_PEERS
1249
+ } else {
1250
+ MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS
1251
+ } ;
1252
+
1253
+ if subnet_count <= threshold {
1254
+ should_protect = true ;
1255
+ break ;
1256
+ }
1257
+ }
1258
+ }
1259
+
1260
+ if should_protect {
1261
+ // Do not drop this peer in this pruning interval
1262
+ continue ;
1263
+ }
1264
+ }
1265
+
1190
1266
if info. is_outbound_only ( ) {
1191
1267
outbound_peers_pruned += 1 ;
1192
1268
}
@@ -1202,7 +1278,7 @@ impl<E: EthSpec> PeerManager<E> {
1202
1278
if let Some ( index) = removed_peer_index {
1203
1279
let ( candidate_peer, _) = peers_on_subnet. remove ( index) ;
1204
1280
// Remove pruned peers from other subnet counts
1205
- for subnet_peers in subnet_to_peer . values_mut ( ) {
1281
+ for subnet_peers in att_subnet_to_peer . values_mut ( ) {
1206
1282
subnet_peers. retain ( |( peer_id, _) | peer_id != & candidate_peer) ;
1207
1283
}
1208
1284
// Remove pruned peers from all sync-committee counts
@@ -1218,6 +1294,19 @@ impl<E: EthSpec> PeerManager<E> {
1218
1294
}
1219
1295
}
1220
1296
}
1297
+ // Remove pruned peers from all custody subnet counts
1298
+ if let Some ( known_custody_subnets) =
1299
+ peer_to_custody_subnet. get ( & candidate_peer)
1300
+ {
1301
+ for custody_subnet in known_custody_subnets {
1302
+ if let Some ( custody_subnet_count) =
1303
+ custody_subnet_peer_count. get_mut ( custody_subnet)
1304
+ {
1305
+ * custody_subnet_count =
1306
+ custody_subnet_count. saturating_sub ( 1 ) ;
1307
+ }
1308
+ }
1309
+ }
1221
1310
peers_to_prune. insert ( candidate_peer) ;
1222
1311
} else {
1223
1312
peers_on_subnet. clear ( ) ;
@@ -1271,6 +1360,9 @@ impl<E: EthSpec> PeerManager<E> {
1271
1360
// Update peer score metrics;
1272
1361
self . update_peer_score_metrics ( ) ;
1273
1362
1363
+ // Maintain minimum count for custody peers.
1364
+ self . maintain_custody_peers ( ) ;
1365
+
1274
1366
// Maintain minimum count for sync committee peers.
1275
1367
self . maintain_sync_committee_peers ( ) ;
1276
1368
@@ -2153,6 +2245,83 @@ mod tests {
2153
2245
assert ! ( !connected_peers. contains( & peers[ 5 ] ) ) ;
2154
2246
}
2155
2247
2248
+ /// Test that custody subnet peers below threshold are protected from pruning.
2249
+ /// Creates 3 peers: 2 on sampling subnet (below MIN_SAMPLING_COLUMN_SUBNET_PEERS=3),
2250
+ /// 1 with no subnet. Should prune the peer with no subnet and keep the custody subnet peers.
2251
+ #[ tokio:: test]
2252
+ async fn test_peer_manager_protect_custody_subnet_peers_below_threshold ( ) {
2253
+ let target = 2 ;
2254
+ let mut peer_manager = build_peer_manager ( target) . await ;
2255
+
2256
+ // Set up sampling subnets
2257
+ let mut sampling_subnets = HashSet :: new ( ) ;
2258
+ sampling_subnets. insert ( 0 . into ( ) ) ;
2259
+ * peer_manager. network_globals . sampling_subnets . write ( ) = sampling_subnets;
2260
+
2261
+ let mut peers = Vec :: new ( ) ;
2262
+
2263
+ // Create 3 peers
2264
+ for i in 0 ..3 {
2265
+ let peer_id = PeerId :: random ( ) ;
2266
+ peer_manager. inject_connect_ingoing ( & peer_id, "/ip4/0.0.0.0" . parse ( ) . unwrap ( ) , None ) ;
2267
+
2268
+ let custody_subnets = if i < 2 {
2269
+ // First 2 peers on sampling subnet 0
2270
+ [ 0 . into ( ) ] . into_iter ( ) . collect ( )
2271
+ } else {
2272
+ // Last peer has no custody subnets
2273
+ HashSet :: new ( )
2274
+ } ;
2275
+
2276
+ // Set custody subnets for the peer
2277
+ peer_manager
2278
+ . network_globals
2279
+ . peers
2280
+ . write ( )
2281
+ . peer_info_mut ( & peer_id)
2282
+ . unwrap ( )
2283
+ . set_custody_subnets ( custody_subnets. clone ( ) ) ;
2284
+
2285
+ // Add subscriptions for custody subnets
2286
+ for subnet_id in custody_subnets {
2287
+ peer_manager
2288
+ . network_globals
2289
+ . peers
2290
+ . write ( )
2291
+ . add_subscription ( & peer_id, Subnet :: DataColumn ( subnet_id) ) ;
2292
+ }
2293
+
2294
+ peers. push ( peer_id) ;
2295
+ }
2296
+
2297
+ // Verify initial setup
2298
+ assert_eq ! ( peer_manager. network_globals. connected_or_dialing_peers( ) , 3 ) ;
2299
+
2300
+ // Perform the heartbeat to trigger pruning
2301
+ peer_manager. heartbeat ( ) ;
2302
+
2303
+ // Should prune down to target of 2 peers
2304
+ assert_eq ! (
2305
+ peer_manager. network_globals. connected_or_dialing_peers( ) ,
2306
+ target
2307
+ ) ;
2308
+
2309
+ let connected_peers: std:: collections:: HashSet < _ > = peer_manager
2310
+ . network_globals
2311
+ . peers
2312
+ . read ( )
2313
+ . connected_or_dialing_peers ( )
2314
+ . cloned ( )
2315
+ . collect ( ) ;
2316
+
2317
+ // The 2 custody subnet peers should be protected
2318
+ assert ! ( connected_peers. contains( & peers[ 0 ] ) ) ;
2319
+ assert ! ( connected_peers. contains( & peers[ 1 ] ) ) ;
2320
+
2321
+ // The peer with no custody subnets should be pruned
2322
+ assert ! ( !connected_peers. contains( & peers[ 2 ] ) ) ;
2323
+ }
2324
+
2156
2325
/// Test the pruning logic to prioritise peers with the most subnets, but not at the expense of
2157
2326
/// removing our few sync-committee subnets.
2158
2327
///
@@ -2265,7 +2434,7 @@ mod tests {
2265
2434
/// This test is for reproducing the issue:
2266
2435
/// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659
2267
2436
///
2268
- /// Whether the issue happens depends on `subnet_to_peer ` (HashMap), since HashMap doesn't
2437
+ /// Whether the issue happens depends on `att_subnet_to_peer ` (HashMap), since HashMap doesn't
2269
2438
/// guarantee a particular order of iteration. So we repeat the test case to try to reproduce
2270
2439
/// the issue.
2271
2440
#[ tokio:: test]
0 commit comments