Skip to content

Commit 18f89d4

Browse files
Overload Raptor Chunk Header Epoch
A full node may join new RaptorCast groups from the same validator while it's syncing or stuck. In such cases, it may rebroadcast to an expired group because its local round hasn't advanced, and expired groups are not garbage-collected. As a result, many full nodes end up receiving unsolicited Raptor chunks from groups they no longer belong to. The root cause is that the node selects the rebroadcast group based on its local round. This PR resolves the issue by overloading the epoch field in the Raptor chunk header setting it to the round number for secondary raptorcast. This allows syncing or stuck full nodes to use the round number to identify and service the correct group
1 parent 56c6220 commit 18f89d4

File tree

10 files changed

+121
-101
lines changed

10 files changed

+121
-101
lines changed

monad-consensus-state/src/command.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ where
6161
},
6262
PublishToFullNodes {
6363
epoch: Epoch,
64+
round: Round,
6465
message: Verified<ST, Validated<ConsensusMessage<ST, SCT, EPT>>>,
6566
},
6667
/// Schedule a timeout event for `round` to be emitted in `duration`
@@ -137,6 +138,7 @@ where
137138
cmds.push(ConsensusCommand::EnterRound(epoch, round));
138139
cmds.push(ConsensusCommand::PublishToFullNodes {
139140
epoch,
141+
round: high_certificate.round(),
140142
message: ConsensusMessage {
141143
version,
142144
message: ProtocolMessage::AdvanceRound(AdvanceRoundMessage {

monad-executor-glue/src/lib.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ pub enum RouterCommand<ST: CertificateSignatureRecoverable, OM> {
6666
message: OM,
6767
priority: UdpPriority,
6868
},
69+
// Primary publishing embeds epoch in chunk header. Secondary publishing
70+
// embeds round in chunk header, as rebroadcasting periods are defined in
71+
// rounds
6972
PublishToFullNodes {
70-
epoch: Epoch, // Epoch gets embedded into the raptorcast message
73+
epoch: Epoch,
74+
round: Round,
7175
message: OM,
7276
},
7377
AddEpochValidatorSet {
@@ -102,9 +106,14 @@ impl<ST: CertificateSignatureRecoverable, OM> Debug for RouterCommand<ST, OM> {
102106
.field("target", target)
103107
.field("priority", priority)
104108
.finish(),
105-
Self::PublishToFullNodes { epoch, message: _ } => f
109+
Self::PublishToFullNodes {
110+
epoch,
111+
round,
112+
message: _,
113+
} => f
106114
.debug_struct("PublishToFullNodes")
107115
.field("epoch", epoch)
116+
.field("round", round)
108117
.finish(),
109118
Self::AddEpochValidatorSet {
110119
epoch,

monad-raptorcast/src/decoding.rs

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -762,12 +762,7 @@ impl<PT: PubKey> AuthorIndex<PT> {
762762

763763
let message_size = message.app_message_len as MessageSize;
764764

765-
index.insert(
766-
cache_key.clone(),
767-
message.unix_ts_ms,
768-
Epoch(message.epoch),
769-
message_size,
770-
);
765+
index.insert(cache_key.clone(), message.unix_ts_ms, message_size);
771766
self.used_size += message_size;
772767

773768
if index.is_overquota() {
@@ -1000,8 +995,7 @@ struct PerAuthorIndex {
1000995
quota: Quota,
1001996
used_size: MessageSize,
1002997
time_index: BTreeSet<(UnixTimestamp, CacheKey)>,
1003-
epoch_index: BTreeSet<(Epoch, CacheKey)>,
1004-
reverse_index: HashMap<CacheKey, (UnixTimestamp, Epoch, MessageSize)>,
998+
reverse_index: HashMap<CacheKey, (UnixTimestamp, MessageSize)>,
1005999
}
10061000

10071001
impl PerAuthorIndex {
@@ -1010,7 +1004,6 @@ impl PerAuthorIndex {
10101004
quota,
10111005
used_size: 0,
10121006
time_index: BTreeSet::new(),
1013-
epoch_index: BTreeSet::new(),
10141007
reverse_index: HashMap::new(),
10151008
}
10161009
}
@@ -1028,12 +1021,11 @@ impl PerAuthorIndex {
10281021
}
10291022

10301023
pub fn remove(&mut self, key: &CacheKey) -> PrunedKeys {
1031-
let Some((unix_ts_ms, epoch, size)) = self.reverse_index.remove(key) else {
1024+
let Some((unix_ts_ms, size)) = self.reverse_index.remove(key) else {
10321025
return PrunedKeys::empty();
10331026
};
10341027

10351028
self.time_index.remove(&(unix_ts_ms, key.clone()));
1036-
self.epoch_index.remove(&(epoch, key.clone()));
10371029
self.used_size -= size;
10381030
PrunedKeys::singleton(key.clone(), size)
10391031
}
@@ -1054,17 +1046,9 @@ impl PerAuthorIndex {
10541046
.collect()
10551047
}
10561048

1057-
pub fn insert(
1058-
&mut self,
1059-
cache_key: CacheKey,
1060-
unix_ts_ms: UnixTimestamp,
1061-
epoch: Epoch,
1062-
size: MessageSize,
1063-
) {
1049+
pub fn insert(&mut self, cache_key: CacheKey, unix_ts_ms: UnixTimestamp, size: MessageSize) {
10641050
self.time_index.insert((unix_ts_ms, cache_key.clone()));
1065-
self.epoch_index.insert((epoch, cache_key.clone()));
1066-
self.reverse_index
1067-
.insert(cache_key, (unix_ts_ms, epoch, size));
1051+
self.reverse_index.insert(cache_key, (unix_ts_ms, size));
10681052
self.used_size += size;
10691053
}
10701054

@@ -1079,9 +1063,6 @@ impl PerAuthorIndex {
10791063
if let Some(threshold) = unix_ts_threshold {
10801064
evicted_keys.extend(self.prune_by_time(threshold));
10811065
}
1082-
if let Some(threshold) = epoch_threshold {
1083-
evicted_keys.extend(self.prune_by_epoch(threshold));
1084-
}
10851066
evicted_keys
10861067
}
10871068

@@ -1115,17 +1096,6 @@ impl PerAuthorIndex {
11151096
self.remove_many(&to_prune_keys)
11161097
}
11171098

1118-
fn prune_by_epoch(&mut self, epoch_threshold: Epoch) -> PrunedKeys {
1119-
let mut to_prune_keys = vec![];
1120-
for (epoch, key) in &self.epoch_index {
1121-
if *epoch >= epoch_threshold {
1122-
break;
1123-
}
1124-
to_prune_keys.push(key.clone());
1125-
}
1126-
self.remove_many(&to_prune_keys)
1127-
}
1128-
11291099
fn prune_by_slots(&mut self, target_len: usize) -> PrunedKeys {
11301100
let slots_to_free_up = self.len().saturating_sub(target_len);
11311101
if slots_to_free_up == 0 {
@@ -1150,21 +1120,15 @@ impl PerAuthorIndex {
11501120
#[cfg(test)]
11511121
fn consistency_breaches(&self, prefix: &str) -> Vec<String> {
11521122
let mut breaches = vec![];
1153-
if self.epoch_index.len() != self.reverse_index.len() {
1154-
breaches.push(format!("{prefix}.epoch-index-size-mismatch"));
1155-
}
11561123
if self.time_index.len() != self.reverse_index.len() {
11571124
breaches.push(format!("{prefix}.time-index-size-mismatch"));
11581125
}
11591126

11601127
let mut used_size = self.used_size;
1161-
for (key, (unix_ts, epoch, _size)) in &self.reverse_index {
1128+
for (key, (unix_ts, _size)) in &self.reverse_index {
11621129
if !self.time_index.contains(&(*unix_ts, key.clone())) {
11631130
breaches.push(format!("{prefix}.time-index-missing-key"));
11641131
}
1165-
if !self.epoch_index.contains(&(*epoch, key.clone())) {
1166-
breaches.push(format!("{prefix}.epoch-index-missing-key"));
1167-
}
11681132
used_size -= *_size;
11691133
}
11701134

@@ -1655,7 +1619,7 @@ mod test {
16551619
// these fields are never touched in this module
16561620
recipient_hash: HexBytes([0; 20]),
16571621
message: Bytes::new(),
1658-
epoch: EPOCH.0,
1622+
group_id: EPOCH.0,
16591623
unix_ts_ms,
16601624
secondary_broadcast: false,
16611625
};

monad-raptorcast/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,11 @@ where
567567
} => {
568568
self.handle_publish(target, message, priority, self_id);
569569
}
570-
RouterCommand::PublishToFullNodes { epoch, message } => {
570+
RouterCommand::PublishToFullNodes {
571+
epoch,
572+
round: _,
573+
message,
574+
} => {
571575
let full_nodes_view = self.dedicated_full_nodes.view();
572576
if self.is_dynamic_fullnode {
573577
debug!("self is dynamic full node, skipping publishing to full nodes");

monad-raptorcast/src/raptorcast_secondary/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ where
163163
}
164164

165165
fn udp_build(
166-
epoch: &Epoch,
166+
group_id: u64,
167167
build_target: BuildTarget<ST>,
168168
outbound_message: Bytes,
169169
mtu: u16,
@@ -197,7 +197,7 @@ where
197197
segment_size,
198198
outbound_message,
199199
redundancy,
200-
epoch.0,
200+
group_id,
201201
unix_ts_ms,
202202
build_target,
203203
known_addresses,
@@ -230,7 +230,7 @@ where
230230
}
231231
};
232232
let udp_messages = Self::udp_build(
233-
&self.curr_epoch,
233+
self.curr_epoch.0,
234234
BuildTarget::<ST>::PointToPoint(dest_node),
235235
msg_bytes,
236236
self.mtu,
@@ -399,7 +399,11 @@ where
399399
}
400400
},
401401

402-
Self::Command::PublishToFullNodes { epoch, message } => {
402+
Self::Command::PublishToFullNodes {
403+
epoch: _,
404+
round,
405+
message,
406+
} => {
403407
let _timer = DropTimer::start(Duration::from_millis(20), |elapsed| {
404408
warn!(?elapsed, "long time to publish message")
405409
});
@@ -451,7 +455,7 @@ where
451455
// Split outbound_message into raptorcast chunks that we can
452456
// send to full nodes.
453457
let rc_chunks: UnicastMsg = Self::udp_build(
454-
&epoch,
458+
round.0,
455459
build_target,
456460
outbound_message,
457461
self.mtu,

monad-raptorcast/src/udp.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,9 @@ impl<ST: CertificateSignatureRecoverable> UdpState<ST> {
189189
// chunks ASAP, before changing `recently_decoded_state`.
190190
if let Some(broadcast_mode) = maybe_broadcast_mode {
191191
if !group_map.check_source(
192-
Epoch(parsed_message.epoch),
192+
parsed_message.group_id,
193193
&parsed_message.author,
194+
&message.src_addr,
194195
broadcast_mode,
195196
) {
196197
continue;
@@ -221,7 +222,7 @@ impl<ST: CertificateSignatureRecoverable> UdpState<ST> {
221222
if let Some(broadcast_mode) = maybe_broadcast_mode {
222223
if self_hash == parsed_message.recipient_hash {
223224
let maybe_targets = group_map.iterate_rebroadcast_peers(
224-
Epoch(parsed_message.epoch),
225+
parsed_message.group_id,
225226
&parsed_message.author,
226227
broadcast_mode,
227228
);
@@ -237,9 +238,12 @@ impl<ST: CertificateSignatureRecoverable> UdpState<ST> {
237238
}
238239
};
239240

240-
let validator_set = epoch_validators
241-
.get(&Epoch(parsed_message.epoch))
242-
.map(|ev| &ev.validators);
241+
let validator_set = match maybe_broadcast_mode {
242+
Some(BroadcastMode::Primary) | None => epoch_validators
243+
.get(&Epoch(parsed_message.group_id))
244+
.map(|ev| &ev.validators),
245+
Some(BroadcastMode::Secondary) => None, // full-node raptorcast or point-to-point
246+
};
243247

244248
let decoding_context =
245249
DecodingContext::new(validator_set, unix_ts_ms_now(), current_epoch);
@@ -360,7 +364,7 @@ pub fn build_messages<ST>(
360364
segment_size: u16, // Each chunk in the returned Vec (Bytes element of the tuple) will be limited to this size
361365
app_message: Bytes, // This is the actual message that gets raptor-10 encoded and split into UDP chunks
362366
redundancy: Redundancy,
363-
epoch_no: u64,
367+
group_id: u64,
364368
unix_ts_ms: u64,
365369
build_target: BuildTarget<ST>,
366370
known_addresses: &HashMap<NodeId<CertificateSignaturePubKey<ST>>, SocketAddr>,
@@ -376,7 +380,7 @@ where
376380
app_message,
377381
app_message_len,
378382
redundancy,
379-
epoch_no,
383+
group_id,
380384
unix_ts_ms,
381385
build_target,
382386
known_addresses,
@@ -431,7 +435,7 @@ pub fn build_messages_with_length<ST>(
431435
app_message: Bytes,
432436
app_message_len: u32,
433437
redundancy: Redundancy,
434-
epoch_no: u64,
438+
group_id: u64,
435439
unix_ts_ms: u64,
436440
build_target: BuildTarget<ST>,
437441
known_addresses: &HashMap<NodeId<CertificateSignaturePubKey<ST>>, SocketAddr>,
@@ -760,7 +764,6 @@ where
760764
// At this point, everything BELOW chunk_merkle_leaf_idx is populated
761765
// populate merkle trees/roots/leaf_idx + signatures (cached)
762766
let version: u16 = 0;
763-
let epoch_no: u64 = epoch_no;
764767
let unix_ts_ms: u64 = unix_ts_ms;
765768
message
766769
// .par_chunks_mut(segment_size as usize * chunks_per_merkle_batch)
@@ -795,8 +798,9 @@ where
795798
cursor_broadcast_merkle_depth[0] = ((is_raptor_broadcast as u8) << 7)
796799
| ((is_secondary_raptor_broadcast as u8) << 6)
797800
| (tree_depth & 0b0000_1111); // tree_depth max 4 bits
798-
let (cursor_epoch_no, cursor) = cursor.split_at_mut(8);
799-
cursor_epoch_no.copy_from_slice(&epoch_no.to_le_bytes());
801+
802+
let (cursor_group_id, cursor) = cursor.split_at_mut(8);
803+
cursor_group_id.copy_from_slice(&group_id.to_le_bytes());
800804
let (cursor_unix_ts_ms, cursor) = cursor.split_at_mut(8);
801805
cursor_unix_ts_ms.copy_from_slice(&unix_ts_ms.to_le_bytes());
802806
let (cursor_app_message_hash, cursor) = cursor.split_at_mut(20);
@@ -860,7 +864,10 @@ where
860864
// This applies to both validator-to-validator and validator-to-full-node
861865
// raptorcasting.
862866
pub author: NodeId<PT>,
863-
pub epoch: u64,
867+
// group_id is set to
868+
// - epoch number for validator-to-validator raptorcast
869+
// - round number for validator-to-fullnode raptorcast
870+
pub group_id: u64,
864871
pub unix_ts_ms: u64,
865872
pub app_message_hash: AppMessageHash,
866873
pub app_message_len: u32,
@@ -951,8 +958,8 @@ where
951958
return Err(MessageValidationError::InvalidTreeDepth);
952959
}
953960

954-
let cursor_epoch = split_off(8)?;
955-
let epoch = u64::from_le_bytes(cursor_epoch.as_ref().try_into().expect("u64 is 8 bytes"));
961+
let cursor_group_id = split_off(8)?;
962+
let group_id = u64::from_le_bytes(cursor_group_id.as_ref().try_into().expect("u64 is 8 bytes"));
956963

957964
let cursor_unix_ts_ms = split_off(8)?;
958965
let unix_ts_ms = u64::from_le_bytes(
@@ -1047,7 +1054,7 @@ where
10471054
Ok(ValidatedMessage {
10481055
message,
10491056
author,
1050-
epoch,
1057+
group_id,
10511058
unix_ts_ms,
10521059
app_message_hash,
10531060
app_message_len,

0 commit comments

Comments
 (0)