Skip to content

Commit e0e6037

Browse files
authored
fix(gossisub): prevent mesh exceeding mesh_n_high
Split off from #6183, to quote: >I noticed two issues during testing. > >We allow our mesh to grow greater than mesh_n_high, intentionally >This looks like its intentional but I can't recall why we would have added it. I think its counter-intuitive to allow our mesh to grow larger than the specified parameter. I suspect we added it to prevent our mesh from being filled with inbound peers and potentially being eclipsed. I suspect the best approach here is to remove inbound peers in the mesh maintenance rather than exceeding the mesh_n_high configuration. Pull-Request: #6184.
1 parent dd3fb5d commit e0e6037

File tree

3 files changed

+85
-105
lines changed

3 files changed

+85
-105
lines changed

protocols/gossipsub/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
## 0.50.0
2+
- Prevent mesh exceeding mesh_n_high.
3+
See [PR 6184](https://github.com/libp2p/rust-libp2p/pull/6184)
4+
25
- Fix underflow when shuffling peers after prunning.
36
See [PR 6183](https://github.com/libp2p/rust-libp2p/pull/6183)
47

protocols/gossipsub/src/behaviour.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,8 +1366,6 @@ where
13661366
tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
13671367
return;
13681368
};
1369-
// Needs to be here to comply with the borrow checker.
1370-
let is_outbound = connected_peer.outbound;
13711369

13721370
// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
13731371
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
@@ -1419,8 +1417,6 @@ where
14191417
peer_score.add_penalty(peer_id, 1);
14201418

14211419
// check the flood cutoff
1422-
// See: https://github.com/rust-lang/rust-clippy/issues/10061
1423-
#[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
14241420
let flood_cutoff = (backoff_time
14251421
+ self.config.graft_flood_threshold())
14261422
- self.config.prune_backoff();
@@ -1455,10 +1451,9 @@ where
14551451
}
14561452

14571453
// check mesh upper bound and only allow graft if the upper bound is not reached
1458-
// or if it is an outbound peer
14591454
let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
14601455

1461-
if peers.len() >= mesh_n_high && !is_outbound {
1456+
if peers.len() >= mesh_n_high {
14621457
to_prune_topics.insert(topic_hash.clone());
14631458
continue;
14641459
}
@@ -2208,7 +2203,7 @@ where
22082203
}
22092204

22102205
// too many peers - remove some
2211-
if peers.len() > mesh_n_high {
2206+
if peers.len() >= mesh_n_high {
22122207
tracing::debug!(
22132208
topic=%topic_hash,
22142209
"HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",

protocols/gossipsub/src/behaviour/tests.rs

Lines changed: 80 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -2340,59 +2340,23 @@ fn test_gossip_to_at_most_gossip_factor_peers() {
23402340
);
23412341
}
23422342

2343-
#[test]
2344-
fn test_accept_only_outbound_peer_grafts_when_mesh_full() {
2345-
let config: Config = Config::default();
2346-
2347-
// enough peers to fill the mesh
2348-
let (mut gs, peers, _, topics) = inject_nodes1()
2349-
.peer_no(config.mesh_n_high())
2350-
.topics(vec!["test".into()])
2351-
.to_subscribe(true)
2352-
.create_network();
2353-
2354-
// graft all the peers => this will fill the mesh
2355-
for peer in peers {
2356-
gs.handle_graft(&peer, topics.clone());
2357-
}
2358-
2359-
// assert current mesh size
2360-
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high());
2361-
2362-
// create an outbound and an inbound peer
2363-
let (inbound, _in_queue) = add_peer(&mut gs, &topics, false, false);
2364-
let (outbound, _out_queue) = add_peer(&mut gs, &topics, true, false);
2365-
2366-
// send grafts
2367-
gs.handle_graft(&inbound, vec![topics[0].clone()]);
2368-
gs.handle_graft(&outbound, vec![topics[0].clone()]);
2369-
2370-
// assert mesh size
2371-
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high() + 1);
2372-
2373-
// inbound is not in mesh
2374-
assert!(!gs.mesh[&topics[0]].contains(&inbound));
2375-
2376-
// outbound is in mesh
2377-
assert!(gs.mesh[&topics[0]].contains(&outbound));
2378-
}
2379-
23802343
#[test]
23812344
fn test_do_not_remove_too_many_outbound_peers() {
23822345
// use an extreme case to catch errors with high probability
2383-
let m = 50;
2384-
let n = 2 * m;
2346+
let mesh_n = 50;
2347+
let mesh_n_high = 2 * mesh_n;
23852348
let config = ConfigBuilder::default()
2386-
.mesh_n_high(n)
2387-
.mesh_n(n)
2388-
.mesh_n_low(n)
2389-
.mesh_outbound_min(m)
2349+
.mesh_n_high(mesh_n_high)
2350+
.mesh_n(mesh_n)
2351+
// Irrelevant for this test.
2352+
.mesh_n_low(mesh_n)
2353+
.mesh_outbound_min(mesh_n)
23902354
.build()
23912355
.unwrap();
23922356

23932357
// fill the mesh with inbound connections
23942358
let (mut gs, peers, _queues, topics) = inject_nodes1()
2395-
.peer_no(n)
2359+
.peer_no(mesh_n)
23962360
.topics(vec!["test".into()])
23972361
.to_subscribe(true)
23982362
.gs_config(config)
@@ -2405,60 +2369,26 @@ fn test_do_not_remove_too_many_outbound_peers() {
24052369

24062370
// create m outbound connections and graft (we will accept the graft)
24072371
let mut outbound = HashSet::new();
2408-
for _ in 0..m {
2372+
// Go from 50 (mesh_n) to 100 (mesh_n_high) to trigger prunning.
2373+
for _ in 0..mesh_n {
24092374
let (peer, _) = add_peer(&mut gs, &topics, true, false);
24102375
outbound.insert(peer);
24112376
gs.handle_graft(&peer, topics.clone());
24122377
}
24132378

24142379
// mesh is overly full
2415-
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n + m);
2380+
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n_high);
24162381

24172382
// run a heartbeat
24182383
gs.heartbeat();
24192384

2420-
// Peers should be removed to reach n
2421-
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n);
2385+
// Peers should be removed to reach `mesh_n`
2386+
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n);
24222387

