Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions monad-executor-glue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub enum RouterCommand<ST: CertificateSignatureRecoverable, OM> {
dedicated_full_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
prioritized_full_nodes: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
},
UpdateUpstreamValidators {
prioritized_upstream: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
},
}

impl<ST: CertificateSignatureRecoverable, OM> Debug for RouterCommand<ST, OM> {
Expand Down Expand Up @@ -140,6 +143,12 @@ impl<ST: CertificateSignatureRecoverable, OM> Debug for RouterCommand<ST, OM> {
.field("dedicated_full_nodes", dedicated_full_nodes)
.field("prioritized_full_nodes", prioritized_full_nodes)
.finish(),
Self::UpdateUpstreamValidators {
prioritized_upstream,
} => f
.debug_struct("UpdateUpstreamValidators")
.field("prioritized_upstream", prioritized_upstream)
.finish(),
}
}
}
Expand Down Expand Up @@ -1926,6 +1935,7 @@ where
{
pub dedicated_full_nodes: Vec<NodeId<SCT::NodeIdPubKey>>,
pub prioritized_full_nodes: Vec<NodeId<SCT::NodeIdPubKey>>,
pub prioritized_upstream: Vec<NodeId<SCT::NodeIdPubKey>>,
pub blocksync_override_peers: Vec<NodeId<SCT::NodeIdPubKey>>,
}

Expand Down
3 changes: 3 additions & 0 deletions monad-mock-swarm/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ impl<S: SwarmRelation> Executor for MockExecutor<S> {
RouterCommand::UpdateFullNodes { .. } => {
// TODO
}
RouterCommand::UpdateUpstreamValidators { .. } => {
// TODO
}
RouterCommand::PublishToFullNodes { .. } => {
// TODO
}
Expand Down
5 changes: 4 additions & 1 deletion monad-node-config/src/fullnode_raptorcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use monad_crypto::certificate_signature::PubKey;
use monad_types::Round;
use monad_types::{NodeId, Round};
use serde::{Deserialize, Serialize};

use super::fullnode::FullNodeConfig;
Expand Down Expand Up @@ -44,4 +44,7 @@ pub struct FullNodeRaptorCastConfig<P: PubKey> {
pub invite_future_dist_min: Round,
pub invite_future_dist_max: Round,
pub invite_accept_heartbeat_ms: u64,

#[serde(bound = "P:PubKey", default = "Vec::new")]
pub prioritized_upstream: Vec<NodeId<P>>,
}
1 change: 1 addition & 0 deletions monad-peer-disc-swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl<S: PeerDiscSwarmRelation> Executor for MockPeerDiscExecutor<S> {
RouterCommand::UpdatePeers { .. } => {}
RouterCommand::GetFullNodes => {}
RouterCommand::UpdateFullNodes { .. } => {}
RouterCommand::UpdateUpstreamValidators { .. } => {}
RouterCommand::PublishToFullNodes { .. } => {}
}
}
Expand Down
17 changes: 13 additions & 4 deletions monad-raptorcast/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ where
ST: CertificateSignatureRecoverable,
{
None, // Not participating in any raptor-casting to full-nodes.
Client(RaptorCastConfigSecondaryClient), // i.e. we are a full-node
Client(RaptorCastConfigSecondaryClient<ST>), // i.e. we are a full-node
Publisher(RaptorCastConfigSecondaryPublisher<ST>), // we are a validator
}

