diff --git a/sim-rs/CHANGELOG.md b/sim-rs/CHANGELOG.md index bdae751e5..7879aaaab 100644 --- a/sim-rs/CHANGELOG.md +++ b/sim-rs/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Linear Leios +- Add some protocol-level tests +- Fix bug; transactions with conflicts referenced by EBs did not propagate far enough + ## v1.3.0 ### Linear Leios diff --git a/sim-rs/Cargo.lock b/sim-rs/Cargo.lock index fd0f17fb4..f249e739a 100644 --- a/sim-rs/Cargo.lock +++ b/sim-rs/Cargo.lock @@ -258,6 +258,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "ctrlc" version = "3.4.7" @@ -303,6 +309,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.4.0" @@ -517,6 +537,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -542,7 +568,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1245,6 +1271,7 @@ version = "1.3.0" dependencies = [ "anyhow", "async-stream", + "dashmap", "futures", "netsim-async", "num-traits", @@ -1253,6 +1280,7 @@ dependencies = [ "rand_chacha 0.9.0", "rand_distr 0.5.1", "serde", + "serde_yaml", "tokio", "tokio-util", "tracing", diff --git a/sim-rs/sim-core/Cargo.toml b/sim-rs/sim-core/Cargo.toml index 6ca59b835..c20bb7836 100644 --- a/sim-rs/sim-core/Cargo.toml +++ b/sim-rs/sim-core/Cargo.toml @@ -7,6 +7,7 @@ rust-version = "1.88" [dependencies] anyhow = "1" async-stream = "0.3" +dashmap = "6" futures = "0.3" netsim-async = { git = "https://github.com/input-output-hk/ce-netsim.git", rev = "9d1e26c" } num-traits = "0.2" @@ -20,4 +21,5 @@ tokio-util = "0.7" tracing = "0.1" [dev-dependencies] +serde_yaml = "0.9" tokio = { version = "1", features = ["macros", "rt"] } diff --git a/sim-rs/sim-core/src/clock.rs b/sim-rs/sim-core/src/clock.rs index 0c4485fe0..895739ae4 100644 --- a/sim-rs/sim-core/src/clock.rs +++ b/sim-rs/sim-core/src/clock.rs @@ -10,11 +10,13 @@ use std::{ pub use coordinator::ClockCoordinator; use coordinator::ClockEvent; use futures::FutureExt; +pub use mock::MockClockCoordinator; use timestamp::AtomicTimestamp; pub use timestamp::Timestamp; use tokio::sync::{mpsc, oneshot}; mod coordinator; +mod mock; mod timestamp; // wrapper struct which holds a SimulationEvent, diff --git a/sim-rs/sim-core/src/clock/mock.rs b/sim-rs/sim-core/src/clock/mock.rs new file mode 100644 index 000000000..c29a68f1c --- /dev/null +++ b/sim-rs/sim-core/src/clock/mock.rs @@ -0,0 +1,106 @@ +use std::{ + collections::HashMap, + sync::{Arc, atomic::AtomicUsize}, + time::Duration, +}; + +use tokio::sync::{mpsc, oneshot}; + +use crate::clock::{ + Clock, TaskInitiator, Timestamp, coordinator::ClockEvent, timestamp::AtomicTimestamp, +}; + +pub struct MockClockCoordinator { + time: Arc, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + waiter_count: Arc, + tasks: Arc, + waiters: HashMap, +} + +impl Default for MockClockCoordinator { + fn default() -> Self { + Self::new() + } +} + +impl MockClockCoordinator { + pub fn new() -> Self { + let time = Arc::new(AtomicTimestamp::new(Timestamp::zero())); + let (tx, rx) = mpsc::unbounded_channel(); + let waiter_count = Arc::new(AtomicUsize::new(0)); + let tasks = Arc::new(AtomicUsize::new(0)); + Self { + time, + tx, + rx, + waiter_count, + tasks, + waiters: HashMap::new(), + } + } + + pub fn clock(&self) -> Clock { + Clock::new( + Duration::from_nanos(1), + self.time.clone(), + self.waiter_count.clone(), + TaskInitiator::new(self.tasks.clone()), + self.tx.clone(), + ) + } + + pub fn now(&self) -> Timestamp { + self.time.load(std::sync::atomic::Ordering::Acquire) + } + + pub fn advance_time(&mut self, until: Timestamp) { + while let Ok(event) = self.rx.try_recv() { + match event { + ClockEvent::Wait { actor, until, done } => { + if self.waiters.insert(actor, Waiter { until, done }).is_some() { + panic!("waiter {actor} waited twice"); + } + } + ClockEvent::CancelWait { actor } => { + if self.waiters.remove(&actor).is_none() { + panic!("waiter {actor} cancelled a wait twice"); + } + } + ClockEvent::FinishTask => { + if self.tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel) == 0 { + panic!("cancelled too many tasks"); + } + } + } + } + assert_eq!( + self.waiters.len(), + self.waiter_count.load(std::sync::atomic::Ordering::Acquire), + "not every worker is waiting for time to pass" + ); + + self.time.store(until, std::sync::atomic::Ordering::Release); + self.waiters = std::mem::take(&mut self.waiters) + .into_iter() + .filter_map(|(actor, waiter)| { + if let Some(t) = &waiter.until { + if *t < until { + panic!("advanced time too far (waited for {until:?}, next event at {t:?})"); + } + if *t == until { + let _ = waiter.done.send(()); + return None; + } + } + Some((actor, waiter)) + }) + .collect(); + } +} + +struct Waiter { + until: Option, + done: oneshot::Sender<()>, +} diff --git a/sim-rs/sim-core/src/model.rs b/sim-rs/sim-core/src/model.rs index d4ddc2040..7f9ce8330 100644 --- a/sim-rs/sim-core/src/model.rs +++ b/sim-rs/sim-core/src/model.rs @@ -84,7 +84,7 @@ impl Block { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct LinearRankingBlockHeader { pub id: BlockId, pub vrf: u64, @@ -93,7 +93,7 @@ pub struct LinearRankingBlockHeader { pub eb_announcement: Option, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct LinearRankingBlock { pub header: LinearRankingBlockHeader, pub transactions: Vec>, @@ -227,7 +227,7 @@ impl StracciatellaEndorserBlock { } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct LinearEndorserBlock { pub slot: u64, pub producer: NodeId, @@ -293,7 +293,7 @@ pub enum TransactionLostReason { EBExpired, } -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] pub struct Endorsement { pub eb: EndorserBlockId, pub size_bytes: u64, diff --git a/sim-rs/sim-core/src/sim.rs b/sim-rs/sim-core/src/sim.rs index ac9e0e6b8..937c40caa 100644 --- a/sim-rs/sim-core/src/sim.rs +++ b/sim-rs/sim-core/src/sim.rs @@ -28,6 +28,8 @@ mod linear_leios; mod lottery; mod slot; mod stracciatella; +#[cfg(test)] +mod tests; mod tx; enum NetworkWrapper { @@ -304,6 +306,13 @@ impl Default for EventResult { } impl EventResult { + #[cfg(test)] + pub fn merge(&mut self, mut other: EventResult) { + self.messages.append(&mut other.messages); + self.tasks.append(&mut other.tasks); + self.timed_events.append(&mut other.timed_events); + } + pub fn send_to(&mut self, to: NodeId, msg: N::Message) { self.messages.push((to, msg)); } diff --git a/sim-rs/sim-core/src/sim/linear_leios.rs b/sim-rs/sim-core/src/sim/linear_leios.rs index 22b591325..d25856d19 100644 --- a/sim-rs/sim-core/src/sim/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/linear_leios.rs @@ -28,11 +28,11 @@ use crate::{ sim::{ MiniProtocol, NodeImpl, SimCpuTask, SimMessage, linear_leios::attackers::{EBWithholdingEvent, EBWithholdingSender}, - lottery, + lottery::{LotteryConfig, LotteryKind, MockLotteryResults, vrf_probabilities}, }, }; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Message { // TX propagation AnnounceTx(TransactionId), @@ -114,6 +114,7 @@ impl SimMessage for Message { } } +#[derive(Debug, Clone, PartialEq, Eq)] pub enum CpuTask { /// A transaction has been received and validated, and is ready to propagate TransactionValidated(NodeId, Arc), @@ -306,7 +307,7 @@ pub struct LinearLeiosNode { tracker: EventTracker, rng: ChaChaRng, clock: Clock, - stake: u64, + lottery: LotteryConfig, consumers: Vec, txs: HashMap, ledger_states: BTreeMap>, @@ -333,6 +334,11 @@ impl NodeImpl for LinearLeiosNode { rng: ChaChaRng, clock: Clock, ) -> Self { + let lottery = LotteryConfig::Random { + stake: config.stake, + total_stake: sim_config.total_stake, + }; + Self { id: config.id, sim_config, @@ -340,7 +346,7 @@ impl NodeImpl for LinearLeiosNode { tracker, rng, clock, - stake: config.stake, + lottery, consumers: config.consumers.clone(), txs: HashMap::new(), ledger_states: BTreeMap::new(), @@ -512,7 +518,10 @@ impl LinearLeiosNode { // Ranking block propagation impl LinearLeiosNode { fn try_generate_rb(&mut self, slot: u64) { - let Some(vrf) = self.run_vrf(self.sim_config.block_generation_probability) else { + let Some(vrf) = self.run_vrf( + LotteryKind::GenerateRB, + self.sim_config.block_generation_probability, + ) else { return; }; @@ -995,7 +1004,7 @@ impl LinearLeiosNode { if missing_txs.is_empty() { self.queued - .schedule_cpu_task(CpuTask::EBBlockValidated(eb, seen)); + .schedule_cpu_task(CpuTask::EBBlockValidated(eb.clone(), seen)); } else { for tx_id in missing_txs { self.leios @@ -1005,6 +1014,23 @@ impl LinearLeiosNode { .push(eb.id()); } } + + if matches!( + self.sim_config.variant, + LeiosVariant::LinearWithTxReferences + ) { + // If the EB references any TXs which we already have, but are not in our mempool, + // we must have failed to add them to the mempool due to conflicts. + // Announce those TXs to our peers, since we didn't before. + for tx in &eb.txs { + if !self.has_tx(tx.id) || self.praos.mempool.contains_key(&tx.id) { + continue; + } + for peer in &self.consumers { + self.queued.send_to(*peer, Message::AnnounceTx(tx.id)); + } + } + } } fn try_validating_eb(&mut self, eb_id: EndorserBlockId) { @@ -1204,8 +1230,8 @@ impl LinearLeiosNode { } fn try_vote_for_endorser_block(&mut self, eb: &Arc, seen: Timestamp) -> bool { - let vrf_wins = lottery::vrf_probabilities(self.sim_config.vote_probability) - .filter_map(|f| self.run_vrf(f)) + let vrf_wins = vrf_probabilities(self.sim_config.vote_probability) + .filter_map(|f| self.run_vrf(LotteryKind::GenerateVote, f)) .count(); if vrf_wins == 0 { return false; @@ -1487,18 +1513,12 @@ impl LinearLeiosNode { // Common utilities impl LinearLeiosNode { + #[allow(unused)] + pub fn mock_lottery(&mut self, results: Arc) { + self.lottery = LotteryConfig::Mock { results }; + } // Simulates the output of a VRF using this node's stake (if any). - fn run_vrf(&mut self, success_rate: f64) -> Option { - let target_vrf_stake = lottery::compute_target_vrf_stake( - self.stake, - self.sim_config.total_stake, - success_rate, - ); - let result = self.rng.random_range(0..self.sim_config.total_stake); - if result < target_vrf_stake { - Some(result) - } else { - None - } + fn run_vrf(&mut self, kind: LotteryKind, success_rate: f64) -> Option { + self.lottery.run(kind, success_rate, &mut self.rng) } } diff --git a/sim-rs/sim-core/src/sim/lottery.rs b/sim-rs/sim-core/src/sim/lottery.rs index bc86167ab..0df95e211 100644 --- a/sim-rs/sim-core/src/sim/lottery.rs +++ b/sim-rs/sim-core/src/sim/lottery.rs @@ -1,3 +1,9 @@ +use std::{collections::VecDeque, sync::Arc}; + +use dashmap::DashMap; +use rand::Rng; +use rand_chacha::ChaChaRng; + pub fn compute_target_vrf_stake(stake: u64, total_stake: u64, success_rate: f64) -> u64 { let ratio = stake as f64 / total_stake as f64; (total_stake as f64 * ratio * success_rate) as u64 @@ -7,3 +13,51 @@ pub fn vrf_probabilities(probability: f64) -> impl Iterator { let final_success_rate = Some(probability.fract()).filter(|f| *f > 0.0); std::iter::repeat_n(1.0, probability.trunc() as usize).chain(final_success_rate) } + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub enum LotteryKind { + GenerateRB, + GenerateVote, +} + +#[derive(Default)] +pub struct MockLotteryResults { + results: DashMap>, +} +impl MockLotteryResults { + pub fn run(&self, kind: LotteryKind) -> Option { + self.results.entry(kind).or_default().pop_front() + } + #[allow(unused)] + pub fn configure_win(&self, kind: LotteryKind, result: u64) { + self.results.entry(kind).or_default().push_back(result); + } +} + +pub enum LotteryConfig { + Random { + stake: u64, + total_stake: u64, + }, + #[allow(unused)] + Mock { + results: Arc, + }, +} + +impl LotteryConfig { + pub fn run(&self, kind: LotteryKind, success_rate: f64, rng: &mut ChaChaRng) -> Option { + match self { + Self::Random { stake, total_stake } => { + let target_vrf_stake = compute_target_vrf_stake(*stake, *total_stake, success_rate); + let result = rng.random_range(0..*total_stake); + if result < target_vrf_stake { + Some(result) + } else { + None + } + } + Self::Mock { results } => results.run(kind), + } + } +} diff --git a/sim-rs/sim-core/src/sim/tests/linear_leios.rs b/sim-rs/sim-core/src/sim/tests/linear_leios.rs new file mode 100644 index 000000000..df3f7eb5d --- /dev/null +++ b/sim-rs/sim-core/src/sim/tests/linear_leios.rs @@ -0,0 +1,523 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaChaRng; +use tokio::sync::mpsc; + +use crate::{ + clock::{Clock, MockClockCoordinator, Timestamp}, + config::{NodeId, RawLinkInfo, RawNode, RawTopology, SimConfiguration, TransactionConfig}, + events::{Event, EventTracker}, + model::{LinearEndorserBlock, LinearRankingBlock, Transaction, VoteBundle}, + sim::{ + EventResult, NodeImpl, + linear_leios::{CpuTask, LinearLeiosNode, Message, TimedEvent}, + lottery::{LotteryKind, MockLotteryResults}, + }, +}; + +fn new_sim_config(topology: RawTopology) -> Arc { + let mut params: crate::config::RawParameters = + serde_yaml::from_slice(include_bytes!("../../../../parameters/config.default.yaml")) + .unwrap(); + params.leios_variant = crate::config::LeiosVariant::LinearWithTxReferences; + // every transaction fills up exactly half of an RB + let tx_size = params.rb_body_max_size_bytes / 2; + params.tx_size_bytes_distribution = crate::config::DistributionConfig::Constant { + value: tx_size as f64, + }; + params.tx_max_size_bytes = tx_size; + let topology = topology.into(); + Arc::new(SimConfiguration::build(params, topology).unwrap()) +} + +fn new_sim( + sim_config: Arc, + event_tx: mpsc::UnboundedSender<(Event, Timestamp)>, + clock: Clock, +) -> ( + HashMap, + HashMap>, +) { + let tracker = EventTracker::new(event_tx, clock.clone(), &sim_config.nodes); + let mut rng = ChaChaRng::seed_from_u64(sim_config.seed); + let mut lottery = HashMap::new(); + let nodes = sim_config + .nodes + .iter() + .map(|config| { + let rng = ChaChaRng::seed_from_u64(rng.next_u64()); + let mut node = LinearLeiosNode::new( + config, + sim_config.clone(), + tracker.clone(), + rng, + clock.clone(), + ); + let lottery_results = Arc::new(MockLotteryResults::default()); + node.mock_lottery(lottery_results.clone()); + lottery.insert(config.id, lottery_results); + (config.id, node) + }) + .collect(); + (nodes, lottery) +} + +fn new_topology(nodes: Vec<(&'static str, RawNode)>) -> RawTopology { + RawTopology { + nodes: nodes + .into_iter() + .map(|(name, node)| (name.to_string(), node)) + .collect(), + } +} +fn new_node(stake: Option, producers: Vec<&'static str>) -> RawNode { + RawNode { + stake, + location: crate::config::RawNodeLocation::Cluster { + cluster: "all".into(), + }, + cpu_core_count: Some(4), + tx_conflict_fraction: None, + tx_generation_weight: None, + producers: producers + .iter() + .map(|n| { + ( + n.to_string(), + RawLinkInfo { + latency_ms: 0.0, + bandwidth_bytes_per_second: None, + }, + ) + }) + .collect(), + adversarial: None, + behaviours: vec![], + } +} + +struct TestDriver { + pub config: Arc, + rng: ChaChaRng, + slot: u64, + time: MockClockCoordinator, + nodes: HashMap, + lottery: HashMap>, + queued: HashMap>, + events: BTreeMap>, +} + +impl TestDriver { + fn new(topology: RawTopology) -> Self { + let config = new_sim_config(topology); + let rng = ChaChaRng::seed_from_u64(config.seed); + let slot = 0; + let time = MockClockCoordinator::new(); + let (event_tx, _event_rx) = mpsc::unbounded_channel(); + let (nodes, lottery) = new_sim(config.clone(), event_tx, time.clock()); + Self { + config, + rng, + slot, + time, + nodes, + lottery, + queued: HashMap::new(), + events: BTreeMap::new(), + } + } + + pub fn id_for(&self, name: &str) -> NodeId { + self.config + .nodes + .iter() + .find_map(|n| if n.name == name { Some(n.id) } else { None }) + .unwrap() + } + + pub fn now(&self) -> Timestamp { + self.time.now() + } + + pub fn produce_tx(&mut self, node_id: NodeId, conflict: bool) -> Arc { + let TransactionConfig::Real(tx_config) = &self.config.transactions else { + panic!("unexpected TX config") + }; + let tx = Arc::new(tx_config.new_tx(&mut self.rng, Some(if conflict { 1.0 } else { 0.0 }))); + let node = self.nodes.get_mut(&node_id).unwrap(); + let events = node.handle_new_tx(tx.clone()); + self.process_events(node_id, events); + tx + } + + pub fn win_next_rb_lottery(&mut self, node_id: NodeId, result: u64) { + self.lottery + .get(&node_id) + .unwrap() + .configure_win(LotteryKind::GenerateRB, result); + } + + pub fn win_next_vote_lottery(&mut self, node_id: NodeId, result: u64) { + self.lottery + .get(&node_id) + .unwrap() + .configure_win(LotteryKind::GenerateVote, result); + } + + pub fn next_slot(&mut self) { + self.advance_time_to(Timestamp::from_secs(self.slot + 1)); + } + + pub fn advance_time_to(&mut self, timestamp: Timestamp) { + let mut now = self.time.now(); + while now < timestamp { + let next_slot = self.slot + 1; + let next_slot_time = Timestamp::from_secs(next_slot); + let mut next_event = timestamp.min(next_slot_time); + if let Some((event_time, _)) = self.events.first_key_value() { + next_event = next_event.min(*event_time); + } + self.time.advance_time(next_event); + now = next_event; + + let mut updates: HashMap> = HashMap::new(); + if now == next_slot_time { + for (node_id, node) in &mut self.nodes { + let events = node.handle_new_slot(next_slot); + updates.entry(*node_id).or_default().merge(events); + } + self.slot = next_slot; + } + if let Some(events) = self.events.remove(&next_event) { + for (node_id, event) in events { + let node = self.nodes.get_mut(&node_id).unwrap(); + let events = node.handle_timed_event(event); + updates.entry(node_id).or_default().merge(events); + } + } + + for (node, events) in updates { + self.process_events(node, events); + } + } + } + + pub fn expect_tx_sent(&mut self, from: NodeId, to: NodeId, tx: Arc) { + self.expect_message(from, to, Message::AnnounceTx(tx.id)); + self.expect_message(to, from, Message::RequestTx(tx.id)); + self.expect_message(from, to, Message::Tx(tx.clone())); + self.expect_cpu_task(to, CpuTask::TransactionValidated(from, tx)); + } + + pub fn expect_tx_not_sent(&mut self, from: NodeId, to: NodeId, tx: Arc) { + self.expect_no_message(from, to, Message::AnnounceTx(tx.id)); + } + + pub fn expect_rb_and_eb_sent( + &mut self, + from: NodeId, + to: NodeId, + rb: Arc, + eb: Option>, + ) { + self.expect_message(from, to, Message::AnnounceRBHeader(rb.header.id)); + self.expect_message(to, from, Message::RequestRBHeader(rb.header.id)); + self.expect_message( + from, + to, + Message::RBHeader(rb.header.clone(), true, eb.is_some()), + ); + self.expect_cpu_task( + to, + CpuTask::RBHeaderValidated(from, rb.header.clone(), true, eb.is_some()), + ); + self.expect_message(to, from, Message::RequestRB(rb.header.id)); + self.expect_message(from, to, Message::RB(rb.clone())); + self.expect_cpu_task(to, CpuTask::RBBlockValidated(rb)); + if let Some(eb) = eb { + self.expect_message(to, from, Message::RequestEB(eb.id())); + self.expect_message(from, to, Message::EB(eb.clone())); + self.expect_cpu_task(to, CpuTask::EBHeaderValidated(from, eb)); + } + } + + pub fn expect_eb_validated(&mut self, node: NodeId, eb: Arc) { + self.expect_cpu_task(node, CpuTask::EBBlockValidated(eb, self.time.now())); + } + + pub fn expect_message( + &mut self, + from: NodeId, + to: NodeId, + message: ::Message, + ) { + let queued = self.queued.entry(from).or_default(); + let mut found = false; + queued.messages.retain(|(t, msg)| { + if t == &to && msg == &message { + found = true; + false + } else { + true + } + }); + assert!( + found, + "message {message:?} was not sent from {from} to {to}\npending messages: {:?}", + queued + .messages + .iter() + .filter(|(t, _)| t == &to) + .collect::>(), + ); + let events = self + .nodes + .get_mut(&to) + .unwrap() + .handle_message(from, message); + self.process_events(to, events); + } + + pub fn expect_no_message( + &mut self, + from: NodeId, + to: NodeId, + message: ::Message, + ) { + let Some(queued) = self.queued.get(&from) else { + return; + }; + for (t, m) in &queued.messages { + assert_ne!((t, m), (&to, &message)); + } + } + + pub fn expect_cpu_task(&mut self, node: NodeId, task: ::Task) { + self.expect_cpu_task_matching(node, |t| if *t == task { Some(t.clone()) } else { None }); + } + + pub fn expect_cpu_task_matching(&mut self, node: NodeId, matcher: M) -> T + where + M: Fn(&::Task) -> Option, + { + let queued = self.queued.entry(node).or_default(); + let mut result = None; + let mut events = EventResult::default(); + queued.tasks.retain(|t| { + if result.is_some() { + return true; + } + result = matcher(t); + if result.is_some() { + events = self + .nodes + .get_mut(&node) + .unwrap() + .handle_cpu_task(t.clone()); + } + result.is_none() + }); + self.process_events(node, events); + result.expect("no CPU tasks matching filter") + } + + fn process_events(&mut self, node: NodeId, mut events: EventResult) { + for (timestamp, event) in events.timed_events.drain(..) { + self.events + .entry(timestamp) + .or_default() + .push((node, event)); + } + self.queued.entry(node).or_default().merge(events); + } +} + +fn is_new_rb_task( + task: &CpuTask, +) -> Option<(Arc, Option>)> { + match task { + CpuTask::RBBlockGenerated(rb, eb) => Some(( + Arc::new(rb.clone()), + eb.as_ref().map(|(eb, _)| Arc::new(eb.clone())), + )), + _ => None, + } +} + +fn is_new_vote_task(task: &CpuTask) -> Option> { + match task { + CpuTask::VTBundleGenerated(vote, _) => Some(Arc::new(vote.clone())), + _ => None, + } +} + +#[test] +fn should_produce_rbs_without_ebs() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + // Node 1 produces a transaction, node 2 should request it + let tx1 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1.clone()); + + // Node 2 produces a transaction, node 1 should request it + let tx2 = sim.produce_tx(node2, false); + sim.expect_tx_sent(node2, node1, tx2.clone()); + + // When node 1 produces an RB, it should include both TXs + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + assert_eq!(new_rb.transactions, vec![tx1, tx2]); + assert_eq!(new_eb, None); + + sim.expect_rb_and_eb_sent(node1, node2, new_rb, None); +} + +#[test] +fn should_produce_rbs_and_ebs() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + // Node 1 produces three transactions, Node 2 should request them all + let tx1_1 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1_1.clone()); + let tx1_2 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1_2.clone()); + let tx1_3 = sim.produce_tx(node1, false); + sim.expect_tx_sent(node1, node2, tx1_3.clone()); + + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + assert_eq!(new_rb.transactions, vec![tx1_1, tx1_2]); + let new_eb = new_eb.expect("no EB produced"); + assert_eq!(new_eb.txs, vec![tx1_3]); + + sim.expect_rb_and_eb_sent(node1, node2, new_rb, Some(new_eb.clone())); + sim.expect_eb_validated(node2, new_eb); +} + +#[test] +fn should_not_propagate_conflicting_transactions() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + // Node 1 and 2 produce conflicting transactions + let tx1 = sim.produce_tx(node1, false); + let tx2 = sim.produce_tx(node2, true); + + // Each node should send its TX to the other node, + sim.expect_tx_sent(node1, node2, tx1.clone()); + sim.expect_tx_sent(node2, node1, tx2.clone()); + + // When node 1 produces an RB, it should include only its own TX + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + assert_eq!(new_rb.transactions, vec![tx1]); + assert_eq!(new_eb, None); +} + +#[test] +fn should_repropagate_conflicting_transactions_from_eb() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1", "node-3"])), + ("node-3", new_node(Some(1000), vec!["node-2"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + let node3 = sim.id_for("node-3"); + + // Node 1 produces 3 transactions + let tx1_1 = sim.produce_tx(node1, false); + let tx1_2 = sim.produce_tx(node1, false); + let tx1_3 = sim.produce_tx(node1, false); + + // Node 2 produces a transaction which conflicts with Node 1's final transaction + let tx2 = sim.produce_tx(node2, true); + // Node 2 sends its transactions to nodes 1 and 3 + sim.expect_tx_sent(node2, node1, tx2.clone()); + sim.expect_tx_sent(node2, node3, tx2.clone()); + + // Node 1 sends all of its transactions to node 2 + sim.expect_tx_sent(node1, node2, tx1_1.clone()); + sim.expect_tx_sent(node1, node2, tx1_2.clone()); + sim.expect_tx_sent(node1, node2, tx1_3.clone()); + + // Node 2 sends the first two transactions to node 3, but not the conflicting third + sim.expect_tx_sent(node2, node3, tx1_1.clone()); + sim.expect_tx_sent(node2, node3, tx1_2.clone()); + sim.expect_tx_not_sent(node2, node3, tx1_3.clone()); + + // Now, Node 1 produces an RB (with an EB, because there are enough transactions) + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (rb, eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + let eb = eb.expect("node did not produce EB"); + assert_eq!(rb.transactions, vec![tx1_1, tx1_2]); + assert_eq!(eb.txs, vec![tx1_3.clone()]); + + // That RB and EB propagate from node 1 to node 2 + sim.expect_rb_and_eb_sent(node1, node2, rb.clone(), Some(eb.clone())); + // Node 2 fully validates the EB, because node 1 has all TXs + sim.expect_eb_validated(node2, eb.clone()); + // And Node 2 propagates it to Node 3 + sim.expect_rb_and_eb_sent(node2, node3, rb.clone(), Some(eb.clone())); + + // and NOW Node 2 will tell Node 3 about the EB's conflicting TX + sim.expect_tx_sent(node2, node3, tx1_3); + sim.expect_eb_validated(node3, eb); +} + +#[test] +fn should_vote_for_eb() { + let topology = new_topology(vec![ + ("node-1", new_node(Some(1000), vec!["node-2"])), + ("node-2", new_node(Some(1000), vec!["node-1"])), + ]); + let mut sim = TestDriver::new(topology); + let node1 = sim.id_for("node-1"); + let node2 = sim.id_for("node-2"); + + let txs = (0..3) + .map(|_| sim.produce_tx(node1, false)) + .collect::>(); + for tx in &txs { + sim.expect_tx_sent(node1, node2, tx.clone()); + } + + sim.win_next_rb_lottery(node1, 0); + sim.next_slot(); + let (rb, eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task); + let eb = eb.expect("node did not produce EB"); + + sim.expect_rb_and_eb_sent(node1, node2, rb.clone(), Some(eb.clone())); + sim.expect_eb_validated(node2, eb.clone()); + + sim.win_next_vote_lottery(node2, 0); + sim.advance_time_to(sim.now() + (sim.config.header_diffusion_time * 3)); + let vote = sim.expect_cpu_task_matching(node2, is_new_vote_task); + assert_eq!(*vote.ebs.first_key_value().unwrap().0, eb.id()); +} diff --git a/sim-rs/sim-core/src/sim/tests/mod.rs b/sim-rs/sim-core/src/sim/tests/mod.rs new file mode 100644 index 000000000..4fade2dfb --- /dev/null +++ b/sim-rs/sim-core/src/sim/tests/mod.rs @@ -0,0 +1 @@ +mod linear_leios;