Skip to content

Commit 8edbcbc

Browse files
committed
sim-rs: use explicit pipelines rather than slots everywhere
1 parent f4e68d9 commit 8edbcbc

File tree

1 file changed

+65
-84
lines changed

1 file changed

+65
-84
lines changed

sim-rs/sim-core/src/sim/node.rs

Lines changed: 65 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,10 @@ struct NodeLeiosState {
144144
ibs_to_generate: BTreeMap<u64, Vec<InputBlockHeader>>,
145145
ibs: BTreeMap<InputBlockId, InputBlockState>,
146146
ib_requests: BTreeMap<NodeId, PeerInputBlockRequests>,
147-
ibs_by_slot: BTreeMap<u64, Vec<InputBlockId>>,
147+
ibs_by_pipeline: BTreeMap<u64, Vec<InputBlockId>>,
148148
ebs: BTreeMap<EndorserBlockId, EndorserBlockState>,
149-
ebs_by_slot: BTreeMap<u64, Vec<EndorserBlockId>>,
150-
earliest_eb_cert_times_by_slot: BTreeMap<u64, Timestamp>,
149+
ebs_by_pipeline: BTreeMap<u64, Vec<EndorserBlockId>>,
150+
earliest_eb_cert_times_by_pipeline: BTreeMap<u64, Timestamp>,
151151
votes_to_generate: BTreeMap<u64, usize>,
152152
votes_by_eb: BTreeMap<EndorserBlockId, BTreeMap<NodeId, usize>>,
153153
votes: BTreeMap<VoteBundleId, VoteBundleState>,
@@ -572,17 +572,18 @@ impl Node {
572572
fn generate_endorser_blocks(&mut self, slot: u64) {
573573
for next_p in vrf_probabilities(self.sim_config.eb_generation_probability) {
574574
if self.run_vrf(next_p).is_some() {
575+
let pipeline = self.slot_to_pipeline(slot) + 1;
575576
self.tracker.track_eb_lottery_won(EndorserBlockId {
576577
slot,
577-
pipeline: self.slot_to_pipeline(slot) + 1,
578+
pipeline,
578579
producer: self.id,
579580
});
580-
let ibs = self.select_ibs_for_eb(slot);
581-
let ebs = self.select_ebs_for_eb(slot);
581+
let ibs = self.select_ibs_for_eb(pipeline);
582+
let ebs = self.select_ebs_for_eb(pipeline);
582583
let bytes = self.sim_config.sizes.eb(ibs.len(), ebs.len());
583584
let eb = EndorserBlock {
584585
slot,
585-
pipeline: self.slot_to_pipeline(slot) + 1,
586+
pipeline,
586587
producer: self.id,
587588
bytes,
588589
ibs,
@@ -628,13 +629,10 @@ impl Node {
628629
return false;
629630
};
630631
// When we vote, we vote for EBs which were sent at the start of the prior stage.
631-
let eb_slot = match slot.checked_sub(self.sim_config.stage_length) {
632-
Some(s) => s - (s % self.sim_config.stage_length),
633-
None => {
634-
return false;
635-
}
632+
let Some(eb_pipeline) = (slot / self.sim_config.stage_length).checked_sub(1) else {
633+
return false;
636634
};
637-
let Some(ebs) = self.leios.ebs_by_slot.get(&eb_slot) else {
635+
let Some(ebs) = self.leios.ebs_by_pipeline.get(&eb_pipeline) else {
638636
return false;
639637
};
640638
let mut ebs = ebs.clone();
@@ -815,7 +813,7 @@ impl Node {
815813
})
816814
}
817815

818-
fn choose_endorsed_block_from_slot(&self, slot: u64) -> Option<EndorserBlockId> {
816+
fn choose_endorsed_block_from_pipeline(&self, pipeline: u64) -> Option<EndorserBlockId> {
819817
// an EB is eligible for endorsement if it has this many votes
820818
let vote_threshold = self.sim_config.vote_threshold;
821819

@@ -824,8 +822,8 @@ impl Node {
824822
// - the number of votes (more votes is better)
825823
let (&block, _) = self
826824
.leios
827-
.ebs_by_slot
828-
.get(&slot)
825+
.ebs_by_pipeline
826+
.get(&pipeline)
829827
.iter()
830828
.flat_map(|ids| ids.iter())
831829
.filter_map(|eb| {
@@ -1093,7 +1091,7 @@ impl Node {
10931091

10941092
fn finish_validating_ib(&mut self, from: NodeId, ib: Arc<InputBlock>) -> Result<()> {
10951093
let id = ib.header.id;
1096-
let slot = ib.header.id.slot;
1094+
let pipeline = ib.header.id.pipeline;
10971095
for transaction in &ib.transactions {
10981096
// Do not include transactions from this IB in any IBs we produce ourselves.
10991097
self.leios.mempool.remove(&transaction.id);
@@ -1106,7 +1104,11 @@ impl Node {
11061104
{
11071105
return Ok(());
11081106
}
1109-
self.leios.ibs_by_slot.entry(slot).or_default().push(id);
1107+
self.leios
1108+
.ibs_by_pipeline
1109+
.entry(pipeline)
1110+
.or_default()
1111+
.push(id);
11101112

11111113
for peer in &self.consumers {
11121114
if *peer == from {
@@ -1184,7 +1186,11 @@ impl Node {
11841186
{
11851187
return Ok(());
11861188
}
1187-
self.leios.ebs_by_slot.entry(id.slot).or_default().push(id);
1189+
self.leios
1190+
.ebs_by_pipeline
1191+
.entry(id.pipeline)
1192+
.or_default()
1193+
.push(id);
11881194
// We haven't seen this EB before, so propagate it to our neighbors
11891195
for peer in &self.consumers {
11901196
if *peer == from {
@@ -1244,8 +1250,8 @@ impl Node {
12441250
*eb_votes += count;
12451251
if *eb_votes as u64 > self.sim_config.vote_threshold {
12461252
self.leios
1247-
.earliest_eb_cert_times_by_slot
1248-
.entry(eb.slot)
1253+
.earliest_eb_cert_times_by_pipeline
1254+
.entry(eb.pipeline)
12491255
.or_insert(self.clock.now());
12501256
}
12511257
}
@@ -1302,8 +1308,8 @@ impl Node {
13021308

13031309
let id = ib.header.id;
13041310
self.leios
1305-
.ibs_by_slot
1306-
.entry(ib.header.id.slot)
1311+
.ibs_by_pipeline
1312+
.entry(ib.header.id.pipeline)
13071313
.or_default()
13081314
.push(id);
13091315
self.leios.ibs.insert(id, InputBlockState::Received(ib));
@@ -1313,45 +1319,40 @@ impl Node {
13131319
Ok(())
13141320
}
13151321

1316-
fn select_ibs_for_eb(&mut self, slot: u64) -> Vec<InputBlockId> {
1317-
let config = &self.sim_config;
1318-
let Some(earliest_slot) = slot.checked_sub(config.stage_length * 3) else {
1319-
return vec![];
1320-
};
1321-
let mut ibs = vec![];
1322-
for slot in earliest_slot..(earliest_slot + config.stage_length) {
1323-
let Some(slot_ibs) = self.leios.ibs_by_slot.remove(&slot) else {
1324-
continue;
1325-
};
1326-
ibs.extend(slot_ibs);
1327-
}
1328-
ibs
1322+
fn select_ibs_for_eb(&mut self, pipeline: u64) -> Vec<InputBlockId> {
1323+
self.leios
1324+
.ibs_by_pipeline
1325+
.get(&pipeline)
1326+
.cloned()
1327+
.unwrap_or_default()
13291328
}
13301329

1331-
fn select_ebs_for_eb(&self, slot: u64) -> Vec<EndorserBlockId> {
1332-
let Some(referenced_slots) = self.slots_referenced_by_ebs(slot) else {
1330+
fn select_ebs_for_eb(&self, pipeline: u64) -> Vec<EndorserBlockId> {
1331+
let Some(referenced_pipelines) = self.pipelines_referenced_by_ebs(pipeline) else {
13331332
return vec![];
13341333
};
13351334

13361335
// include one certified EB from each of these pipelines
13371336
let mut ebs = vec![];
1338-
for pipeline_slot in referenced_slots {
1339-
if let Some(eb) = self.choose_endorsed_block_from_slot(pipeline_slot) {
1337+
for referenced_pipeline in referenced_pipelines {
1338+
if let Some(eb) = self.choose_endorsed_block_from_pipeline(referenced_pipeline) {
13401339
ebs.push(eb);
13411340
}
13421341
}
13431342
ebs
13441343
}
13451344

1346-
fn slots_referenced_by_ebs(&self, slot: u64) -> Option<impl Iterator<Item = u64> + use<'_>> {
1345+
fn pipelines_referenced_by_ebs(
1346+
&self,
1347+
pipeline: u64,
1348+
) -> Option<impl Iterator<Item = u64> + use<'_>> {
13471349
if self.sim_config.variant != LeiosVariant::Full {
13481350
// EBs don't reference other EBs unless we're running Full Leios
13491351
return None;
13501352
}
13511353

1352-
let current_pipeline = slot / self.sim_config.stage_length;
13531354
// The newest pipeline to include EBs from is i-3, where i is the current pipeline.
1354-
let Some(newest_included_pipeline) = current_pipeline.checked_sub(3) else {
1355+
let Some(newest_included_pipeline) = pipeline.checked_sub(3) else {
13551356
// If there haven't been 3 pipelines yet, just don't recurse.
13561357
return None;
13571358
};
@@ -1360,14 +1361,11 @@ impl Node {
13601361
// η is the "quality parameter" (expected block rate), and L is stage length.
13611362
let old_pipelines =
13621363
(3 * self.sim_config.praos_chain_quality).div_ceil(self.sim_config.stage_length);
1363-
let oldest_included_pipeline = current_pipeline
1364+
let oldest_included_pipeline = pipeline
13641365
.checked_sub(old_pipelines)
13651366
.unwrap_or(newest_included_pipeline);
13661367

1367-
Some(
1368-
(oldest_included_pipeline..=newest_included_pipeline)
1369-
.map(|i| i * self.sim_config.stage_length),
1370-
)
1368+
Some(oldest_included_pipeline..=newest_included_pipeline)
13711369
}
13721370

13731371
fn finish_generating_eb(&mut self, eb: EndorserBlock) -> Result<()> {
@@ -1378,69 +1376,52 @@ impl Node {
13781376
self.leios
13791377
.ebs
13801378
.insert(id, EndorserBlockState::Received(eb.clone()));
1381-
self.leios.ebs_by_slot.entry(id.slot).or_default().push(id);
1379+
self.leios
1380+
.ebs_by_pipeline
1381+
.entry(id.pipeline)
1382+
.or_default()
1383+
.push(id);
13821384
for peer in &self.consumers {
13831385
self.send_to(*peer, SimulationMessage::AnnounceEB(id))?;
13841386
}
13851387
Ok(())
13861388
}
13871389

13881390
fn should_vote_for(&self, eb: &EndorserBlock) -> Result<(), NoVoteReason> {
1389-
let stage_length = self.sim_config.stage_length;
1390-
1391-
let ib_slot_start = match eb.pipeline.checked_sub(4) {
1392-
Some(x) => x * stage_length,
1393-
None => {
1394-
// The IBs for this EB were "generated" before the sim began.
1395-
// It's valid iff there are no IBs.
1396-
return if eb.ibs.is_empty() {
1397-
Ok(())
1398-
} else {
1399-
Err(NoVoteReason::ExtraIB)
1400-
};
1401-
}
1402-
};
1403-
let ib_slot_end = ib_slot_start + stage_length;
1404-
let ib_slot_range = ib_slot_start..ib_slot_end;
1405-
14061391
let mut ib_set = HashSet::new();
14071392
for ib in &eb.ibs {
14081393
if !matches!(self.leios.ibs.get(ib), Some(InputBlockState::Received(_))) {
14091394
return Err(NoVoteReason::MissingIB);
14101395
}
1411-
if !ib_slot_range.contains(&ib.slot) {
1396+
if ib.pipeline != eb.pipeline {
14121397
return Err(NoVoteReason::InvalidSlot);
14131398
}
14141399
ib_set.insert(*ib);
14151400
}
1416-
for ib_slot in ib_slot_range {
1417-
for ib in self
1418-
.leios
1419-
.ibs_by_slot
1420-
.get(&ib_slot)
1421-
.iter()
1422-
.flat_map(|f| f.iter())
1423-
{
1401+
1402+
if let Some(ibs) = self.leios.ibs_by_pipeline.get(&eb.pipeline) {
1403+
for ib in ibs {
14241404
if !ib_set.contains(ib) {
14251405
return Err(NoVoteReason::ExtraIB);
14261406
}
14271407
}
14281408
}
14291409

14301410
// If this EB is meant to reference other EBs, validate that it references whatever it needs
1431-
if let Some(expected_referenced_slots) = self.slots_referenced_by_ebs(eb.slot) {
1432-
let actual_referenced_slots: HashSet<u64> = eb.ebs.iter().map(|id| id.slot).collect();
1433-
for expected_slot in expected_referenced_slots {
1411+
if let Some(expected_referenced_pipelines) = self.pipelines_referenced_by_ebs(eb.pipeline) {
1412+
let actual_referenced_pipelines: HashSet<u64> =
1413+
eb.ebs.iter().map(|id| id.pipeline).collect();
1414+
for expected_pipeline in expected_referenced_pipelines {
14341415
let Some(certified_at) = self
14351416
.leios
1436-
.earliest_eb_cert_times_by_slot
1437-
.get(&expected_slot)
1417+
.earliest_eb_cert_times_by_pipeline
1418+
.get(&expected_pipeline)
14381419
else {
14391420
// We don't require an EB referenced from this pipeline if none of that pipeline's EBs have been certified yet,
14401421
continue;
14411422
};
14421423

1443-
let last_pipeline_slot = expected_slot + self.sim_config.stage_length - 1;
1424+
let last_pipeline_slot = (expected_pipeline + 1) * self.sim_config.stage_length - 1;
14441425
let cutoff = Timestamp::from_secs(last_pipeline_slot)
14451426
.checked_sub_duration(self.sim_config.header_diffusion_time)
14461427
.unwrap_or_default();
@@ -1449,7 +1430,7 @@ impl Node {
14491430
continue;
14501431
}
14511432

1452-
if !actual_referenced_slots.contains(&expected_slot) {
1433+
if !actual_referenced_pipelines.contains(&expected_pipeline) {
14531434
return Err(NoVoteReason::MissingEB);
14541435
}
14551436
}
@@ -1481,8 +1462,8 @@ impl Node {
14811462
*eb_votes += count;
14821463
if *eb_votes as u64 > self.sim_config.vote_threshold {
14831464
self.leios
1484-
.earliest_eb_cert_times_by_slot
1485-
.entry(eb.slot)
1465+
.earliest_eb_cert_times_by_pipeline
1466+
.entry(eb.pipeline)
14861467
.or_insert(self.clock.now());
14871468
}
14881469
}

0 commit comments

Comments
 (0)