diff --git a/data/simulation/config.d.ts b/data/simulation/config.d.ts index a96a192ee..6a83d929a 100644 --- a/data/simulation/config.d.ts +++ b/data/simulation/config.d.ts @@ -63,6 +63,10 @@ export interface Config { * If true, transactions will be removed from the Leios mempool if they conflict with in-flight IBs. */ "leios-mempool-aggressive-pruning": boolean; + /** + * The maximum size of a mempool, in bytes + */ + "leios-mempool-size-bytes"?: bigint | null; /** * Praos blockchain quality parameter. * This is η from the Leios paper. diff --git a/data/simulation/config.default.yaml b/data/simulation/config.default.yaml index d42070eb9..19635ac04 100644 --- a/data/simulation/config.default.yaml +++ b/data/simulation/config.default.yaml @@ -32,6 +32,7 @@ leios-header-diffusion-time-ms: 1000.0 leios-ib-generation-time-ms: 130.0 leios-mempool-sampling-strategy: ordered-by-id leios-mempool-aggressive-pruning: false +leios-mempool-size-bytes: null # TODO: revise default praos-chain-quality: 40 praos-fallback-enabled: true diff --git a/data/simulation/config.schema.json b/data/simulation/config.schema.json index a662b5018..7ec47731b 100644 --- a/data/simulation/config.schema.json +++ b/data/simulation/config.schema.json @@ -383,6 +383,12 @@ "$ref": "#/definitions/MempoolSamplingStrategy", "description": "The strategy to use when selecting TXs from the Leios mempool." }, + "leios-mempool-size-bytes": { + "additionalProperties": false, + "description": "The maximum size of a mempool, in bytes", + "properties": {}, + "type": "number" + }, "leios-stage-active-voting-slots": { "additionalProperties": false, "properties": {}, diff --git a/sim-rs/CHANGELOG.md b/sim-rs/CHANGELOG.md index 765db9a56..ac902fb4e 100644 --- a/sim-rs/CHANGELOG.md +++ b/sim-rs/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## v1.4.0 + +### Linear Leios + +- Add bounded mempools. Configure them with by setting `leios-mempool-size-bytes`. Incoming transactions which don't fit in the mempool will be queued for inclusion in the mempool when there is space. Transactions referenced by an EB which are not yet in the mempool will still be forwarded to peers. + +### Other + +- Fix warnings from new rust version + ## v1.3.1 ### Linear Leios diff --git a/sim-rs/Cargo.lock b/sim-rs/Cargo.lock index 39292683f..d9f9a6609 100644 --- a/sim-rs/Cargo.lock +++ b/sim-rs/Cargo.lock @@ -1229,7 +1229,7 @@ dependencies = [ [[package]] name = "sim-cli" -version = "1.3.1" +version = "1.4.0" dependencies = [ "anyhow", "async-compression", @@ -1257,7 +1257,7 @@ dependencies = [ [[package]] name = "sim-core" -version = "1.3.1" +version = "1.4.0" dependencies = [ "anyhow", "async-stream", diff --git a/sim-rs/implementations/LINEAR_LEIOS.md b/sim-rs/implementations/LINEAR_LEIOS.md index 79448ad3c..b300b4202 100644 --- a/sim-rs/implementations/LINEAR_LEIOS.md +++ b/sim-rs/implementations/LINEAR_LEIOS.md @@ -50,6 +50,12 @@ When a node creates an RB, it will follow these steps in order: When a node receives an RB body, it immediately removes all referenced/conflicting transactions from its mempool. If the RB has an EB certificate, it also removes that EB’s transactions from its mempool. If the certified EB arrives after the RB body, we remove its TXs from the mempool once it arrives. +### Bounded mempools + +The mempool can be configured with an optional size limit, through the `leios-mempool-size-bytes` parameter. When a node tries adding a transaction to a full mempool, the transaction will go into an (unbounded) queue instead. Nodes will only announce transactions to their peers once those transactions have actually reached the mempool. + +If a node has received an EB which references transactions, and those transactions are in the queue but not yet in the mempool, the node will announce those transactions to its peer as well. This is to simulate the behavior of the real protocol, where nodes may request transactions from an EB separately from the TxSubmission mini-protocol. + ## New parameters |Name|Description|Default value| diff --git a/sim-rs/sim-cli/Cargo.toml b/sim-rs/sim-cli/Cargo.toml index 93f3545f9..464848701 100644 --- a/sim-rs/sim-cli/Cargo.toml +++ b/sim-rs/sim-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sim-cli" -version = "1.3.1" +version = "1.4.0" edition = "2024" default-run = "sim-cli" rust-version = "1.88" diff --git a/sim-rs/sim-cli/src/events/aggregate.rs b/sim-rs/sim-cli/src/events/aggregate.rs index c591d3158..0c2c30e7e 100644 --- a/sim-rs/sim-cli/src/events/aggregate.rs +++ b/sim-rs/sim-cli/src/events/aggregate.rs @@ -232,7 +232,7 @@ impl TraceAggregator { let new_chunk = (event.time_s - Timestamp::zero()).as_millis() / 250; self.current_time = event.time_s; if current_chunk != new_chunk { - if new_chunk % 4 == 0 { + if new_chunk.is_multiple_of(4) { let timestamp = Timestamp::from_secs((new_chunk / 4) as u64); self.tx_counts.push(self.produce_tx_counts(timestamp)); } diff --git a/sim-rs/sim-core/Cargo.toml b/sim-rs/sim-core/Cargo.toml index d83e366bd..35f361b01 100644 --- a/sim-rs/sim-core/Cargo.toml +++ b/sim-rs/sim-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sim-core" -version = "1.3.1" +version = "1.4.0" edition = "2024" rust-version = "1.88" diff --git a/sim-rs/sim-core/src/config.rs b/sim-rs/sim-core/src/config.rs index 455cf15e6..b8455c30e 100644 --- a/sim-rs/sim-core/src/config.rs +++ b/sim-rs/sim-core/src/config.rs @@ -74,6 +74,7 @@ pub struct RawParameters { pub leios_ib_generation_time_ms: f64, pub leios_mempool_sampling_strategy: MempoolSamplingStrategy, pub leios_mempool_aggressive_pruning: bool, + pub leios_mempool_size_bytes: Option, pub praos_chain_quality: u64, pub praos_fallback_enabled: bool, pub linear_vote_stage_length_slots: u64, @@ -716,6 +717,7 @@ pub struct SimConfiguration { pub(crate) relay_strategy: RelayStrategy, pub(crate) mempool_strategy: MempoolSamplingStrategy, pub(crate) mempool_aggressive_pruning: bool, + pub(crate) mempool_size_bytes: u64, pub(crate) praos_chain_quality: u64, pub(crate) block_generation_probability: f64, pub(crate) ib_generation_probability: f64, @@ -742,7 +744,7 @@ pub struct SimConfiguration { impl SimConfiguration { pub fn build(params: RawParameters, mut topology: Topology) -> Result { - if params.ib_shards % params.ib_shard_group_count != 0 { + if !params.ib_shards.is_multiple_of(params.ib_shard_group_count) { bail!( "ib-shards ({}) is not divisible by ib-shard-group-count ({})", params.ib_shards, @@ -751,7 +753,9 @@ impl SimConfiguration { } if matches!(params.leios_variant, LeiosVariant::FullWithoutIbs) && params.ib_shard_group_count != 1 - && params.ib_shard_period_length_slots % params.leios_stage_length_slots != 0 + && !params + .ib_shard_period_length_slots + .is_multiple_of(params.leios_stage_length_slots) { bail!( "Invalid sharding configuration. EBs are generated every {} slot(s). This sim is configured to choose EB shards from 1 of {} groups, using a different group every {} slot(s). Some groups would never be chosen.", @@ -782,6 +786,7 @@ impl SimConfiguration { relay_strategy: params.relay_strategy, mempool_strategy: params.leios_mempool_sampling_strategy, mempool_aggressive_pruning: params.leios_mempool_aggressive_pruning, + mempool_size_bytes: params.leios_mempool_size_bytes.unwrap_or(u64::MAX), praos_chain_quality: params.praos_chain_quality, block_generation_probability: params.rb_generation_probability, ib_generation_probability: params.ib_generation_probability, diff --git a/sim-rs/sim-core/src/sim/leios.rs b/sim-rs/sim-core/src/sim/leios.rs index 4c111c6d5..ff985d2bd 100644 --- a/sim-rs/sim-core/src/sim/leios.rs +++ b/sim-rs/sim-core/src/sim/leios.rs @@ -414,7 +414,7 @@ impl NodeImpl for LeiosNode { } fn handle_new_slot(&mut self, slot: u64) -> EventResult { - if slot % self.sim_config.stage_length == 0 { + if slot.is_multiple_of(self.sim_config.stage_length) { // A new stage has begun. // Decide how many votes to generate in each slot diff --git a/sim-rs/sim-core/src/sim/linear_leios.rs b/sim-rs/sim-core/src/sim/linear_leios.rs index 3fc4cafb7..3d238654f 100644 --- a/sim-rs/sim-core/src/sim/linear_leios.rs +++ b/sim-rs/sim-core/src/sim/linear_leios.rs @@ -259,7 +259,6 @@ impl RankingBlockView { #[derive(Default)] struct NodePraosState { - mempool: BTreeMap>, peer_heads: BTreeMap, blocks: BTreeMap, block_ids_by_slot: BTreeMap, @@ -310,6 +309,7 @@ pub struct LinearLeiosNode { lottery: LotteryConfig, consumers: Vec, txs: HashMap, + mempool: Mempool, ledger_states: BTreeMap>, praos: NodePraosState, leios: NodeLeiosState, @@ -338,6 +338,7 @@ impl NodeImpl for LinearLeiosNode { stake: config.stake, total_stake: sim_config.total_stake, }; + let mempool_max_size_bytes = sim_config.mempool_size_bytes; Self { id: config.id, @@ -349,6 +350,7 @@ impl NodeImpl for LinearLeiosNode { lottery, consumers: config.consumers.clone(), txs: HashMap::new(), + mempool: Mempool::new(mempool_max_size_bytes), ledger_states: BTreeMap::new(), praos: NodePraosState::default(), leios: NodeLeiosState::default(), @@ -1025,10 +1027,12 @@ impl LinearLeiosNode { LeiosVariant::LinearWithTxReferences ) { // If the EB references any TXs which we already have, but are not in our mempool, - // we must have failed to add them to the mempool due to conflicts. - // Announce those TXs to our peers, since we didn't before. + // either we must have failed to add them to the mempool due to conflicts, + // or they haven't reached the mempool _yet_. + // Announce those TXs to our peers, since either way we didn't before. + let mempool_ids = self.mempool.ids().collect::>(); for tx in &eb.txs { - if !self.has_tx(tx.id) || self.praos.mempool.contains_key(&tx.id) { + if !self.has_tx(tx.id) || mempool_ids.contains(&tx.id) { continue; } for peer in &self.consumers { @@ -1396,17 +1400,7 @@ impl LinearLeiosNode { return false; } - if self - .praos - .mempool - .values() - .any(|t| t.input_id == tx.input_id) - { - // This TX conflicts with something already in the mempool - return false; - } - self.praos.mempool.insert(tx.id, tx.clone()); - true + self.mempool.try_insert(tx.clone()) } fn sample_from_mempool( @@ -1416,7 +1410,7 @@ impl LinearLeiosNode { remove: bool, ) { let mut size = txs.iter().map(|tx| tx.bytes).sum::(); - let mut candidates: Vec<_> = self.praos.mempool.keys().copied().collect(); + let mut candidates: Vec<_> = self.mempool.ids().collect(); if matches!( self.sim_config.mempool_strategy, MempoolSamplingStrategy::Random @@ -1427,16 +1421,24 @@ impl LinearLeiosNode { } // Fill with as many pending transactions as can fit + let mut removed_ids = vec![]; while let Some(id) = candidates.pop() { - let tx = self.praos.mempool.get(&id).unwrap(); + let Some(TransactionView::Received(tx)) = self.txs.get(&id) else { + panic!("missing a TX in our mempool"); + }; if size + tx.bytes > max_size { break; } size += tx.bytes; + txs.push(tx.clone()); if remove { - txs.push(self.praos.mempool.remove(&id).unwrap()); - } else { - txs.push(tx.clone()); + removed_ids.push(tx.id); + } + } + for newly_queued_tx in self.mempool.remove_txs(removed_ids) { + for peer in &self.consumers { + self.queued + .send_to(*peer, Message::AnnounceTx(newly_queued_tx)); } } } @@ -1458,9 +1460,12 @@ impl LinearLeiosNode { fn remove_txs_from_mempool(&mut self, txs: &[Arc]) { let inputs = txs.iter().map(|tx| tx.input_id).collect::>(); - self.praos - .mempool - .retain(|_, tx| !inputs.contains(&tx.input_id)); + for newly_queued_tx in self.mempool.remove_conflicting_txs(&inputs) { + for peer in &self.consumers { + self.queued + .send_to(*peer, Message::AnnounceTx(newly_queued_tx)); + } + } } fn resolve_ledger_state(&mut self, rb_ref: Option) -> Arc { @@ -1532,3 +1537,196 @@ impl LinearLeiosNode { self.lottery.run(kind, success_rate, &mut self.rng) } } + +struct Mempool { + next_id: u64, + mempool_count: usize, + mempool_size_bytes: u64, + max_size_bytes: u64, + queue: BTreeMap>, + input_ids: HashSet, +} +impl Mempool { + fn new(max_size_bytes: u64) -> Self { + Self { + next_id: 0, + mempool_count: 0, + mempool_size_bytes: 0, + max_size_bytes, + queue: BTreeMap::new(), + input_ids: HashSet::new(), + } + } + fn try_insert(&mut self, tx: Arc) -> bool { + let new_bytes = self.mempool_size_bytes + tx.bytes; + if self.mempool_count < self.queue.len() || new_bytes > self.max_size_bytes { + // mempool is or would be full, just put this at the end and Be Done + let id = self.new_id(); + self.queue.insert(id, tx); + return false; + } + if self.input_ids.contains(&tx.input_id) { + // conflicts with something already in the mempool + return false; + } + + self.mempool_count += 1; + self.mempool_size_bytes = new_bytes; + self.input_ids.insert(tx.input_id); + let id = self.new_id(); + self.queue.insert(id, tx); + true + } + + fn ids(&self) -> impl Iterator { + self.queue.values().take(self.mempool_count).map(|tx| tx.id) + } + + // Removes a set of TXs from the mempool. + // Returns any previously-queued TXs now added to the mempool. + fn remove_txs(&mut self, ids: impl IntoIterator) -> Vec { + let id_set: HashSet = ids.into_iter().collect(); + if id_set.is_empty() { + return vec![]; + } + let mut new_mempool_count = self.mempool_count; + let mut full = false; + let mut newly_added = vec![]; + let mut seen_so_far = 0; + self.queue.retain(|_, tx| { + let seen = seen_so_far; + seen_so_far += 1; + if seen < self.mempool_count { + // we're iterating through the mempool + if !id_set.contains(&tx.id) { + return true; + } + // this is a transaction in the mempool which we want to remove + new_mempool_count -= 1; + self.mempool_size_bytes -= tx.bytes; + self.input_ids.remove(&tx.input_id); + false + } else { + // we're iterating through the queued TXs which aren't yet in the mempool + if self.input_ids.contains(&tx.input_id) { + // conflicts with the mempool, remove it at once + return false; + } + // add TXs until we're full + if !full { + let new_size = self.mempool_size_bytes + tx.bytes; + if new_size > self.max_size_bytes { + full = true; + } else { + new_mempool_count += 1; + self.mempool_size_bytes = new_size; + self.input_ids.insert(tx.input_id); + newly_added.push(tx.id); + } + } + true + } + }); + self.mempool_count = new_mempool_count; + newly_added + } + + fn remove_conflicting_txs(&mut self, input_ids: &HashSet) -> Vec { + let mut new_mempool_count = self.mempool_count; + let mut full = false; + let mut newly_added = vec![]; + let mut seen_so_far = 0; + self.queue.retain(|_, tx| { + let seen = seen_so_far; + seen_so_far += 1; + if seen < self.mempool_count { + // we're iterating through the mempool + if !input_ids.contains(&tx.input_id) { + return true; + } + // this is a transaction in the mempool which we want to remove + new_mempool_count -= 1; + self.mempool_size_bytes -= tx.bytes; + self.input_ids.remove(&tx.input_id); + false + } else { + // we're iterating through the queued TXs which aren't yet in the mempool + if self.input_ids.contains(&tx.input_id) || input_ids.contains(&tx.input_id) { + // conflicts with the ledger or the new mempool, remove it at once + return false; + } + // add TXs until we're full + if !full { + let new_size = self.mempool_size_bytes + tx.bytes; + if new_size > self.max_size_bytes { + full = true; + } else { + new_mempool_count += 1; + self.mempool_size_bytes = new_size; + self.input_ids.insert(tx.input_id); + newly_added.push(tx.id); + } + } + true + } + }); + self.mempool_count = new_mempool_count; + newly_added + } + + fn new_id(&mut self) -> u64 { + let id = self.next_id; + self.next_id += 1; + id + } +} + +#[cfg(test)] +mod mempool_tests { + use std::sync::Arc; + + use crate::model::{Transaction, TransactionId}; + + use super::Mempool; + + struct TxFactory { + next_id: u64, + } + impl TxFactory { + fn new() -> Self { + Self { next_id: 0 } + } + fn tx(&mut self, bytes: u64) -> Arc { + let id = self.next_id; + self.next_id += 1; + Arc::new(Transaction { + id: TransactionId::new(id), + shard: 0, + bytes, + input_id: id, + overcollateralization_factor: 0, + }) + } + fn txs(&mut self, bytes: [u64; N]) -> [Arc; N] { + bytes.map(|b| self.tx(b)) + } + } + + #[test] + fn should_fill_as_space_is_available() { + let mut txs = TxFactory::new(); + let [tx1, tx2, tx3] = txs.txs([5, 5, 5]); + let mut mempool = Mempool::new(10); + assert!(mempool.try_insert(tx1.clone())); + assert!(mempool.try_insert(tx2.clone())); + + // new TX doesn't fit + assert!(!mempool.try_insert(tx3.clone())); + assert_eq!(mempool.ids().collect::>(), vec![tx1.id, tx2.id]); + + // until we remove a TX, and suddenly it does + let added = mempool.remove_txs([tx2.id]); + assert_eq!(added, vec![tx3.id]); + assert_eq!(mempool.ids().collect::>(), vec![tx1.id, tx3.id]); + } +} diff --git a/sim-rs/sim-core/src/sim/stracciatella.rs b/sim-rs/sim-core/src/sim/stracciatella.rs index e0c896e9f..e03812b38 100644 --- a/sim-rs/sim-core/src/sim/stracciatella.rs +++ b/sim-rs/sim-core/src/sim/stracciatella.rs @@ -280,7 +280,7 @@ impl NodeImpl for StracciatellaLeiosNode { } fn handle_new_slot(&mut self, slot: u64) -> EventResult { - if slot % self.sim_config.stage_length == 0 { + if slot.is_multiple_of(self.sim_config.stage_length) { // A new stage has begun. // Decide how many votes to generate in each slot