Skip to content

Commit 4e33c3c

Browse files
committed
Upper bound full node selection pool at max_group_size
1 parent 041a342 commit 4e33c3c

File tree

11 files changed

+173
-49
lines changed

11 files changed

+173
-49
lines changed

monad-executor-glue/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ pub enum RouterCommand<ST: CertificateSignatureRecoverable, OM> {
7878
GetPeers,
7979
UpdatePeers {
8080
peer_entries: Vec<PeerEntry<ST>>,
81-
pinned_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
81+
dedicated_full_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
82+
prioritized_full_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
8283
},
8384
GetFullNodes,
8485
UpdateFullNodes {
@@ -122,11 +123,13 @@ impl<ST: CertificateSignatureRecoverable, OM> Debug for RouterCommand<ST, OM> {
122123
Self::GetPeers => write!(f, "GetPeers"),
123124
Self::UpdatePeers {
124125
peer_entries,
125-
pinned_nodes,
126+
dedicated_full_nodes,
127+
prioritized_full_nodes,
126128
} => f
127129
.debug_struct("UpdatePeers")
128130
.field("peer_entries", peer_entries)
129-
.field("pinned_nodes", pinned_nodes)
131+
.field("dedicated_full_nodes", dedicated_full_nodes)
132+
.field("prioritized_full_nodes", prioritized_full_nodes)
130133
.finish(),
131134
Self::GetFullNodes => write!(f, "GetFullNodes"),
132135
Self::UpdateFullNodes {
@@ -1926,7 +1929,8 @@ where
19261929
ST: CertificateSignatureRecoverable,
19271930
{
19281931
pub known_peers: Vec<PeerEntry<ST>>,
1929-
pub pinned_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
1932+
pub dedicated_full_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
1933+
pub prioritized_full_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
19301934
}
19311935

19321936
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]

monad-node/src/main.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -680,17 +680,17 @@ where
680680
)
681681
})
682682
.collect();
683+
let prioritized_full_nodes: BTreeSet<_> = node_config
684+
.fullnode_raptorcast
685+
.full_nodes_prioritized
686+
.identities
687+
.iter()
688+
.map(|id| NodeId::new(id.secp256k1_pubkey))
689+
.collect();
683690
let pinned_full_nodes: BTreeSet<_> = full_nodes
684691
.iter()
685692
.map(|full_node| NodeId::new(full_node.secp256k1_pubkey))
686-
.chain(
687-
node_config
688-
.fullnode_raptorcast
689-
.full_nodes_prioritized
690-
.identities
691-
.iter()
692-
.map(|id| NodeId::new(id.secp256k1_pubkey)),
693-
)
693+
.chain(prioritized_full_nodes.clone())
694694
.chain(bootstrap_peers.keys().cloned())
695695
.collect();
696696

@@ -701,6 +701,7 @@ where
701701
current_epoch,
702702
epoch_validators: epoch_validators.clone(),
703703
pinned_full_nodes,
704+
prioritized_full_nodes,
704705
bootstrap_peers,
705706
refresh_period: Duration::from_secs(peer_discovery_config.refresh_period),
706707
request_timeout: Duration::from_secs(peer_discovery_config.request_timeout),
@@ -709,6 +710,7 @@ where
709710
.last_participation_prune_threshold,
710711
min_num_peers: peer_discovery_config.min_num_peers,
711712
max_num_peers: peer_discovery_config.max_num_peers,
713+
max_group_size: node_config.fullnode_raptorcast.max_group_size,
712714
enable_publisher: node_config.fullnode_raptorcast.enable_publisher,
713715
enable_client: node_config.fullnode_raptorcast.enable_client,
714716
rng: ChaCha8Rng::from_entropy(),

