Skip to content

Commit fd711ba

Browse files
committed
sim-rs: support ib-diffusion-strategy config setting
1 parent 855c0c9 commit fd711ba

File tree

4 files changed

+78
-11
lines changed

4 files changed

+78
-11
lines changed

data/simulation/config.d.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ export interface Config {
5454
"ib-body-avg-size-bytes": bigint;
5555
/** Only supported by Rust simulation. */
5656
"ib-body-max-size-bytes": bigint;
57-
/** Only supported by Haskell simulation. */
5857
"ib-diffusion-strategy": DiffusionStrategy;
5958
/** Only supported by Haskell simulation. */
6059
"ib-diffusion-max-window-size": bigint;

data/simulation/config.schema.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,7 @@
189189
"type": "number"
190190
},
191191
"ib-diffusion-strategy": {
192-
"$ref": "#/definitions/DiffusionStrategy",
193-
"description": "Only supported by Haskell simulation."
192+
"$ref": "#/definitions/DiffusionStrategy"
194193
},
195194
"ib-generation-cpu-time-ms": {
196195
"type": "number"

sim-rs/sim-core/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub struct RawParameters {
8383
pub ib_body_validation_cpu_time_ms_constant: f64,
8484
pub ib_body_validation_cpu_time_ms_per_byte: f64,
8585
pub ib_body_max_size_bytes: u64,
86+
pub ib_diffusion_strategy: DiffusionStrategy,
8687
#[serde(default = "u64::one")]
8788
pub ib_shards: u64,
8889

@@ -111,6 +112,14 @@ pub struct RawParameters {
111112
// pub cert_size_bytes_per_node: u64,
112113
}
113114

115+
#[derive(Debug, Copy, Clone, Deserialize, PartialEq, Eq)]
116+
#[serde(rename_all = "kebab-case")]
117+
pub enum DiffusionStrategy {
118+
PeerOrder,
119+
FreshestFirst,
120+
OldestFirst,
121+
}
122+
114123
#[derive(Debug, Copy, Clone, Deserialize, PartialEq, Eq)]
115124
#[serde(rename_all = "kebab-case")]
116125
pub enum RelayStrategy {
@@ -314,6 +323,7 @@ pub struct SimConfiguration {
314323
pub(crate) max_block_size: u64,
315324
pub(crate) max_tx_size: u64,
316325
pub(crate) max_ib_size: u64,
326+
pub(crate) ib_diffusion_strategy: DiffusionStrategy,
317327
pub(crate) max_ib_requests_per_peer: usize,
318328
pub(crate) ib_shards: u64,
319329
pub(crate) cpu_times: CpuTimeConfig,
@@ -340,6 +350,7 @@ impl SimConfiguration {
340350
max_tx_size: params.tx_max_size_bytes,
341351
stage_length: params.leios_stage_length_slots,
342352
max_ib_size: params.ib_body_max_size_bytes,
353+
ib_diffusion_strategy: params.ib_diffusion_strategy,
343354
max_ib_requests_per_peer: 1,
344355
ib_shards: params.ib_shards,
345356
cpu_times: CpuTimeConfig::new(&params),

sim-rs/sim-core/src/sim/node.rs

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::{
2-
collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet},
2+
cmp::Reverse,
3+
collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque},
4+
hash::Hash,
35
sync::Arc,
46
time::Duration,
57
};
@@ -14,7 +16,7 @@ use tracing::{info, trace};
1416

1517
use crate::{
1618
clock::{ClockBarrier, FutureEvent, Timestamp},
17-
config::{NodeConfiguration, NodeId, RelayStrategy, SimConfiguration},
19+
config::{DiffusionStrategy, NodeConfiguration, NodeId, RelayStrategy, SimConfiguration},
1820
events::EventTracker,
1921
model::{
2022
Block, CpuTaskId, Endorsement, EndorserBlock, EndorserBlockId, InputBlock,
@@ -166,11 +168,59 @@ enum VoteBundleState {
166168
Received(Arc<VoteBundle>),
167169
}
168170

169-
#[derive(Default)]
170171
struct PeerInputBlockRequests {
171-
pending: PriorityQueue<InputBlockId, Timestamp>,
172+
pending: PendingQueue<InputBlockId>,
172173
active: HashSet<InputBlockId>,
173174
}
175+
enum PendingQueue<T: Hash + Eq> {
176+
PeerOrder(VecDeque<T>),
177+
FreshestFirst(PriorityQueue<T, Timestamp>),
178+
OldestFirst(PriorityQueue<T, Reverse<Timestamp>>),
179+
}
180+
impl<T: Hash + Eq> PendingQueue<T> {
181+
fn new(strategy: DiffusionStrategy) -> Self {
182+
match strategy {
183+
DiffusionStrategy::PeerOrder => Self::PeerOrder(VecDeque::new()),
184+
DiffusionStrategy::FreshestFirst => Self::FreshestFirst(PriorityQueue::new()),
185+
DiffusionStrategy::OldestFirst => Self::OldestFirst(PriorityQueue::new()),
186+
}
187+
}
188+
fn push(&mut self, value: T, timestamp: Timestamp) {
189+
match self {
190+
Self::PeerOrder(queue) => queue.push_back(value),
191+
Self::FreshestFirst(queue) => {
192+
queue.push(value, timestamp);
193+
}
194+
Self::OldestFirst(queue) => {
195+
queue.push(value, Reverse(timestamp));
196+
}
197+
}
198+
}
199+
fn pop(&mut self) -> Option<T> {
200+
match self {
201+
Self::PeerOrder(queue) => queue.pop_back(),
202+
Self::FreshestFirst(queue) => queue.pop().map(|(value, _)| value),
203+
Self::OldestFirst(queue) => queue.pop().map(|(value, _)| value),
204+
}
205+
}
206+
}
207+
208+
impl PeerInputBlockRequests {
209+
fn new(config: &SimConfiguration) -> Self {
210+
Self {
211+
pending: PendingQueue::new(config.ib_diffusion_strategy),
212+
active: HashSet::new(),
213+
}
214+
}
215+
216+
fn queue(&mut self, id: InputBlockId, timestamp: Timestamp) {
217+
self.pending.push(id, timestamp);
218+
}
219+
220+
fn next(&mut self) -> Option<InputBlockId> {
221+
self.pending.pop()
222+
}
223+
}
174224

175225
impl Node {
176226
#[allow(clippy::too_many_arguments)]
@@ -873,7 +923,11 @@ impl Node {
873923
_ => return Ok(()),
874924
};
875925
// Do we have capacity to request this block?
876-
let reqs = self.leios.ib_requests.entry(from).or_default();
926+
let reqs = self
927+
.leios
928+
.ib_requests
929+
.entry(from)
930+
.or_insert(PeerInputBlockRequests::new(&self.sim_config));
877931
if reqs.active.len() < self.sim_config.max_ib_requests_per_peer {
878932
// If so, make the request
879933
self.leios
@@ -883,7 +937,7 @@ impl Node {
883937
self.send_to(from, SimulationMessage::RequestIB(id))?;
884938
} else {
885939
// If not, just track that this peer has this IB when we're ready
886-
reqs.pending.push(id, header.timestamp);
940+
reqs.queue(id, header.timestamp);
887941
}
888942
Ok(())
889943
}
@@ -926,11 +980,15 @@ impl Node {
926980
}
927981

928982
// Mark that this IB is no longer pending
929-
let reqs = self.leios.ib_requests.entry(from).or_default();
983+
let reqs = self
984+
.leios
985+
.ib_requests
986+
.entry(from)
987+
.or_insert(PeerInputBlockRequests::new(&self.sim_config));
930988
reqs.active.remove(&id);
931989

932990
// We now have capacity to request one more IB from this peer
933-
while let Some((id, _)) = reqs.pending.pop() {
991+
while let Some(id) = reqs.next() {
934992
let header = match self.leios.ibs.get(&id) {
935993
Some(InputBlockState::Pending(header)) => header,
936994
Some(InputBlockState::Requested(header))

0 commit comments

Comments
 (0)