Skip to content

Commit 9dd6943

Browse files
Overload Raptor Chunk Header Epoch
A full node may join new RaptorCast groups from the same validator while it's syncing or stuck. In such cases, it may rebroadcast to an expired group because its local round hasn't advanced, and expired groups are not garbage-collected. As a result, many full nodes end up receiving unsolicited Raptor chunks from groups they no longer belong to. The root cause is that the node selects the rebroadcast group based on its local round. This PR resolves the issue by overloading the epoch field in the Raptor chunk header setting it to the round number for secondary raptorcast. This allows syncing or stuck full nodes to use the round number to identify and service the correct group
1 parent 33d321a commit 9dd6943

File tree

14 files changed

+150
-124
lines changed

14 files changed

+150
-124
lines changed

monad-consensus-state/src/command.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ where
6161
},
6262
PublishToFullNodes {
6363
epoch: Epoch,
64+
round: Round,
6465
message: Verified<ST, Validated<ConsensusMessage<ST, SCT, EPT>>>,
6566
},
6667
/// Schedule a timeout event for `round` to be emitted in `duration`
@@ -137,6 +138,7 @@ where
137138
cmds.push(ConsensusCommand::EnterRound(epoch, round));
138139
cmds.push(ConsensusCommand::PublishToFullNodes {
139140
epoch,
141+
round: high_certificate.round(),
140142
message: ConsensusMessage {
141143
version,
142144
message: ProtocolMessage::AdvanceRound(AdvanceRoundMessage {

monad-executor-glue/src/lib.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ pub enum RouterCommand<ST: CertificateSignatureRecoverable, OM> {
6666
message: OM,
6767
priority: UdpPriority,
6868
},
69+
// Primary publishing embeds epoch as gropu_id in chunk header. Secondary
70+
// publishing embeds round as group_id in chunk header, as rebroadcasting
71+
// periods are defined in rounds
6972
PublishToFullNodes {
70-
epoch: Epoch, // Epoch gets embedded into the raptorcast message
73+
epoch: Epoch,
74+
round: Round,
7175
message: OM,
7276
},
7377
AddEpochValidatorSet {
@@ -103,9 +107,14 @@ impl<ST: CertificateSignatureRecoverable, OM> Debug for RouterCommand<ST, OM> {
103107
.field("target", target)
104108
.field("priority", priority)
105109
.finish(),
106-
Self::PublishToFullNodes { epoch, message: _ } => f
110+
Self::PublishToFullNodes {
111+
epoch,
112+
round,
113+
message: _,
114+
} => f
107115
.debug_struct("PublishToFullNodes")
108116
.field("epoch", epoch)
117+
.field("round", round)
109118
.finish(),
110119
Self::AddEpochValidatorSet {
111120
epoch,

monad-raptorcast/src/decoding.rs

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -762,12 +762,7 @@ impl<PT: PubKey> AuthorIndex<PT> {
762762

763763
let message_size = message.app_message_len as MessageSize;
764764

765-
index.insert(
766-
cache_key.clone(),
767-
message.unix_ts_ms,
768-
Epoch(message.epoch),
769-
message_size,
770-
);
765+
index.insert(cache_key.clone(), message.unix_ts_ms, message_size);
771766
self.used_size += message_size;
772767

773768
if index.is_overquota() {
@@ -1000,8 +995,7 @@ struct PerAuthorIndex {
1000995
quota: Quota,
1001996
used_size: MessageSize,
1002997
time_index: BTreeSet<(UnixTimestamp, CacheKey)>,
1003-
epoch_index: BTreeSet<(Epoch, CacheKey)>,
1004-
reverse_index: HashMap<CacheKey, (UnixTimestamp, Epoch, MessageSize)>,
998+
reverse_index: HashMap<CacheKey, (UnixTimestamp, MessageSize)>,
1005999
}
10061000

10071001
impl PerAuthorIndex {
@@ -1010,7 +1004,6 @@ impl PerAuthorIndex {
10101004
quota,
10111005
used_size: 0,
10121006
time_index: BTreeSet::new(),
1013-
epoch_index: BTreeSet::new(),
10141007
reverse_index: HashMap::new(),
10151008
}
10161009
}
@@ -1028,12 +1021,11 @@ impl PerAuthorIndex {
10281021
}
10291022

10301023
pub fn remove(&mut self, key: &CacheKey) -> PrunedKeys {
1031-
let Some((unix_ts_ms, epoch, size)) = self.reverse_index.remove(key) else {
1024+
let Some((unix_ts_ms, size)) = self.reverse_index.remove(key) else {
10321025
return PrunedKeys::empty();
10331026
};
10341027

10351028
self.time_index.remove(&(unix_ts_ms, key.clone()));
1036-
self.epoch_index.remove(&(epoch, key.clone()));
10371029
self.used_size -= size;
10381030
PrunedKeys::singleton(key.clone(), size)
10391031
}
@@ -1054,17 +1046,9 @@ impl PerAuthorIndex {
10541046
.collect()
10551047
}
10561048

1057-
pub fn insert(
1058-
&mut self,
1059-
cache_key: CacheKey,
1060-
unix_ts_ms: UnixTimestamp,
1061-
epoch: Epoch,
1062-
size: MessageSize,
1063-
) {
1049+
pub fn insert(&mut self, cache_key: CacheKey, unix_ts_ms: UnixTimestamp, size: MessageSize) {
10641050
self.time_index.insert((unix_ts_ms, cache_key.clone()));
1065-
self.epoch_index.insert((epoch, cache_key.clone()));
1066-
self.reverse_index
1067-
.insert(cache_key, (unix_ts_ms, epoch, size));
1051+
self.reverse_index.insert(cache_key, (unix_ts_ms, size));
10681052
self.used_size += size;
10691053
}
10701054

@@ -1079,9 +1063,6 @@ impl PerAuthorIndex {
10791063
if let Some(threshold) = unix_ts_threshold {
10801064
evicted_keys.extend(self.prune_by_time(threshold));
10811065
}
1082-
if let Some(threshold) = epoch_threshold {
1083-
evicted_keys.extend(self.prune_by_epoch(threshold));
1084-
}
10851066
evicted_keys
10861067
}
10871068

@@ -1115,17 +1096,6 @@ impl PerAuthorIndex {
11151096
self.remove_many(&to_prune_keys)
11161097
}
11171098

1118-
fn prune_by_epoch(&mut self, epoch_threshold: Epoch) -> PrunedKeys {
1119-
let mut to_prune_keys = vec![];
1120-
for (epoch, key) in &self.epoch_index {
1121-
if *epoch >= epoch_threshold {
1122-
break;
1123-
}
1124-
to_prune_keys.push(key.clone());
1125-
}
1126-
self.remove_many(&to_prune_keys)
1127-
}
1128-
11291099
fn prune_by_slots(&mut self, target_len: usize) -> PrunedKeys {
11301100
let slots_to_free_up = self.len().saturating_sub(target_len);
11311101
if slots_to_free_up == 0 {
@@ -1150,21 +1120,15 @@ impl PerAuthorIndex {
11501120
#[cfg(test)]
11511121
fn consistency_breaches(&self, prefix: &str) -> Vec<String> {
11521122
let mut breaches = vec![];
1153-
if self.epoch_index.len() != self.reverse_index.len() {
1154-
breaches.push(format!("{prefix}.epoch-index-size-mismatch"));
1155-
}
11561123
if self.time_index.len() != self.reverse_index.len() {
11571124
breaches.push(format!("{prefix}.time-index-size-mismatch"));
11581125
}
11591126

11601127
let mut used_size = self.used_size;
1161-
for (key, (unix_ts, epoch, _size)) in &self.reverse_index {
1128+
for (key, (unix_ts, _size)) in &self.reverse_index {
11621129
if !self.time_index.contains(&(*unix_ts, key.clone())) {
11631130
breaches.push(format!("{prefix}.time-index-missing-key"));
11641131
}
1165-
if !self.epoch_index.contains(&(*epoch, key.clone())) {
1166-
breaches.push(format!("{prefix}.epoch-index-missing-key"));
1167-
}
11681132
used_size -= *_size;
11691133
}
11701134

@@ -1656,7 +1620,7 @@ mod test {
16561620
// these fields are never touched in this module
16571621
recipient_hash: HexBytes([0; 20]),
16581622
message: Bytes::new(),
1659-
epoch: EPOCH.0,
1623+
group_id: EPOCH.0,
16601624
unix_ts_ms,
16611625
};
16621626
messages.push(message);

monad-raptorcast/src/lib.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ where
152152
let message_builder =
153153
OwnedMessageBuilder::new(config.shared_key.clone(), peer_discovery_driver.clone())
154154
.segment_size(segment_size_for_mtu(config.mtu))
155-
.epoch_no(current_epoch)
155+
.group_id(current_epoch)
156156
.redundancy(redundancy);
157157

158158
Self {
@@ -323,7 +323,7 @@ where
323323
if let Some(rc_chunks) = self
324324
.message_builder
325325
.prepare()
326-
.epoch_no(epoch)
326+
.group_id(epoch)
327327
.build_unicast_msg(&outbound_message, &build_target)
328328
{
329329
self.dataplane_writer
@@ -473,7 +473,7 @@ where
473473
}
474474

475475
self.current_epoch = epoch;
476-
self.message_builder.set_epoch_no(epoch);
476+
self.message_builder.set_group_id(epoch);
477477

478478
while let Some(entry) = self.epoch_validators.first_entry() {
479479
if *entry.key() + Epoch(1) < self.current_epoch {
@@ -532,7 +532,11 @@ where
532532
} => {
533533
self.handle_publish(target, message, priority, self_id);
534534
}
535-
RouterCommand::PublishToFullNodes { epoch, message } => {
535+
RouterCommand::PublishToFullNodes {
536+
epoch,
537+
round: _,
538+
message,
539+
} => {
536540
let full_nodes_view = self.dedicated_full_nodes.view();
537541
if self.is_dynamic_fullnode {
538542
debug!("self is dynamic full node, skipping publishing to full nodes");
@@ -573,7 +577,7 @@ where
573577
if let Some(rc_chunks) = self
574578
.message_builder
575579
.prepare_with_peer_lookup(&node_addrs)
576-
.epoch_no(epoch)
580+
.group_id(epoch)
577581
.build_unicast_msg(&outbound_message, &build_target)
578582
{
579583
self.dataplane_writer.udp_write_unicast(rc_chunks);

monad-raptorcast/src/packet/assembler.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ pub(crate) fn build_header(
577577
version: u16,
578578
broadcast_type: BroadcastType,
579579
merkle_tree_depth: u8,
580-
epoch_no: u64,
580+
group_id: u64,
581581
unix_ts_ms: u64,
582582
app_message: &[u8],
583583
) -> Result<Bytes> {
@@ -586,7 +586,7 @@ pub(crate) fn build_header(
586586
// Secondary broadcast bit,
587587
// 2 unused bits,
588588
// 4 bits for Merkle Tree Depth
589-
// 8 // Epoch #
589+
// 8 // Group id
590590
// 8 // Unix timestamp
591591
// 20 // AppMessage hash
592592
// 4 // AppMessage length
@@ -611,7 +611,7 @@ pub(crate) fn build_header(
611611
cursor_broadcast_merkle_depth[0] = broadcast_byte;
612612

613613
let (cursor_epoch_no, cursor) = cursor.split_at_mut_checked(8).expect("header too short");
614-
cursor_epoch_no.copy_from_slice(&epoch_no.to_le_bytes());
614+
cursor_epoch_no.copy_from_slice(&group_id.to_le_bytes());
615615

616616
let (cursor_unix_ts_ms, cursor) = cursor.split_at_mut_checked(8).expect("header too short");
617617
cursor_unix_ts_ms.copy_from_slice(&unix_ts_ms.to_le_bytes());

monad-raptorcast/src/packet/builder.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ where
9292
peer_lookup: PL,
9393

9494
// required fields
95-
epoch_no: Option<u64>,
95+
group_id: Option<u64>,
9696
redundancy: Option<Redundancy>,
9797

9898
// optional fields
@@ -111,7 +111,7 @@ where
111111
Self {
112112
key: self.key.clone(),
113113
peer_lookup: self.peer_lookup.clone(),
114-
epoch_no: self.epoch_no,
114+
group_id: self.group_id,
115115
redundancy: self.redundancy,
116116
unix_ts_ms: self.unix_ts_ms,
117117
segment_size: self.segment_size,
@@ -141,7 +141,7 @@ where
141141

142142
// default fields
143143
redundancy: None,
144-
epoch_no: None,
144+
group_id: None,
145145
unix_ts_ms: TimestampMode::RealTime,
146146

147147
// optional fields
@@ -162,8 +162,8 @@ where
162162
self
163163
}
164164

165-
pub fn epoch_no(mut self, epoch_no: impl Into<u64>) -> Self {
166-
self.epoch_no = Some(epoch_no.into());
165+
pub fn group_id(mut self, group_id: impl Into<u64>) -> Self {
166+
self.group_id = Some(group_id.into());
167167
self
168168
}
169169

@@ -187,16 +187,16 @@ where
187187
}
188188

189189
// ----- Convenience methods for modifying the builder -----
190-
pub fn set_epoch_no(&mut self, epoch_no: impl Into<u64>) {
191-
self.epoch_no = Some(epoch_no.into());
190+
pub fn set_group_id(&mut self, group_id: impl Into<u64>) {
191+
self.group_id = Some(group_id.into());
192192
}
193193

194194
// ----- Prepare override builder -----
195195
pub fn prepare(&self) -> PreparedMessageBuilder<'_, 'key, ST, PL, PL> {
196196
PreparedMessageBuilder {
197197
base: self,
198198
peer_lookup: None,
199-
epoch_no: None,
199+
group_id: None,
200200
}
201201
}
202202

@@ -210,7 +210,7 @@ where
210210
PreparedMessageBuilder {
211211
base: self,
212212
peer_lookup: Some(peer_lookup),
213-
epoch_no: None,
213+
group_id: None,
214214
}
215215
}
216216

@@ -256,7 +256,7 @@ where
256256

257257
// Add extra override fields as needed
258258
peer_lookup: Option<PL2>,
259-
epoch_no: Option<u64>,
259+
group_id: Option<u64>,
260260
}
261261

262262
impl<'base, 'key, ST, PL, PL2> PreparedMessageBuilder<'base, 'key, ST, PL, PL2>
@@ -266,21 +266,21 @@ where
266266
PL2: PeerAddrLookup<CertificateSignaturePubKey<ST>>,
267267
{
268268
// ----- Setters for overrides -----
269-
pub fn epoch_no(mut self, epoch_no: impl Into<u64>) -> Self {
270-
self.epoch_no = Some(epoch_no.into());
269+
pub fn group_id(mut self, group_id: impl Into<u64>) -> Self {
270+
self.group_id = Some(group_id.into());
271271
self
272272
}
273273

274274
// ----- Parameter validation methods -----
275-
fn unwrap_epoch_no(&self) -> Result<u64> {
276-
if let Some(epoch_no) = self.epoch_no {
277-
return Ok(epoch_no);
275+
fn unwrap_group_id(&self) -> Result<u64> {
276+
if let Some(group_id) = self.group_id {
277+
return Ok(group_id);
278278
}
279-
let epoch_no = self
279+
let group_id = self
280280
.base
281-
.epoch_no
282-
.expect("epoch_no must be set before building");
283-
Ok(epoch_no)
281+
.group_id
282+
.expect("group_id must be set before building");
283+
Ok(group_id)
284284
}
285285
fn unwrap_unix_ts_ms(&self) -> Result<u64> {
286286
let unix_ts_ms = match self.base.unix_ts_ms {
@@ -366,14 +366,14 @@ where
366366
broadcast_type: BroadcastType,
367367
app_message: &[u8],
368368
) -> Result<Bytes> {
369-
let epoch_no = self.unwrap_epoch_no()?;
369+
let group_id = self.unwrap_group_id()?;
370370
let unix_ts_ms = self.unwrap_unix_ts_ms()?;
371371

372372
let header_buf = build_header(
373373
0, // version
374374
broadcast_type,
375375
merkle_tree_depth,
376-
epoch_no,
376+
group_id,
377377
unix_ts_ms,
378378
app_message,
379379
)?;

monad-raptorcast/src/packet/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ pub fn build_messages<ST>(
8080
segment_size: u16,
8181
app_message: Bytes,
8282
redundancy: Redundancy,
83-
epoch_no: u64,
83+
group_id: u64,
8484
unix_ts_ms: u64,
8585
build_target: BuildTarget<ST>,
8686
known_addresses: &HashMap<NodeId<CertificateSignaturePubKey<ST>>, SocketAddr>,
@@ -90,7 +90,7 @@ where
9090
{
9191
let builder = MessageBuilder::new(key, known_addresses)
9292
.segment_size(segment_size)
93-
.epoch_no(epoch_no)
93+
.group_id(group_id)
9494
.unix_ts_ms(unix_ts_ms)
9595
.redundancy(redundancy);
9696

0 commit comments

Comments
 (0)