diff --git a/monad-executor-glue/src/lib.rs b/monad-executor-glue/src/lib.rs index 8d920277df..4dbc9201a0 100644 --- a/monad-executor-glue/src/lib.rs +++ b/monad-executor-glue/src/lib.rs @@ -86,6 +86,9 @@ pub enum RouterCommand { dedicated_full_nodes: Vec>>, prioritized_full_nodes: Vec>>, }, + UpdateUpstreamValidators { + prioritized_upstream: Vec>>, + }, } impl Debug for RouterCommand { @@ -140,6 +143,12 @@ impl Debug for RouterCommand { .field("dedicated_full_nodes", dedicated_full_nodes) .field("prioritized_full_nodes", prioritized_full_nodes) .finish(), + Self::UpdateUpstreamValidators { + prioritized_upstream, + } => f + .debug_struct("UpdateUpstreamValidators") + .field("prioritized_upstream", prioritized_upstream) + .finish(), } } } @@ -1926,6 +1935,7 @@ where { pub dedicated_full_nodes: Vec>, pub prioritized_full_nodes: Vec>, + pub prioritized_upstream: Vec>, pub blocksync_override_peers: Vec>, } diff --git a/monad-mock-swarm/src/mock.rs b/monad-mock-swarm/src/mock.rs index af4f688cb1..f05254203c 100644 --- a/monad-mock-swarm/src/mock.rs +++ b/monad-mock-swarm/src/mock.rs @@ -397,6 +397,9 @@ impl Executor for MockExecutor { RouterCommand::UpdateFullNodes { .. } => { // TODO } + RouterCommand::UpdateUpstreamValidators { .. } => { + // TODO + } RouterCommand::PublishToFullNodes { .. } => { // TODO } diff --git a/monad-node-config/src/fullnode_raptorcast.rs b/monad-node-config/src/fullnode_raptorcast.rs index db1010e817..dba38ba2fa 100644 --- a/monad-node-config/src/fullnode_raptorcast.rs +++ b/monad-node-config/src/fullnode_raptorcast.rs @@ -14,7 +14,7 @@ // along with this program. If not, see . use monad_crypto::certificate_signature::PubKey; -use monad_types::Round; +use monad_types::{NodeId, Round}; use serde::{Deserialize, Serialize}; use super::fullnode::FullNodeConfig; @@ -44,4 +44,7 @@ pub struct FullNodeRaptorCastConfig { pub invite_future_dist_min: Round, pub invite_future_dist_max: Round, pub invite_accept_heartbeat_ms: u64, + + #[serde(bound = "P:PubKey", default = "Vec::new")] + pub prioritized_upstream: Vec>, } diff --git a/monad-peer-disc-swarm/src/lib.rs b/monad-peer-disc-swarm/src/lib.rs index 2cfdcbea82..6923073d0d 100644 --- a/monad-peer-disc-swarm/src/lib.rs +++ b/monad-peer-disc-swarm/src/lib.rs @@ -159,6 +159,7 @@ impl Executor for MockPeerDiscExecutor { RouterCommand::UpdatePeers { .. } => {} RouterCommand::GetFullNodes => {} RouterCommand::UpdateFullNodes { .. } => {} + RouterCommand::UpdateUpstreamValidators { .. } => {} RouterCommand::PublishToFullNodes { .. } => {} } } diff --git a/monad-raptorcast/src/config.rs b/monad-raptorcast/src/config.rs index 1fdd8e2d4d..6c80d54bc6 100644 --- a/monad-raptorcast/src/config.rs +++ b/monad-raptorcast/src/config.rs @@ -130,12 +130,15 @@ where ST: CertificateSignatureRecoverable, { None, // Not participating in any raptor-casting to full-nodes. - Client(RaptorCastConfigSecondaryClient), // i.e. we are a full-node + Client(RaptorCastConfigSecondaryClient), // i.e. we are a full-node Publisher(RaptorCastConfigSecondaryPublisher), // we are a validator } #[derive(Clone)] -pub struct RaptorCastConfigSecondaryClient { +pub struct RaptorCastConfigSecondaryClient +where + ST: CertificateSignatureRecoverable, +{ // Maximum number of groups a full node will join at a time pub max_num_group: usize, // Maximum number of full nodes in a group @@ -146,16 +149,22 @@ pub struct RaptorCastConfigSecondaryClient { pub invite_future_dist_min: Round, pub invite_future_dist_max: Round, pub invite_accept_heartbeat: Duration, + // Prioritized upstream validators that this full node prefers to connect to + pub prioritized_upstream: Vec>>, } -impl Default for RaptorCastConfigSecondaryClient { - fn default() -> RaptorCastConfigSecondaryClient { +impl Default for RaptorCastConfigSecondaryClient +where + ST: CertificateSignatureRecoverable, +{ + fn default() -> RaptorCastConfigSecondaryClient { RaptorCastConfigSecondaryClient { max_num_group: 3, max_group_size: 50, invite_future_dist_min: Round(1), invite_future_dist_max: Round(600), // ~5 minutes into the future, with current round length of 500ms invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: Default::default(), } } } diff --git a/monad-raptorcast/src/lib.rs b/monad-raptorcast/src/lib.rs index 375bcc57d7..7dcaae0956 100644 --- a/monad-raptorcast/src/lib.rs +++ b/monad-raptorcast/src/lib.rs @@ -422,6 +422,7 @@ where invite_future_dist_min: Round(1), invite_future_dist_max: Round(5), invite_accept_heartbeat_ms: 100, + prioritized_upstream: vec![], }, }; let pd = PeerDiscoveryDriver::new(peer_discovery_builder); @@ -636,6 +637,10 @@ where } => { self.dedicated_full_nodes.list = dedicated_full_nodes; } + RouterCommand::UpdateUpstreamValidators { .. } => { + // Primary RaptorCast doesn't need upstream validators config + // This command is consumed by secondary RaptorCast only + } } } } diff --git a/monad-raptorcast/src/raptorcast_secondary/client.rs b/monad-raptorcast/src/raptorcast_secondary/client.rs index ae393f0736..da4fc9cf49 100644 --- a/monad-raptorcast/src/raptorcast_secondary/client.rs +++ b/monad-raptorcast/src/raptorcast_secondary/client.rs @@ -22,7 +22,7 @@ use monad_crypto::certificate_signature::{ use monad_executor::ExecutorMetrics; use monad_types::{NodeId, Round, RoundSpan, GENESIS_ROUND}; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use super::{ super::{config::RaptorCastConfigSecondaryClient, util::Group}, @@ -46,9 +46,9 @@ where { client_node_id: NodeId>, // Our (full-node) node_id as an invitee - // Full nodes may choose to reject a request if it doesn’t have enough + // Full nodes may choose to reject a request if it doesn't have enough // upload bandwidth to broadcast chunk to a large group. - config: RaptorCastConfigSecondaryClient, + config: RaptorCastConfigSecondaryClient, // [start_round, end_round) -> GroupAsClient // Represents all raptorcast groups that we have accepted and haven't expired @@ -83,7 +83,7 @@ where pub fn new( client_node_id: NodeId>, group_sink_channel: UnboundedSender>, - config: RaptorCastConfigSecondaryClient, + config: RaptorCastConfigSecondaryClient, ) -> Self { assert!( config.max_num_group > 0, @@ -196,60 +196,68 @@ where return false; } - let log_exceed_max_num_group = || { - debug!( - "RaptorCastSecondary rejected invite for rounds \ - [{:?}, {:?}) from validator {:?} due to exceeding number of active groups", - invite_msg.start_round, invite_msg.end_round, invite_msg.validator_id - ); - }; - let mut num_current_groups = 0; - - // Check confirmed groups - for group in self - .confirmed_groups - .values(invite_msg.start_round..invite_msg.end_round) + // If validator is not in prioritized upstream, check that we won't + // exceed max_num_group limit in the entire service span + if !self + .config + .prioritized_upstream + .contains(&invite_msg.validator_id) { - // Check if we already have an overlapping invite from same - // validator, e.g. [30, 40)->validator3 but we already - // have [25, 35)->validator3 - // Note that we accept overlaps across different validators, - // e.g. [30, 40)->validator3 + [25, 35)->validator4 - if group.get_validator_id() == &invite_msg.validator_id { - warn!( - "RaptorCastSecondary received self-overlapping \ - invite for rounds [{:?}, {:?}) from validator {:?}", + let log_exceed_max_num_group = || { + debug!( + "RaptorCastSecondary rejected invite for rounds \ + [{:?}, {:?}) from validator {:?} due to exceeding number of active groups", invite_msg.start_round, invite_msg.end_round, invite_msg.validator_id ); - return false; - } - - // Check that it doesn't exceed max number of groups during round span - num_current_groups += 1; - if num_current_groups + 1 > self.config.max_num_group { - log_exceed_max_num_group(); - return false; - } - } - - // Check groups we were invited to but are still unconfirmed - for (&key, other_invites) in self.pending_confirms.iter() { - if key >= invite_msg.end_round { - // Remaining keys are outside the invite range - break; - } - - for other in other_invites.values() { - if !Self::overlaps(other.start_round, other.end_round, invite_msg) { - continue; + }; + let mut num_current_groups = 0; + + // Check confirmed groups + for group in self + .confirmed_groups + .values(invite_msg.start_round..invite_msg.end_round) + { + // Check if we already have an overlapping invite from same + // validator, e.g. [30, 40)->validator3 but we already + // have [25, 35)->validator3 + // Note that we accept overlaps across different validators, + // e.g. [30, 40)->validator3 + [25, 35)->validator4 + if group.get_validator_id() == &invite_msg.validator_id { + warn!( + "RaptorCastSecondary received self-overlapping \ + invite for rounds [{:?}, {:?}) from validator {:?}", + invite_msg.start_round, invite_msg.end_round, invite_msg.validator_id + ); + return false; } + // Check that it doesn't exceed max number of groups during round span num_current_groups += 1; if num_current_groups + 1 > self.config.max_num_group { log_exceed_max_num_group(); return false; } } + + // Check groups we were invited to but are still unconfirmed + for (&key, other_invites) in self.pending_confirms.iter() { + if key >= invite_msg.end_round { + // Remaining keys are outside the invite range + break; + } + + for other in other_invites.values() { + if !Self::overlaps(other.start_round, other.end_round, invite_msg) { + continue; + } + + num_current_groups += 1; + if num_current_groups + 1 > self.config.max_num_group { + log_exceed_max_num_group(); + return false; + } + } + } } true @@ -433,6 +441,18 @@ where } } + pub fn update_prioritized_upstream( + &mut self, + prioritized_upstream: Vec>>, + ) { + info!( + old_prioritized_upstream = ?self.config.prioritized_upstream, + new_prioritized_upstream = ?prioritized_upstream, + "RaptorCastSecondary Client updating prioritized_upstreams", + ); + self.config.prioritized_upstream = prioritized_upstream; + } + fn overlaps(begin: Round, end: Round, group: &PrepareGroup) -> bool { assert!(begin <= end); assert!(group.start_round <= group.end_round); @@ -492,6 +512,7 @@ mod tests { invite_future_dist_min: Round(1), invite_future_dist_max: Round(100), invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![], }, ); @@ -542,6 +563,7 @@ mod tests { invite_future_dist_min: Round(1), invite_future_dist_max: Round(100), invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![], }, ); @@ -578,6 +600,60 @@ mod tests { assert!(!resp.accept); } + #[test] + fn test_prioritized_upstream_overrides_max_num_group() { + let (clt_tx, _clt_rx): RcToRcChannelGrp = unbounded_channel(); + let self_id = nid(1); + let mut clt = Client::::new( + self_id, + clt_tx, + RaptorCastConfigSecondaryClient { + max_num_group: 1, + max_group_size: 50, + invite_future_dist_min: Round(1), + invite_future_dist_max: Round(100), + invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![nid(4)], + }, + ); + + // PrepareGroup is accepted when client has slots + let response = clt.handle_prepare_group_message(PrepareGroup { + validator_id: nid(2), + max_group_size: 50, + start_round: Round(1), + end_round: Round(5), + }); + assert!(response.accept); + + // PrepareGroup is rejected because it exceeds max_num_group + let response = clt.handle_prepare_group_message(PrepareGroup { + validator_id: nid(3), + max_group_size: 50, + start_round: Round(4), + end_round: Round(8), + }); + assert!(!response.accept); + + // same PrepareGroup from prioritized upstream is accepted + let response = clt.handle_prepare_group_message(PrepareGroup { + validator_id: nid(4), + max_group_size: 50, + start_round: Round(4), + end_round: Round(8), + }); + assert!(response.accept); + + // prioritized PrepareGroup occupies slots. Regular PrepareGroup is rejected + let response = clt.handle_prepare_group_message(PrepareGroup { + validator_id: nid(3), + max_group_size: 50, + start_round: Round(7), + end_round: Round(10), + }); + assert!(!response.accept); + } + #[test] fn test_get_current_group_count() { let (clt_tx, _clt_rx): RcToRcChannelGrp = unbounded_channel(); @@ -591,6 +667,7 @@ mod tests { invite_future_dist_min: Round(1), invite_future_dist_max: Round(100), invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![], }, ); diff --git a/monad-raptorcast/src/raptorcast_secondary/mod.rs b/monad-raptorcast/src/raptorcast_secondary/mod.rs index f8d19960e6..692749b3c3 100644 --- a/monad-raptorcast/src/raptorcast_secondary/mod.rs +++ b/monad-raptorcast/src/raptorcast_secondary/mod.rs @@ -293,6 +293,22 @@ where publisher.update_always_ask_full_nodes(prioritized_full_nodes); } }, + Self::Command::UpdateUpstreamValidators { + prioritized_upstream, + } => match &mut self.role { + Role::Client(client) => { + debug!( + ?prioritized_upstream, + "RaptorCastSecondary Client updating prioritized_upstream validators" + ); + client.update_prioritized_upstream(prioritized_upstream); + } + Role::Publisher(_) => { + debug!( + "RaptorCastSecondary Publisher ignoring UpdateUpstreamValidators command" + ); + } + }, Self::Command::UpdateCurrentRound(epoch, round) => match &mut self.role { Role::Client(client) => { diff --git a/monad-raptorcast/src/raptorcast_secondary/publisher.rs b/monad-raptorcast/src/raptorcast_secondary/publisher.rs index ab74b68782..4b7397a576 100644 --- a/monad-raptorcast/src/raptorcast_secondary/publisher.rs +++ b/monad-raptorcast/src/raptorcast_secondary/publisher.rs @@ -1496,6 +1496,7 @@ mod tests { invite_future_dist_min: Round(1), invite_future_dist_max: Round(100), invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![], }, ); let mut group_map = MockGroupMap::new(nid(10), clt_rx); @@ -1625,6 +1626,7 @@ mod tests { invite_future_dist_min: Round(1), invite_future_dist_max: Round(100), invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![], }, ); let mut group_map = MockGroupMap::new(nid(10), clt_rx); @@ -1768,6 +1770,7 @@ mod tests { invite_future_dist_min: Round(1), invite_future_dist_max: Round(100), invite_accept_heartbeat: Duration::from_secs(10), + prioritized_upstream: vec![], }, ); let mut group_map = MockGroupMap::new(nid(me), clt_rx); diff --git a/monad-router-multi/src/lib.rs b/monad-router-multi/src/lib.rs index e607721270..60c5e5b356 100644 --- a/monad-router-multi/src/lib.rs +++ b/monad-router-multi/src/lib.rs @@ -208,6 +208,7 @@ where invite_accept_heartbeat: Duration::from_millis( cfg.secondary_instance.invite_accept_heartbeat_ms, ), + prioritized_upstream: cfg.secondary_instance.prioritized_upstream.clone(), }), } } @@ -345,6 +346,14 @@ where validator_cmds.push(cmd_cpy); fullnodes_cmds.push(cmd); } + RouterCommand::UpdateUpstreamValidators { + ref prioritized_upstream, + } => { + // This command is only relevant for full-nodes + self.rc_config.secondary_instance.prioritized_upstream = + prioritized_upstream.clone(); + fullnodes_cmds.push(cmd); + } RouterCommand::UpdateCurrentRound(epoch, round) => { let cmd_cpy = RouterCommand::UpdateCurrentRound(epoch, round); if epoch > self.current_epoch { diff --git a/monad-state/src/lib.rs b/monad-state/src/lib.rs index ba6d38e0ab..a0e5fe704e 100644 --- a/monad-state/src/lib.rs +++ b/monad-state/src/lib.rs @@ -1143,6 +1143,12 @@ where prioritized_full_nodes: config_update.prioritized_full_nodes, })); + cmds.push(Command::RouterCommand( + RouterCommand::UpdateUpstreamValidators { + prioritized_upstream: config_update.prioritized_upstream, + }, + )); + cmds.push(Command::ControlPanelCommand(ControlPanelCommand::Write( WriteCommand::ReloadConfig(ReloadConfig::Response("Success".to_string())), ))); diff --git a/monad-testground/src/main.rs b/monad-testground/src/main.rs index 8cf21aff45..42d0175880 100644 --- a/monad-testground/src/main.rs +++ b/monad-testground/src/main.rs @@ -351,6 +351,7 @@ where invite_future_dist_min: Round(1), invite_future_dist_max: Round(5), invite_accept_heartbeat_ms: 100, + prioritized_upstream: vec![], }, }), }, diff --git a/monad-updaters/src/config_loader.rs b/monad-updaters/src/config_loader.rs index a22a476c9c..18e2f8668f 100644 --- a/monad-updaters/src/config_loader.rs +++ b/monad-updaters/src/config_loader.rs @@ -177,6 +177,7 @@ where ConfigEvent::ConfigUpdate(ConfigUpdate { dedicated_full_nodes, prioritized_full_nodes, + prioritized_upstream: node_config.fullnode_raptorcast.prioritized_upstream, blocksync_override_peers, }) } diff --git a/monad-updaters/src/local_router.rs b/monad-updaters/src/local_router.rs index e7fb59b8f7..d8e13af4c2 100644 --- a/monad-updaters/src/local_router.rs +++ b/monad-updaters/src/local_router.rs @@ -196,6 +196,7 @@ where RouterCommand::UpdatePeers { .. } => {} RouterCommand::GetFullNodes => {} RouterCommand::UpdateFullNodes { .. } => {} + RouterCommand::UpdateUpstreamValidators { .. } => {} } } }