monad-peer-disc-swarm/src/driver.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,12 @@ where
187187
self.algo.update_validator_set(epoch, validators)
188188
}
189189
PeerDiscoveryEvent::UpdatePeers { peers } => self.algo.update_peers(peers),
190-
PeerDiscoveryEvent::UpdatePinnedNodes { pinned_full_nodes } => {
191-
self.algo.update_pinned_nodes(pinned_full_nodes)
192-
}
190+
PeerDiscoveryEvent::UpdatePinnedNodes {
191+
dedicated_full_nodes,
192+
prioritized_full_nodes,
193+
} => self
194+
.algo
195+
.update_pinned_nodes(dedicated_full_nodes, prioritized_full_nodes),
193196
PeerDiscoveryEvent::UpdateConfirmGroup { end_round, peers } => {
194197
self.algo.update_peer_participation(end_round, peers)
195198
}

monad-peer-disc-swarm/tests/peer_discovery.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ struct TestConfig {
9898
pub last_participation_prune_threshold: Round,
9999
pub min_num_peers: usize,
100100
pub max_num_peers: usize,
101+
pub max_group_size: usize,
101102
pub outbound_pipeline: Vec<GenericTransformer<PubKeyType, PeerDiscoveryMessage<SignatureType>>>,
102103
}
103104

