Skip to content

Commit dc35a40

Browse files
committed
consensus, raptorcast: prioritize consensus messages to validators
publish all messages from consensus with high priority. rebroacasting to validators is done with high priority by default.
1 parent 1c6f01a commit dc35a40

File tree

3 files changed

+128
-3
lines changed

3 files changed

+128
-3
lines changed

monad-executor-glue/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use monad_crypto::certificate_signature::{
4747
use monad_state_backend::StateBackend;
4848
use monad_types::{
4949
deserialize_pubkey, serialize_pubkey, Epoch, ExecutionProtocol, NodeId, Round, RouterTarget,
50-
SeqNum, Stake,
50+
SeqNum, Stake, UdpPriority,
5151
};
5252
use monad_validator::signature_collection::SignatureCollection;
5353
use serde::{Deserialize, Serialize};
@@ -64,7 +64,7 @@ pub enum RouterCommand<ST: CertificateSignatureRecoverable, OM> {
6464
// NOTE(dshulyak) priority for tcp messages is ignored
6565
target: RouterTarget<CertificateSignaturePubKey<ST>>,
6666
message: OM,
67-
priority: monad_types::UdpPriority,
67+
priority: UdpPriority,
6868
},
6969
PublishToFullNodes {
7070
epoch: Epoch, // Epoch gets embedded into the raptorcast message

monad-raptorcast/tests/raptorcast_instance.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,3 +783,127 @@ async fn test_priority_messages() {
783783
);
784784
}
785785
}
786+
787+
#[tokio::test]
788+
async fn test_raptorcast_forwarding_priority() {
789+
let validator1_addr: SocketAddrV4 = "127.0.0.1:12000".parse().unwrap();
790+
let validator2_addr: SocketAddrV4 = "127.0.0.1:12001".parse().unwrap();
791+
let validator_fullnode_addr: SocketAddrV4 = "127.0.0.1:12002".parse().unwrap();
792+
793+
let mut validator1_secret = [1u8; 32];
794+
let mut validator2_secret = [2u8; 32];
795+
let mut validator_fullnode_secret = [3u8; 32];
796+
797+
let validator1_key = Arc::new(KeyPair::from_bytes(&mut validator1_secret).unwrap());
798+
let validator2_key = Arc::new(KeyPair::from_bytes(&mut validator2_secret).unwrap());
799+
let validator_fullnode_key =
800+
Arc::new(KeyPair::from_bytes(&mut validator_fullnode_secret).unwrap());
801+
802+
let validator1_nodeid = NodeId::new(validator1_key.pubkey());
803+
let validator2_nodeid = NodeId::new(validator2_key.pubkey());
804+
let validator_fullnode_nodeid = NodeId::new(validator_fullnode_key.pubkey());
805+
806+
let known_addresses = HashMap::from([
807+
(validator1_nodeid, validator1_addr),
808+
(validator2_nodeid, validator2_addr),
809+
(validator_fullnode_nodeid, validator_fullnode_addr),
810+
]);
811+
812+
let mut validator1_rc = new_defaulted_raptorcast_for_tests::<
813+
SignatureType,
814+
MockMessage,
815+
MockMessage,
816+
MockEvent<PubKeyType>,
817+
>(
818+
SocketAddr::V4(validator1_addr),
819+
known_addresses.clone(),
820+
validator1_key.clone(),
821+
);
822+
823+
let mut validator2_rc = new_defaulted_raptorcast_for_tests::<
824+
SignatureType,
825+
MockMessage,
826+
MockMessage,
827+
MockEvent<PubKeyType>,
828+
>(
829+
SocketAddr::V4(validator2_addr),
830+
known_addresses.clone(),
831+
validator2_key.clone(),
832+
);
833+
834+
let mut validator_fullnode_rc = new_defaulted_raptorcast_for_tests::<
835+
SignatureType,
836+
MockMessage,
837+
MockMessage,
838+
MockEvent<PubKeyType>,
839+
>(
840+
SocketAddr::V4(validator_fullnode_addr),
841+
known_addresses.clone(),
842+
validator_fullnode_key.clone(),
843+
);
844+
845+
let epoch = Epoch(0);
846+
let validator_set = vec![
847+
(validator1_nodeid, Stake::ONE),
848+
(validator2_nodeid, Stake::ONE),
849+
(validator_fullnode_nodeid, Stake::ONE),
850+
];
851+
852+
validator1_rc.exec(vec![RouterCommand::AddEpochValidatorSet {
853+
epoch,
854+
validator_set: validator_set.clone(),
855+
}]);
856+
857+
validator2_rc.exec(vec![RouterCommand::AddEpochValidatorSet {
858+
epoch,
859+
validator_set: validator_set.clone(),
860+
}]);
861+
862+
validator_fullnode_rc.exec(vec![RouterCommand::AddEpochValidatorSet {
863+
epoch,
864+
validator_set: validator_set.clone(),
865+
}]);
866+
867+
validator2_rc.exec(vec![RouterCommand::UpdateFullNodes {
868+
dedicated_full_nodes: vec![validator_fullnode_nodeid],
869+
prioritized_full_nodes: vec![],
870+
}]);
871+
872+
const MESSAGE_SIZE: usize = 128 << 20;
873+
874+
let high_priority_msg = MockMessage::new(0xAA, MESSAGE_SIZE);
875+
validator1_rc.exec(vec![RouterCommand::PublishWithPriority {
876+
target: monad_types::RouterTarget::Broadcast(epoch),
877+
message: high_priority_msg,
878+
priority: UdpPriority::High,
879+
}]);
880+
881+
let regular_priority_msg = MockMessage::new(0xBB, MESSAGE_SIZE);
882+
validator1_rc.exec(vec![RouterCommand::PublishWithPriority {
883+
target: monad_types::RouterTarget::Broadcast(epoch),
884+
message: regular_priority_msg,
885+
priority: UdpPriority::Regular,
886+
}]);
887+
888+
let mut received_messages = Vec::new();
889+
let timeout = Duration::from_secs(2);
890+
let start = std::time::Instant::now();
891+
892+
while received_messages.len() < 2 && start.elapsed() < timeout {
893+
if let Some(event) = validator_fullnode_rc.next().await {
894+
let MockEvent((from, msg_id)) = event;
895+
received_messages.push((from, msg_id));
896+
}
897+
}
898+
899+
assert_eq!(received_messages.len(), 2);
900+
901+
assert_eq!(
902+
received_messages[0].1, 0xAA,
903+
"high priority message (0xAA) should be received first"
904+
);
905+
assert_eq!(
906+
received_messages[1].1, 0xBB,
907+
"regular priority message (0xBB) should be received second"
908+
);
909+
}

monad-state/src/consensus.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,9 +656,10 @@ where
656656
}))
657657
}
658658
ConsensusCommand::Publish { target, message } => {
659-
parent_cmds.push(Command::RouterCommand(RouterCommand::Publish {
659+
parent_cmds.push(Command::RouterCommand(RouterCommand::PublishWithPriority {
660660
target,
661661
message: VerifiedMonadMessage::Consensus(message),
662+
priority: monad_types::UdpPriority::High,
662663
}))
663664
}
664665
ConsensusCommand::PublishToFullNodes { epoch, message } => {

0 commit comments

Comments
 (0)