diff --git a/monad-consensus-state/src/command.rs b/monad-consensus-state/src/command.rs index 4c1232a1c9..a00b80db2d 100644 --- a/monad-consensus-state/src/command.rs +++ b/monad-consensus-state/src/command.rs @@ -61,6 +61,7 @@ where }, PublishToFullNodes { epoch: Epoch, + round: Round, message: Verified>>, }, /// Schedule a timeout event for `round` to be emitted in `duration` @@ -137,6 +138,7 @@ where cmds.push(ConsensusCommand::EnterRound(epoch, round)); cmds.push(ConsensusCommand::PublishToFullNodes { epoch, + round: high_certificate.round(), message: ConsensusMessage { version, message: ProtocolMessage::AdvanceRound(AdvanceRoundMessage { diff --git a/monad-executor-glue/src/lib.rs b/monad-executor-glue/src/lib.rs index 3202c0c608..524f7683d1 100644 --- a/monad-executor-glue/src/lib.rs +++ b/monad-executor-glue/src/lib.rs @@ -66,8 +66,12 @@ pub enum RouterCommand { message: OM, priority: UdpPriority, }, + // Primary publishing embeds epoch as group_id in chunk header. Secondary + // publishing embeds round as group_id in chunk header, as rebroadcasting + // periods are defined in rounds PublishToFullNodes { - epoch: Epoch, // Epoch gets embedded into the raptorcast message + epoch: Epoch, + round: Round, message: OM, }, AddEpochValidatorSet { @@ -103,9 +107,14 @@ impl Debug for RouterCommand { .field("target", target) .field("priority", priority) .finish(), - Self::PublishToFullNodes { epoch, message: _ } => f + Self::PublishToFullNodes { + epoch, + round, + message: _, + } => f .debug_struct("PublishToFullNodes") .field("epoch", epoch) + .field("round", round) .finish(), Self::AddEpochValidatorSet { epoch, diff --git a/monad-raptorcast/benches/encode_bench.rs b/monad-raptorcast/benches/encode_bench.rs index cdcb0c1079..ce595c6c05 100644 --- a/monad-raptorcast/benches/encode_bench.rs +++ b/monad-raptorcast/benches/encode_bench.rs @@ -22,11 +22,12 @@ use monad_crypto::certificate_signature::{CertificateSignature, CertificateSigna use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE; use monad_raptorcast::{ packet, + udp::GroupId, util::{BuildTarget, EpochValidators, Redundancy}, }; use monad_secp::SecpSignature; use monad_testutil::signing::get_key; -use monad_types::{NodeId, Stake}; +use monad_types::{Epoch, NodeId, Stake}; const NUM_NODES: usize = 100; @@ -63,8 +64,8 @@ pub fn bench_build_messages(c: &mut Criterion, name: &str, message_size: usize, DEFAULT_SEGMENT_SIZE, // segment_size message.clone(), Redundancy::from_u8(2), - 0, // epoch_no - 0, // unix_ts_ms + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms build_target.clone(), &known_addrs, ); diff --git a/monad-raptorcast/benches/raptor_bench.rs b/monad-raptorcast/benches/raptor_bench.rs index c884d7745a..95220d3a1d 100644 --- a/monad-raptorcast/benches/raptor_bench.rs +++ b/monad-raptorcast/benches/raptor_bench.rs @@ -24,11 +24,11 @@ use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE; use monad_raptor::ManagedDecoder; use monad_raptorcast::{ packet::build_messages, - udp::{parse_message, MAX_REDUNDANCY, SIGNATURE_CACHE_SIZE}, + udp::{parse_message, GroupId, MAX_REDUNDANCY, SIGNATURE_CACHE_SIZE}, util::{BuildTarget, EpochValidators, Redundancy}, }; use monad_secp::{KeyPair, SecpSignature}; -use monad_types::{NodeId, Stake}; +use monad_types::{Epoch, NodeId, Stake}; #[allow(clippy::useless_vec)] pub fn criterion_benchmark(c: &mut Criterion) { @@ -71,8 +71,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { DEFAULT_SEGMENT_SIZE, // segment_size message.clone(), Redundancy::from_u8(2), - 0, // epoch_no - 0, // unix_ts_ms + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms BuildTarget::Raptorcast(epoch_validators), &known_addresses, ); @@ -112,8 +112,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { DEFAULT_SEGMENT_SIZE, // segment_size message.clone(), Redundancy::from_u8(2), - 0, // epoch_no - 0, // unix_ts_ms + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms BuildTarget::Raptorcast(epoch_validators), &known_addresses, ) diff --git a/monad-raptorcast/src/decoding.rs b/monad-raptorcast/src/decoding.rs index 70b1a82260..f3e8d3e52b 100644 --- a/monad-raptorcast/src/decoding.rs +++ b/monad-raptorcast/src/decoding.rs @@ -32,7 +32,7 @@ use monad_crypto::{ hasher::{Hasher as _, HasherType}, }; use monad_raptor::{ManagedDecoder, SOURCE_SYMBOLS_MIN}; -use monad_types::{Epoch, NodeId, Stake}; +use monad_types::{NodeId, Stake}; use rand::Rng as _; use crate::{ @@ -336,7 +336,6 @@ where let prune_config = PruneConfig { // TODO: sync with config.udp_message_max_age_ms max_unix_ts_ms_delta: Some(10 * 1000), // 10 seconds - max_epoch_delta: Some(2), // 2 epochs pruning_min_ratio: 0.1, // prune at least 10% of cache or enter cooldown pruning_cooldown: Duration::from_secs(10), // 10 seconds cooldown }; @@ -421,7 +420,6 @@ trait QuotaPolicy: Send + Sync { #[derive(Clone, Copy)] pub struct PruneConfig { max_unix_ts_ms_delta: Option, - max_epoch_delta: Option, // if a full pruning sweep only reclaims less than this // fraction of the cache, we throttle further pruning for a @@ -436,19 +434,13 @@ pub struct PruneConfig { pub struct DecodingContext<'a, PT: PubKey> { validator_set: Option<&'a ValidatorSet>, unix_ts_now: UnixTimestamp, - current_epoch: Epoch, } impl<'a, PT: PubKey> DecodingContext<'a, PT> { - pub fn new( - validator_set: Option<&'a ValidatorSet>, - unix_ts_now: UnixTimestamp, - current_epoch: Epoch, - ) -> Self { + pub fn new(validator_set: Option<&'a ValidatorSet>, unix_ts_now: UnixTimestamp) -> Self { Self { validator_set, unix_ts_now, - current_epoch, } } } @@ -762,12 +754,7 @@ impl AuthorIndex { let message_size = message.app_message_len as MessageSize; - index.insert( - cache_key.clone(), - message.unix_ts_ms, - Epoch(message.epoch), - message_size, - ); + index.insert(cache_key.clone(), message.unix_ts_ms, message_size); self.used_size += message_size; if index.is_overquota() { @@ -811,15 +798,11 @@ impl AuthorIndex { .prune_config .max_unix_ts_ms_delta .and_then(|delta| context.unix_ts_now.checked_sub(delta)); - let epoch_threshold: Option = self - .prune_config - .max_epoch_delta - .and_then(|delta| context.current_epoch.checked_sub(delta)); let mut evicted_keys = PrunedKeys::empty(); // we first try only pruning expired keys - let expired_keys = author_index.prune_expired(unix_ts_threshold, epoch_threshold); + let expired_keys = author_index.prune_expired(unix_ts_threshold); evicted_keys.extend(expired_keys); // if still over quota, compact the cache to fit under quota @@ -852,10 +835,6 @@ impl AuthorIndex { .prune_config .max_unix_ts_ms_delta .and_then(|delta| context.unix_ts_now.checked_sub(delta)); - let epoch_threshold: Option = self - .prune_config - .max_epoch_delta - .and_then(|delta| context.current_epoch.checked_sub(delta)); let mut authors_to_drop = vec![]; let mut total_slots = 0; @@ -863,7 +842,7 @@ impl AuthorIndex { let mut pruned_keys = PrunedKeys::empty(); for (author, author_index) in &mut self.per_author_index { total_slots += author_index.len(); - pruned_keys.extend(author_index.prune_expired(unix_ts_threshold, epoch_threshold)); + pruned_keys.extend(author_index.prune_expired(unix_ts_threshold)); if author_index.is_empty() { authors_to_drop.push(*author); @@ -1000,8 +979,7 @@ struct PerAuthorIndex { quota: Quota, used_size: MessageSize, time_index: BTreeSet<(UnixTimestamp, CacheKey)>, - epoch_index: BTreeSet<(Epoch, CacheKey)>, - reverse_index: HashMap, + reverse_index: HashMap, } impl PerAuthorIndex { @@ -1010,7 +988,6 @@ impl PerAuthorIndex { quota, used_size: 0, time_index: BTreeSet::new(), - epoch_index: BTreeSet::new(), reverse_index: HashMap::new(), } } @@ -1028,12 +1005,11 @@ impl PerAuthorIndex { } pub fn remove(&mut self, key: &CacheKey) -> PrunedKeys { - let Some((unix_ts_ms, epoch, size)) = self.reverse_index.remove(key) else { + let Some((unix_ts_ms, size)) = self.reverse_index.remove(key) else { return PrunedKeys::empty(); }; self.time_index.remove(&(unix_ts_ms, key.clone())); - self.epoch_index.remove(&(epoch, key.clone())); self.used_size -= size; PrunedKeys::singleton(key.clone(), size) } @@ -1054,34 +1030,19 @@ impl PerAuthorIndex { .collect() } - pub fn insert( - &mut self, - cache_key: CacheKey, - unix_ts_ms: UnixTimestamp, - epoch: Epoch, - size: MessageSize, - ) { + pub fn insert(&mut self, cache_key: CacheKey, unix_ts_ms: UnixTimestamp, size: MessageSize) { self.time_index.insert((unix_ts_ms, cache_key.clone())); - self.epoch_index.insert((epoch, cache_key.clone())); - self.reverse_index - .insert(cache_key, (unix_ts_ms, epoch, size)); + self.reverse_index.insert(cache_key, (unix_ts_ms, size)); self.used_size += size; } // Remove expired entries. - pub fn prune_expired( - &mut self, - unix_ts_threshold: Option, - epoch_threshold: Option, - ) -> PrunedKeys { + pub fn prune_expired(&mut self, unix_ts_threshold: Option) -> PrunedKeys { let mut evicted_keys = PrunedKeys::empty(); // first, we prune all expired keys if let Some(threshold) = unix_ts_threshold { evicted_keys.extend(self.prune_by_time(threshold)); } - if let Some(threshold) = epoch_threshold { - evicted_keys.extend(self.prune_by_epoch(threshold)); - } evicted_keys } @@ -1115,17 +1076,6 @@ impl PerAuthorIndex { self.remove_many(&to_prune_keys) } - fn prune_by_epoch(&mut self, epoch_threshold: Epoch) -> PrunedKeys { - let mut to_prune_keys = vec![]; - for (epoch, key) in &self.epoch_index { - if *epoch >= epoch_threshold { - break; - } - to_prune_keys.push(key.clone()); - } - self.remove_many(&to_prune_keys) - } - fn prune_by_slots(&mut self, target_len: usize) -> PrunedKeys { let slots_to_free_up = self.len().saturating_sub(target_len); if slots_to_free_up == 0 { @@ -1150,21 +1100,15 @@ impl PerAuthorIndex { #[cfg(test)] fn consistency_breaches(&self, prefix: &str) -> Vec { let mut breaches = vec![]; - if self.epoch_index.len() != self.reverse_index.len() { - breaches.push(format!("{prefix}.epoch-index-size-mismatch")); - } if self.time_index.len() != self.reverse_index.len() { breaches.push(format!("{prefix}.time-index-size-mismatch")); } let mut used_size = self.used_size; - for (key, (unix_ts, epoch, _size)) in &self.reverse_index { + for (key, (unix_ts, _size)) in &self.reverse_index { if !self.time_index.contains(&(*unix_ts, key.clone())) { breaches.push(format!("{prefix}.time-index-missing-key")); } - if !self.epoch_index.contains(&(*epoch, key.clone())) { - breaches.push(format!("{prefix}.epoch-index-missing-key")); - } used_size -= *_size; } @@ -1587,7 +1531,7 @@ mod test { use rand::seq::SliceRandom as _; use super::*; - use crate::util::BroadcastMode; + use crate::{udp::GroupId, util::BroadcastMode}; type PT = monad_crypto::NopPubKey; const EPOCH: Epoch = Epoch(1); @@ -1656,7 +1600,7 @@ mod test { // these fields are never touched in this module recipient_hash: HexBytes([0; 20]), message: Bytes::new(), - epoch: EPOCH.0, + group_id: GroupId::Primary(EPOCH), unix_ts_ms, }; messages.push(message); @@ -1669,7 +1613,7 @@ mod test { let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]); let author = node_id(0); let symbols = make_symbols(&app_message, author, UNIX_TS_MS); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); for n in 0..MIN_DECODABLE_SYMBOLS { let mut cache = make_cache(10, 10, 10); @@ -1746,7 +1690,7 @@ mod test { // single slot per tier is enough let mut cache = make_cache(1, 1, 1); - let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS); let res = try_decode_all(&mut cache, &context, all_symbols.iter()) .expect("Decoding should succeed"); @@ -1759,7 +1703,7 @@ mod test { let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]); let author = node_id(0); let symbols = make_symbols(&app_message, author, UNIX_TS_MS); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); let mut cache = make_cache(10, 10, 10); // Decode a message completely. @@ -1782,7 +1726,7 @@ mod test { let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]); let author = node_id(0); let symbols = make_symbols(&app_message, author, old_ts); - let context = DecodingContext::new(None, old_ts, EPOCH); + let context = DecodingContext::new(None, old_ts); // Insert an old message. let _ = cache.try_decode(&symbols[0], &context); @@ -1793,7 +1737,7 @@ mod test { let new_app_message = Bytes::from(vec![2u8; APP_MESSAGE_LEN]); let new_author = node_id(i); let new_symbols = make_symbols(&new_app_message, new_author, UNIX_TS_MS); - let new_context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let new_context = DecodingContext::new(None, UNIX_TS_MS); let _ = cache.try_decode(&new_symbols[0], &new_context); assert!(cache.consistency_breaches().is_empty()); } @@ -1837,7 +1781,7 @@ mod test { config.validator_tier.min_slots_per_validator = Some(2); let mut cache = DecoderCache::new(config); - let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS); let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter()) .expect("Decoding should succeed"); assert!(cache.consistency_breaches().is_empty()); @@ -1919,7 +1863,7 @@ mod test { } let mut cache = DecoderCache::new(config); - let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS); let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter()) .expect("Decoding should succeed"); assert!(cache.consistency_breaches().is_empty()); @@ -1966,7 +1910,7 @@ mod test { config.p2p_tier.min_slots_per_author = 2; // each author gets at least 2 slots let mut cache = DecoderCache::new(config); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter()) .expect("Decoding should succeed"); assert!(cache.consistency_breaches().is_empty()); @@ -2032,7 +1976,7 @@ mod test { } let mut cache = DecoderCache::new(config); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter()) .expect("Decoding should succeed"); assert!(cache.consistency_breaches().is_empty()); @@ -2058,7 +2002,7 @@ mod test { let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]); let author = node_id(0); let symbols = make_symbols(&app_message, author, UNIX_TS_MS); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); // Insert a valid symbol first. let _ = cache.try_decode(&symbols[0], &context); @@ -2092,7 +2036,7 @@ mod test { config.p2p_tier.min_slots_per_author = 2; let mut cache = DecoderCache::new(config); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); // Fill the cache. let app_message0 = Bytes::from(vec![0u8; APP_MESSAGE_LEN]); @@ -2131,7 +2075,7 @@ mod test { config.p2p_tier.min_slots_per_author = 5; let mut cache = DecoderCache::new(config); - let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH); + let context = DecodingContext::new(None, UNIX_TS_MS); // take a single symbol for a given message let partial_symbol = |msg: u8, ts: UnixTimestamp| { diff --git a/monad-raptorcast/src/lib.rs b/monad-raptorcast/src/lib.rs index a99037a9f0..901c7ab4d4 100644 --- a/monad-raptorcast/src/lib.rs +++ b/monad-raptorcast/src/lib.rs @@ -57,6 +57,7 @@ use monad_types::{DropTimer, Epoch, ExecutionProtocol, NodeId, Round, RouterTarg use monad_validator::signature_collection::SignatureCollection; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tracing::{debug, debug_span, error, trace, warn}; +use udp::GroupId; use util::{BuildTarget, EpochValidators, FullNodes, Group, ReBroadcastGroupMap, Redundancy}; use crate::{ @@ -160,7 +161,7 @@ where let message_builder = OwnedMessageBuilder::new(config.shared_key.clone(), peer_discovery_driver.clone()) .segment_size(segment_size_for_mtu(config.mtu)) - .epoch_no(current_epoch) + .group_id(GroupId::Primary(current_epoch)) .redundancy(redundancy); Self { @@ -343,7 +344,7 @@ where self.message_builder .prepare() - .epoch_no(epoch) + .group_id(GroupId::Primary(epoch)) .build_into(&outbound_message, &build_target, &mut sink) .unwrap_log_on_error(&outbound_message, &build_target); } @@ -499,7 +500,7 @@ where } self.current_epoch = epoch; - self.message_builder.set_epoch_no(epoch); + self.message_builder.set_group_id(GroupId::Primary(epoch)); while let Some(entry) = self.epoch_validators.first_entry() { if *entry.key() + Epoch(1) < self.current_epoch { @@ -558,7 +559,11 @@ where } => { self.handle_publish(target, message, priority, self_id); } - RouterCommand::PublishToFullNodes { epoch, message } => { + RouterCommand::PublishToFullNodes { + epoch, + round: _, + message, + } => { let full_nodes_view = self.dedicated_full_nodes.view(); if self.is_dynamic_fullnode { debug!("self is dynamic full node, skipping publishing to full nodes"); @@ -610,7 +615,7 @@ where let build_target = BuildTarget::PointToPoint(node); self.message_builder .prepare_with_peer_lookup(&node_addrs) - .epoch_no(epoch) + .group_id(GroupId::Primary(epoch)) .build_into(&outbound_message, &build_target, &mut sink) .unwrap_log_on_error(&outbound_message, &build_target); } @@ -740,7 +745,6 @@ where let decoded_app_messages = { // FIXME: pass dataplane as arg to handle_message this.udp_state.handle_message( - this.current_epoch, &this.rebroadcast_map, // contains the NodeIds for all the RC participants for each epoch &this.epoch_validators, |targets, payload, bcast_stride| { diff --git a/monad-raptorcast/src/packet/assembler.rs b/monad-raptorcast/src/packet/assembler.rs index 45a9d619d7..d7f7663173 100644 --- a/monad-raptorcast/src/packet/assembler.rs +++ b/monad-raptorcast/src/packet/assembler.rs @@ -31,6 +31,7 @@ use super::{ BuildError, Collector, PeerAddrLookup, Result, UdpMessage, }; use crate::{ + udp::GroupId, util::{compute_hash, Redundancy}, SIGNATURE_SIZE, }; @@ -580,7 +581,7 @@ pub(crate) fn build_header( version: u16, broadcast_type: BroadcastType, merkle_tree_depth: u8, - epoch_no: u64, + group_id: GroupId, unix_ts_ms: u64, app_message_hash: &[u8; 20], app_message_len: usize, @@ -590,7 +591,7 @@ pub(crate) fn build_header( // Secondary broadcast bit, // 2 unused bits, // 4 bits for Merkle Tree Depth - // 8 // Epoch # + // 8 // Group id // 8 // Unix timestamp // 20 // AppMessage hash // 4 // AppMessage length @@ -614,8 +615,9 @@ pub(crate) fn build_header( broadcast_byte |= merkle_tree_depth & 0b0000_1111; cursor_broadcast_merkle_depth[0] = broadcast_byte; - let (cursor_epoch_no, cursor) = cursor.split_at_mut_checked(8).expect("header too short"); - cursor_epoch_no.copy_from_slice(&epoch_no.to_le_bytes()); + let group_id: u64 = group_id.into(); + let (cursor_group_id, cursor) = cursor.split_at_mut_checked(8).expect("header too short"); + cursor_group_id.copy_from_slice(&group_id.to_le_bytes()); let (cursor_unix_ts_ms, cursor) = cursor.split_at_mut_checked(8).expect("header too short"); cursor_unix_ts_ms.copy_from_slice(&unix_ts_ms.to_le_bytes()); diff --git a/monad-raptorcast/src/packet/builder.rs b/monad-raptorcast/src/packet/builder.rs index b71e3059da..f30e4665b8 100644 --- a/monad-raptorcast/src/packet/builder.rs +++ b/monad-raptorcast/src/packet/builder.rs @@ -31,7 +31,7 @@ use super::{ use crate::{ message::MAX_MESSAGE_SIZE, udp::{ - MAX_MERKLE_TREE_DEPTH, MAX_NUM_PACKETS, MAX_REDUNDANCY, MAX_SEGMENT_LENGTH, + GroupId, MAX_MERKLE_TREE_DEPTH, MAX_NUM_PACKETS, MAX_REDUNDANCY, MAX_SEGMENT_LENGTH, MIN_CHUNK_LENGTH, MIN_MERKLE_TREE_DEPTH, }, util::{compute_app_message_hash, unix_ts_ms_now, BuildTarget, Redundancy}, @@ -92,7 +92,7 @@ where peer_lookup: PL, // required fields - epoch_no: Option, + group_id: Option, redundancy: Option, // optional fields @@ -111,7 +111,7 @@ where Self { key: self.key.clone(), peer_lookup: self.peer_lookup.clone(), - epoch_no: self.epoch_no, + group_id: self.group_id, redundancy: self.redundancy, unix_ts_ms: self.unix_ts_ms, segment_size: self.segment_size, @@ -141,7 +141,7 @@ where // default fields redundancy: None, - epoch_no: None, + group_id: None, unix_ts_ms: TimestampMode::RealTime, // optional fields @@ -162,8 +162,8 @@ where self } - pub fn epoch_no(mut self, epoch_no: impl Into) -> Self { - self.epoch_no = Some(epoch_no.into()); + pub fn group_id(mut self, group_id: GroupId) -> Self { + self.group_id = Some(group_id); self } @@ -187,8 +187,8 @@ where } // ----- Convenience methods for modifying the builder ----- - pub fn set_epoch_no(&mut self, epoch_no: impl Into) { - self.epoch_no = Some(epoch_no.into()); + pub fn set_group_id(&mut self, group_id: GroupId) { + self.group_id = Some(group_id); } // ----- Prepare override builder ----- @@ -196,7 +196,7 @@ where PreparedMessageBuilder { base: self, peer_lookup: None, - epoch_no: None, + group_id: None, } } @@ -210,7 +210,7 @@ where PreparedMessageBuilder { base: self, peer_lookup: Some(peer_lookup), - epoch_no: None, + group_id: None, } } @@ -247,7 +247,7 @@ where // Add extra override fields as needed peer_lookup: Option, - epoch_no: Option, + group_id: Option, } impl<'base, 'key, ST, PL, PL2> PreparedMessageBuilder<'base, 'key, ST, PL, PL2> @@ -257,21 +257,21 @@ where PL2: PeerAddrLookup>, { // ----- Setters for overrides ----- - pub fn epoch_no(mut self, epoch_no: impl Into) -> Self { - self.epoch_no = Some(epoch_no.into()); + pub fn group_id(mut self, group_id: GroupId) -> Self { + self.group_id = Some(group_id); self } // ----- Parameter validation methods ----- - fn unwrap_epoch_no(&self) -> Result { - if let Some(epoch_no) = self.epoch_no { - return Ok(epoch_no); + fn unwrap_group_id(&self) -> Result { + if let Some(group_id) = self.group_id { + return Ok(group_id); } - let epoch_no = self + let group_id = self .base - .epoch_no - .expect("epoch_no must be set before building"); - Ok(epoch_no) + .group_id + .expect("group_id must be set before building"); + Ok(group_id) } fn unwrap_unix_ts_ms(&self) -> Result { let unix_ts_ms = match self.base.unix_ts_ms { @@ -358,14 +358,14 @@ where app_message_hash: &[u8; 20], app_message_len: usize, ) -> Result { - let epoch_no = self.unwrap_epoch_no()?; + let group_id = self.unwrap_group_id()?; let unix_ts_ms = self.unwrap_unix_ts_ms()?; let header_buf = build_header( 0, // version broadcast_type, merkle_tree_depth, - epoch_no, + group_id, unix_ts_ms, app_message_hash, app_message_len, diff --git a/monad-raptorcast/src/packet/mod.rs b/monad-raptorcast/src/packet/mod.rs index e8c1d8ec30..9ad66b772a 100644 --- a/monad-raptorcast/src/packet/mod.rs +++ b/monad-raptorcast/src/packet/mod.rs @@ -30,7 +30,10 @@ pub(crate) use self::{ assigner::ChunkAssigner, builder::MessageBuilder, }; -use crate::util::{BuildTarget, Redundancy}; +use crate::{ + udp::GroupId, + util::{BuildTarget, Redundancy}, +}; #[derive(Debug, Clone)] pub struct UdpMessage { @@ -80,7 +83,7 @@ pub fn build_messages( segment_size: u16, app_message: Bytes, redundancy: Redundancy, - epoch_no: u64, + group_id: GroupId, unix_ts_ms: u64, build_target: BuildTarget, known_addresses: &HashMap>, SocketAddr>, @@ -90,7 +93,7 @@ where { let builder = MessageBuilder::new(key, known_addresses) .segment_size(segment_size) - .epoch_no(epoch_no) + .group_id(group_id) .unix_ts_ms(unix_ts_ms) .redundancy(redundancy); diff --git a/monad-raptorcast/src/raptorcast_secondary/mod.rs b/monad-raptorcast/src/raptorcast_secondary/mod.rs index 749528aa20..8d24016d1d 100644 --- a/monad-raptorcast/src/raptorcast_secondary/mod.rs +++ b/monad-raptorcast/src/raptorcast_secondary/mod.rs @@ -48,6 +48,7 @@ use crate::{ config::{RaptorCastConfig, SecondaryRaptorCastMode}, message::OutboundRouterMessage, packet::{RetrofitResult as _, UdpMessageBatcher}, + udp::GroupId, util::{BuildTarget, FullNodes, Group, Redundancy}, OwnedMessageBuilder, RaptorCastEvent, UNICAST_MSG_BATCH_SIZE, }; @@ -149,7 +150,7 @@ where let message_builder = OwnedMessageBuilder::new(config.shared_key, peer_discovery_driver.clone()) .segment_size(segment_size_for_mtu(config.mtu)) - .epoch_no(current_epoch) + .group_id(GroupId::Primary(current_epoch)) .redundancy(redundancy); Self { @@ -317,7 +318,7 @@ where "RaptorCastSecondary UpdateCurrentRound (Publisher)" ); self.curr_epoch = epoch; - self.message_builder.set_epoch_no(epoch); + self.message_builder.set_group_id(GroupId::Primary(epoch)); // The publisher needs to be periodically informed about new nodes out there, // so that it can randomize when creating new groups. let full_nodes = self @@ -353,7 +354,11 @@ where } }, - Self::Command::PublishToFullNodes { epoch, message } => { + Self::Command::PublishToFullNodes { + epoch: _, + round, + message, + } => { let _timer = DropTimer::start(Duration::from_millis(20), |elapsed| { warn!(?elapsed, "long time to publish message") }); @@ -412,7 +417,7 @@ where // send to full nodes. self.message_builder .prepare() - .epoch_no(epoch) + .group_id(GroupId::Secondary(round)) .build_into(&outbound_message, &build_target, &mut sink) .unwrap_log_on_error(&outbound_message, &build_target); } diff --git a/monad-raptorcast/src/udp.rs b/monad-raptorcast/src/udp.rs index 787ae8689a..2a550aa776 100644 --- a/monad-raptorcast/src/udp.rs +++ b/monad-raptorcast/src/udp.rs @@ -29,7 +29,7 @@ use monad_dataplane::{ RecvUdpMsg, }; use monad_merkle::{MerkleHash, MerkleProof}; -use monad_types::{Epoch, NodeId}; +use monad_types::{Epoch, NodeId, Round}; use tracing::warn; pub use crate::packet::build_messages; @@ -123,7 +123,6 @@ impl UdpState { #[tracing::instrument(level = "debug", name = "udp_handle_message", skip_all)] pub fn handle_message( &mut self, - current_epoch: Epoch, group_map: &ReBroadcastGroupMap, epoch_validators: &BTreeMap>, rebroadcast: impl FnMut(Vec>>, Bytes, u16), @@ -186,11 +185,11 @@ impl UdpState { // Note: The check that parsed_message.author is valid is already // done in iterate_rebroadcast_peers(), but we want to drop invalid // chunks ASAP, before changing `recently_decoded_state`. - if let Some(broadcast_mode) = parsed_message.maybe_broadcast_mode { + if parsed_message.maybe_broadcast_mode.is_some() { if !group_map.check_source( - Epoch(parsed_message.epoch), + parsed_message.group_id, &parsed_message.author, - broadcast_mode, + &message.src_addr, ) { continue; } @@ -216,32 +215,30 @@ impl UdpState { ); let mut try_rebroadcast_symbol = || { - // rebroadcast raptorcast chunks if necessary - if let Some(broadcast_mode) = parsed_message.maybe_broadcast_mode { - if self_hash == parsed_message.recipient_hash { - let maybe_targets = group_map.iterate_rebroadcast_peers( - Epoch(parsed_message.epoch), + // rebroadcast raptorcast chunks if broadcast mode is set and + // we're the assigned rebroadcaster + if parsed_message.maybe_broadcast_mode.is_some() + && self_hash == parsed_message.recipient_hash + { + let maybe_targets = group_map + .iterate_rebroadcast_peers(parsed_message.group_id, &parsed_message.author); + if let Some(targets) = maybe_targets { + batch_guard.queue_broadcast( + payload_start_idx, + payload_end_idx, &parsed_message.author, - broadcast_mode, - ); - if let Some(targets) = maybe_targets { - batch_guard.queue_broadcast( - payload_start_idx, - payload_end_idx, - &parsed_message.author, - || targets.cloned().collect(), - ) - } + || targets.cloned().collect(), + ) } } }; - let validator_set = epoch_validators - .get(&Epoch(parsed_message.epoch)) - .map(|ev| &ev.validators); + let validator_set = match parsed_message.group_id { + GroupId::Primary(epoch) => epoch_validators.get(&epoch).map(|ev| &ev.validators), + GroupId::Secondary(_round) => None, + }; - let decoding_context = - DecodingContext::new(validator_set, unix_ts_ms_now(), current_epoch); + let decoding_context = DecodingContext::new(validator_set, unix_ts_ms_now()); match self .decoder_cache @@ -294,6 +291,21 @@ impl UdpState { } } +#[derive(Clone, Copy, Debug)] +pub enum GroupId { + Primary(Epoch), + Secondary(Round), +} + +impl From for u64 { + fn from(group_id: GroupId) -> Self { + match group_id { + GroupId::Primary(epoch) => epoch.0, + GroupId::Secondary(round) => round.0, + } + } +} + #[derive(Clone, Debug)] pub struct ValidatedMessage where @@ -306,7 +318,10 @@ where // This applies to both validator-to-validator and validator-to-full-node // raptorcasting. pub author: NodeId, - pub epoch: u64, + // group_id is set to + // - epoch number for validator-to-validator raptorcast + // - round number for validator-to-fullnode raptorcast + pub group_id: GroupId, pub unix_ts_ms: u64, pub app_message_hash: AppMessageHash, pub app_message_len: u32, @@ -398,8 +413,12 @@ where return Err(MessageValidationError::InvalidTreeDepth); } - let cursor_epoch = split_off(8)?; - let epoch = u64::from_le_bytes(cursor_epoch.as_ref().try_into().expect("u64 is 8 bytes")); + let cursor_group_id = split_off(8)?; + let group_id = u64::from_le_bytes(cursor_group_id.as_ref().try_into().expect("u64 is 8 bytes")); + let group_id = match maybe_broadcast_mode { + Some(BroadcastMode::Primary) | None => GroupId::Primary(Epoch(group_id)), + Some(BroadcastMode::Secondary) => GroupId::Secondary(Round(group_id)), + }; let cursor_unix_ts_ms = split_off(8)?; let unix_ts_ms = u64::from_le_bytes( @@ -494,7 +513,7 @@ where Ok(ValidatedMessage { message, author, - epoch, + group_id, unix_ts_ms, app_message_hash, app_message_len, @@ -670,7 +689,7 @@ mod tests { use monad_types::{Epoch, NodeId, Round, RoundSpan, Stake}; use rstest::*; - use super::{MessageValidationError, UdpState}; + use super::{GroupId, MessageValidationError, UdpState}; use crate::{ udp::{build_messages, parse_message, SIGNATURE_CACHE_SIZE}, util::{ @@ -718,7 +737,7 @@ mod tests { (keys.pop().unwrap(), validators, known_addresses) } - const EPOCH: u64 = 5; + const EPOCH: Epoch = Epoch(5); const UNIX_TS_MS: u64 = 5; #[test] @@ -738,7 +757,7 @@ mod tests { DEFAULT_SEGMENT_SIZE, // segment_size app_message.clone(), Redundancy::from_u8(2), - EPOCH, // epoch_no + GroupId::Primary(EPOCH), // epoch_no UNIX_TS_MS, BuildTarget::Raptorcast(epoch_validators), &known_addresses, @@ -777,7 +796,7 @@ mod tests { DEFAULT_SEGMENT_SIZE, // segment_size app_message, Redundancy::from_u8(2), - EPOCH, // epoch_no + GroupId::Primary(EPOCH), // epoch_no UNIX_TS_MS, BuildTarget::Raptorcast(epoch_validators), &known_addresses, @@ -827,7 +846,7 @@ mod tests { DEFAULT_SEGMENT_SIZE, // segment_size app_message, Redundancy::from_u8(2), - EPOCH, // epoch_no + GroupId::Primary(EPOCH), // epoch_no UNIX_TS_MS, BuildTarget::Raptorcast(epoch_validators), &known_addresses, @@ -873,7 +892,7 @@ mod tests { DEFAULT_SEGMENT_SIZE, // segment_size app_message.clone(), Redundancy::from_u8(2), - EPOCH, // epoch_no + GroupId::Primary(EPOCH), // epoch_no UNIX_TS_MS, build_target.clone(), &known_addresses, @@ -923,7 +942,7 @@ mod tests { DEFAULT_SEGMENT_SIZE, // segment_size app_message, Redundancy::from_u8(2), - EPOCH, // epoch_no + GroupId::Primary(EPOCH), // epoch_no UNIX_TS_MS, BuildTarget::Broadcast(epoch_validators.into()), &known_addresses, @@ -976,7 +995,6 @@ mod tests { }; udp_state.handle_message( - Epoch(1), &group_map, &validator_set, |_targets, _payload, _stride| {}, @@ -1013,7 +1031,7 @@ mod tests { DEFAULT_SEGMENT_SIZE, app_message, Redundancy::from_u8(1), - 0, + GroupId::Primary(EPOCH), test_timestamp, BuildTarget::Broadcast(epoch_validators.into()), &known_addresses, diff --git a/monad-raptorcast/src/util.rs b/monad-raptorcast/src/util.rs index 81d83864ff..34fa90259e 100644 --- a/monad-raptorcast/src/util.rs +++ b/monad-raptorcast/src/util.rs @@ -15,8 +15,9 @@ use std::{ cmp::Ordering, - collections::{BTreeMap, BinaryHeap, HashSet}, + collections::{BTreeMap, HashSet}, fmt, + net::SocketAddr, }; use fixed::{types::extra::U11, FixedU16}; @@ -28,6 +29,8 @@ use monad_crypto::{ use monad_types::{Epoch, NodeId, Round, RoundSpan, Stake}; use tracing::{debug, warn}; +use crate::udp::GroupId; + #[derive(Clone, Debug, Default)] pub struct EpochValidators where @@ -289,7 +292,8 @@ where sorted_other_peers: Vec>>, // Excludes self } -type GroupQueue = BinaryHeap>; +// Keyed by starting round +type GroupQueue = BTreeMap>; // Groups in a GroupQueue should be sorted by start round, earliest round first impl Ord for Group @@ -549,7 +553,8 @@ where // For Validator->validator re-raptorcasting validator_map: BTreeMap>, - // For Validator->fullnode re-raptorcasting + // For Validator->fullnode re-raptorcasting: each entry is keyed by + // validator NodeId. GroupQueue is a BTreeMap keyed by starting round fullnode_map: BTreeMap>, GroupQueue>, } @@ -569,17 +574,17 @@ where // before calling iterate_rebroadcast_peers(). pub fn check_source( &self, - msg_epoch: Epoch, + msg_group_id: GroupId, author_node_id: &NodeId>, - broadcast_mode: BroadcastMode, + src_addr: &SocketAddr, ) -> bool { - match broadcast_mode { - BroadcastMode::Primary => { + match msg_group_id { + GroupId::Primary(msg_epoch) => { // validator to validator raptorcast if let Some(group) = self.validator_map.get(&msg_epoch) { let author_found = group.check_author_node_id(author_node_id); if !author_found { - debug!(?author_node_id, ?self.validator_map, + debug!(?author_node_id, ?self.validator_map, ?src_addr, "Validator author for v2v group not found in validator_map"); } author_found @@ -588,15 +593,30 @@ where false } } - BroadcastMode::Secondary => { + GroupId::Secondary(msg_round) => { // Source node id (validator) is already the key to the map, so // we don't need to look into the group itself. - let author_found = self.fullnode_map.contains_key(author_node_id); - if !author_found { - debug!(?author_node_id, ?self.fullnode_map, + if let Some(groups) = self.fullnode_map.get(author_node_id) { + if let Some((_start_round, group)) = groups.range(..=msg_round).next_back() { + if msg_round >= group.round_span.start && msg_round < group.round_span.end { + return true; + } + } else { + debug!( + ?msg_round, + ?author_node_id, + ?self.fullnode_map, + ?src_addr, + "msg_round not found for validator's round span"); + // FIXME: returning true for backward compatibility, + // to be removed after upgrade + return true; + } + } else { + debug!(?author_node_id, ?self.fullnode_map, ?src_addr, "Validator author for v2fn group not found in fullnode_map"); } - author_found + false } } } @@ -608,30 +628,32 @@ where // and author field (for validator-to-fullnodes raptorcasting) pub fn iterate_rebroadcast_peers( &self, - msg_epoch: Epoch, // for validator-to-validator re-raptorcasting only + msg_group_id: GroupId, msg_author: &NodeId>, // skipped when iterating RaptorCast group - broadcast_mode: BroadcastMode, ) -> Option> { - let maybe_group = match broadcast_mode { - BroadcastMode::Primary => self.validator_map.get(&msg_epoch), - BroadcastMode::Secondary => { - let maybe_group_queue = self.fullnode_map.get(msg_author); - if let Some(group_queue) = maybe_group_queue { - group_queue.peek() // Take earliest among all future groups for msg_author + let rebroadcast_group = match msg_group_id { + GroupId::Primary(msg_epoch) => self.validator_map.get(&msg_epoch)?, + GroupId::Secondary(msg_round) => { + let group_queue = self.fullnode_map.get(msg_author)?; + let (_start_round, group) = group_queue.range(..=msg_round).next_back()?; + if msg_round >= group.round_span.start && msg_round < group.round_span.end { + group } else { - None + // FIXME: this is the backward compatible branch, which can + // be hit if full node is upgraded before the validator. + // It'll be removed after the upgrade + group_queue.first_key_value()?.1 } } }; - if let Some(group) = maybe_group { + if rebroadcast_group.size_excl_self() == 0 { // If there's no other peers in the group, then there's no one to broadcast to - if group.size_excl_self() == 0 { - return None; - } - return Some(group.iter_skip_self_and_author(msg_author, 0)); // this validates author + return None; } - None + + // this validates author + Some(rebroadcast_group.iter_skip_self_and_author(msg_author, 0)) } // As Validator: When we get an AddEpochValidatorSet. @@ -653,12 +675,12 @@ where // As Full-node: When secondary RaptorCast instance (Client) sends us a Group<> pub fn push_group_fullnodes(&mut self, group: Group) { - let vid = group.get_validator_id(); - let prev_group_queue_from_vid = format!("{:?}", self.fullnode_map.get(vid)); + let vid = *group.get_validator_id(); + let prev_group_queue_from_vid = format!("{:?}", self.fullnode_map.get(&vid)); self.fullnode_map - .entry(*vid) + .entry(vid) .or_default() - .push(group.clone()); + .insert(group.round_span.start, group); debug!(?vid, ?prev_group_queue_from_vid, "RaptorCast Group insert",); } @@ -670,7 +692,7 @@ where // aren't received proposals yet and hence do not know what the current // round is. for (_vid, group_queue) in self.fullnode_map.iter_mut() { - group_queue.retain(|group| curr_round < group.round_span.end); + group_queue.retain(|_start_round, group| curr_round < group.round_span.end); } self.fullnode_map .retain(|_vid, group_queue| !group_queue.is_empty()); @@ -691,7 +713,7 @@ where pub fn get_fullnode_map(&self) -> BTreeMap>, Group> { let mut res: BTreeMap<_, _> = BTreeMap::new(); for (vid, group_queue) in &self.fullnode_map { - if let Some(group) = group_queue.peek() { + if let Some((_start_round, group)) = group_queue.first_key_value() { res.insert(*vid, group.clone()); } } diff --git a/monad-raptorcast/tests/encoder_error.rs b/monad-raptorcast/tests/encoder_error.rs index 7a254da20c..418747224c 100644 --- a/monad-raptorcast/tests/encoder_error.rs +++ b/monad-raptorcast/tests/encoder_error.rs @@ -21,11 +21,11 @@ use monad_crypto::hasher::{Hasher, HasherType}; use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE; use monad_raptor::SOURCE_SYMBOLS_MAX; use monad_raptorcast::{ - udp::build_messages, + udp::{build_messages, GroupId}, util::{BuildTarget, EpochValidators, Redundancy}, }; use monad_secp::{KeyPair, SecpSignature}; -use monad_types::{NodeId, Stake}; +use monad_types::{Epoch, NodeId, Stake}; use tracing_subscriber::fmt::format::FmtSpan; // Try to encode a message that is too large to be encoded, to verify that the encoder @@ -74,8 +74,8 @@ pub fn encoder_error() { DEFAULT_SEGMENT_SIZE, message, Redundancy::from_u8(1), - 0, // epoch_no - 0, // unix_ts_ms + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms BuildTarget::Raptorcast(epoch_validators), &known_addresses, ); diff --git a/monad-raptorcast/tests/raptorcast_instance.rs b/monad-raptorcast/tests/raptorcast_instance.rs index 6aa61107b4..637c7ff1b9 100644 --- a/monad-raptorcast/tests/raptorcast_instance.rs +++ b/monad-raptorcast/tests/raptorcast_instance.rs @@ -37,7 +37,7 @@ use monad_raptorcast::{ new_defaulted_raptorcast_for_tests, packet::build_messages, raptorcast_secondary::group_message::FullNodesGroupMessage, - udp::MAX_REDUNDANCY, + udp::{GroupId, MAX_REDUNDANCY}, util::{BuildTarget, EpochValidators, Group, Redundancy}, RaptorCast, RaptorCastEvent, }; @@ -95,8 +95,8 @@ pub fn different_symbol_sizes() { segment_size, message.clone(), Redundancy::from_u8(2), - 0, // epoch_no - 0, // unix_ts_ms + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms BuildTarget::Raptorcast(epoch_validators), &known_addresses, ); @@ -155,8 +155,8 @@ pub fn buffer_count_overflow() { DEFAULT_SEGMENT_SIZE, message, Redundancy::from_u8(2), - 0, // epoch_no - 0, // unix_ts_ms + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms BuildTarget::Raptorcast(epoch_validators), &known_addresses, ); @@ -234,9 +234,9 @@ pub fn valid_rebroadcast() { &tx_keypair, DEFAULT_SEGMENT_SIZE, message, - MAX_REDUNDANCY, // redundancy, - 0, // epoch_no - 0, // unix_ts_ms + MAX_REDUNDANCY, // redundancy, + GroupId::Primary(Epoch(0)), // epoch_no + 0, // unix_ts_ms BuildTarget::Raptorcast(epoch_validators), &known_addresses, ); @@ -550,6 +550,7 @@ async fn publish_to_full_nodes() { let message = MockMessage::new(42, 10000); let command = RouterCommand::PublishToFullNodes { epoch: Epoch(0), + round: Round(0), message, }; validator_rc.exec(vec![command]); diff --git a/monad-router-multi/src/lib.rs b/monad-router-multi/src/lib.rs index e607721270..8f4ba3eab9 100644 --- a/monad-router-multi/src/lib.rs +++ b/monad-router-multi/src/lib.rs @@ -315,9 +315,14 @@ where RouterCommand::GetPeers => validator_cmds.push(cmd), RouterCommand::UpdatePeers { .. } => validator_cmds.push(cmd), - RouterCommand::PublishToFullNodes { epoch, ref message } => { + RouterCommand::PublishToFullNodes { + epoch, + round, + ref message, + } => { let cmd_cpy = RouterCommand::PublishToFullNodes { epoch, + round, message: message.clone(), }; validator_cmds.push(cmd_cpy); diff --git a/monad-state/src/consensus.rs b/monad-state/src/consensus.rs index bb5b331676..683cc116c6 100644 --- a/monad-state/src/consensus.rs +++ b/monad-state/src/consensus.rs @@ -233,6 +233,7 @@ where match protocol_message { ProtocolMessage::Proposal(msg) => { let proposal_epoch = msg.proposal_epoch; + let proposal_round = msg.proposal_round; let mut proposal_cmds = consensus.handle_proposal_message(author, msg); // TODO:Maybe we could skip the below command if we could somehow determine that @@ -241,6 +242,7 @@ where // send verified_message carrying author signature proposal_cmds.push(ConsensusCommand::PublishToFullNodes { epoch: proposal_epoch, + round: proposal_round, message: verified_message, }); proposal_cmds @@ -662,12 +664,15 @@ where priority: monad_types::UdpPriority::High, })) } - ConsensusCommand::PublishToFullNodes { epoch, message } => { - parent_cmds.push(Command::RouterCommand(RouterCommand::PublishToFullNodes { - epoch, - message: VerifiedMonadMessage::Consensus(message), - })) - } + ConsensusCommand::PublishToFullNodes { + epoch, + round, + message, + } => parent_cmds.push(Command::RouterCommand(RouterCommand::PublishToFullNodes { + epoch, + round, + message: VerifiedMonadMessage::Consensus(message), + })), ConsensusCommand::Schedule { round, duration } => { parent_cmds.push(Command::TimerCommand(TimerCommand::Schedule { duration, diff --git a/monad-types/src/lib.rs b/monad-types/src/lib.rs index b64aa27f0d..723ed26506 100644 --- a/monad-types/src/lib.rs +++ b/monad-types/src/lib.rs @@ -222,13 +222,6 @@ impl Debug for Epoch { self.0.fmt(f) } } - -impl From for u64 { - fn from(epoch: Epoch) -> Self { - epoch.0 - } -} - /// Block sequence number /// /// Consecutive blocks in the same branch have consecutive sequence numbers,