@@ -118,6 +119,7 @@ impl Default for TestConfig {
118119
last_participation_prune_threshold: Round(5000),
119120
min_num_peers: 5,
120121
max_num_peers: 50,
122+
max_group_size: 50,
121123
outbound_pipeline: vec![],
122124
}
123125
}
@@ -206,6 +208,7 @@ fn setup_keys_and_swarm_builder(
206208
current_epoch: config.current_epoch,
207209
epoch_validators: epoch_validators.clone(),
208210
pinned_full_nodes,
211+
prioritized_full_nodes: BTreeSet::new(),
209212
bootstrap_peers,
210213
refresh_period: config.refresh_period,
211214
request_timeout: config.request_timeout,
@@ -214,6 +217,7 @@ fn setup_keys_and_swarm_builder(
214217
.last_participation_prune_threshold,
215218
min_num_peers: config.min_num_peers,
216219
max_num_peers: config.max_num_peers,
220+
max_group_size: config.max_group_size,
217221
enable_publisher: secondary_raptorcast_enabled,
218222
enable_client: secondary_raptorcast_enabled,
219223
rng: ChaCha8Rng::seed_from_u64(123456), // fixed seed for reproducibility
@@ -382,13 +386,15 @@ fn test_update_name_record() {
382386
current_epoch: config.current_epoch,
383387
epoch_validators: BTreeMap::new(),
384388
pinned_full_nodes: BTreeSet::new(),
389+
prioritized_full_nodes: BTreeSet::new(),
385390
bootstrap_peers: BTreeMap::from([(node_1, generate_name_record(node_1_key))]),
386391
refresh_period: config.refresh_period,
387392
request_timeout: config.request_timeout,
388393
unresponsive_prune_threshold: config.unresponsive_prune_threshold,
389394
last_participation_prune_threshold: config.last_participation_prune_threshold,
390395
min_num_peers: config.min_num_peers,
391396
max_num_peers: config.max_num_peers,
397+
max_group_size: config.max_group_size,
392398
enable_publisher: false,
393399
enable_client: false,
394400
rng: ChaCha8Rng::seed_from_u64(123456),

monad-peer-discovery/src/discovery.rs

Lines changed: 109 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ pub struct PeerDiscovery<ST: CertificateSignatureRecoverable> {
129129
pub epoch_validators: BTreeMap<Epoch, BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>>,
130130
// initial bootstrap peers set in config file
131131
pub initial_bootstrap_peers: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
132-
// pinned full nodes are dedicated and prioritized full nodes passed in from config that will not be pruned
132+
// prioritized full nodes for secondary raptorcast, set in config file
133+
pub prioritized_full_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
134+
// pinned full nodes contains dedicated and prioritized full nodes, and initial bootstrap peers that will not be pruned
133135
pub pinned_full_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
134136
// mapping of node IDs to their corresponding name records
135137
pub routing_info: BTreeMap<NodeId<CertificateSignaturePubKey<ST>>, MonadNameRecord<ST>>,
@@ -156,6 +158,8 @@ pub struct PeerDiscovery<ST: CertificateSignatureRecoverable> {
156158
pub min_num_peers: usize,
157159
// maximum number of peers before pruning
158160
pub max_num_peers: usize,
161+
// maximum number of peers in a raptorcast group
162+
pub max_group_size: usize,
159163
// secondary raptorcast setting: enable publisher mode when self is a validator
160164
pub enable_publisher: bool,
161165
// secondary raptorcast setting: enable client mode when self is a full node
@@ -170,13 +174,15 @@ pub struct PeerDiscoveryBuilder<ST: CertificateSignatureRecoverable> {
170174
pub current_epoch: Epoch,
171175
pub epoch_validators: BTreeMap<Epoch, BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>>,
172176
pub pinned_full_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
177+
pub prioritized_full_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
173178
pub bootstrap_peers: BTreeMap<NodeId<CertificateSignaturePubKey<ST>>, MonadNameRecord<ST>>,
174179
pub refresh_period: Duration,
175180
pub request_timeout: Duration,
176181
pub unresponsive_prune_threshold: u32,
177182
pub last_participation_prune_threshold: Round,
178183
pub min_num_peers: usize,
179184
pub max_num_peers: usize,
185+
pub max_group_size: usize,
180186
pub enable_publisher: bool,
181187
pub enable_client: bool,
182188
pub rng: ChaCha8Rng,
@@ -235,6 +241,7 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDisco
235241
.keys()
236242
.cloned()
237243
.collect::<BTreeSet<_>>(),
244+
prioritized_full_nodes: self.prioritized_full_nodes,
238245
pinned_full_nodes: self.pinned_full_nodes,
239246
routing_info: Default::default(),
240247
participation_info: Default::default(),
@@ -247,6 +254,7 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDisco
247254
last_participation_prune_threshold: self.last_participation_prune_threshold,
248255
min_num_peers: self.min_num_peers,
249256
max_num_peers: self.max_num_peers,
257+
max_group_size: self.max_group_size,
250258
enable_publisher: self.enable_publisher,
251259
enable_client: self.enable_client,
252260
rng: self.rng,
@@ -643,8 +651,7 @@ where
643651
// we still respond with pong even if peer list is full
644652
let mut peer_list_full = false;
645653
if self.routing_info.len() + self.pending_queue.len() >= self.max_num_peers
646-
&& !self.check_validator_membership(&from)
647-
&& !self.pinned_full_nodes.contains(&from)
654+
&& !self.is_pinned_node(&from)
648655
&& !self.routing_info.contains_key(&from)
649656
{
650657
debug!(
@@ -1089,6 +1096,28 @@ where
10891096
return cmds;
10901097
}
10911098

1099+
// drop request if incoming request is a public full node and max_group_size is already reached
1100+
if !self.prioritized_full_nodes.contains(&from) {
1101+
let connected_public_full_nodes = self
1102+
.participation_info
1103+
.iter()
1104+
.filter(|(node_id, info)| {
1105+
info.status == SecondaryRaptorcastConnectionStatus::Connected
1106+
&& !self.prioritized_full_nodes.contains(node_id)
1107+
})
1108+
.count();
1109+
1110+
if connected_public_full_nodes + self.prioritized_full_nodes.len()
1111+
>= self.max_group_size
1112+
{
1113+
debug!(
1114+
?from,
1115+
"connected full nodes already exceeds max group size, dropping request"
1116+
);
1117+
return cmds;
1118+
}
1119+
}
1120+
10921121
if let Some(info) = self.participation_info.get_mut(&from) {
10931122
info.status = SecondaryRaptorcastConnectionStatus::Connected;
10941123
} else {
@@ -1154,7 +1183,6 @@ where
11541183
.filter_map(|(node_id, info)| {
11551184
if self.current_round.max(info.last_active) - info.last_active
11561185
>= self.last_participation_prune_threshold
1157-
&& !self.is_pinned_node(node_id)
11581186
{
11591187
Some(*node_id)
11601188
} else {
@@ -1164,9 +1192,16 @@ where
11641192
.collect();
11651193

11661194
for node_id in non_participating_nodes {
1167-
debug!(?node_id, "removing non-participating peer");
1168-
self.participation_info.remove(&node_id);
1169-
self.routing_info.remove(&node_id);
1195+
if self.is_pinned_node(&node_id) {
1196+
debug!(?node_id, "clearing participation info for pinned node");
1197+
if let Some(info) = self.participation_info.get_mut(&node_id) {
1198+
info.status = SecondaryRaptorcastConnectionStatus::None;
1199+
}
1200+
} else {
1201+
debug!(?node_id, "removing non-participating peer");
1202+
self.participation_info.remove(&node_id);
1203+
self.routing_info.remove(&node_id);
1204+
}
11701205
}
11711206

11721207
// if number of peers above max number of peers, randomly choose a few full nodes and prune them from routing_info
@@ -1406,11 +1441,21 @@ where
14061441

14071442
fn update_pinned_nodes(
14081443
&mut self,
1409-
pinned_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
1444+
dedicated_full_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
1445+
prioritized_full_nodes: BTreeSet<NodeId<CertificateSignaturePubKey<ST>>>,
14101446
) -> Vec<PeerDiscoveryCommand<ST>> {
1411-
debug!(?pinned_nodes, "updating pinned nodes");
1447+
debug!(
1448+
?dedicated_full_nodes,
1449+
?prioritized_full_nodes,
1450+
"updating pinned nodes"
1451+
);
14121452

1413-
self.pinned_full_nodes = pinned_nodes;
1453+
self.pinned_full_nodes = dedicated_full_nodes
1454+
.iter()
1455+
.chain(prioritized_full_nodes.iter())
1456+
.cloned()
1457+
.collect();
1458+
self.prioritized_full_nodes = prioritized_full_nodes;
14141459

14151460
Vec::new()
14161461
}
@@ -1565,6 +1610,7 @@ mod tests {
15651610
epoch_validators: BTreeMap::new(),
15661611
initial_bootstrap_peers: routing_info.keys().cloned().collect(),
15671612
pinned_full_nodes: BTreeSet::new(),
1613+
prioritized_full_nodes: BTreeSet::new(),
15681614
routing_info,
15691615
participation_info,
15701616
pending_queue: BTreeMap::new(),
@@ -1576,6 +1622,7 @@ mod tests {
15761622
last_participation_prune_threshold: Round(5000),
15771623
min_num_peers: 5,
15781624
max_num_peers: 50,
1625+
max_group_size: 50,
15791626
enable_publisher: false,
15801627
enable_client: false,
15811628
rng: ChaCha8Rng::seed_from_u64(123456),
@@ -2234,12 +2281,16 @@ mod tests {
22342281

22352282
#[test]
22362283
fn test_publisher_participation_info() {
2237-
let keys = create_keys::<SignatureType>(2);
2284+
let keys = create_keys::<SignatureType>(4);
22382285
let peer0 = &keys[0];
22392286
let peer1 = &keys[1];
2287+
let peer2 = &keys[2];
2288+
let peer3 = &keys[3];
22402289
let peer1_pubkey = NodeId::new(peer1.pubkey());
2290+
let peer2_pubkey = NodeId::new(peer2.pubkey());
2291+
let peer3_pubkey = NodeId::new(peer3.pubkey());
22412292

2242-
let mut state = generate_test_state(peer0, vec![peer1]);
2293+
let mut state = generate_test_state(peer0, vec![peer1, peer2, peer3]);
22432294

22442295
// do not respond to full node raptorcast request if self is not a validator publisher
22452296
state.self_role = PeerDiscoveryRole::ValidatorNone;
@@ -2262,6 +2313,52 @@ mod tests {
22622313
message: PeerDiscoveryMessage::FullNodeRaptorcastResponse
22632314
}));
22642315
assert_eq!(state.get_secondary_fullnodes(), Vec::from([peer1_pubkey]));
2316+
2317+
// do not respond to full node raptorcast request if already exceeding max group size
2318+
state.max_group_size = 1;
2319+
let cmds = state.handle_full_node_raptorcast_request(peer2_pubkey);
2320+
assert_eq!(
2321+
state.participation_info.get(&peer2_pubkey).unwrap().status,
2322+
SecondaryRaptorcastConnectionStatus::None
2323+
);
2324+
assert_eq!(cmds.len(), 0);
2325+
assert_eq!(state.get_secondary_fullnodes(), Vec::from([peer1_pubkey]));
2326+
2327+
// prioritized full nodes are reserved a slot even if unconnected
2328+
state.prioritized_full_nodes.insert(peer3_pubkey);
2329+
state.max_group_size = 2;
2330+
let cmds = state.handle_full_node_raptorcast_request(peer2_pubkey);
2331+
// peer1 (connected) and peer3 (prioritized) took up the slots
2332+
// peer2 is still rejected
2333+
assert_eq!(
2334+
state.participation_info.get(&peer1_pubkey).unwrap().status,
2335+
SecondaryRaptorcastConnectionStatus::Connected
2336+
);
2337+
assert_eq!(
2338+
state.participation_info.get(&peer2_pubkey).unwrap().status,
2339+
SecondaryRaptorcastConnectionStatus::None
2340+
);
2341+
assert_eq!(
2342+
state.participation_info.get(&peer3_pubkey).unwrap().status,
2343+
SecondaryRaptorcastConnectionStatus::None
2344+
);
2345+
assert_eq!(cmds.len(), 0);
2346+
assert_eq!(state.get_secondary_fullnodes(), Vec::from([peer1_pubkey]));
2347+
2348+
// a connected prioritized full node is only counted once towards max_group_size
2349+
state.handle_full_node_raptorcast_request(peer3_pubkey);
2350+
assert_eq!(
2351+
state.participation_info.get(&peer3_pubkey).unwrap().status,
2352+
SecondaryRaptorcastConnectionStatus::Connected
2353+
);
2354+
state.max_group_size = 3;
2355+
let cmds = state.handle_full_node_raptorcast_request(peer2_pubkey);
2356+
// now peer2 is accepted as there are only 2 connections
2357+
assert_eq!(
2358+
state.participation_info.get(&peer2_pubkey).unwrap().status,
2359+
SecondaryRaptorcastConnectionStatus::Connected
2360+
);
2361+
assert_eq!(cmds.len(), 1);
22652362
}
22662363

22672364
#[test]

monad-peer-discovery/src/driver.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,12 @@ impl<PD: PeerDiscoveryAlgo> PeerDiscoveryDriver<PD> {
214214
self.pd.update_validator_set(epoch, validators)
215215
}
216216
PeerDiscoveryEvent::UpdatePeers { peers } => self.pd.update_peers(peers),
217-
PeerDiscoveryEvent::UpdatePinnedNodes { pinned_full_nodes } => {
218-
self.pd.update_pinned_nodes(pinned_full_nodes)
219-
}
217+
PeerDiscoveryEvent::UpdatePinnedNodes {
218+
dedicated_full_nodes,
219+
prioritized_full_nodes,
220+
} => self
221+
.pd
222+
.update_pinned_nodes(dedicated_full_nodes, prioritized_full_nodes),
220223
PeerDiscoveryEvent::UpdateConfirmGroup { end_round, peers } => {
221224
self.pd.update_peer_participation(end_round, peers)
222225
}

0 commit comments

Comments
 (0)