24232388
// all outbound peers are still in the mesh
24242389
assert!(outbound.iter().all(|p| gs.mesh[&topics[0]].contains(p)));
24252390
}
24262391

2427-
#[test]
2428-
fn test_add_outbound_peers_if_min_is_not_satisfied() {
2429-
let config: Config = Config::default();
2430-
2431-
// Fill full mesh with inbound peers
2432-
let (mut gs, peers, _, topics) = inject_nodes1()
2433-
.peer_no(config.mesh_n_high())
2434-
.topics(vec!["test".into()])
2435-
.to_subscribe(true)
2436-
.create_network();
2437-
2438-
// graft all the peers
2439-
for peer in peers {
2440-
gs.handle_graft(&peer, topics.clone());
2441-
}
2442-
2443-
// create config.mesh_outbound_min() many outbound connections without grafting
2444-
let mut peers = vec![];
2445-
for _ in 0..config.mesh_outbound_min() {
2446-
peers.push(add_peer(&mut gs, &topics, true, false));
2447-
}
2448-
2449-
// Nothing changed in the mesh yet
2450-
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high());
2451-
2452-
// run a heartbeat
2453-
gs.heartbeat();
2454-
2455-
// The outbound peers got additionally added
2456-
assert_eq!(
2457-
gs.mesh[&topics[0]].len(),
2458-
config.mesh_n_high() + config.mesh_outbound_min()
2459-
);
2460-
}
2461-
24622392
#[test]
24632393
fn test_prune_negative_scored_peers() {
24642394
let config = Config::default();
@@ -3205,22 +3135,20 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
32053135
.build()
32063136
.unwrap();
32073137

3208-
// build mesh with more peers than mesh can hold
3209-
let n = config.mesh_n_high() + 1;
3138+
let mesh_n_high = config.mesh_n_high();
3139+
32103140
let (mut gs, peers, _queues, topics) = inject_nodes1()
3211-
.peer_no(n)
3141+
.peer_no(mesh_n_high)
32123142
.topics(vec!["test".into()])
32133143
.to_subscribe(true)
32143144
.gs_config(config.clone())
32153145
.explicit(0)
3216-
.outbound(n)
32173146
.scoring(Some((
32183147
PeerScoreParams::default(),
32193148
PeerScoreThresholds::default(),
32203149
)))
32213150
.create_network();
32223151

3223-
// graft all, will be accepted since the are outbound
32243152
for peer in &peers {
32253153
gs.handle_graft(peer, topics.clone());
32263154
}
@@ -3232,7 +3160,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
32323160
gs.set_application_score(peer, index as f64);
32333161
}
32343162

