From 37dd4af9db1706201af18dee1aae37cbadbeb060 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Thu, 28 Aug 2025 10:35:53 -0400 Subject: [PATCH 1/5] sim-rs: implement mock clock --- sim-rs/sim-core/src/clock.rs | 2 + sim-rs/sim-core/src/clock/mock.rs | 105 ++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 sim-rs/sim-core/src/clock/mock.rs diff --git a/sim-rs/sim-core/src/clock.rs b/sim-rs/sim-core/src/clock.rs index 0c4485fe0..895739ae4 100644 --- a/sim-rs/sim-core/src/clock.rs +++ b/sim-rs/sim-core/src/clock.rs @@ -10,11 +10,13 @@ use std::{ pub use coordinator::ClockCoordinator; use coordinator::ClockEvent; use futures::FutureExt; +pub use mock::MockClockCoordinator; use timestamp::AtomicTimestamp; pub use timestamp::Timestamp; use tokio::sync::{mpsc, oneshot}; mod coordinator; +mod mock; mod timestamp; // wrapper struct which holds a SimulationEvent, diff --git a/sim-rs/sim-core/src/clock/mock.rs b/sim-rs/sim-core/src/clock/mock.rs new file mode 100644 index 000000000..0e38e80c8 --- /dev/null +++ b/sim-rs/sim-core/src/clock/mock.rs @@ -0,0 +1,105 @@ +use std::{ + collections::HashMap, + sync::{Arc, atomic::AtomicUsize}, + time::Duration, +}; + +use tokio::sync::{mpsc, oneshot}; + +use crate::clock::{ + Clock, TaskInitiator, Timestamp, coordinator::ClockEvent, timestamp::AtomicTimestamp, +}; + +pub struct MockClockCoordinator { + time: Arc, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + waiter_count: Arc, + tasks: Arc, + waiters: HashMap, +} + +impl Default for MockClockCoordinator { + fn default() -> Self { + Self::new() + } +} + +impl MockClockCoordinator { + pub fn new() -> Self { + let time = Arc::new(AtomicTimestamp::new(Timestamp::zero())); + let (tx, rx) = mpsc::unbounded_channel(); + let waiter_count = Arc::new(AtomicUsize::new(0)); + let tasks = Arc::new(AtomicUsize::new(0)); + Self { + time, + tx, + rx, + waiter_count, + tasks, + waiters: HashMap::new(), + } + } + + pub fn clock(&self) -> Clock { + Clock::new( + Duration::from_nanos(1), + self.time.clone(), + self.waiter_count.clone(), + TaskInitiator::new(self.tasks.clone()), + self.tx.clone(), + ) + } + + pub fn advance_time(&mut self, until: Timestamp) { + while let Ok(event) = self.rx.try_recv() { + match event { + ClockEvent::Wait { actor, until, done } => { + if self.waiters.insert(actor, Waiter { until, done }).is_some() { + panic!("waiter {actor} waited twice"); + } + } + ClockEvent::CancelWait { actor } => { + if self.waiters.remove(&actor).is_none() { + panic!("waiter {actor} cancelled a wait twice"); + } + } + ClockEvent::FinishTask => { + if self.tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel) == 0 { + panic!("cancelled too many tasks"); + } + } + } + } + assert_eq!( + self.waiters.len(), + self.waiter_count.load(std::sync::atomic::Ordering::Acquire), + "not every worker is waiting for time to pass" + ); + + self.time.store(until, std::sync::atomic::Ordering::Release); + let mut something_happened = false; + self.waiters = std::mem::take(&mut self.waiters) + .into_iter() + .filter_map(|(actor, waiter)| { + if let Some(t) = &waiter.until { + if *t < until { + panic!("advanced time too far (waited for {until:?}, next event at {t:?})"); + } + if *t == until { + let _ = waiter.done.send(()); + something_happened = true; + return None; + } + } + Some((actor, waiter)) + }) + .collect(); + assert!(something_happened, "no actors were waiting for {until:?}"); + } +} + +struct Waiter { + until: Option, + done: oneshot::Sender<()>, +} From 7fc0e7b76a49b78a8850d2278a8b87edd439aec5 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Thu, 28 Aug 2025 11:13:51 -0400 Subject: [PATCH 2/5] sim-rs: support mocking lottery runs --- sim-rs/Cargo.lock | 29 ++++++++++++- sim-rs/sim-core/Cargo.toml | 1 + sim-rs/sim-core/src/sim/linear_leios.rs | 38 ++++++++--------- sim-rs/sim-core/src/sim/lottery.rs | 54 +++++++++++++++++++++++++ 4 files changed, 103 insertions(+), 19 deletions(-) diff --git a/sim-rs/Cargo.lock b/sim-rs/Cargo.lock index fd0f17fb4..d11b728c9 100644 --- a/sim-rs/Cargo.lock +++ b/sim-rs/Cargo.lock @@ -258,6 +258,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "ctrlc" version = "3.4.7" @@ -303,6 +309,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.4.0" @@ -517,6 +537,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -542,7 +568,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1245,6 +1271,7 @@ version = "1.3.0" dependencies = [ "anyhow", "async-stream", + "dashmap", "futures", "netsim-async", "num-traits", diff --git a/sim-rs/sim-core/Cargo.toml b/sim-rs/sim-core/Cargo.toml index 6ca59b835..079228904 100644 --- a/sim-rs/sim-core/Cargo.toml +++ b/sim-rs/sim-core/Cargo.toml @@ -7,6 +7,7 @@ rust-version = "1.88" [dependencies] anyhow = "1" async-stream = "0.3" +dashmap = "6" futures = "0.3" netsim-async = { git = "https://github.com/input-output-hk/ce-netsim.git", rev = "9d1e26c" } num-traits = "0.2" diff --git a/sim-rs/sim-core/src/sim/linear_leios.rs b/sim-rs/sim-core/src/sim/linear_leios.rs index 22b591325..ff7f9c054 100644 --- a/sim-rs/sim-core/src/sim/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/linear_leios.rs @@ -28,7 +28,7 @@ use crate::{ sim::{ MiniProtocol, NodeImpl, SimCpuTask, SimMessage, linear_leios::attackers::{EBWithholdingEvent, EBWithholdingSender}, - lottery, + lottery::{LotteryConfig, LotteryKind, MockLotteryResults, vrf_probabilities}, }, }; @@ -306,7 +306,7 @@ pub struct LinearLeiosNode { tracker: EventTracker, rng: ChaChaRng, clock: Clock, - stake: u64, + lottery: LotteryConfig, consumers: Vec, txs: HashMap, ledger_states: BTreeMap>, @@ -333,6 +333,11 @@ impl NodeImpl for LinearLeiosNode { rng: ChaChaRng, clock: Clock, ) -> Self { + let lottery = LotteryConfig::Random { + stake: config.stake, + total_stake: sim_config.total_stake, + }; + Self { id: config.id, sim_config, @@ -340,7 +345,7 @@ impl NodeImpl for LinearLeiosNode { tracker, rng, clock, - stake: config.stake, + lottery, consumers: config.consumers.clone(), txs: HashMap::new(), ledger_states: BTreeMap::new(), @@ -512,7 +517,10 @@ impl LinearLeiosNode { // Ranking block propagation impl LinearLeiosNode { fn try_generate_rb(&mut self, slot: u64) { - let Some(vrf) = self.run_vrf(self.sim_config.block_generation_probability) else { + let Some(vrf) = self.run_vrf( + LotteryKind::GenerateRB, + self.sim_config.block_generation_probability, + ) else { return; }; @@ -1204,8 +1212,8 @@ impl LinearLeiosNode { } fn try_vote_for_endorser_block(&mut self, eb: &Arc, seen: Timestamp) -> bool { - let vrf_wins = lottery::vrf_probabilities(self.sim_config.vote_probability) - .filter_map(|f| self.run_vrf(f)) + let vrf_wins = vrf_probabilities(self.sim_config.vote_probability) + .filter_map(|f| self.run_vrf(LotteryKind::GenerateVote, f)) .count(); if vrf_wins == 0 { return false; @@ -1487,18 +1495,12 @@ impl LinearLeiosNode { // Common utilities impl LinearLeiosNode { + #[allow(unused)] + pub fn mock_lottery(&mut self, results: Arc) { + self.lottery = LotteryConfig::Mock { results }; + } // Simulates the output of a VRF using this node's stake (if any). - fn run_vrf(&mut self, success_rate: f64) -> Option { - let target_vrf_stake = lottery::compute_target_vrf_stake( - self.stake, - self.sim_config.total_stake, - success_rate, - ); - let result = self.rng.random_range(0..self.sim_config.total_stake); - if result < target_vrf_stake { - Some(result) - } else { - None - } + fn run_vrf(&mut self, kind: LotteryKind, success_rate: f64) -> Option { + self.lottery.run(kind, success_rate, &mut self.rng) } } diff --git a/sim-rs/sim-core/src/sim/lottery.rs b/sim-rs/sim-core/src/sim/lottery.rs index bc86167ab..0df95e211 100644 --- a/sim-rs/sim-core/src/sim/lottery.rs +++ b/sim-rs/sim-core/src/sim/lottery.rs @@ -1,3 +1,9 @@ +use std::{collections::VecDeque, sync::Arc}; + +use dashmap::DashMap; +use rand::Rng; +use rand_chacha::ChaChaRng; + pub fn compute_target_vrf_stake(stake: u64, total_stake: u64, success_rate: f64) -> u64 { let ratio = stake as f64 / total_stake as f64; (total_stake as f64 * ratio * success_rate) as u64 @@ -7,3 +13,51 @@ pub fn vrf_probabilities(probability: f64) -> impl Iterator { let final_success_rate = Some(probability.fract()).filter(|f| *f > 0.0); std::iter::repeat_n(1.0, probability.trunc() as usize).chain(final_success_rate) } + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub enum LotteryKind { + GenerateRB, + GenerateVote, +} + +#[derive(Default)] +pub struct MockLotteryResults { + results: DashMap>, +} +impl MockLotteryResults { + pub fn run(&self, kind: LotteryKind) -> Option { + self.results.entry(kind).or_default().pop_front() + } + #[allow(unused)] + pub fn configure_win(&self, kind: LotteryKind, result: u64) { + self.results.entry(kind).or_default().push_back(result); + } +} + +pub enum LotteryConfig { + Random { + stake: u64, + total_stake: u64, + }, + #[allow(unused)] + Mock { + results: Arc, + }, +} + +impl LotteryConfig { + pub fn run(&self, kind: LotteryKind, success_rate: f64, rng: &mut ChaChaRng) -> Option { + match self { + Self::Random { stake, total_stake } => { + let target_vrf_stake = compute_target_vrf_stake(*stake, *total_stake, success_rate); + let result = rng.random_range(0..*total_stake); + if result < target_vrf_stake { + Some(result) + } else { + None + } + } + Self::Mock { results } => results.run(kind), + } + } +} From 2018d061cfd0c8ab213ca8a67cd89c6913fc7cc8 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Thu, 28 Aug 2025 12:03:28 -0400 Subject: [PATCH 3/5] sim-rs: add a simple tx propagation test --- sim-rs/Cargo.lock | 1 + sim-rs/sim-core/Cargo.toml | 1 + sim-rs/sim-core/src/clock/mock.rs | 3 - sim-rs/sim-core/src/model.rs | 8 +- sim-rs/sim-core/src/sim.rs | 9 + sim-rs/sim-core/src/sim/linear_leios.rs | 3 +- sim-rs/sim-core/src/sim/tests/linear_leios.rs | 254 ++++++++++++++++++ sim-rs/sim-core/src/sim/tests/mod.rs | 1 + 8 files changed, 272 insertions(+), 8 deletions(-) create mode 100644 sim-rs/sim-core/src/sim/tests/linear_leios.rs create mode 100644 sim-rs/sim-core/src/sim/tests/mod.rs diff --git a/sim-rs/Cargo.lock b/sim-rs/Cargo.lock index d11b728c9..f249e739a 100644 --- a/sim-rs/Cargo.lock +++ b/sim-rs/Cargo.lock @@ -1280,6 +1280,7 @@ dependencies = [ "rand_chacha 0.9.0", "rand_distr 0.5.1", "serde", + "serde_yaml", "tokio", "tokio-util", "tracing", diff --git a/sim-rs/sim-core/Cargo.toml b/sim-rs/sim-core/Cargo.toml index 079228904..c20bb7836 100644 --- a/sim-rs/sim-core/Cargo.toml +++ b/sim-rs/sim-core/Cargo.toml @@ -21,4 +21,5 @@ tokio-util = "0.7" tracing = "0.1" [dev-dependencies] +serde_yaml = "0.9" tokio = { version = "1", features = ["macros", "rt"] } diff --git a/sim-rs/sim-core/src/clock/mock.rs b/sim-rs/sim-core/src/clock/mock.rs index 0e38e80c8..92bceaa7f 100644 --- a/sim-rs/sim-core/src/clock/mock.rs +++ b/sim-rs/sim-core/src/clock/mock.rs @@ -78,7 +78,6 @@ impl MockClockCoordinator { ); self.time.store(until, std::sync::atomic::Ordering::Release); - let mut something_happened = false; self.waiters = std::mem::take(&mut self.waiters) .into_iter() .filter_map(|(actor, waiter)| { @@ -88,14 +87,12 @@ impl MockClockCoordinator { } if *t == until { let _ = waiter.done.send(()); - something_happened = true; return None; } } Some((actor, waiter)) }) .collect(); - assert!(something_happened, "no actors were waiting for {until:?}"); } } diff --git a/sim-rs/sim-core/src/model.rs b/sim-rs/sim-core/src/model.rs index d4ddc2040..7f9ce8330 100644 --- a/sim-rs/sim-core/src/model.rs +++ b/sim-rs/sim-core/src/model.rs @@ -84,7 +84,7 @@ impl Block { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct LinearRankingBlockHeader { pub id: BlockId, pub vrf: u64, @@ -93,7 +93,7 @@ pub struct LinearRankingBlockHeader { pub eb_announcement: Option, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct LinearRankingBlock { pub header: LinearRankingBlockHeader, pub transactions: Vec>, @@ -227,7 +227,7 @@ impl StracciatellaEndorserBlock { } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct LinearEndorserBlock { pub slot: u64, pub producer: NodeId, @@ -293,7 +293,7 @@ pub enum TransactionLostReason { EBExpired, } -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] pub struct Endorsement { pub eb: EndorserBlockId, pub size_bytes: u64, diff --git a/sim-rs/sim-core/src/sim.rs b/sim-rs/sim-core/src/sim.rs index ac9e0e6b8..937c40caa 100644 --- a/sim-rs/sim-core/src/sim.rs +++ b/sim-rs/sim-core/src/sim.rs @@ -28,6 +28,8 @@ mod linear_leios; mod lottery; mod slot; mod stracciatella; +#[cfg(test)] +mod tests; mod tx; enum NetworkWrapper { @@ -304,6 +306,13 @@ impl Default for EventResult { } impl EventResult { + #[cfg(test)] + pub fn merge(&mut self, mut other: EventResult) { + self.messages.append(&mut other.messages); + self.tasks.append(&mut other.tasks); + self.timed_events.append(&mut other.timed_events); + } + pub fn send_to(&mut self, to: NodeId, msg: N::Message) { self.messages.push((to, msg)); } diff --git a/sim-rs/sim-core/src/sim/linear_leios.rs b/sim-rs/sim-core/src/sim/linear_leios.rs index ff7f9c054..f2984a8af 100644 --- a/sim-rs/sim-core/src/sim/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/linear_leios.rs @@ -32,7 +32,7 @@ use crate::{ }, }; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Message { // TX propagation AnnounceTx(TransactionId), @@ -114,6 +114,7 @@ impl SimMessage for Message { } } +#[derive(Debug, Clone, PartialEq, Eq)] pub enum CpuTask { /// A transaction has been received and validated, and is ready to propagate TransactionValidated(NodeId, Arc), diff --git a/sim-rs/sim-core/src/sim/tests/linear_leios.rs b/sim-rs/sim-core/src/sim/tests/linear_leios.rs new file mode 100644 index 000000000..19530c18b --- /dev/null +++ b/sim-rs/sim-core/src/sim/tests/linear_leios.rs @@ -0,0 +1,254 @@ +use std::{collections::HashMap, sync::Arc}; + +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaChaRng; +use tokio::sync::mpsc; + +use crate::{ + clock::{Clock, MockClockCoordinator, Timestamp}, + config::{NodeId, RawLinkInfo, RawNode, RawTopology, SimConfiguration, TransactionConfig}, + events::{Event, EventTracker}, + model::{LinearEndorserBlock, LinearRankingBlock, Transaction}, + sim::{ + EventResult, NodeImpl, + linear_leios::{CpuTask, LinearLeiosNode, Message}, + lottery::{LotteryKind, MockLotteryResults}, + }, +}; + +fn new_sim_config(topology: RawTopology) -> Arc { + let params = + serde_yaml::from_slice(include_bytes!("../../../../parameters/config.default.yaml")) + .unwrap(); + let topology = topology.into(); + Arc::new(SimConfiguration::build(params, topology).unwrap()) +} + +fn new_sim( + sim_config: Arc, + event_tx: mpsc::UnboundedSender<(Event, Timestamp)>, + clock: Clock, +) -> ( + HashMap, + HashMap>, +) { + let tracker = EventTracker::new(event_tx, clock.clone(), &sim_config.nodes); + let mut rng = ChaChaRng::seed_from_u64(sim_config.seed); + let mut lottery = HashMap::new(); + let nodes = sim_config + .nodes + .iter() + .map(|config| { + let rng = ChaChaRng::seed_from_u64(rng.next_u64()); + let mut node = LinearLeiosNode::new( + config, + sim_config.clone(), + tracker.clone(), + rng, + clock.clone(), + ); + let lottery_results = Arc::new(MockLotteryResults::default()); + node.mock_lottery(lottery_results.clone()); + lottery.insert(config.id, lottery_results); + (config.id, node) + }) + .collect(); + (nodes, lottery) +} + +fn new_topology(nodes: Vec<(&'static str, RawNode)>) -> RawTopology { + RawTopology { + nodes: nodes + .into_iter() + .map(|(name, node)| (name.to_string(), node)) + .collect(), + } +} +fn new_node(stake: Option, producers: Vec<&'static str>) -> RawNode { + RawNode { + stake, + location: crate::config::RawNodeLocation::Cluster { + cluster: "all".into(), + }, + cpu_core_count: Some(4), + tx_conflict_fraction: None, + tx_generation_weight: None, + producers: producers + .iter() + .map(|n| { + ( + n.to_string(), + RawLinkInfo { + latency_ms: 0.0, + bandwidth_bytes_per_second: None, + }, + ) + }) + .collect(), + adversarial: None, + behaviours: vec![], + } +} + +struct TestDriver { + sim_config: Arc, + rng: ChaChaRng, + slot: u64, + time: MockClockCoordinator, + nodes: HashMap, + lottery: HashMap>, + queued: HashMap>, +} + +impl TestDriver { + fn new(topology: RawTopology) -> Self { + let sim_config = new_sim_config(topology); + let rng = ChaChaRng::seed_from_u64(sim_config.seed); + let slot = 0; + let time = MockClockCoordinator::new(); + let (event_tx, _event_rx) = mpsc::unbounded_channel(); + let (nodes, lottery) = new_sim(sim_config.clone(), event_tx, time.clock()); + Self { + sim_config, + rng, + slot, + time, + nodes, + lottery, + queued: HashMap::new(), + } + } + + pub fn id_for(&self, name: &str) -> NodeId { + self.sim_config + .nodes + .iter() + .find_map(|n| if n.name == name { Some(n.id) } else { None }) + .unwrap() + } + + pub fn produce_tx(&mut self, node_id: NodeId, conflict: bool) -> Arc { + let TransactionConfig::Real(tx_config) = &self.sim_config.transactions else { + panic!("unexpected TX config") + }; + let tx = Arc::new(tx_config.new_tx(&mut self.rng, Some(if conflict { 1.0 } else { 0.0 }))); + let node = self.nodes.get_mut(&node_id).unwrap(); + let events = node.handle_new_tx(tx.clone()); + self.queued.entry(node_id).or_default().merge(events); + tx + } + + pub fn win_next_rb_lottery(&mut self, node_id: NodeId, result: u64) { + self.lottery + .get(&node_id) + .unwrap() + .configure_win(LotteryKind::GenerateRB, result); + } + + pub fn next_slot(&mut self) { + self.slot += 1; + self.time.advance_time(Timestamp::from_secs(self.slot)); + for (node_id, node) in self.nodes.iter_mut() { + let events = node.handle_new_slot(self.slot); + self.queued.entry(*node_id).or_default().merge(events); + } + } + + pub fn expect_message( + &mut self, + from: NodeId, + to: NodeId, + message: ::Message, + ) { + let queued = self.queued.entry(from).or_default(); + let mut found = false; + queued.messages.retain(|(t, msg)| { + if t == &to && msg == &message { + found = true; + false + } else { + true + } + }); + assert!( + found, + "message {message:?} was not sent from {from} to {to}" + ); + let events = self + .nodes + .get_mut(&to) + .unwrap() + .handle_message(from, message); + self.queued.entry(to).or_default().merge(events); + } + + pub fn expect_cpu_task(&mut self, node: NodeId, task: ::Task) { + self.expect_cpu_task_matching(node, |t| if *t == task { Some(t.clone()) } else { None }); + } + + pub fn expect_cpu_task_matching(&mut self, node: NodeId, matcher: M) -> T + where + M: Fn(&::Task) -> Option, + { + let queued = self.queued.entry(node).or_default(); + let mut result = None; + let mut new_queued = EventResult::default(); + queued.tasks.retain(|t| { + if result.is_some() { + return true; + } + result = matcher(t); + if result.is_some() { + new_queued = self + .nodes + .get_mut(&node) + .unwrap() + .handle_cpu_task(t.clone()); + } + result.is_none() + }); + queued.merge(new_queued); + result.expect("no CPU tasks matching filter") + } +} + +fn is_new_rb_task(task: &CpuTask) -> Option<(LinearRankingBlock, Option)> { + match task { + CpuTask::RBBlockGenerated(rb, eb) => { + Some((rb.clone(), eb.as_ref().map(|(eb, _)| eb.clone()))) + } + _ => None, + } +} + +#[test] +fn should_propagate_transactions() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + // Node 1 produces a transaction, node 2 should request it + let tx1 = sim.produce_tx(node1, false); + sim.expect_message(node1, node2, Message::AnnounceTx(tx1.id)); + sim.expect_message(node2, node1, Message::RequestTx(tx1.id)); + sim.expect_message(node1, node2, Message::Tx(tx1.clone())); + sim.expect_cpu_task(node2, CpuTask::TransactionValidated(node1, tx1.clone())); + + // Node 2 produces a transaction, node 1 should request it + let tx2 = sim.produce_tx(node2, false); + sim.expect_message(node2, node1, Message::AnnounceTx(tx2.id)); + sim.expect_message(node1, node2, Message::RequestTx(tx2.id)); + sim.expect_message(node2, node1, Message::Tx(tx2.clone())); + sim.expect_cpu_task(node1, CpuTask::TransactionValidated(node2, tx2.clone())); + + // When node 1 produces an RB, it should include both TXs + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + assert_eq!(new_rb.transactions, vec![tx1, tx2]); + assert_eq!(new_eb, None); +} diff --git a/sim-rs/sim-core/src/sim/tests/mod.rs b/sim-rs/sim-core/src/sim/tests/mod.rs new file mode 100644 index 000000000..4fade2dfb --- /dev/null +++ b/sim-rs/sim-core/src/sim/tests/mod.rs @@ -0,0 +1 @@ +mod linear_leios; From 7f980c816105d634b9518b5d7e646cb36b897930 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 29 Aug 2025 11:09:52 -0400 Subject: [PATCH 4/5] sim-rs: fix bug with conflicting TXs --- sim-rs/CHANGELOG.md | 6 + sim-rs/sim-core/src/clock/mock.rs | 4 + sim-rs/sim-core/src/sim/linear_leios.rs | 19 +- sim-rs/sim-core/src/sim/tests/linear_leios.rs | 173 ++++++++++++++++-- 4 files changed, 187 insertions(+), 15 deletions(-) diff --git a/sim-rs/CHANGELOG.md b/sim-rs/CHANGELOG.md index bdae751e5..7879aaaab 100644 --- a/sim-rs/CHANGELOG.md +++ b/sim-rs/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Linear Leios +- Add some protocol-level tests +- Fix bug; transactions with conflicts referenced by EBs did not propagate far enough + ## v1.3.0 ### Linear Leios diff --git a/sim-rs/sim-core/src/clock/mock.rs b/sim-rs/sim-core/src/clock/mock.rs index 92bceaa7f..c29a68f1c 100644 --- a/sim-rs/sim-core/src/clock/mock.rs +++ b/sim-rs/sim-core/src/clock/mock.rs @@ -51,6 +51,10 @@ impl MockClockCoordinator { ) } + pub fn now(&self) -> Timestamp { + self.time.load(std::sync::atomic::Ordering::Acquire) + } + pub fn advance_time(&mut self, until: Timestamp) { while let Ok(event) = self.rx.try_recv() { match event { diff --git a/sim-rs/sim-core/src/sim/linear_leios.rs b/sim-rs/sim-core/src/sim/linear_leios.rs index f2984a8af..d25856d19 100644 --- a/sim-rs/sim-core/src/sim/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/linear_leios.rs @@ -1004,7 +1004,7 @@ impl LinearLeiosNode { if missing_txs.is_empty() { self.queued - .schedule_cpu_task(CpuTask::EBBlockValidated(eb, seen)); + .schedule_cpu_task(CpuTask::EBBlockValidated(eb.clone(), seen)); } else { for tx_id in missing_txs { self.leios @@ -1014,6 +1014,23 @@ impl LinearLeiosNode { .push(eb.id()); } } + + if matches!( + self.sim_config.variant, + LeiosVariant::LinearWithTxReferences + ) { + // If the EB references any TXs which we already have, but are not in our mempool, + // we must have failed to add them to the mempool due to conflicts. + // Announce those TXs to our peers, since we didn't before. + for tx in &eb.txs { + if !self.has_tx(tx.id) || self.praos.mempool.contains_key(&tx.id) { + continue; + } + for peer in &self.consumers { + self.queued.send_to(*peer, Message::AnnounceTx(tx.id)); + } + } + } } fn try_validating_eb(&mut self, eb_id: EndorserBlockId) { diff --git a/sim-rs/sim-core/src/sim/tests/linear_leios.rs b/sim-rs/sim-core/src/sim/tests/linear_leios.rs index 19530c18b..471b6a20c 100644 --- a/sim-rs/sim-core/src/sim/tests/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/tests/linear_leios.rs @@ -17,9 +17,16 @@ use crate::{ }; fn new_sim_config(topology: RawTopology) -> Arc { - let params = + let mut params: crate::config::RawParameters = serde_yaml::from_slice(include_bytes!("../../../../parameters/config.default.yaml")) .unwrap(); + params.leios_variant = crate::config::LeiosVariant::LinearWithTxReferences; + // every transaction fills up exactly half of an RB + let tx_size = params.rb_body_max_size_bytes / 2; + params.tx_size_bytes_distribution = crate::config::DistributionConfig::Constant { + value: tx_size as f64, + }; + params.tx_max_size_bytes = tx_size; let topology = topology.into(); Arc::new(SimConfiguration::build(params, topology).unwrap()) } @@ -154,6 +161,49 @@ impl TestDriver { } } + pub fn expect_tx_sent(&mut self, from: NodeId, to: NodeId, tx: Arc) { + self.expect_message(from, to, Message::AnnounceTx(tx.id)); + self.expect_message(to, from, Message::RequestTx(tx.id)); + self.expect_message(from, to, Message::Tx(tx.clone())); + self.expect_cpu_task(to, CpuTask::TransactionValidated(from, tx)); + } + + pub fn expect_tx_not_sent(&mut self, from: NodeId, to: NodeId, tx: Arc) { + self.expect_no_message(from, to, Message::AnnounceTx(tx.id)); + } + + pub fn expect_rb_and_eb_sent( + &mut self, + from: NodeId, + to: NodeId, + rb: Arc, + eb: Option>, + ) { + self.expect_message(from, to, Message::AnnounceRBHeader(rb.header.id)); + self.expect_message(to, from, Message::RequestRBHeader(rb.header.id)); + self.expect_message( + from, + to, + Message::RBHeader(rb.header.clone(), true, eb.is_some()), + ); + self.expect_cpu_task( + to, + CpuTask::RBHeaderValidated(from, rb.header.clone(), true, eb.is_some()), + ); + self.expect_message(to, from, Message::RequestRB(rb.header.id)); + self.expect_message(from, to, Message::RB(rb.clone())); + self.expect_cpu_task(to, CpuTask::RBBlockValidated(rb)); + if let Some(eb) = eb { + self.expect_message(to, from, Message::RequestEB(eb.id())); + self.expect_message(from, to, Message::EB(eb.clone())); + self.expect_cpu_task(to, CpuTask::EBHeaderValidated(from, eb)); + } + } + + pub fn expect_eb_validated(&mut self, node: NodeId, eb: Arc) { + self.expect_cpu_task(node, CpuTask::EBBlockValidated(eb, self.time.now())); + } + pub fn expect_message( &mut self, from: NodeId, @@ -172,7 +222,12 @@ impl TestDriver { }); assert!( found, - "message {message:?} was not sent from {from} to {to}" + "message {message:?} was not sent from {from} to {to}\npending messages: {:?}", + queued + .messages + .iter() + .filter(|(t, _)| t == &to) + .collect::>(), ); let events = self .nodes @@ -182,6 +237,20 @@ impl TestDriver { self.queued.entry(to).or_default().merge(events); } + pub fn expect_no_message( + &mut self, + from: NodeId, + to: NodeId, + message: ::Message, + ) { + let Some(queued) = self.queued.get(&from) else { + return; + }; + for (t, m) in &queued.messages { + assert_ne!((t, m), (&to, &message)); + } + } + pub fn expect_cpu_task(&mut self, node: NodeId, task: ::Task) { self.expect_cpu_task_matching(node, |t| if *t == task { Some(t.clone()) } else { None }); } @@ -212,11 +281,14 @@ impl TestDriver { } } -fn is_new_rb_task(task: &CpuTask) -> Option<(LinearRankingBlock, Option)> { +fn is_new_rb_task( + task: &CpuTask, +) -> Option<(Arc, Option>)> { match task { - CpuTask::RBBlockGenerated(rb, eb) => { - Some((rb.clone(), eb.as_ref().map(|(eb, _)| eb.clone()))) - } + CpuTask::RBBlockGenerated(rb, eb) => Some(( + Arc::new(rb.clone()), + eb.as_ref().map(|(eb, _)| Arc::new(eb.clone())), + )), _ => None, } } @@ -233,17 +305,11 @@ fn should_propagate_transactions() { // Node 1 produces a transaction, node 2 should request it let tx1 = sim.produce_tx(node1, false); - sim.expect_message(node1, node2, Message::AnnounceTx(tx1.id)); - sim.expect_message(node2, node1, Message::RequestTx(tx1.id)); - sim.expect_message(node1, node2, Message::Tx(tx1.clone())); - sim.expect_cpu_task(node2, CpuTask::TransactionValidated(node1, tx1.clone())); + sim.expect_tx_sent(node1, node2, tx1.clone()); // Node 2 produces a transaction, node 1 should request it let tx2 = sim.produce_tx(node2, false); - sim.expect_message(node2, node1, Message::AnnounceTx(tx2.id)); - sim.expect_message(node1, node2, Message::RequestTx(tx2.id)); - sim.expect_message(node2, node1, Message::Tx(tx2.clone())); - sim.expect_cpu_task(node1, CpuTask::TransactionValidated(node2, tx2.clone())); + sim.expect_tx_sent(node2, node1, tx2.clone()); // When node 1 produces an RB, it should include both TXs sim.win_next_rb_lottery(node1, 0); @@ -252,3 +318,82 @@ fn should_propagate_transactions() { assert_eq!(new_rb.transactions, vec![tx1, tx2]); assert_eq!(new_eb, None); } + +#[test] +fn should_not_propagate_conflicting_transactions() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + // Node 1 and 2 produce conflicting transactions + let tx1 = sim.produce_tx(node1, false); + let tx2 = sim.produce_tx(node2, true); + + // Each node should send its TX to the other node, + sim.expect_tx_sent(node1, node2, tx1.clone()); + sim.expect_tx_sent(node2, node1, tx2.clone()); + + // When node 1 produces an RB, it should include only its own TX + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + assert_eq!(new_rb.transactions, vec![tx1]); + assert_eq!(new_eb, None); +} + +#[test] +fn should_repropagate_conflicting_transactions_from_eb() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1", "node-3"])), + ("node-3", new_node(Some(1000), vec!["node-2"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + let node3 = sim.id_for("node-3"); + + // Node 1 produces 3 transactions + let tx1_1 = sim.produce_tx(node1, false); + let tx1_2 = sim.produce_tx(node1, false); + let tx1_3 = sim.produce_tx(node1, false); + + // Node 2 produces a transaction which conflicts with Node 1's final transaction + let tx2 = sim.produce_tx(node2, true); + // Node 2 sends its transactions to nodes 1 and 3 + sim.expect_tx_sent(node2, node1, tx2.clone()); + sim.expect_tx_sent(node2, node3, tx2.clone()); + + // Node 1 sends all of its transactions to node 2 + sim.expect_tx_sent(node1, node2, tx1_1.clone()); + sim.expect_tx_sent(node1, node2, tx1_2.clone()); + sim.expect_tx_sent(node1, node2, tx1_3.clone()); + + // Node 2 sends the first two transactions to node 3, but not the conflicting third + sim.expect_tx_sent(node2, node3, tx1_1.clone()); + sim.expect_tx_sent(node2, node3, tx1_2.clone()); + sim.expect_tx_not_sent(node2, node3, tx1_3.clone()); + + // Now, Node 1 produces an RB (with an EB, because there are enough transactions) + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (rb, eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + let eb = eb.expect("node did not produce EB"); + assert_eq!(rb.transactions, vec![tx1_1, tx1_2]); + assert_eq!(eb.txs, vec![tx1_3.clone()]); + + // That RB and EB propagate from node 1 to node 2 + sim.expect_rb_and_eb_sent(node1, node2, rb.clone(), Some(eb.clone())); + // Node 2 fully validates the EB, because node 1 has all TXs + sim.expect_eb_validated(node2, eb.clone()); + // And Node 2 propagates it to Node 3 + sim.expect_rb_and_eb_sent(node2, node3, rb.clone(), Some(eb.clone())); + + // and NOW Node 2 will tell Node 3 about the EB's conflicting TX + sim.expect_tx_sent(node2, node3, tx1_3); + sim.expect_eb_validated(node3, eb); +} From b69e330feafcf522e9d1f043b93214363e14a580 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 29 Aug 2025 12:33:39 -0400 Subject: [PATCH 5/5] sim-rs: test vote generation --- sim-rs/sim-core/src/sim/tests/linear_leios.rs | 166 +++++++++++++++--- 1 file changed, 145 insertions(+), 21 deletions(-) diff --git a/sim-rs/sim-core/src/sim/tests/linear_leios.rs b/sim-rs/sim-core/src/sim/tests/linear_leios.rs index 471b6a20c..df3f7eb5d 100644 --- a/sim-rs/sim-core/src/sim/tests/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/tests/linear_leios.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaChaRng; @@ -8,10 +11,10 @@ use crate::{ clock::{Clock, MockClockCoordinator, Timestamp}, config::{NodeId, RawLinkInfo, RawNode, RawTopology, SimConfiguration, TransactionConfig}, events::{Event, EventTracker}, - model::{LinearEndorserBlock, LinearRankingBlock, Transaction}, + model::{LinearEndorserBlock, LinearRankingBlock, Transaction, VoteBundle}, sim::{ EventResult, NodeImpl, - linear_leios::{CpuTask, LinearLeiosNode, Message}, + linear_leios::{CpuTask, LinearLeiosNode, Message, TimedEvent}, lottery::{LotteryKind, MockLotteryResults}, }, }; @@ -98,50 +101,56 @@ fn new_node(stake: Option, producers: Vec<&'static str>) -> RawNode { } struct TestDriver { - sim_config: Arc, + pub config: Arc, rng: ChaChaRng, slot: u64, time: MockClockCoordinator, nodes: HashMap, lottery: HashMap>, queued: HashMap>, + events: BTreeMap>, } impl TestDriver { fn new(topology: RawTopology) -> Self { - let sim_config = new_sim_config(topology); - let rng = ChaChaRng::seed_from_u64(sim_config.seed); + let config = new_sim_config(topology); + let rng = ChaChaRng::seed_from_u64(config.seed); let slot = 0; let time = MockClockCoordinator::new(); let (event_tx, _event_rx) = mpsc::unbounded_channel(); - let (nodes, lottery) = new_sim(sim_config.clone(), event_tx, time.clock()); + let (nodes, lottery) = new_sim(config.clone(), event_tx, time.clock()); Self { - sim_config, + config, rng, slot, time, nodes, lottery, queued: HashMap::new(), + events: BTreeMap::new(), } } pub fn id_for(&self, name: &str) -> NodeId { - self.sim_config + self.config .nodes .iter() .find_map(|n| if n.name == name { Some(n.id) } else { None }) .unwrap() } + pub fn now(&self) -> Timestamp { + self.time.now() + } + pub fn produce_tx(&mut self, node_id: NodeId, conflict: bool) -> Arc { - let TransactionConfig::Real(tx_config) = &self.sim_config.transactions else { + let TransactionConfig::Real(tx_config) = &self.config.transactions else { panic!("unexpected TX config") }; let tx = Arc::new(tx_config.new_tx(&mut self.rng, Some(if conflict { 1.0 } else { 0.0 }))); let node = self.nodes.get_mut(&node_id).unwrap(); let events = node.handle_new_tx(tx.clone()); - self.queued.entry(node_id).or_default().merge(events); + self.process_events(node_id, events); tx } @@ -152,12 +161,48 @@ impl TestDriver { .configure_win(LotteryKind::GenerateRB, result); } + pub fn win_next_vote_lottery(&mut self, node_id: NodeId, result: u64) { + self.lottery + .get(&node_id) + .unwrap() + .configure_win(LotteryKind::GenerateVote, result); + } + pub fn next_slot(&mut self) { - self.slot += 1; - self.time.advance_time(Timestamp::from_secs(self.slot)); - for (node_id, node) in self.nodes.iter_mut() { - let events = node.handle_new_slot(self.slot); - self.queued.entry(*node_id).or_default().merge(events); + self.advance_time_to(Timestamp::from_secs(self.slot + 1)); + } + + pub fn advance_time_to(&mut self, timestamp: Timestamp) { + let mut now = self.time.now(); + while now < timestamp { + let next_slot = self.slot + 1; + let next_slot_time = Timestamp::from_secs(next_slot); + let mut next_event = timestamp.min(next_slot_time); + if let Some((event_time, _)) = self.events.first_key_value() { + next_event = next_event.min(*event_time); + } + self.time.advance_time(next_event); + now = next_event; + + let mut updates: HashMap> = HashMap::new(); + if now == next_slot_time { + for (node_id, node) in &mut self.nodes { + let events = node.handle_new_slot(next_slot); + updates.entry(*node_id).or_default().merge(events); + } + self.slot = next_slot; + } + if let Some(events) = self.events.remove(&next_event) { + for (node_id, event) in events { + let node = self.nodes.get_mut(&node_id).unwrap(); + let events = node.handle_timed_event(event); + updates.entry(node_id).or_default().merge(events); + } + } + + for (node, events) in updates { + self.process_events(node, events); + } } } @@ -234,7 +279,7 @@ impl TestDriver { .get_mut(&to) .unwrap() .handle_message(from, message); - self.queued.entry(to).or_default().merge(events); + self.process_events(to, events); } pub fn expect_no_message( @@ -261,14 +306,14 @@ impl TestDriver { { let queued = self.queued.entry(node).or_default(); let mut result = None; - let mut new_queued = EventResult::default(); + let mut events = EventResult::default(); queued.tasks.retain(|t| { if result.is_some() { return true; } result = matcher(t); if result.is_some() { - new_queued = self + events = self .nodes .get_mut(&node) .unwrap() @@ -276,9 +321,19 @@ impl TestDriver { } result.is_none() }); - queued.merge(new_queued); + self.process_events(node, events); result.expect("no CPU tasks matching filter") } + + fn process_events(&mut self, node: NodeId, mut events: EventResult) { + for (timestamp, event) in events.timed_events.drain(..) { + self.events + .entry(timestamp) + .or_default() + .push((node, event)); + } + self.queued.entry(node).or_default().merge(events); + } } fn is_new_rb_task( @@ -293,8 +348,15 @@ fn is_new_rb_task( } } +fn is_new_vote_task(task: &CpuTask) -> Option> { + match task { + CpuTask::VTBundleGenerated(vote, _) => Some(Arc::new(vote.clone())), + _ => None, + } +} + #[test] -fn should_propagate_transactions() { +fn should_produce_rbs_without_ebs() { let topology = new_topology(vec![ ("node-1", new_node(Some(1000), vec!["node-2"])), ("node-2", new_node(Some(1000), vec!["node-1"])), @@ -317,6 +379,37 @@ fn should_propagate_transactions() { let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); assert_eq!(new_rb.transactions, vec![tx1, tx2]); assert_eq!(new_eb, None); + + sim.expect_rb_and_eb_sent(node1, node2, new_rb, None); +} + +#[test] +fn should_produce_rbs_and_ebs() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + // Node 1 produces three transactions, Node 2 should request them all + let tx1_1 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1_1.clone()); + let tx1_2 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1_2.clone()); + let tx1_3 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1_3.clone()); + + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + assert_eq!(new_rb.transactions, vec![tx1_1, tx1_2]); + let new_eb = new_eb.expect("no EB produced"); + assert_eq!(new_eb.txs, vec![tx1_3]); + + sim.expect_rb_and_eb_sent(node1, node2, new_rb, Some(new_eb.clone())); + sim.expect_eb_validated(node2, new_eb); } #[test] @@ -397,3 +490,34 @@ fn should_repropagate_conflicting_transactions_from_eb() { sim.expect_tx_sent(node2, node3, tx1_3); sim.expect_eb_validated(node3, eb); } + +#[test] +fn should_vote_for_eb() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + let txs = (0..3) + .map(|_| sim.produce_tx(node1, false)) + .collect::>(); + for tx in &txs { + sim.expect_tx_sent(node1, node2, tx.clone()); + } + + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (rb, eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + let eb = eb.expect("node did not produce EB"); + + sim.expect_rb_and_eb_sent(node1, node2, rb.clone(), Some(eb.clone())); + sim.expect_eb_validated(node2, eb.clone()); + + sim.win_next_vote_lottery(node2, 0); + sim.advance_time_to(sim.now() + (sim.config.header_diffusion_time * 3)); + let vote = sim.expect_cpu_task_matching(node2, is_new_vote_task); + assert_eq!(*vote.ebs.first_key_value().unwrap().0, eb.id()); +}