diff --git a/Cargo.lock b/Cargo.lock index 598bf56916d..57c1211bd5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1293,6 +1293,7 @@ dependencies = [ "lru 0.12.5", "mockall", "prost", + "rand 0.8.5", "rstest", "serde", "starknet-types-core", diff --git a/crates/apollo_consensus/Cargo.toml b/crates/apollo_consensus/Cargo.toml index 550f714ed89..ecf0f0d6071 100644 --- a/crates/apollo_consensus/Cargo.toml +++ b/crates/apollo_consensus/Cargo.toml @@ -45,6 +45,7 @@ apollo_test_utils.workspace = true assert_matches.workspace = true enum-as-inner.workspace = true mockall.workspace = true +rand.workspace = true rstest.workspace = true tempfile.workspace = true test-case.workspace = true diff --git a/crates/apollo_consensus/src/simulation_test.rs b/crates/apollo_consensus/src/simulation_test.rs new file mode 100644 index 00000000000..a32bb1c73d8 --- /dev/null +++ b/crates/apollo_consensus/src/simulation_test.rs @@ -0,0 +1,538 @@ +//! Discrete event simulation test for consensus protocol. +//! +//! This test uses a discrete event simulation approach with a timeline-based +//! event queue. +//! +//! Messages are scheduled with random delays to simulate network jitter. + +use std::cmp::Ordering; +use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; +use std::ops::Range; + +use apollo_consensus_config::config::TimeoutsConfig; +use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType}; +use lazy_static::lazy_static; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use starknet_api::block::BlockNumber; +use starknet_types_core::felt::Felt; + +use crate::single_height_consensus::SingleHeightConsensus; +use crate::state_machine::{SMRequest, StateMachineEvent, Step}; +use crate::types::{Decision, ProposalCommitment, Round, ValidatorId}; +use crate::votes_threshold::QuorumType; + +const HEIGHT_0: BlockNumber = BlockNumber(0); +const PROPOSAL_COMMITMENT: ProposalCommitment = ProposalCommitment(Felt::ONE); +const TOTAL_NODES: usize = 100; +const THRESHOLD: usize = (2 * TOTAL_NODES / 3) + 1; +const NODE_0_LEADER_PROBABILITY: f64 = 0.1; + +// Timing configuration (all values in ticks) +// +// NOTE: These timing ranges are NOT strict protocol requirements. Messages can arrive +// at any time and rounds can overlap freely. The timing model exists only to create +// realistic clustering of messages - most votes for a given round happen near each other. +// +// The ONLY hard constraints that matter for correctness are: +// 1. Each node sends precommit AFTER prevote (for the same round) +// 2. Honest nodes send votes AFTER seeing the proposal (when NODE_0 is the proposer) +// +// Everything else (round boundaries, overlap, etc.) is just for simulation realism. +const ROUND_DURATION: u64 = 100; // Each round spans 100 ticks +const ROUND_OVERLAP_PERCENT: u64 = 20; // Overlap between rounds as percentage of ROUND_DURATION +const ROUND_OVERLAP: u64 = ROUND_DURATION * ROUND_OVERLAP_PERCENT / 100; + +// Network delays for messages (min..max ticks) +const PROPOSAL_ARRIVAL_DELAY_RANGE: Range = 2..20; +const PREVOTE_ARRIVAL_DELAY_RANGE: Range = 20..50; +const PRECOMMIT_AFTER_PREVOTE_DELAY_RANGE: Range = 5..20; + +// Processing delays +const VALIDATION_DELAY_RANGE: Range = 15..30; // Time to validate a proposal +const BUILD_PROPOSAL_DELAY_RANGE: Range = 15..30; // Time to build a proposal + +// Timeout delays +const TIMEOUT_PROPOSE_DELAY_RANGE: Range = 15..30; // Timeout for propose step +const TIMEOUT_PREVOTE_DELAY_RANGE: Range = 5..10; // Timeout for prevote step +const TIMEOUT_PRECOMMIT_DELAY_RANGE: Range = 5..10; // Timeout for precommit step + +lazy_static! { + static ref NODE_0: ValidatorId = ValidatorId::from(0u64); +} + +/// Represents an input event in the simulation. +#[derive(Debug, Clone)] +enum InputEvent { + /// A vote message from peer node. + Vote(Vote), + /// A proposal message. + Proposal(ProposalInit), + /// An internal event. + Internal(StateMachineEvent), +} + +/// A timed event in the discrete event simulation. +/// +/// Events are ordered by ascending tick (earliest first). +#[derive(Debug)] +struct TimedEvent { + /// The simulation tick at which this event should occur. + tick: u64, + /// The event to process. + event: InputEvent, +} + +impl PartialEq for TimedEvent { + fn eq(&self, other: &Self) -> bool { + self.tick == other.tick + } +} + +impl Eq for TimedEvent {} + +impl PartialOrd for TimedEvent { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimedEvent { + fn cmp(&self, other: &Self) -> Ordering { + other.tick.cmp(&self.tick) + } +} + +/// Discrete event simulation for consensus protocol. +/// +/// Uses a timeline-based approach where events are scheduled at specific +/// ticks and processed in chronological order. +struct DiscreteEventSimulation { + /// Random number generator for scheduling delays. + rng: StdRng, + /// The seed used to initialize the simulation. + seed: u64, + /// The single height consensus instance. + shc: SingleHeightConsensus, + /// All validators in the network. + validators: Vec, + /// Priority queue of pending events that have yet to be processed (min-heap by tick). + pending_events: BinaryHeap, + /// Current simulation tick. + current_tick: u64, + /// History of all processed events. + processed_history: Vec, + /// Tracks what the node actually voted for in each round. + node_votes: HashMap, + /// Number of rounds to pre-generate. + num_rounds: usize, + /// Tracks which rounds NODE_0 is the proposer. + node_0_proposer_rounds: HashSet, +} + +impl DiscreteEventSimulation { + fn new(total_nodes: usize, seed: u64, num_rounds: usize) -> Self { + let rng = StdRng::seed_from_u64(seed); + let validators: Vec = + (0..total_nodes).map(|i| ValidatorId::from(u64::try_from(i).unwrap())).collect(); + + let shc = SingleHeightConsensus::new( + HEIGHT_0, + false, + *NODE_0, + validators.clone(), + QuorumType::Byzantine, + TimeoutsConfig::default(), + ); + + Self { + rng, + seed, + shc, + validators, + pending_events: BinaryHeap::new(), + current_tick: 0, + processed_history: Vec::new(), + node_votes: HashMap::new(), + num_rounds, + node_0_proposer_rounds: HashSet::new(), + } + } + + /// Probabilistically selects a leader for the given round. + /// Node 0 (the one under test) has probability NODE_0_LEADER_PROBABILITY of being selected. + /// Other nodes share the remaining probability (1 - NODE_0_LEADER_PROBABILITY) uniformly. + /// The selection is deterministic per round - the same round will always return the same + /// leader. + fn get_leader(seed: u64, round: Round) -> ValidatorId { + let round_u64 = u64::from(round); + let round_seed = seed.wrapping_mul(31).wrapping_add(round_u64); + let mut round_rng = StdRng::seed_from_u64(round_seed); + + let random_value: f64 = round_rng.gen(); + + if random_value < NODE_0_LEADER_PROBABILITY { + *NODE_0 + } else { + let idx = round_rng.gen_range(1..TOTAL_NODES); + ValidatorId::from(u64::try_from(idx).unwrap()) + } + } + + /// Schedules an event to occur at the specified absolute tick. + fn schedule_at_tick(&mut self, tick: u64, event: InputEvent) { + self.pending_events.push(TimedEvent { tick, event }); + } + + /// Pre-generates all events for all requested rounds. + /// + /// Each round gets its own time range with minimal overlap. + /// For rounds where NODE_0 is the proposer, peer votes are scheduled after + /// the build finish event (which will be determined dynamically during simulation). + fn pre_generate_all_rounds(&mut self) { + for round_idx in 0..self.num_rounds { + let round = Round::from(u32::try_from(round_idx).unwrap()); + let leader_id = Self::get_leader(self.seed, round); + // Track rounds where NODE_0 is the proposer. + // We will schedule peer votes for these rounds after the build finish event. + if leader_id == *NODE_0 { + self.node_0_proposer_rounds.insert(round); + continue; + } + + // Determine time range for this round + let round_start_tick = + u64::try_from(round_idx).unwrap() * (ROUND_DURATION - ROUND_OVERLAP); + + // 1. Proposal from leader + let proposal_tick = round_start_tick + self.rng.gen_range(PROPOSAL_ARRIVAL_DELAY_RANGE); + self.schedule_at_tick( + proposal_tick, + InputEvent::Proposal(ProposalInit { + height: HEIGHT_0, + round, + proposer: leader_id, + valid_round: None, + }), + ); + + // 2. Votes from other honest validators + self.schedule_peer_votes(round, round_start_tick); + } + } + + /// Schedules honest peer votes for a round. Votes are scheduled within the round time range. + /// + /// Note: The timing constraints here are not strict - in reality, votes can arrive at any + /// time and may overlap with other rounds. The timing ranges just create realistic clustering. + /// The only real constraint enforced is: precommit_tick > prevote_tick (same voter). + fn schedule_peer_votes(&mut self, round: Round, round_start_tick: u64) { + let round_end_tick = round_start_tick + ROUND_DURATION; + // Skip index 0 (self) - our votes are handled by the state machine + for i in 1..self.validators.len() { + let voter = self.validators[i]; + let commitment = Some(PROPOSAL_COMMITMENT); + + let prevote_tick = round_start_tick + + self.rng.gen_range(PREVOTE_ARRIVAL_DELAY_RANGE).min(round_end_tick - 1); + let precommit_tick = prevote_tick + + self.rng.gen_range(PRECOMMIT_AFTER_PREVOTE_DELAY_RANGE).min(round_end_tick); + + self.schedule_at_tick( + prevote_tick, + InputEvent::Vote(Vote { + vote_type: VoteType::Prevote, + height: HEIGHT_0, + round, + proposal_commitment: commitment, + voter, + }), + ); + + self.schedule_at_tick( + precommit_tick, + InputEvent::Vote(Vote { + vote_type: VoteType::Precommit, + height: HEIGHT_0, + round, + proposal_commitment: commitment, + voter, + }), + ); + } + } + + /// Runs the simulation until a decision is reached or the deadline is exceeded. + /// + /// Returns `Some(Decision)` if consensus is reached, `None` if the deadline + /// is reached without a decision. + fn run(&mut self, deadline_ticks: u64) -> Option { + // Pre-generate all rounds events + self.pre_generate_all_rounds(); + + let seed = self.seed; + let leader_fn = move |r: Round| Self::get_leader(seed, r); + + // Start the single height consensus + let requests = self.shc.start(&leader_fn); + if let Some(decision) = self.handle_requests(requests) { + return Some(decision); + } + + // Main event loop + while let Some(timed_event) = self.pending_events.pop() { + if timed_event.tick > deadline_ticks { + break; + } + + self.current_tick = timed_event.tick; + self.processed_history.push(timed_event.event.clone()); + + // Process the event + let requests = match timed_event.event { + InputEvent::Vote(v) => self.shc.handle_vote(&leader_fn, v), + InputEvent::Proposal(p) => self.shc.handle_proposal(&leader_fn, p), + InputEvent::Internal(e) => self.shc.handle_event(&leader_fn, e), + }; + + if let Some(decision) = self.handle_requests(requests) { + return Some(decision); + } + } + + None + } + + /// Handles state machine requests by scheduling appropriate events. + /// + /// This simulates the manager's role in handling consensus requests, + /// such as validation results, proposal building, and timeouts. + /// Also tracks BroadcastVote requests to know what the node actually voted for. + fn handle_requests(&mut self, reqs: VecDeque) -> Option { + for req in reqs { + match req { + SMRequest::StartValidateProposal(init) => { + let delay = self.rng.gen_range(VALIDATION_DELAY_RANGE); + let validate_finish_tick = self.current_tick + delay; + let result = StateMachineEvent::FinishedValidation( + Some(PROPOSAL_COMMITMENT), + init.round, + None, + ); + self.schedule_at_tick(validate_finish_tick, InputEvent::Internal(result)); + } + SMRequest::StartBuildProposal(round) => { + let delay = self.rng.gen_range(BUILD_PROPOSAL_DELAY_RANGE); + let build_finish_tick = self.current_tick + delay; + let result = + StateMachineEvent::FinishedBuilding(Some(PROPOSAL_COMMITMENT), round); + self.schedule_at_tick(build_finish_tick, InputEvent::Internal(result)); + + // Schedule peer votes after build finish + assert!(self.node_0_proposer_rounds.contains(&round)); + self.schedule_peer_votes(round, build_finish_tick); + } + SMRequest::ScheduleTimeout(step, round) => { + let (delay, event) = match step { + Step::Propose => ( + self.rng.gen_range(TIMEOUT_PROPOSE_DELAY_RANGE), + StateMachineEvent::TimeoutPropose(round), + ), + Step::Prevote => ( + self.rng.gen_range(TIMEOUT_PREVOTE_DELAY_RANGE), + StateMachineEvent::TimeoutPrevote(round), + ), + Step::Precommit => ( + self.rng.gen_range(TIMEOUT_PRECOMMIT_DELAY_RANGE), + StateMachineEvent::TimeoutPrecommit(round), + ), + }; + let timeout_tick = self.current_tick + delay; + self.schedule_at_tick(timeout_tick, InputEvent::Internal(event)); + } + SMRequest::BroadcastVote(vote) => { + self.node_votes.insert(vote.round, vote); + } + SMRequest::DecisionReached(decision) => { + return Some(decision); + } + _ => { + // Ignore other request types + } + } + } + None + } +} + +fn verify_result(sim: &DiscreteEventSimulation, result: Option<&Decision>) { + #[derive(Default)] + struct RoundStats { + peer_precommits: usize, + finished_proposal: bool, + expected_commitment: ProposalCommitment, + } + + let mut stats: HashMap = HashMap::new(); + + // Aggregate stats from the processed history. + for event in &sim.processed_history { + match event { + // Track peer precommits. + InputEvent::Vote(v) => { + if let (VoteType::Precommit, Some(commitment)) = + (v.vote_type, v.proposal_commitment) + { + let entry = stats.entry(v.round).or_insert_with(|| RoundStats { + expected_commitment: PROPOSAL_COMMITMENT, + ..Default::default() + }); + if commitment == entry.expected_commitment { + entry.peer_precommits += 1; + } + } + } + // Track proposal knowledge. + InputEvent::Internal(StateMachineEvent::FinishedValidation(c, r, _)) + | InputEvent::Internal(StateMachineEvent::FinishedBuilding(c, r)) => { + if let Some(proposal_commitment) = *c { + let entry = stats.entry(*r).or_insert_with(|| RoundStats { + expected_commitment: PROPOSAL_COMMITMENT, + ..Default::default() + }); + if proposal_commitment == entry.expected_commitment { + entry.finished_proposal = true; + } + } + } + _ => {} + } + } + + // 2. Determine Expected Decision + // Use the actual votes the node broadcast (from BroadcastVote requests) + // instead of inferring from timeouts + let mut expected_decision: Option<(Round, ProposalCommitment)> = None; + + for r in 0..sim.num_rounds { + let r = Round::from(u32::try_from(r).unwrap()); + if let Some(s) = stats.get(&r) { + // Check what the node actually voted for in this round + // If the node voted precommit for the valid commitment, count it + let expected_commitment = PROPOSAL_COMMITMENT; + let self_vote = sim + .node_votes + .get(&r) + .filter(|v| { + v.vote_type == VoteType::Precommit + && v.proposal_commitment == Some(expected_commitment) + }) + .iter() + .count(); + let total_precommits = s.peer_precommits + self_vote; + + if s.finished_proposal && total_precommits >= THRESHOLD { + expected_decision = Some((r, expected_commitment)); + break; + } + } + } + + // 3. Compare with Actual Result + match (result, expected_decision) { + (Some(actual), Some((expected_round, expected_commitment))) => { + let decided_round = actual.precommits[0].round; + let decided_block = actual.block; + assert_eq!( + decided_block, expected_commitment, + "Decision block mismatch. History: {:?}", + sim.processed_history + ); + assert_eq!( + decided_round, expected_round, + "Decision round mismatch expected: {:?}, actual: {:?}. History: {:?}", + expected_round, decided_round, sim.processed_history + ); + + // 4. Verify that decision precommits are all in history (or are the node's own vote) + // Collect all precommits from processed_history + let history_precommits: HashSet<_> = sim + .processed_history + .iter() + .filter_map(|e| { + if let InputEvent::Vote(v) = e { + if v.vote_type == VoteType::Precommit + && v.round == decided_round + && v.proposal_commitment == Some(decided_block) + { + Some(v.clone()) + } else { + None + } + } else { + None + } + }) + .collect(); + + let decision_precommits: HashSet<_> = actual.precommits.iter().cloned().collect(); + + // Check that the difference (decision precommits not in history) is only the node's + // vote + let diff: HashSet<_> = decision_precommits.difference(&history_precommits).collect(); + // Get the node's own vote (if it voted for this round/proposal) + let expected_diff: HashSet<_> = sim + .node_votes + .get(&decided_round) + .filter(|v| { + v.vote_type == VoteType::Precommit + && v.proposal_commitment == Some(decided_block) + }) + .iter() + .cloned() + .collect(); + assert_eq!( + diff, expected_diff, + "Decision has precommits not in history that don't match node vote. Diff: {:?}, \ + History: {:?}, Decision: {:?}", + diff, history_precommits, actual.precommits + ); + + // Verify quorum threshold is met + assert!( + actual.precommits.len() >= THRESHOLD, + "Insufficient precommits in decision: {}/{}. Decision: {:?}, History: {:?}", + actual.precommits.len(), + THRESHOLD, + actual, + sim.processed_history + ); + } + (None, None) => { + // SUCCESS: No decision reached. History confirms conditions were never met. + } + _ => { + panic!( + "FAILURE: returned {result:?}, expected {expected_decision:?}. History: {:?}", + sim.processed_history + ); + } + } +} + +#[test] +fn test_honest_nodes_only() { + let seed = rand::thread_rng().gen(); + let num_rounds = 1; // Number of rounds to pre-generate + println!( + "Running consensus simulation with total nodes {TOTAL_NODES}, {num_rounds} rounds, and \ + seed: {seed}" + ); + + let mut sim = DiscreteEventSimulation::new(TOTAL_NODES, seed, num_rounds); + + let deadline_ticks = u64::try_from(num_rounds).unwrap() * ROUND_DURATION; + let result = sim.run(deadline_ticks); + + verify_result(&sim, result.as_ref()); +} diff --git a/crates/apollo_consensus/src/single_height_consensus.rs b/crates/apollo_consensus/src/single_height_consensus.rs index b6aaf2c79d5..8c2fb8b966d 100644 --- a/crates/apollo_consensus/src/single_height_consensus.rs +++ b/crates/apollo_consensus/src/single_height_consensus.rs @@ -8,6 +8,10 @@ #[path = "single_height_consensus_test.rs"] mod single_height_consensus_test; +#[cfg(test)] +#[path = "simulation_test.rs"] +mod simulation_test; + use std::collections::{HashSet, VecDeque}; use crate::state_machine::VoteStatus;