#[derive(Clone)]
pub struct RaptorCastConfigSecondaryClient {
pub struct RaptorCastConfigSecondaryClient<ST>
where
ST: CertificateSignatureRecoverable,
{
// Maximum number of groups a full node will join at a time
pub max_num_group: usize,
// Maximum number of full nodes in a group
Expand All @@ -146,16 +149,22 @@ pub struct RaptorCastConfigSecondaryClient {
pub invite_future_dist_min: Round,
pub invite_future_dist_max: Round,
pub invite_accept_heartbeat: Duration,
// Prioritized upstream validators that this full node prefers to connect to
pub prioritized_upstream: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
}

impl Default for RaptorCastConfigSecondaryClient {
fn default() -> RaptorCastConfigSecondaryClient {
impl<ST> Default for RaptorCastConfigSecondaryClient<ST>
where
ST: CertificateSignatureRecoverable,
{
fn default() -> RaptorCastConfigSecondaryClient<ST> {
RaptorCastConfigSecondaryClient {
max_num_group: 3,
max_group_size: 50,
invite_future_dist_min: Round(1),
invite_future_dist_max: Round(600), // ~5 minutes into the future, with current round length of 500ms
invite_accept_heartbeat: Duration::from_secs(10),
prioritized_upstream: Default::default(),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions monad-raptorcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ where
invite_future_dist_min: Round(1),
invite_future_dist_max: Round(5),
invite_accept_heartbeat_ms: 100,
prioritized_upstream: vec![],
},
};
let pd = PeerDiscoveryDriver::new(peer_discovery_builder);
Expand Down Expand Up @@ -636,6 +637,10 @@ where
} => {
self.dedicated_full_nodes.list = dedicated_full_nodes;
}
RouterCommand::UpdateUpstreamValidators { .. } => {
// Primary RaptorCast doesn't need upstream validators config
// This command is consumed by secondary RaptorCast only
}
}
}
}
Expand Down
171 changes: 124 additions & 47 deletions monad-raptorcast/src/raptorcast_secondary/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use monad_crypto::certificate_signature::{
use monad_executor::ExecutorMetrics;
use monad_types::{NodeId, Round, RoundSpan, GENESIS_ROUND};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};

use super::{
super::{config::RaptorCastConfigSecondaryClient, util::Group},
Expand All @@ -46,9 +46,9 @@ where
{
client_node_id: NodeId<CertificateSignaturePubKey<ST>>, // Our (full-node) node_id as an invitee

// Full nodes may choose to reject a request if it doesnt have enough
// Full nodes may choose to reject a request if it doesn't have enough
// upload bandwidth to broadcast chunk to a large group.
config: RaptorCastConfigSecondaryClient,
config: RaptorCastConfigSecondaryClient<ST>,

// [start_round, end_round) -> GroupAsClient
// Represents all raptorcast groups that we have accepted and haven't expired
Expand Down Expand Up @@ -83,7 +83,7 @@ where
pub fn new(
client_node_id: NodeId<CertificateSignaturePubKey<ST>>,
group_sink_channel: UnboundedSender<GroupAsClient<ST>>,
config: RaptorCastConfigSecondaryClient,
config: RaptorCastConfigSecondaryClient<ST>,
) -> Self {
assert!(
config.max_num_group > 0,
Expand Down Expand Up @@ -196,60 +196,68 @@ where
return false;
}

let log_exceed_max_num_group = || {
debug!(
"RaptorCastSecondary rejected invite for rounds \
[{:?}, {:?}) from validator {:?} due to exceeding number of active groups",
invite_msg.start_round, invite_msg.end_round, invite_msg.validator_id
);
};
let mut num_current_groups = 0;

// Check confirmed groups
for group in self
.confirmed_groups
.values(invite_msg.start_round..invite_msg.end_round)
// If validator is not in prioritized upstream, check that we won't
// exceed max_num_group limit in the entire service span
if !self
.config
.prioritized_upstream
.contains(&invite_msg.validator_id)
{
// Check if we already have an overlapping invite from same
// validator, e.g. [30, 40)->validator3 but we already
// have [25, 35)->validator3
// Note that we accept overlaps across different validators,
// e.g. [30, 40)->validator3 + [25, 35)->validator4
if group.get_validator_id() == &invite_msg.validator_id {
warn!(
"RaptorCastSecondary received self-overlapping \
invite for rounds [{:?}, {:?}) from validator {:?}",
let log_exceed_max_num_group = || {
debug!(
"RaptorCastSecondary rejected invite for rounds \
[{:?}, {:?}) from validator {:?} due to exceeding number of active groups",
invite_msg.start_round, invite_msg.end_round, invite_msg.validator_id
);
return false;
}

// Check that it doesn't exceed max number of groups during round span
num_current_groups += 1;
if num_current_groups + 1 > self.config.max_num_group {
log_exceed_max_num_group();
return false;
}
}

// Check groups we were invited to but are still unconfirmed
for (&key, other_invites) in self.pending_confirms.iter() {
if key >= invite_msg.end_round {
// Remaining keys are outside the invite range
break;
}

for other in other_invites.values() {
if !Self::overlaps(other.start_round, other.end_round, invite_msg) {
continue;
};
let mut num_current_groups = 0;

// Check confirmed groups
for group in self
.confirmed_groups
.values(invite_msg.start_round..invite_msg.end_round)
{
// Check if we already have an overlapping invite from same
// validator, e.g. [30, 40)->validator3 but we already
// have [25, 35)->validator3
// Note that we accept overlaps across different validators,
// e.g. [30, 40)->validator3 + [25, 35)->validator4
if group.get_validator_id() == &invite_msg.validator_id {
warn!(
"RaptorCastSecondary received self-overlapping \
invite for rounds [{:?}, {:?}) from validator {:?}",
invite_msg.start_round, invite_msg.end_round, invite_msg.validator_id
);
return false;
}

// Check that it doesn't exceed max number of groups during round span
num_current_groups += 1;
if num_current_groups + 1 > self.config.max_num_group {
log_exceed_max_num_group();
return false;
}
}

// Check groups we were invited to but are still unconfirmed
for (&key, other_invites) in self.pending_confirms.iter() {
if key >= invite_msg.end_round {
// Remaining keys are outside the invite range
break;
}

for other in other_invites.values() {
if !Self::overlaps(other.start_round, other.end_round, invite_msg) {
continue;
}

num_current_groups += 1;
if num_current_groups + 1 > self.config.max_num_group {
log_exceed_max_num_group();
return false;
}
}
}
}

true
Expand Down Expand Up @@ -433,6 +441,18 @@ where
}
}

pub fn update_prioritized_upstream(
&mut self,
prioritized_upstream: Vec<NodeId<CertificateSignaturePubKey<ST>>>,
) {
info!(
old_prioritized_upstream = ?self.config.prioritized_upstream,
new_prioritized_upstream = ?prioritized_upstream,
"RaptorCastSecondary Client updating prioritized_upstreams",
);
self.config.prioritized_upstream = prioritized_upstream;
}

fn overlaps(begin: Round, end: Round, group: &PrepareGroup<ST>) -> bool {
assert!(begin <= end);
assert!(group.start_round <= group.end_round);
Expand Down Expand Up @@ -492,6 +512,7 @@ mod tests {
invite_future_dist_min: Round(1),
invite_future_dist_max: Round(100),
invite_accept_heartbeat: Duration::from_secs(10),
prioritized_upstream: vec![],
},
);

Expand Down Expand Up @@ -542,6 +563,7 @@ mod tests {
invite_future_dist_min: Round(1),
invite_future_dist_max: Round(100),
invite_accept_heartbeat: Duration::from_secs(10),
prioritized_upstream: vec![],
},
);

Expand Down Expand Up @@ -578,6 +600,60 @@ mod tests {
assert!(!resp.accept);
}

#[test]
fn test_prioritized_upstream_overrides_max_num_group() {
let (clt_tx, _clt_rx): RcToRcChannelGrp<ST> = unbounded_channel();
let self_id = nid(1);
let mut clt = Client::<ST>::new(
self_id,
clt_tx,
RaptorCastConfigSecondaryClient {
max_num_group: 1,
max_group_size: 50,
invite_future_dist_min: Round(1),
invite_future_dist_max: Round(100),
invite_accept_heartbeat: Duration::from_secs(10),
prioritized_upstream: vec![nid(4)],
},
);

// PrepareGroup is accepted when client has slots
let response = clt.handle_prepare_group_message(PrepareGroup {
validator_id: nid(2),
max_group_size: 50,
start_round: Round(1),
end_round: Round(5),
});
assert!(response.accept);

// PrepareGroup is rejected because it exceeds max_num_group
let response = clt.handle_prepare_group_message(PrepareGroup {
validator_id: nid(3),
max_group_size: 50,
start_round: Round(4),
end_round: Round(8),
});
assert!(!response.accept);

// same PrepareGroup from prioritized upstream is accepted
let response = clt.handle_prepare_group_message(PrepareGroup {
validator_id: nid(4),
max_group_size: 50,
start_round: Round(4),
end_round: Round(8),
});
assert!(response.accept);

// prioritized PrepareGroup occupies slots. Regular PrepareGroup is rejected
let response = clt.handle_prepare_group_message(PrepareGroup {
validator_id: nid(3),
max_group_size: 50,
start_round: Round(7),
end_round: Round(10),
});
assert!(!response.accept);
}

#[test]
fn test_get_current_group_count() {
let (clt_tx, _clt_rx): RcToRcChannelGrp<ST> = unbounded_channel();
Expand All @@ -591,6 +667,7 @@ mod tests {
invite_future_dist_min: Round(1),
invite_future_dist_max: Round(100),
invite_accept_heartbeat: Duration::from_secs(10),
prioritized_upstream: vec![],
},
);

Expand Down
16 changes: 16 additions & 0 deletions monad-raptorcast/src/raptorcast_secondary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,22 @@ where
publisher.update_always_ask_full_nodes(prioritized_full_nodes);
}
},
Self::Command::UpdateUpstreamValidators {
prioritized_upstream,
} => match &mut self.role {
Role::Client(client) => {
debug!(
?prioritized_upstream,
"RaptorCastSecondary Client updating prioritized_upstream validators"
);
client.update_prioritized_upstream(prioritized_upstream);
}
Role::Publisher(_) => {
debug!(
"RaptorCastSecondary Publisher ignoring UpdateUpstreamValidators command"
);
}
},

Self::Command::UpdateCurrentRound(epoch, round) => match &mut self.role {
Role::Client(client) => {
Expand Down
Loading
Loading