3235-
assert_eq!(gs.mesh[&topics[0]].len(), n);
3163+
assert_eq!(gs.mesh[&topics[0]].len(), mesh_n_high);
32363164

32373165
// heartbeat to prune some peers
32383166
gs.heartbeat();
@@ -3241,7 +3169,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
32413169

32423170
// mesh contains retain_scores best peers
32433171
assert!(gs.mesh[&topics[0]].is_superset(
3244-
&peers[(n - config.retain_scores())..]
3172+
&peers[(mesh_n_high - config.retain_scores())..]
32453173
.iter()
32463174
.cloned()
32473175
.collect()
@@ -6118,10 +6046,13 @@ fn test_mesh_subtraction_with_topic_config() {
61186046
let topic = String::from("topic1");
61196047
let topic_hash = TopicHash::from_raw(topic.clone());
61206048

6049+
let mesh_n = 5;
6050+
let mesh_n_high = 7;
6051+
61216052
let topic_config = TopicMeshConfig {
6122-
mesh_n: 5,
6053+
mesh_n,
6054+
mesh_n_high,
61236055
mesh_n_low: 3,
6124-
mesh_n_high: 7,
61256056
mesh_outbound_min: 2,
61266057
};
61276058

@@ -6130,15 +6061,12 @@ fn test_mesh_subtraction_with_topic_config() {
61306061
.build()
61316062
.unwrap();
61326063

6133-
let peer_no = 12;
6134-
6135-
// make all outbound connections so grafting to all will be allowed
61366064
let (mut gs, peers, _, topics) = inject_nodes1()
6137-
.peer_no(peer_no)
6065+
.peer_no(mesh_n_high)
61386066
.topics(vec![topic])
61396067
.to_subscribe(true)
61406068
.gs_config(config.clone())
6141-
.outbound(peer_no)
6069+
.outbound(mesh_n_high)
61426070
.create_network();
61436071

61446072
// graft all peers
@@ -6148,7 +6076,7 @@ fn test_mesh_subtraction_with_topic_config() {
61486076

61496077
assert_eq!(
61506078
gs.mesh.get(&topics[0]).unwrap().len(),
6151-
peer_no,
6079+
mesh_n_high,
61526080
"Initially all peers should be in the mesh"
61536081
);
61546082

@@ -6163,6 +6091,60 @@ fn test_mesh_subtraction_with_topic_config() {
61636091
);
61646092
}
61656093

6094+
/// Tests that if a mesh reaches `mesh_n_high`,
6095+
/// but is only composed of outbound peers, it is not reduced to `mesh_n`.
6096+
#[test]
6097+
fn test_mesh_subtraction_with_topic_config_min_outbound() {
6098+
let topic = String::from("topic1");
6099+
let topic_hash = TopicHash::from_raw(topic.clone());
6100+
6101+
let mesh_n = 5;
6102+
let mesh_n_high = 7;
6103+
6104+
let topic_config = TopicMeshConfig {
6105+
mesh_n,
6106+
mesh_n_high,
6107+
mesh_n_low: 3,
6108+
mesh_outbound_min: 7,
6109+
};
6110+
6111+
let config = ConfigBuilder::default()
6112+
.set_topic_config(topic_hash.clone(), topic_config)
6113+
.build()
6114+
.unwrap();
6115+
6116+
let peer_no = 12;
6117+
6118+
// make all outbound connections.
6119+
let (mut gs, peers, _, topics) = inject_nodes1()
6120+
.peer_no(peer_no)
6121+
.topics(vec![topic])
6122+
.to_subscribe(true)
6123+
.gs_config(config.clone())
6124+
.outbound(peer_no)
6125+
.create_network();
6126+
6127+
// graft all peers
6128+
for peer in peers {
6129+
gs.handle_graft(&peer, topics.clone());
6130+
}
6131+
6132+
assert_eq!(
6133+
gs.mesh.get(&topics[0]).unwrap().len(),
6134+
mesh_n_high,
6135+
"Initially mesh should be {mesh_n_high}"
6136+
);
6137+
6138+
// run a heartbeat
6139+
gs.heartbeat();
6140+
6141+
assert_eq!(
6142+
gs.mesh.get(&topics[0]).unwrap().len(),
6143+
mesh_n_high,
6144+
"After heartbeat, mesh should still be {mesh_n_high} as these are all outbound peers"
6145+
);
6146+
}
6147+
61666148
/// Test behavior with multiple topics having different configs
61676149
#[test]
61686150
fn test_multiple_topics_with_different_configs() {

0 commit comments

Comments
 (0)