diff --git a/data/simulation/config.d.ts b/data/simulation/config.d.ts index d9b257d56..1135fc4d9 100644 --- a/data/simulation/config.d.ts +++ b/data/simulation/config.d.ts @@ -53,6 +53,10 @@ export interface Config { * The strategy to use when selecting TXs from the Leios mempool. */ "leios-mempool-sampling-strategy": MempoolSamplingStrategy; + /** + * If true, transactions will be removed from the Leios mempool if they conflict with in-flight IBs. + */ + "leios-mempool-aggressive-pruning": boolean; /** * Praos blockchain quality parameter. * This is η from the Leios paper. @@ -75,6 +79,10 @@ export interface Config { "tx-validation-cpu-time-ms": number; /** Only supported by Rust simulation. */ "tx-max-size-bytes": bigint; + /** + * What fraction of TXs (from 0 to 1) should introduce conflicts with transactions which were produced before? + * Only supported by Rust simulation. */ + "tx-conflict-fraction": number | null; /** * When the first transaction should appear. * Only supported by Rust simulation. */ diff --git a/data/simulation/config.default.yaml b/data/simulation/config.default.yaml index 86d4fc659..a81b14ecf 100644 --- a/data/simulation/config.default.yaml +++ b/data/simulation/config.default.yaml @@ -30,6 +30,7 @@ leios-vote-send-recv-stages: false leios-late-ib-inclusion: true leios-header-diffusion-time-ms: 1000.0 leios-mempool-sampling-strategy: ordered-by-id +leios-mempool-aggressive-pruning: false # TODO: revise default praos-chain-quality: 40 praos-fallback-enabled: true @@ -48,6 +49,7 @@ tx-size-bytes-distribution: sigma: 1.127 tx-validation-cpu-time-ms: 1.5 tx-max-size-bytes: 16384 +tx-conflict-fraction: 0 ################################################################################ # Ranking Block Configuration diff --git a/data/simulation/config.schema.json b/data/simulation/config.schema.json index 974a47357..8f3566035 100644 --- a/data/simulation/config.schema.json +++ b/data/simulation/config.schema.json @@ -355,6 +355,10 @@ "description": "When `true`, any delays and message sizes are calculated as if\neach block contained as much data as the expected average, rounded up.\nIn particular, for the sake of the above, we consider that:\n - Each RB includes a certificate.\n - Certificates contain votes from `vote-threshold` nodes.\n - Vote bundles vote for `ceil eb-generation-probability` EBs.\n - EBs reference `ceil (ib-generation-probability * leios-stage-length-slots)` IBs.\nOnly supported by Haskell simulation.", "type": "boolean" }, + "tx-conflict-fraction": { + "description": "What fraction of TXs (from 0 to 1) should introduce conflicts with transactions which were produced before?\nOnly supported by Rust simulation.", + "type": "number" + }, "tx-generation-distribution": { "$ref": "#/definitions/Distribution", "description": "Only supported by Rust simulation." diff --git a/data/simulation/example.rust.jsonl b/data/simulation/example.rust.jsonl index 5a187f8bb..3b67cfa5f 100644 --- a/data/simulation/example.rust.jsonl +++ b/data/simulation/example.rust.jsonl @@ -7,7 +7,7 @@ {"time_s":0.0,"message":{"type":"CpuTaskScheduled","task":{"node":"node-0","index":1},"task_type":"GenIB","subtasks":1}} {"time_s":0.0,"message":{"type":"Cpu","task":{"node":"node-0","index":1},"node":"node-0","cpu_time_s":0.13,"task_label":"GenIB: node-0-1-0","task_type":"GenIB","id":"node-0-1-0"}} {"time_s":0.0,"message":{"type":"VTLotteryWon","id":"0-node-1","slot":0,"pipeline":0,"producer":"node-1"}} -{"time_s":0.0,"message":{"type":"TXGenerated","id":"0","publisher":"node-11","size_bytes":156}} +{"time_s":0.0,"message":{"type":"TXGenerated","id":"0","publisher":"node-11","size_bytes":156,"input_id":0}} {"time_s":0.0,"message":{"type":"VTLotteryWon","id":"0-node-3","slot":0,"pipeline":0,"producer":"node-3"}} {"time_s":0.0,"message":{"type":"VTLotteryWon","id":"0-node-4","slot":0,"pipeline":0,"producer":"node-4"}} {"time_s":0.0,"message":{"type":"EBLotteryWon","id":"0-node-4","slot":0,"pipeline":1,"producer":"node-4"}} diff --git a/data/simulation/topology.d.ts b/data/simulation/topology.d.ts index c777d52fe..6b04d3f39 100644 --- a/data/simulation/topology.d.ts +++ b/data/simulation/topology.d.ts @@ -9,12 +9,12 @@ */ export interface Topology { nodes: - | { - [name: NodeName]: Node; - } - | { - [name: NodeName]: Node; - }; + | { + [name: NodeName]: Node; + } + | { + [name: NodeName]: Node; + }; } /** A node. */ @@ -23,6 +23,10 @@ export interface Node { "cpu-core-count"?: bigint | null; location: Location; producers: { [producer: NodeName]: LinkInfo }; + /** + * What fraction of TXs (from 0 to 1) should introduce conflicts with transactions which were produced before? + * Only supported by Rust simulation. */ + "tx-conflict-fraction"?: number | null; /** If not null, the node will behave according to the given Behaviour. * * Only supported by Haskell simulation. diff --git a/data/simulation/topology.schema.json b/data/simulation/topology.schema.json index 5614e653b..cac4092ed 100644 --- a/data/simulation/topology.schema.json +++ b/data/simulation/topology.schema.json @@ -48,6 +48,10 @@ "additionalProperties": false, "properties": {}, "type": "number" + }, + "tx-conflict-fraction": { + "description": "What fraction of TXs (from 0 to 1) should introduce conflicts with transactions which were produced before?\nOnly supported by Rust simulation.", + "type": "number" } }, "type": "object" @@ -87,6 +91,10 @@ "additionalProperties": false, "properties": {}, "type": "number" + }, + "tx-conflict-fraction": { + "description": "What fraction of TXs (from 0 to 1) should introduce conflicts with transactions which were produced before?\nOnly supported by Rust simulation.", + "type": "number" } }, "type": "object" diff --git a/data/simulation/trace.rust.d.ts b/data/simulation/trace.rust.d.ts index 0aee9dd64..3234ded96 100644 --- a/data/simulation/trace.rust.d.ts +++ b/data/simulation/trace.rust.d.ts @@ -56,6 +56,7 @@ interface GeneratedTransaction { id: string; publisher: string; size_bytes: number; + input_id: number; } interface LostTransaction { diff --git a/data/simulation/trace.rust.schema.json b/data/simulation/trace.rust.schema.json index 38b0a4074..4b93c7080 100644 --- a/data/simulation/trace.rust.schema.json +++ b/data/simulation/trace.rust.schema.json @@ -239,6 +239,9 @@ "id": { "type": "string" }, + "input_id": { + "type": "number" + }, "publisher": { "type": "string" }, @@ -250,7 +253,7 @@ "type": "string" } }, - "required": ["id", "publisher", "size_bytes", "type"], + "required": ["id", "input_id", "publisher", "size_bytes", "type"], "type": "object" }, "GeneratedVote": { diff --git a/sim-rs/sim-cli/src/bin/gen-test-data/strategy/utils.rs b/sim-rs/sim-cli/src/bin/gen-test-data/strategy/utils.rs index 3933a7e63..c8599b0a8 100644 --- a/sim-rs/sim-cli/src/bin/gen-test-data/strategy/utils.rs +++ b/sim-rs/sim-cli/src/bin/gen-test-data/strategy/utils.rs @@ -62,8 +62,8 @@ impl GraphBuilder { let loc1 = to_netsim_location(self.location_of(node)); let loc2 = to_netsim_location(self.location_of(producer)); latency_between_locations(loc1, loc2, 1.) - .unwrap() - .to_duration() + .map(|d| d.to_duration()) + .unwrap_or_default() .max(Duration::from_millis(1)) }); self.links.push(RawLinkConfig { @@ -170,6 +170,7 @@ impl GraphBuilder { stake: n.stake, location: RawNodeLocation::Coords(n.location), cpu_core_count: n.cores, + tx_conflict_fraction: None, producers: BTreeMap::new(), }; (name, node) diff --git a/sim-rs/sim-core/src/config.rs b/sim-rs/sim-core/src/config.rs index 4c7d00c13..5d8cb0010 100644 --- a/sim-rs/sim-core/src/config.rs +++ b/sim-rs/sim-core/src/config.rs @@ -68,6 +68,7 @@ pub struct RawParameters { pub leios_late_ib_inclusion: bool, pub leios_header_diffusion_time_ms: f64, pub leios_mempool_sampling_strategy: MempoolSamplingStrategy, + pub leios_mempool_aggressive_pruning: bool, pub praos_chain_quality: u64, pub praos_fallback_enabled: bool, @@ -76,6 +77,7 @@ pub struct RawParameters { pub tx_size_bytes_distribution: DistributionConfig, pub tx_validation_cpu_time_ms: f64, pub tx_max_size_bytes: u64, + pub tx_conflict_fraction: Option, pub tx_start_time: Option, pub tx_stop_time: Option, @@ -177,6 +179,8 @@ pub struct RawNode { pub location: RawNodeLocation, #[serde(skip_serializing_if = "Option::is_none")] pub cpu_core_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tx_conflict_fraction: Option, pub producers: BTreeMap, } @@ -254,6 +258,7 @@ impl From for Topology { stake: node.stake.unwrap_or_default(), cpu_multiplier: 1.0, cores: node.cpu_core_count, + tx_conflict_fraction: node.tx_conflict_fraction, consumers: vec![], }, ); @@ -403,6 +408,7 @@ impl TransactionConfig { max_size: params.tx_max_size_bytes, frequency_ms: params.tx_generation_distribution.into(), size_bytes: params.tx_size_bytes_distribution.into(), + conflict_fraction: params.tx_conflict_fraction.unwrap_or_default(), start_time: params .tx_start_time .map(|t| Timestamp::zero() + Duration::from_secs_f64(t)), @@ -425,6 +431,7 @@ pub(crate) struct RealTransactionConfig { pub max_size: u64, pub frequency_ms: FloatDistribution, pub size_bytes: FloatDistribution, + pub conflict_fraction: f64, pub start_time: Option, pub stop_time: Option, } @@ -437,11 +444,16 @@ pub(crate) struct MockTransactionConfig { } impl MockTransactionConfig { - pub fn next_id(&self) -> TransactionId { + pub fn mock_tx(&self, bytes: u64) -> Transaction { let id = self .next_id .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - TransactionId::new(id) + Transaction { + id: TransactionId::new(id), + shard: 0, + bytes, + input_id: id, + } } } @@ -464,6 +476,7 @@ pub struct SimConfiguration { pub(crate) header_diffusion_time: Duration, pub(crate) relay_strategy: RelayStrategy, pub(crate) mempool_strategy: MempoolSamplingStrategy, + pub(crate) mempool_aggressive_pruning: bool, pub(crate) praos_chain_quality: u64, pub(crate) block_generation_probability: f64, pub(crate) ib_generation_probability: f64, @@ -509,6 +522,7 @@ impl SimConfiguration { header_diffusion_time: duration_ms(params.leios_header_diffusion_time_ms), relay_strategy: params.relay_strategy, mempool_strategy: params.leios_mempool_sampling_strategy, + mempool_aggressive_pruning: params.leios_mempool_aggressive_pruning, praos_chain_quality: params.praos_chain_quality, block_generation_probability: params.rb_generation_probability, ib_generation_probability: params.ib_generation_probability, @@ -542,6 +556,7 @@ pub struct NodeConfiguration { pub stake: u64, pub cpu_multiplier: f64, pub cores: Option, + pub tx_conflict_fraction: Option, pub consumers: Vec, } diff --git a/sim-rs/sim-core/src/events.rs b/sim-rs/sim-core/src/events.rs index e87b528fc..37e8253a9 100644 --- a/sim-rs/sim-core/src/events.rs +++ b/sim-rs/sim-core/src/events.rs @@ -103,6 +103,7 @@ pub enum Event { publisher: Node, size_bytes: u64, shard: u64, + input_id: u64, }, TXSent { id: TransactionId, @@ -168,6 +169,7 @@ pub enum Event { tx_payload_bytes: u64, size_bytes: u64, transactions: Vec, + rb_ref: Option>, }, NoIBGenerated { node: Node, @@ -427,6 +429,7 @@ impl EventTracker { publisher: self.to_node(publisher), size_bytes: transaction.bytes, shard: transaction.shard, + input_id: transaction.input_id, }); } @@ -476,6 +479,7 @@ impl EventTracker { tx_payload_bytes, size_bytes: header_bytes + tx_payload_bytes, transactions: block.transactions.iter().map(|tx| tx.id).collect(), + rb_ref: block.rb_ref.map(|b| self.to_block(b)), }); } diff --git a/sim-rs/sim-core/src/model.rs b/sim-rs/sim-core/src/model.rs index 52d95882a..bb3f8ec03 100644 --- a/sim-rs/sim-core/src/model.rs +++ b/sim-rs/sim-core/src/model.rs @@ -91,6 +91,7 @@ pub struct Transaction { pub id: TransactionId, pub shard: u64, pub bytes: u64, + pub input_id: u64, } #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -135,6 +136,7 @@ pub struct InputBlock { pub header: InputBlockHeader, pub tx_payload_bytes: u64, pub transactions: Vec>, + pub rb_ref: Option, } impl InputBlock { pub fn bytes(&self) -> u64 { diff --git a/sim-rs/sim-core/src/sim.rs b/sim-rs/sim-core/src/sim.rs index 4919cbf80..d55913f53 100644 --- a/sim-rs/sim-core/src/sim.rs +++ b/sim-rs/sim-core/src/sim.rs @@ -15,7 +15,7 @@ use crate::{ config::SimConfiguration, events::EventTracker, model::{ - Block, EndorserBlock, EndorserBlockId, InputBlock, InputBlockHeader, InputBlockId, + Block, BlockId, EndorserBlock, EndorserBlockId, InputBlock, InputBlockHeader, InputBlockId, Transaction, TransactionId, VoteBundle, VoteBundleId, }, network::Network, @@ -132,8 +132,8 @@ enum SimulationMessage { RequestTx(TransactionId), Tx(Arc), // praos block propagation - RollForward(u64), - RequestBlock(u64), + RollForward(BlockId), + RequestBlock(BlockId), Block(Arc), // IB header propagation AnnounceIBHeader(InputBlockId), diff --git a/sim-rs/sim-core/src/sim/node.rs b/sim-rs/sim-core/src/sim/node.rs index b0a887982..ea57be5d2 100644 --- a/sim-rs/sim-core/src/sim/node.rs +++ b/sim-rs/sim-core/src/sim/node.rs @@ -109,6 +109,13 @@ enum NodeEvent { CpuSubtaskCompleted(Subtask), } +#[derive(Clone, Default)] +struct LedgerState { + spent_inputs: HashSet, + seen_blocks: HashSet, + seen_ebs: HashSet, +} + pub struct Node { id: NodeId, name: String, @@ -126,6 +133,7 @@ pub struct Node { cpu: CpuTaskQueue, consumers: Vec, txs: HashMap, + ledger_states: BTreeMap>, praos: NodePraosState, leios: NodeLeiosState, } @@ -134,8 +142,9 @@ pub struct Node { struct NodePraosState { mempool: BTreeMap>, peer_heads: BTreeMap, - blocks_seen: BTreeSet, - blocks: BTreeMap>, + blocks_seen: BTreeSet, + blocks: BTreeMap>, + block_ids_by_slot: BTreeMap, } struct SeenTransaction { @@ -146,6 +155,7 @@ struct SeenTransaction { #[derive(Default)] struct NodeLeiosState { mempool: BTreeMap, + input_ids_from_ibs: HashSet, ibs_to_generate: BTreeMap>, ibs: BTreeMap, ib_requests: BTreeMap, @@ -279,6 +289,7 @@ impl Node { cpu, consumers, txs: HashMap::new(), + ledger_states: BTreeMap::new(), praos: NodePraosState::default(), leios: NodeLeiosState::default(), } @@ -457,11 +468,11 @@ impl Node { } // Block propagation - SimulationMessage::RollForward(slot) => { - self.receive_roll_forward(from, slot)?; + SimulationMessage::RollForward(id) => { + self.receive_roll_forward(from, id)?; } - SimulationMessage::RequestBlock(slot) => { - self.receive_request_block(from, slot)?; + SimulationMessage::RequestBlock(id) => { + self.receive_request_block(from, id)?; } SimulationMessage::Block(block) => { self.receive_block(from, block); @@ -723,11 +734,13 @@ impl Node { }; for header in headers { self.tracker.track_ib_lottery_won(header.id); - let transactions = self.select_txs_for_ib(header.shard); + let rb_ref = self.latest_rb_ref(); + let transactions = self.select_txs_for_ib(header.shard, rb_ref); let ib = InputBlock { header, tx_payload_bytes: self.sim_config.sizes.ib_payload(&transactions), transactions, + rb_ref, }; self.schedule_cpu_task(CpuTaskType::IBBlockGenerated(ib)); } @@ -750,11 +763,7 @@ impl Node { if self.sim_config.praos_fallback { if let TransactionConfig::Mock(config) = &self.sim_config.transactions { // Add one transaction, the right size for the extra RB payload - let tx = Transaction { - id: config.next_id(), - shard: 0, - bytes: config.rb_size, - }; + let tx = config.mock_tx(config.rb_size); self.tracker.track_transaction_generated(&tx, self.id); transactions.push(Arc::new(tx)); } else { @@ -772,11 +781,7 @@ impl Node { } } - let parent = self - .praos - .blocks - .last_key_value() - .map(|(_, block)| block.id); + let parent = self.latest_rb_ref(); let block = Block { id: BlockId { @@ -966,11 +971,12 @@ impl Node { .get(peer) .is_none_or(|&s| s < block.id.slot) { - self.send_to(*peer, SimulationMessage::RollForward(block.id.slot))?; + self.send_to(*peer, SimulationMessage::RollForward(block.id))?; self.praos.peer_heads.insert(*peer, block.id.slot); } } - self.praos.blocks.insert(block.id.slot, block); + self.praos.block_ids_by_slot.insert(block.id.slot, block.id); + self.praos.blocks.insert(block.id, block); Ok(()) } @@ -1016,6 +1022,21 @@ impl Node { if self.trace { info!("node {} saw tx {id}", self.name); } + let rb_ref = self.latest_rb_ref(); + let ledger_state = self.resolve_ledger_state(rb_ref); + if ledger_state.spent_inputs.contains(&tx.input_id) { + // Ignoring a TX which conflicts with something already onchain + return Ok(()); + } + if self + .praos + .mempool + .values() + .any(|mempool_tx| mempool_tx.input_id == tx.input_id) + { + // Ignoring a TX which conflicts with the current mempool contents. + return Ok(()); + } self.praos.mempool.insert(tx.id, tx.clone()); for peer in &self.consumers { if *peer == from { @@ -1023,6 +1044,13 @@ impl Node { } self.send_to(*peer, SimulationMessage::AnnounceTx(id))?; } + if self.sim_config.mempool_aggressive_pruning + && self.leios.input_ids_from_ibs.contains(&tx.input_id) + { + // Ignoring a TX which conflicts with TXs we've seen in input blocks. + // This only affects the Leios mempool; these TXs should still be able to reach the chain through Praos. + return Ok(()); + } self.leios.mempool.insert( tx.id, SeenTransaction { @@ -1033,15 +1061,15 @@ impl Node { Ok(()) } - fn receive_roll_forward(&mut self, from: NodeId, slot: u64) -> Result<()> { - if self.praos.blocks_seen.insert(slot) { - self.send_to(from, SimulationMessage::RequestBlock(slot))?; + fn receive_roll_forward(&mut self, from: NodeId, id: BlockId) -> Result<()> { + if self.praos.blocks_seen.insert(id) { + self.send_to(from, SimulationMessage::RequestBlock(id))?; } Ok(()) } - fn receive_request_block(&mut self, from: NodeId, slot: u64) -> Result<()> { - if let Some(block) = self.praos.blocks.get(&slot) { + fn receive_request_block(&mut self, from: NodeId, id: BlockId) -> Result<()> { + if let Some(block) = self.praos.blocks.get(&id) { self.tracker.track_praos_block_sent(block, self.id, from); self.send_to(from, SimulationMessage::Block(block.clone()))?; } @@ -1055,14 +1083,18 @@ impl Node { } fn finish_validating_block(&mut self, from: NodeId, block: Arc) -> Result<()> { - if let Some(old_block) = self.praos.blocks.get(&block.id.slot) { + if let Some(old_block_id) = self.praos.block_ids_by_slot.get(&block.id.slot) { // SLOT BATTLE!!! lower VRF wins - if old_block.vrf <= block.vrf { - // We like our block better than this new one. - return Ok(()); + if let Some(old_block) = self.praos.blocks.get(old_block_id) { + if old_block.vrf <= block.vrf { + // We like our block better than this new one. + return Ok(()); + } + self.praos.blocks.remove(old_block_id); } } - self.praos.blocks.insert(block.id.slot, block.clone()); + self.praos.block_ids_by_slot.insert(block.id.slot, block.id); + self.praos.blocks.insert(block.id, block.clone()); let head = self.praos.peer_heads.entry(from).or_default(); if *head < block.id.slot { @@ -1181,6 +1213,15 @@ impl Node { // Do not include transactions from this IB in any IBs we produce ourselves. self.leios.mempool.remove(&transaction.id); } + if self.sim_config.mempool_aggressive_pruning { + // If we're using aggressive pruning, remove transactions from the mempool if they conflict with transactions in this IB + self.leios + .input_ids_from_ibs + .extend(ib.transactions.iter().map(|tx| tx.input_id)); + self.leios + .mempool + .retain(|_, seen| !self.leios.input_ids_from_ibs.contains(&seen.tx.input_id)); + } if self .leios .ibs @@ -1359,17 +1400,19 @@ impl Node { Ok(()) } - fn select_txs_for_ib(&mut self, shard: u64) -> Vec> { + fn select_txs_for_ib(&mut self, shard: u64, rb_ref: Option) -> Vec> { if let TransactionConfig::Mock(config) = &self.sim_config.transactions { - let tx = Transaction { - id: config.next_id(), - shard, - bytes: config.ib_size, - }; + let tx = config.mock_tx(config.ib_size); self.tracker.track_transaction_generated(&tx, self.id); vec![Arc::new(tx)] } else { - self.select_txs(|seen| seen.tx.shard == shard, self.sim_config.max_ib_size) + let ledger_state = self.resolve_ledger_state(rb_ref); + self.select_txs( + |seen| { + seen.tx.shard == shard && !ledger_state.spent_inputs.contains(&seen.tx.input_id) + }, + self.sim_config.max_ib_size, + ) } } @@ -1404,7 +1447,7 @@ impl Node { .values() .filter_map(|seen| { if condition(seen) { - Some((seen.tx.id, seen.tx.bytes)) + Some((seen.tx.id, seen.tx.bytes, seen.tx.input_id)) } else { None } @@ -1418,11 +1461,15 @@ impl Node { } let mut txs = vec![]; let mut size = 0; - for (id, bytes) in candidate_txs { + let mut spent_inputs = HashSet::new(); + for (id, bytes, input_id) in candidate_txs { let remaining_capacity = max_size - size; if remaining_capacity < bytes { continue; } + if !spent_inputs.insert(input_id) { + continue; + } let tx = self.leios.mempool.remove(&id).unwrap().tx; size += tx.bytes; txs.push(tx); @@ -1430,6 +1477,76 @@ impl Node { txs } + fn latest_rb_ref(&self) -> Option { + self.praos.blocks.last_key_value().map(|(k, _)| *k) + } + + fn resolve_ledger_state(&mut self, rb_ref: Option) -> Arc { + let Some(block_id) = rb_ref else { + return Arc::new(LedgerState::default()); + }; + if let Some(state) = self.ledger_states.get(&block_id) { + return state.clone(); + }; + + let mut state = self + .ledger_states + .last_key_value() + .map(|(_, v)| v.as_ref().clone()) + .unwrap_or_default(); + + let mut block_queue = vec![block_id]; + while let Some(block_id) = block_queue.pop() { + if !state.seen_blocks.insert(block_id) { + continue; + } + let Some(block) = self.praos.blocks.get(&block_id) else { + continue; + }; + if let Some(parent) = block.parent { + block_queue.push(parent); + } + for tx in &block.transactions { + state.spent_inputs.insert(tx.input_id); + } + + let mut eb_queue = vec![]; + if let Some(endorsement) = &block.endorsement { + eb_queue.push(endorsement.eb); + } + while let Some(eb_id) = eb_queue.pop() { + if !state.seen_ebs.insert(eb_id) { + continue; + } + let Some(EndorserBlockState::Received { eb, .. }) = self.leios.ebs.get(&eb_id) + else { + continue; + }; + for tx_id in &eb.txs { + let Some(TransactionView::Received(tx)) = self.txs.get(tx_id) else { + continue; + }; + state.spent_inputs.insert(tx.input_id); + } + for ib_id in &eb.ibs { + let Some(InputBlockState::Received(ib)) = self.leios.ibs.get(ib_id) else { + continue; + }; + for tx in &ib.transactions { + state.spent_inputs.insert(tx.input_id); + } + } + for eb_id in &eb.ebs { + eb_queue.push(*eb_id); + } + } + } + + let state = Arc::new(state); + self.ledger_states.insert(block_id, state.clone()); + state + } + fn finish_generating_ib(&mut self, mut ib: InputBlock) -> Result<()> { ib.header.timestamp = self.clock.now(); let ib = Arc::new(ib); diff --git a/sim-rs/sim-core/src/sim/tx.rs b/sim-rs/sim-core/src/sim/tx.rs index 71673a1a3..1e7fd4d64 100644 --- a/sim-rs/sim-core/src/sim/tx.rs +++ b/sim-rs/sim-core/src/sim/tx.rs @@ -11,10 +11,15 @@ use crate::{ model::{Transaction, TransactionId}, }; +struct NodeState { + sink: mpsc::UnboundedSender>, + tx_conflict_fraction: Option, +} + pub struct TransactionProducer { rng: ChaChaRng, clock: ClockBarrier, - node_tx_sinks: HashMap>>, + nodes: HashMap, ib_shards: u64, config: Option, } @@ -23,13 +28,25 @@ impl TransactionProducer { pub fn new( rng: ChaChaRng, clock: ClockBarrier, - node_tx_sinks: HashMap>>, + mut node_tx_sinks: HashMap>>, config: &SimConfiguration, ) -> Self { + let nodes = config + .nodes + .iter() + .map(|node| { + let sink = node_tx_sinks.remove(&node.id).unwrap(); + let state = NodeState { + sink, + tx_conflict_fraction: node.tx_conflict_fraction, + }; + (node.id, state) + }) + .collect(); Self { rng, clock, - node_tx_sinks, + nodes, ib_shards: config.ib_shards, config: match &config.transactions { TransactionConfig::Real(config) => Some(config.clone()), @@ -43,9 +60,10 @@ impl TransactionProducer { self.clock.wait_forever().await; return Ok(()); }; - let node_count = self.node_tx_sinks.len(); + let node_count = self.nodes.len(); let mut next_tx_id = 0; let mut next_tx_at = Timestamp::zero(); + let mut next_input_id = 0; let mut rng = &mut self.rng; if let Some(start) = config.start_time { @@ -54,18 +72,33 @@ impl TransactionProducer { }; loop { + let node_index = rng.random_range(0..node_count); + let node_id = NodeId::new(node_index); + let node = self.nodes.get(&node_id).unwrap(); + + let conflict_fraction = node + .tx_conflict_fraction + .unwrap_or(config.conflict_fraction); + let id = TransactionId::new(next_tx_id); let shard = rng.random_range(0..self.ib_shards); let bytes = (config.size_bytes.sample(&mut rng) as u64).min(config.max_size); - let tx = Transaction { id, shard, bytes }; + let input_id = if next_input_id > 0 && rng.random_bool(conflict_fraction) { + next_input_id - 1 + } else { + let id = next_input_id; + next_input_id += 1; + id + }; - let node_index = rng.random_range(0..node_count); - let node_id = NodeId::new(node_index); + let tx = Transaction { + id, + shard, + bytes, + input_id, + }; - self.node_tx_sinks - .get(&node_id) - .unwrap() - .send(Arc::new(tx))?; + node.sink.send(Arc::new(tx))?; next_tx_id += 1; let millis_until_tx = config.frequency_ms.sample(&mut rng) as u64;