Skip to content

Commit 855c0c9

Browse files
committed
sim-rs: support relay-strategy config setting
1 parent f88acce commit 855c0c9

File tree

4 files changed

+119
-42
lines changed

4 files changed

+119
-42
lines changed

data/simulation/config.d.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
/** A configuration for a Leios simulation. */
44
export interface Config {
55
// Simulation Configuration
6-
/** Only supported by Haskell simulation. */
76
"relay-strategy": RelayStrategy;
87
/** Only supported by Haskell simulation. */
98
"tcp-congestion-control": boolean;

data/simulation/config.schema.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,7 @@
258258
"type": "number"
259259
},
260260
"relay-strategy": {
261-
"$ref": "#/definitions/RelayStrategy",
262-
"description": "Only supported by Haskell simulation."
261+
"$ref": "#/definitions/RelayStrategy"
263262
},
264263
"tcp-congestion-control": {
265264
"description": "Only supported by Haskell simulation.",

sim-rs/sim-core/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ impl From<DistributionConfig> for FloatDistribution {
5252
#[derive(Deserialize)]
5353
#[serde(rename_all = "kebab-case")]
5454
pub struct RawParameters {
55+
// Simulation Configuration
56+
pub relay_strategy: RelayStrategy,
57+
5558
// Leios protocol configuration
5659
pub leios_stage_length_slots: u64,
5760
pub leios_stage_active_voting_slots: u64,
@@ -108,6 +111,13 @@ pub struct RawParameters {
108111
// pub cert_size_bytes_per_node: u64,
109112
}
110113

114+
#[derive(Debug, Copy, Clone, Deserialize, PartialEq, Eq)]
115+
#[serde(rename_all = "kebab-case")]
116+
pub enum RelayStrategy {
117+
RequestFromAll,
118+
RequestFromFirst,
119+
}
120+
111121
#[derive(Debug, Serialize, Deserialize)]
112122
#[serde(rename_all = "kebab-case")]
113123
pub struct RawTopology {
@@ -294,6 +304,7 @@ pub struct SimConfiguration {
294304
pub nodes: Vec<NodeConfiguration>,
295305
pub links: Vec<LinkConfiguration>,
296306
pub stage_length: u64,
307+
pub(crate) relay_strategy: RelayStrategy,
297308
pub(crate) block_generation_probability: f64,
298309
pub(crate) ib_generation_probability: f64,
299310
pub(crate) eb_generation_probability: f64,
@@ -318,6 +329,7 @@ impl SimConfiguration {
318329
nodes: topology.nodes,
319330
trace_nodes: HashSet::new(),
320331
links: topology.links,
332+
relay_strategy: params.relay_strategy,
321333
block_generation_probability: params.rb_generation_probability,
322334
ib_generation_probability: params.ib_generation_probability,
323335
eb_generation_probability: params.eb_generation_probability,

sim-rs/sim-core/src/sim/node.rs

Lines changed: 106 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{btree_map, hash_map, BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet},
2+
collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet},
33
sync::Arc,
44
time::Duration,
55
};
@@ -14,7 +14,7 @@ use tracing::{info, trace};
1414

1515
use crate::{
1616
clock::{ClockBarrier, FutureEvent, Timestamp},
17-
config::{NodeConfiguration, NodeId, SimConfiguration},
17+
config::{NodeConfiguration, NodeId, RelayStrategy, SimConfiguration},
1818
events::EventTracker,
1919
model::{
2020
Block, CpuTaskId, Endorsement, EndorserBlock, EndorserBlockId, InputBlock,
@@ -132,28 +132,35 @@ struct NodeLeiosState {
132132
ibs: BTreeMap<InputBlockId, InputBlockState>,
133133
ib_requests: BTreeMap<NodeId, PeerInputBlockRequests>,
134134
ibs_by_slot: BTreeMap<u64, Vec<InputBlockId>>,
135-
ebs: BTreeMap<EndorserBlockId, Arc<EndorserBlock>>,
135+
ebs: BTreeMap<EndorserBlockId, EndorserBlockState>,
136136
ebs_by_slot: BTreeMap<u64, Vec<EndorserBlockId>>,
137137
votes_to_generate: BTreeMap<u64, usize>,
138138
votes_by_eb: BTreeMap<EndorserBlockId, Vec<NodeId>>,
139139
votes: BTreeMap<VoteBundleId, VoteBundleState>,
140140
}
141141

142142
enum InputBlockState {
143+
HeaderPending,
143144
Pending(InputBlockHeader),
144145
Requested(InputBlockHeader),
145146
Received(Arc<InputBlock>),
146147
}
147148
impl InputBlockState {
148-
fn header(&self) -> &InputBlockHeader {
149+
fn header(&self) -> Option<&InputBlockHeader> {
149150
match self {
150-
Self::Pending(header) => header,
151-
Self::Requested(header) => header,
152-
Self::Received(ib) => &ib.header,
151+
Self::HeaderPending => None,
152+
Self::Pending(header) => Some(header),
153+
Self::Requested(header) => Some(header),
154+
Self::Received(ib) => Some(&ib.header),
153155
}
154156
}
155157
}
156158

159+
enum EndorserBlockState {
160+
Pending,
161+
Received(Arc<EndorserBlock>),
162+
}
163+
157164
enum VoteBundleState {
158165
Requested,
159166
Received(Arc<VoteBundle>),
@@ -292,11 +299,9 @@ impl Node {
292299
.ebs
293300
.keys()
294301
.map(|eb_id| {
295-
let eb = self
296-
.leios
297-
.ebs
298-
.get(eb_id)
299-
.expect("node tried voting for an unknown EB");
302+
let Some(EndorserBlockState::Received(eb)) = self.leios.ebs.get(eb_id) else {
303+
panic!("node tried voting for an unknown EB");
304+
};
300305
cpu_times.vote_generation_constant
301306
+ (cpu_times.vote_generation_per_ib * eb.ibs.len() as u32)
302307
})
@@ -549,7 +554,9 @@ impl Node {
549554
};
550555
let mut ebs = ebs.clone();
551556
ebs.retain(|eb_id| {
552-
let eb = self.leios.ebs.get(eb_id).unwrap();
557+
let Some(EndorserBlockState::Received(eb)) = self.leios.ebs.get(eb_id) else {
558+
panic!("Tried voting for EB which we haven't received");
559+
};
553560
match self.should_vote_for(slot, eb) {
554561
Ok(()) => true,
555562
Err(reason) => {
@@ -604,6 +611,9 @@ impl Node {
604611
let Some(eb) = self.leios.ebs.get(&endorsement.eb) else {
605612
bail!("Missing endorsement block {}", endorsement.eb);
606613
};
614+
let EndorserBlockState::Received(eb) = eb else {
615+
bail!("Haven't yet received endorsement block {}", endorsement.eb);
616+
};
607617
for ib_id in &eb.ibs {
608618
let Some(InputBlockState::Received(ib)) = self.leios.ibs.get(ib_id) else {
609619
bail!("Missing input block {}", ib_id);
@@ -667,7 +677,9 @@ impl Node {
667677
}
668678

669679
fn count_txs_in_eb(&self, eb_id: &EndorserBlockId) -> Option<usize> {
670-
let eb = self.leios.ebs.get(eb_id)?;
680+
let Some(EndorserBlockState::Received(eb)) = self.leios.ebs.get(eb_id) else {
681+
return None;
682+
};
671683
let mut tx_set = HashSet::new();
672684
for ib_id in &eb.ibs {
673685
let InputBlockState::Received(ib) = self.leios.ibs.get(ib_id)? else {
@@ -707,8 +719,11 @@ impl Node {
707719
}
708720

709721
fn receive_announce_tx(&mut self, from: NodeId, id: TransactionId) -> Result<()> {
710-
if let hash_map::Entry::Vacant(e) = self.txs.entry(id) {
711-
e.insert(TransactionView::Pending);
722+
if self.txs.get(&id).is_none_or(|t| {
723+
self.sim_config.relay_strategy == RelayStrategy::RequestFromAll
724+
&& matches!(t, TransactionView::Pending)
725+
}) {
726+
self.txs.insert(id, TransactionView::Pending);
712727
self.send_to(from, SimulationMessage::RequestTx(id))?;
713728
}
714729
Ok(())
@@ -735,10 +750,16 @@ impl Node {
735750

736751
fn propagate_tx(&mut self, from: NodeId, tx: Arc<Transaction>) -> Result<()> {
737752
let id = tx.id;
753+
if self
754+
.txs
755+
.insert(id, TransactionView::Received(tx.clone()))
756+
.is_some_and(|tx| matches!(tx, TransactionView::Received(_)))
757+
{
758+
return Ok(());
759+
}
738760
if self.trace {
739761
info!("node {} saw tx {id}", self.name);
740762
}
741-
self.txs.insert(id, TransactionView::Received(tx.clone()));
742763
self.praos.mempool.insert(tx.id, tx.clone());
743764
for peer in &self.consumers {
744765
if *peer == from {
@@ -790,15 +811,22 @@ impl Node {
790811
}
791812

792813
fn receive_announce_ib_header(&mut self, from: NodeId, id: InputBlockId) -> Result<()> {
793-
self.send_to(from, SimulationMessage::RequestIBHeader(id))?;
814+
if self.leios.ibs.get(&id).is_none_or(|ib| {
815+
self.sim_config.relay_strategy == RelayStrategy::RequestFromAll
816+
&& matches!(ib, InputBlockState::HeaderPending)
817+
}) {
818+
self.leios.ibs.insert(id, InputBlockState::HeaderPending);
819+
self.send_to(from, SimulationMessage::RequestIBHeader(id))?;
820+
}
794821
Ok(())
795822
}
796823

797824
fn receive_request_ib_header(&mut self, from: NodeId, id: InputBlockId) -> Result<()> {
798825
if let Some(ib) = self.leios.ibs.get(&id) {
799-
let header = ib.header().clone();
800-
let have_body = matches!(ib, InputBlockState::Received(_));
801-
self.send_to(from, SimulationMessage::IBHeader(header, have_body))?;
826+
if let Some(header) = ib.header() {
827+
let have_body = matches!(ib, InputBlockState::Received(_));
828+
self.send_to(from, SimulationMessage::IBHeader(header.clone(), have_body))?;
829+
}
802830
}
803831
Ok(())
804832
}
@@ -810,7 +838,12 @@ impl Node {
810838
has_body: bool,
811839
) -> Result<()> {
812840
let id = header.id;
813-
if self.leios.ibs.contains_key(&id) {
841+
if self
842+
.leios
843+
.ibs
844+
.get(&id)
845+
.is_some_and(|ib| ib.header().is_some())
846+
{
814847
return Ok(());
815848
}
816849
self.leios.ibs.insert(id, InputBlockState::Pending(header));
@@ -830,8 +863,14 @@ impl Node {
830863
}
831864

832865
fn receive_announce_ib(&mut self, from: NodeId, id: InputBlockId) -> Result<()> {
833-
let Some(InputBlockState::Pending(header)) = self.leios.ibs.get(&id) else {
834-
return Ok(());
866+
let header = match self.leios.ibs.get(&id) {
867+
Some(InputBlockState::Pending(header)) => header,
868+
Some(InputBlockState::Requested(header))
869+
if self.sim_config.relay_strategy == RelayStrategy::RequestFromAll =>
870+
{
871+
header
872+
}
873+
_ => return Ok(()),
835874
};
836875
// Do we have capacity to request this block?
837876
let reqs = self.leios.ib_requests.entry(from).or_default();
@@ -864,16 +903,20 @@ impl Node {
864903

865904
fn finish_validating_ib(&mut self, from: NodeId, ib: Arc<InputBlock>) -> Result<()> {
866905
let id = ib.header.id;
906+
let slot = ib.header.id.slot;
867907
for transaction in &ib.transactions {
868908
// Do not include transactions from this IB in any IBs we produce ourselves.
869909
self.leios.mempool.remove(&transaction.id);
870910
}
871-
self.leios
872-
.ibs_by_slot
873-
.entry(ib.header.id.slot)
874-
.or_default()
875-
.push(id);
876-
self.leios.ibs.insert(id, InputBlockState::Received(ib));
911+
if self
912+
.leios
913+
.ibs
914+
.insert(id, InputBlockState::Received(ib))
915+
.is_some_and(|ib| matches!(ib, InputBlockState::Received(_)))
916+
{
917+
return Ok(());
918+
}
919+
self.leios.ibs_by_slot.entry(slot).or_default().push(id);
877920

878921
for peer in &self.consumers {
879922
if *peer == from {
@@ -888,9 +931,17 @@ impl Node {
888931

889932
// We now have capacity to request one more IB from this peer
890933
while let Some((id, _)) = reqs.pending.pop() {
891-
let Some(InputBlockState::Pending(header)) = self.leios.ibs.get(&id) else {
892-
// We fetched this IB from some other node already
893-
continue;
934+
let header = match self.leios.ibs.get(&id) {
935+
Some(InputBlockState::Pending(header)) => header,
936+
Some(InputBlockState::Requested(header))
937+
if self.sim_config.relay_strategy == RelayStrategy::RequestFromAll =>
938+
{
939+
header
940+
}
941+
_ => {
942+
// We fetched this IB from some other node already
943+
continue;
944+
}
894945
};
895946

896947
// Make the request
@@ -906,12 +957,18 @@ impl Node {
906957
}
907958

908959
fn receive_announce_eb(&mut self, from: NodeId, id: EndorserBlockId) -> Result<()> {
909-
self.send_to(from, SimulationMessage::RequestEB(id))?;
960+
if self.leios.ebs.get(&id).is_none_or(|eb| {
961+
self.sim_config.relay_strategy == RelayStrategy::RequestFromAll
962+
&& matches!(eb, EndorserBlockState::Pending)
963+
}) {
964+
self.leios.ebs.insert(id, EndorserBlockState::Pending);
965+
self.send_to(from, SimulationMessage::RequestEB(id))?;
966+
}
910967
Ok(())
911968
}
912969

913970
fn receive_request_eb(&mut self, from: NodeId, id: EndorserBlockId) -> Result<()> {
914-
if let Some(eb) = self.leios.ebs.get(&id) {
971+
if let Some(EndorserBlockState::Received(eb)) = self.leios.ebs.get(&id) {
915972
self.tracker.track_eb_sent(id, self.id, from);
916973
self.send_to(from, SimulationMessage::EB(eb.clone()))?;
917974
}
@@ -925,7 +982,12 @@ impl Node {
925982

926983
fn finish_validating_eb(&mut self, from: NodeId, eb: Arc<EndorserBlock>) -> Result<()> {
927984
let id = eb.id();
928-
if self.leios.ebs.insert(id, eb).is_some() {
985+
if self
986+
.leios
987+
.ebs
988+
.insert(id, EndorserBlockState::Received(eb))
989+
.is_some_and(|eb| matches!(eb, EndorserBlockState::Received(_)))
990+
{
929991
return Ok(());
930992
}
931993
self.leios.ebs_by_slot.entry(id.slot).or_default().push(id);
@@ -940,8 +1002,11 @@ impl Node {
9401002
}
9411003

9421004
fn receive_announce_votes(&mut self, from: NodeId, id: VoteBundleId) -> Result<()> {
943-
if let btree_map::Entry::Vacant(e) = self.leios.votes.entry(id) {
944-
e.insert(VoteBundleState::Requested);
1005+
if self.leios.votes.get(&id).is_none_or(|v| {
1006+
self.sim_config.relay_strategy == RelayStrategy::RequestFromAll
1007+
&& matches!(v, VoteBundleState::Requested)
1008+
}) {
1009+
self.leios.votes.insert(id, VoteBundleState::Requested);
9451010
self.send_to(from, SimulationMessage::RequestVotes(id))?;
9461011
}
9471012
Ok(())
@@ -1052,7 +1117,9 @@ impl Node {
10521117
self.tracker.track_eb_generated(&eb);
10531118

10541119
let id = eb.id();
1055-
self.leios.ebs.insert(id, eb.clone());
1120+
self.leios
1121+
.ebs
1122+
.insert(id, EndorserBlockState::Received(eb.clone()));
10561123
self.leios.ebs_by_slot.entry(id.slot).or_default().push(id);
10571124
for peer in &self.consumers {
10581125
self.send_to(*peer, SimulationMessage::AnnounceEB(id))?;

0 commit comments

Comments
 (0)