Skip to content

Commit cc9c5f5

Browse files
authored
feat(portalnet): handle content that doesn't depend on distance to content (#1859)
1 parent c1d5492 commit cc9c5f5

File tree

7 files changed

+612
-314
lines changed

7 files changed

+612
-314
lines changed

crates/portalnet/src/gossip/mod.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use std::collections::HashMap;
2+
3+
use discv5::Enr;
4+
use ethportal_api::{
5+
types::{distance::Metric, portal::MAX_CONTENT_KEYS_PER_OFFER},
6+
utils::bytes::hex_encode_compact,
7+
OverlayContentKey, RawContentKey, RawContentValue,
8+
};
9+
use itertools::{chain, Itertools};
10+
use tracing::{debug, warn};
11+
12+
use crate::types::kbucket::SharedKBucketsTable;
13+
14+
mod neighborhood;
15+
mod random;
16+
17+
/// Selects peers and content that should be gossiped to them.
18+
///
19+
/// Every peer will have at least 1 and at most `MAX_CONTENT_KEYS_PER_OFFER` content assigned to
20+
/// them. If more than `MAX_CONTENT_KEYS_PER_OFFER` is passed as an argument, it's possible that
21+
/// no peers will be selected to propagate that content (even if they exist). The order of content
22+
/// in the returned collection will be the same as the one that is passed (this might be important
23+
/// for some content types).
24+
///
25+
/// This function is designed such that either all content is affected by radius (in which case we
26+
/// use "neighborhood gossip" strategy) or not (in which case we use "random gossip" strategy).
27+
/// If content contains the mix of the two types, then content is split and each gossip strategy is
28+
/// applied to respective part. This can lead to suboptimal result when we select more peers than
29+
/// it is needed.
30+
pub fn gossip_recipients<TContentKey: OverlayContentKey, TMetric: Metric>(
31+
content: Vec<(TContentKey, RawContentValue)>,
32+
kbuckets: &SharedKBucketsTable,
33+
) -> HashMap<Enr, Vec<(RawContentKey, RawContentValue)>> {
34+
// Precalculate "content_id" and "raw_content_key" and use references going forward
35+
let content = content
36+
.into_iter()
37+
.map(|(key, raw_value)| {
38+
let id = key.content_id();
39+
let raw_key = key.to_bytes();
40+
(id, key, raw_key, raw_value)
41+
})
42+
.collect::<Vec<_>>();
43+
44+
debug!(
45+
ids = ?content.iter().map(|(id, _, _, _)| hex_encode_compact(id)),
46+
"selecing gossip recipients"
47+
);
48+
49+
// Split content id+key depending on whether they are affected by radius
50+
let (content_for_neighborhood_gossip, content_for_random_gossip) = content
51+
.iter()
52+
.map(|(id, key, _raw_key, _raw_value)| (id, key))
53+
.partition::<Vec<_>, _>(|(_content_id, content_key)| content_key.affected_by_radius());
54+
if !content_for_neighborhood_gossip.is_empty() && !content_for_random_gossip.is_empty() {
55+
warn!("Expected to gossip content with both neighborhood_gossip and random_gossip");
56+
}
57+
58+
// Combine results of "neighborhood gossip" and "random gossip".
59+
let peer_to_content_ids = chain!(
60+
neighborhood::gossip_recipients::<_, TMetric>(content_for_neighborhood_gossip, kbuckets),
61+
random::gossip_recipients(content_for_random_gossip, kbuckets)
62+
)
63+
.into_grouping_map()
64+
.reduce(|mut all_content_to_gossip, _enr, content_ids| {
65+
all_content_to_gossip.extend(content_ids);
66+
all_content_to_gossip
67+
});
68+
69+
// Extract raw content key/value in hash map for easier lookup.
70+
let raw_content = content
71+
.iter()
72+
.map(|(id, _key, raw_key, raw_value)| (id, (raw_key, raw_value)))
73+
.collect::<HashMap<_, _>>();
74+
75+
peer_to_content_ids
76+
.into_iter()
77+
.map(|(enr, content_ids)| {
78+
let raw_content_key_value = content_ids
79+
.into_iter()
80+
// Select at most `MAX_CONTENT_KEYS_PER_OFFER`
81+
.take(MAX_CONTENT_KEYS_PER_OFFER)
82+
.map(|content_id| {
83+
let (raw_content_key, raw_content_value) = raw_content[content_id];
84+
(raw_content_key.clone(), raw_content_value.clone())
85+
})
86+
.collect();
87+
(enr, raw_content_key_value)
88+
})
89+
.collect()
90+
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
use std::collections::HashMap;
2+
3+
use alloy::hex::ToHexExt;
4+
use discv5::Enr;
5+
use ethportal_api::{types::distance::Metric, OverlayContentKey};
6+
use rand::{rng, seq::IteratorRandom};
7+
use tracing::{debug, error};
8+
9+
use crate::types::kbucket::SharedKBucketsTable;
10+
11+
pub fn gossip_recipients<'c, TContentKey: OverlayContentKey, TMetric: Metric>(
12+
content: Vec<(&'c [u8; 32], &TContentKey)>,
13+
kbuckets: &SharedKBucketsTable,
14+
) -> HashMap<Enr, Vec<&'c [u8; 32]>> {
15+
if content.is_empty() {
16+
return HashMap::new();
17+
}
18+
19+
let content_ids = content
20+
.iter()
21+
.map(|(content_id, _content_key)| *content_id)
22+
.collect::<Vec<_>>();
23+
24+
// Map from content_ids to interested ENRs
25+
let mut content_id_to_interested_enrs = kbuckets.batch_interested_enrs::<TMetric>(&content_ids);
26+
27+
// Map from ENRs to content they will put content
28+
let mut enrs_and_content: HashMap<Enr, Vec<&'c [u8; 32]>> = HashMap::new();
29+
for (content_id, content_key) in content {
30+
let interested_enrs = content_id_to_interested_enrs.remove(content_id).unwrap_or_else(|| {
31+
error!("interested_enrs should contain all content ids, even if there are no interested ENRs");
32+
vec![]
33+
});
34+
if interested_enrs.is_empty() {
35+
debug!(
36+
content.id = content_id.encode_hex_with_prefix(),
37+
content.key = %content_key.to_bytes(),
38+
"No peers eligible for neighborhood gossip"
39+
);
40+
continue;
41+
};
42+
43+
// Select content recipients
44+
for enr in select_content_recipients::<TMetric>(content_id, interested_enrs) {
45+
enrs_and_content.entry(enr).or_default().push(content_id);
46+
}
47+
}
48+
enrs_and_content
49+
}
50+
51+
const NUM_CLOSEST_PEERS: usize = 4;
52+
const NUM_FARTHER_PEERS: usize = 4;
53+
54+
/// Selects put content recipients from a vec of interested peers.
55+
///
56+
/// If number of peers is at most `NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS`, then all are returned.
57+
/// Otherwise, peers are sorted by distance from `content_id` and then:
58+
///
59+
/// 1. Closest `NUM_CLOSEST_PEERS` ENRs are selected
60+
/// 2. Random `NUM_FARTHER_PEERS` ENRs are selected from the rest
61+
fn select_content_recipients<TMetric: Metric>(
62+
content_id: &[u8; 32],
63+
mut peers: Vec<Enr>,
64+
) -> Vec<Enr> {
65+
// Check if we need to do any selection
66+
if peers.len() <= NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS {
67+
return peers;
68+
}
69+
70+
// Sort peers by distance
71+
peers.sort_by_cached_key(|peer| TMetric::distance(content_id, &peer.node_id().raw()));
72+
73+
// Split of at NUM_CLOSEST_PEERS
74+
let farther_peers = peers.split_off(NUM_CLOSEST_PEERS);
75+
76+
// Select random NUM_FARTHER_PEERS
77+
peers.extend(
78+
farther_peers
79+
.into_iter()
80+
.choose_multiple(&mut rng(), NUM_FARTHER_PEERS),
81+
);
82+
83+
peers
84+
}
85+
86+
#[cfg(test)]
87+
mod tests {
88+
use std::iter;
89+
90+
use discv5::enr::NodeId;
91+
use ethportal_api::{
92+
types::{
93+
distance::{Distance, XorMetric},
94+
enr::generate_random_remote_enr,
95+
},
96+
IdentityContentKey,
97+
};
98+
use rand::random;
99+
use rstest::rstest;
100+
101+
use super::*;
102+
103+
#[test]
104+
fn empty() {
105+
let kbuckets = SharedKBucketsTable::new_for_tests(NodeId::random());
106+
107+
for _ in 0..NUM_CLOSEST_PEERS {
108+
let (_, peer) = generate_random_remote_enr();
109+
let _ = kbuckets.insert_or_update_disconnected(&peer, Distance::MAX);
110+
}
111+
112+
assert!(gossip_recipients::<IdentityContentKey, XorMetric>(vec![], &kbuckets).is_empty());
113+
}
114+
115+
mod select_content_recipients {
116+
use std::ops::RangeBounds;
117+
118+
use itertools::chain;
119+
120+
use super::*;
121+
122+
fn create_peers_with_distance(
123+
count: usize,
124+
content_id: &[u8; 32],
125+
log2_distances: impl RangeBounds<usize>,
126+
) -> Vec<Enr> {
127+
iter::repeat_with(|| generate_random_remote_enr().1)
128+
.filter(|peer| {
129+
log2_distances.contains(
130+
&XorMetric::distance(content_id, &peer.node_id().raw())
131+
.log2()
132+
.unwrap(),
133+
)
134+
})
135+
.take(count)
136+
.collect()
137+
}
138+
139+
#[rstest]
140+
#[case(0, 0)]
141+
#[case(NUM_CLOSEST_PEERS - 1, NUM_CLOSEST_PEERS - 1)]
142+
#[case(NUM_CLOSEST_PEERS, NUM_CLOSEST_PEERS)]
143+
#[case(NUM_CLOSEST_PEERS + 1, NUM_CLOSEST_PEERS + 1)]
144+
#[case(NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)]
145+
#[case(NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS + 1, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)]
146+
#[case(256, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)]
147+
fn count(#[case] peers_count: usize, #[case] expected_content_recipients_count: usize) {
148+
let content_id = random();
149+
let peers = create_peers_with_distance(peers_count, &content_id, ..);
150+
assert_eq!(
151+
select_content_recipients::<XorMetric>(&content_id, peers).len(),
152+
expected_content_recipients_count
153+
);
154+
}
155+
156+
#[test]
157+
fn closest() {
158+
let content_id = random();
159+
160+
const CLOSE_PEER_LOG2_DISTANCE: usize = 253;
161+
162+
// Create NUM_CLOSEST_PEERS peers with log2 distance less than CLOSE_PEER_LOG2_DISTANCE
163+
let close_peers = create_peers_with_distance(
164+
NUM_CLOSEST_PEERS,
165+
&content_id,
166+
..CLOSE_PEER_LOG2_DISTANCE,
167+
);
168+
169+
// Create 1000 peers with log2 distance at least CLOSE_PEER_LOG2_DISTANCE
170+
let far_peers =
171+
create_peers_with_distance(1000, &content_id, CLOSE_PEER_LOG2_DISTANCE..);
172+
173+
let recipients = select_content_recipients::<XorMetric>(
174+
&content_id,
175+
chain!(close_peers.clone(), far_peers).collect(),
176+
);
177+
178+
// Verify that `NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS` peers selected
179+
assert_eq!(recipients.len(), NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS);
180+
181+
// Verify that all close peers are selected
182+
for close_peer in close_peers {
183+
assert!(recipients.contains(&close_peer));
184+
}
185+
}
186+
187+
#[test]
188+
fn closest_far_peer_is_not_selected() {
189+
let content_id = random();
190+
const TARGET_PEER_DISTANCE: usize = 253;
191+
192+
// Create NUM_CLOSEST_PEERS peers with log2 distance less than TARGET_PEER_DISTANCE
193+
let close_peers =
194+
create_peers_with_distance(NUM_CLOSEST_PEERS, &content_id, ..TARGET_PEER_DISTANCE);
195+
196+
// Create 1 peer with log2 distance exactly TARGET_PEER_DISTANCE
197+
let target_peer = create_peers_with_distance(
198+
1,
199+
&content_id,
200+
TARGET_PEER_DISTANCE..=TARGET_PEER_DISTANCE,
201+
)
202+
.remove(0);
203+
204+
// Create 1000 peers with log2 distance more than TARGET_PEER_DISTANCE
205+
let far_peers =
206+
create_peers_with_distance(1000, &content_id, TARGET_PEER_DISTANCE + 1..);
207+
208+
let all_peers =
209+
chain!(close_peers, [target_peer.clone()], far_peers).collect::<Vec<_>>();
210+
211+
// We want to test that "target_peer" isn't selected.
212+
// However, because far peers are selected randomly, there is a small chance of being
213+
// selected anyway (0.4%). But we will just repeat the test up to 10 times, as it is
214+
// extremely unlikely to be selected all 10 times.
215+
let target_peer_is_not_selected = || {
216+
!select_content_recipients::<XorMetric>(&content_id, all_peers.clone())
217+
.contains(&target_peer)
218+
};
219+
assert!((0..10).any(|_| target_peer_is_not_selected()));
220+
}
221+
}
222+
}

0 commit comments

Comments
 (0)