Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions monad-raptorcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ futures-util = { workspace = true }
insta = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
rand_distr = { workspace = true }

[[bench]]
name = "raptor_bench"
Expand Down
17 changes: 7 additions & 10 deletions monad-raptorcast/src/packet/assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use super::{
assigner::{ChunkAssignment, ChunkOrder},
BuildError, Collector, PeerAddrLookup, Result, UdpMessage,
};
use crate::{util::Redundancy, SIGNATURE_SIZE};
use crate::{
util::{compute_app_message_hash, compute_hash, Redundancy},
SIGNATURE_SIZE,
};

#[derive(Default, Clone, Copy, PartialEq, Eq)]
pub enum AssembleMode {
Expand Down Expand Up @@ -156,7 +159,7 @@ impl<PT: PubKey> PartialEq for RecipientInner<PT> {

impl<PT: PubKey> Recipient<PT> {
pub fn new(node_id: NodeId<PT>) -> Self {
let node_hash = crate::util::compute_hash(&node_id).0;
let node_hash = compute_hash(&node_id).0;
let addr = OnceCell::new();
let inner = RecipientInner {
node_id,
Expand Down Expand Up @@ -618,8 +621,8 @@ pub(crate) fn build_header(

let (cursor_app_message_hash, cursor) =
cursor.split_at_mut_checked(20).expect("header too short");
let app_message_hash = calc_full_hash(app_message);
cursor_app_message_hash.copy_from_slice(&app_message_hash[..20]);
let app_message_hash = compute_app_message_hash(app_message).0;
cursor_app_message_hash.copy_from_slice(&app_message_hash);

let (cursor_app_message_len, cursor) =
cursor.split_at_mut_checked(4).expect("header too short");
Expand All @@ -645,12 +648,6 @@ where
}
}

fn calc_full_hash(bytes: &[u8]) -> Hash {
let mut hasher = HasherType::new();
hasher.update(bytes);
hasher.hash()
}

impl AssembleMode {
pub fn expected_chunk_order(self) -> Option<ChunkOrder> {
match self {
Expand Down
158 changes: 149 additions & 9 deletions monad-raptorcast/src/packet/assigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
use std::{collections::HashMap, ops::Range};

use bytes::BytesMut;
use monad_crypto::certificate_signature::PubKey;
use monad_crypto::certificate_signature::{
CertificateSignaturePubKey, CertificateSignatureRecoverable, PubKey,
};
use monad_types::{NodeId, Stake};
use rand::{rngs::StdRng, seq::SliceRandom as _, SeedableRng as _};

use super::{BuildError, Chunk, PacketLayout, Recipient, Result};
use crate::util::compute_app_message_hash;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkOrder {
Expand Down Expand Up @@ -384,6 +388,10 @@ pub(crate) struct Partitioned<PT: PubKey> {
}

impl<PT: PubKey> Partitioned<PT> {
// This assigner is only used for full-node raptorcast, which is
// based on homogeneous peers. RaptorCast between validators has
// switched to StakeBasedWithRC assigner.
#[cfg_attr(not(test), expect(unused))]
pub fn from_validator_set(validator_set: Vec<(NodeId<PT>, Stake)>) -> Self {
let mut total_stake = Stake::ZERO;
let weighted_nodes = validator_set
Expand Down Expand Up @@ -456,16 +464,47 @@ pub(crate) struct StakeBasedWithRC<PT: PubKey> {
}

impl<PT: PubKey> StakeBasedWithRC<PT> {
#[cfg_attr(not(test), expect(unused))]
pub fn seed_from_app_message(app_message: &[u8]) -> [u8; 32] {
let mut padded_seed = [0u8; 32];
let app_message_hash = compute_app_message_hash(app_message);
padded_seed[..20].copy_from_slice(&app_message_hash.0);
padded_seed
}

// Shuffle the validator stake map for chunk assignment. This uses
// a deterministic seed, as in the future, it will be required
// that the leader and all validators compute the shuffling in the
// same way (for features not yet implemented). In the future,
// this should be done using known shuffling algorithm to allow
// for easy implementation in other languages, e.g., using Mt19937
// and Fisher Yates shuffle.
pub fn shuffle_validators<ST>(
view: &crate::util::ValidatorsView<ST>,
seed: [u8; 32],
) -> Vec<(NodeId<CertificateSignaturePubKey<ST>>, Stake)>
where
ST: CertificateSignatureRecoverable,
{
let mut validator_set = view
.iter()
.map(|(node_id, stake)| (*node_id, stake))
.collect::<std::collections::BinaryHeap<_>>()
.into_sorted_vec();
let mut rng = StdRng::from_seed(seed);
validator_set.shuffle(&mut rng);
validator_set
}

pub fn from_validator_set(validator_set: Vec<(NodeId<PT>, Stake)>) -> Self {
let mut total_stake = Stake::ZERO;
let validator_set = validator_set
let validator_set: Vec<_> = validator_set
.into_iter()
.map(|(nid, stake)| {
total_stake += stake;
(Recipient::new(nid), stake)
})
.collect();

Self {
validator_set,
total_stake,
Expand Down Expand Up @@ -538,15 +577,19 @@ fn split_off_chunks_into<PT: PubKey>(

#[cfg(test)]
mod tests {
use std::{collections::HashMap, ops::Range};
use std::{
collections::{BTreeSet, HashMap},
ops::Range,
};

use alloy_primitives::U256;
use itertools::Itertools as _;
use monad_crypto::certificate_signature::CertificateSignaturePubKey;
use monad_secp::SecpSignature;
use monad_testutil::signing::get_key;
use monad_types::{NodeId, Stake};
use rand::{seq::SliceRandom, Rng as _};
use rand::{seq::SliceRandom, Rng};
use rand_distr::{Distribution, Normal};

use super::{ChunkAssignment, ChunkOrder, Partitioned, StakeBasedWithRC};
use crate::{
Expand Down Expand Up @@ -600,12 +643,20 @@ mod tests {
fn rand_validator_set(rng: &mut impl rand::Rng, max_n: usize) -> Vec<(NodeId<PT>, Stake)> {
let n: usize = rng.gen_range(1..max_n);
let mut validator_set = Vec::with_capacity(n);

let mon = U256::from(1_000_000_000_000_000_000u64);
let min_stake = mon * U256::from(100_000); // approximated
let mean = f64::from(min_stake * U256::from(100)); // estimated
let std_dev = f64::from(mon * U256::from(500_000)); // estimated
let stake_distr = Normal::new(mean, std_dev).unwrap();

loop {
let mut total_stake = Stake::ZERO;

for i in 1..=n {
// divide by n to ensure the sum never overflows.
let stake = Stake::from(rng.gen::<U256>() / U256::from(n));
let stake = stake_distr.sample(rng).max(f64::from(min_stake));
let stake = Stake::from(u256_from_f64_lossy(stake));

// NOTE: we don't forbid individual stake to be zero,
// as long as total stake is non-zero. we do this to
// test the robustness of assignment algorithm.
Expand Down Expand Up @@ -638,7 +689,7 @@ mod tests {
fn test_replicated_assignment() {
let rng = &mut rand::thread_rng();

for _ in 0..1000 {
for _ in 0..100 {
let node_set = rand_node_set(rng, 2000);
let assigner = Replicated::from_broadcast(node_set);
let num_symbols = rng.gen_range(0..10);
Expand All @@ -661,7 +712,7 @@ mod tests {
fn test_partitioned_assignment() {
let rng = &mut rand::thread_rng();

for _ in 0..300 {
for _ in 0..30 {
let validator_set = rand_validator_set(rng, 2000);
let assigner = Partitioned::from_validator_set(validator_set);
let num_symbols = rng.gen_range(0..1000);
Expand Down Expand Up @@ -722,4 +773,93 @@ mod tests {
);
}
}

#[test]
fn test_stake_with_rc_properties() {
let rng = &mut rand::thread_rng();
for _ in 0..50 {
let n_validators = rng.gen_range(1..2000);
let validator_set = rand_validator_set(rng, n_validators);
let assigner = Partitioned::from_validator_set(validator_set.clone());
let assigner_rc = StakeBasedWithRC::from_validator_set(validator_set);

// estimated from at most 2MB data, 1400-byte segments, 3x redundancy
let num_symbols = rng.gen_range(1..5000);
let assignment = assigner
.assign_chunks(num_symbols, None)
.expect("should assign successfully");
let assignment_rc = assigner_rc
.assign_chunks(num_symbols, None)
.expect("should assign successfully");

// assignment with rc must produce at least the same number of chunks as without rc
assert!(assignment_rc.total_chunks() >= assignment.total_chunks());
// the difference in total chunks must not exceed number of validators
assert!(assignment_rc.total_chunks() - assignment.total_chunks() <= n_validators);

let chunk_ids: BTreeSet<_> = assignment
.assignments
.iter()
.flat_map(|slice| slice.chunk_id_range.clone())
.collect();
let chunk_ids_rc: BTreeSet<_> = assignment_rc
.assignments
.iter()
.flat_map(|slice| slice.chunk_id_range.clone())
.collect();

// both assignments must be continuous from 0 to total_chunks - 1
assert_eq!(chunk_ids.len(), assignment.total_chunks());
assert_eq!(chunk_ids.first().cloned(), Some(0));
assert_eq!(
chunk_ids.last().cloned(),
Some(assignment.total_chunks() - 1)
);

assert_eq!(chunk_ids_rc.len(), assignment_rc.total_chunks());
assert_eq!(chunk_ids_rc.first().cloned(), Some(0));
assert_eq!(
chunk_ids_rc.last().cloned(),
Some(assignment_rc.total_chunks() - 1)
);

let validator_to_chunks: HashMap<_, _> = assignment
.assignments
.iter()
.map(|slice| (slice.recipient.node_hash(), slice.chunk_id_range.len()))
.collect();
let validator_to_chunks_rc: HashMap<_, _> = assignment_rc
.assignments
.iter()
.map(|slice| (slice.recipient.node_hash(), slice.chunk_id_range.len()))
.collect();

for (validator, num_chunks) in validator_to_chunks {
let num_chunks_rc = validator_to_chunks_rc.get(validator);
// each validator that exist in non-rc assignment must
// also exist in rc assignment
assert!(num_chunks_rc.is_some());
// each validator must get at least as many chunks in rc assignment
assert!(*num_chunks_rc.unwrap() >= num_chunks);
}
}
}

// Ported from alloy_primitives::U256::from_f64_lossy from a newer version
fn u256_from_f64_lossy(value: f64) -> U256 {
if value >= 1.0 {
let bits = value.to_bits();
let exponent = ((bits >> 52) & 0x7ff) - 1023;
let mantissa = (bits & 0x0f_ffff_ffff_ffff) | 0x10_0000_0000_0000;
if exponent <= 52 {
U256::from(mantissa >> (52 - exponent))
} else if exponent >= 256 {
U256::MAX
} else {
U256::from(mantissa) << U256::from(exponent - 52)
}
} else {
U256::ZERO
}
}
}
39 changes: 23 additions & 16 deletions monad-raptorcast/src/packet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use monad_crypto::certificate_signature::{
};
use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE;
use monad_types::NodeId;
use rand::{seq::SliceRandom as _, Rng};
use rand::Rng;

use super::{
assembler::{self, build_header, AssembleMode, BroadcastType, PacketLayout},
Expand All @@ -34,7 +34,7 @@ use crate::{
MAX_MERKLE_TREE_DEPTH, MAX_NUM_PACKETS, MAX_REDUNDANCY, MAX_SEGMENT_LENGTH,
MIN_CHUNK_LENGTH, MIN_MERKLE_TREE_DEPTH,
},
util::{self, BuildTarget, Redundancy},
util::{unix_ts_ms_now, BuildTarget, Redundancy},
};

pub const DEFAULT_MERKLE_TREE_DEPTH: u8 = 6;
Expand Down Expand Up @@ -285,7 +285,7 @@ where
fn unwrap_unix_ts_ms(&self) -> Result<u64> {
let unix_ts_ms = match self.base.unix_ts_ms {
TimestampMode::Fixed(ts) => ts,
TimestampMode::RealTime => util::unix_ts_ms_now(),
TimestampMode::RealTime => unix_ts_ms_now(),
};
Ok(unix_ts_ms)
}
Expand Down Expand Up @@ -383,34 +383,41 @@ where
Ok(header_buf)
}

fn choose_assigner(
fn make_assigner(
build_target: &BuildTarget<ST>,
self_node_id: &NodeId<CertificateSignaturePubKey<ST>>,
app_message: &[u8],
rng: &mut impl Rng,
) -> Box<dyn ChunkAssigner<CertificateSignaturePubKey<ST>>>
where
ST: CertificateSignatureRecoverable,
{
use assigner::{Partitioned, Replicated, StakeBasedWithRC};
match build_target {
BuildTarget::PointToPoint(to) => Box::new(assigner::Replicated::from_unicast(**to)),
BuildTarget::Broadcast(nodes) => Box::new(assigner::Replicated::from_broadcast(
nodes.iter().copied().collect(),
)),
BuildTarget::PointToPoint(to) => Box::new(Replicated::from_unicast(**to)),
BuildTarget::Broadcast(nodes) => {
let assigner = Replicated::from_broadcast(nodes.iter().copied().collect());
Box::new(assigner)
}
BuildTarget::Raptorcast(validators) => {
let mut validator_set: Vec<_> = validators
.iter()
.map(|(node_id, stake)| (*node_id, stake))
.collect();
validator_set.shuffle(rng);
Box::new(assigner::Partitioned::from_validator_set(validator_set))
let seed =
StakeBasedWithRC::<CertificateSignaturePubKey<ST>>::seed_from_app_message(
app_message,
);
let sorted_validators =
StakeBasedWithRC::<CertificateSignaturePubKey<ST>>::shuffle_validators::<ST>(
validators, seed,
);
let assigner = StakeBasedWithRC::from_validator_set(sorted_validators);
Box::new(assigner)
}
BuildTarget::FullNodeRaptorCast(group) => {
let seed = rng.gen::<usize>();
let nodes = group
.iter_skip_self_and_author(self_node_id, seed)
.copied()
.collect();
Box::new(assigner::Partitioned::from_homogeneous_peers(nodes))
Box::new(Partitioned::from_homogeneous_peers(nodes))
}
}
}
Expand All @@ -433,7 +440,7 @@ where
// select chunk assignment algorithm based on build target
let rng = &mut rand::thread_rng();
let self_node_id = NodeId::new(self.base.key.as_ref().pubkey());
let assigner = Self::choose_assigner(build_target, &self_node_id, rng);
let assigner = Self::make_assigner(build_target, &self_node_id, app_message, rng);

// calculate the number of symbols needed for assignment
let app_message_len = self.checked_message_len(app_message.len())?;
Expand Down
Loading
Loading