Skip to content

Commit afd4b58

Browse files
committed
sim-rs: emit TXLost events when transactions are lost
1 parent 77bdcc0 commit afd4b58

File tree

6 files changed

+249
-5
lines changed

6 files changed

+249
-5
lines changed

sim-rs/sim-cli/src/events.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use anyhow::Result;
55
use async_compression::tokio::write::GzipEncoder;
66
use average::Variance;
77
use itertools::Itertools as _;
8+
use liveness::LivenessMonitor;
89
use pretty_bytes_rust::{pretty_bytes, PrettyBytesOptions};
910
use serde::Serialize;
1011
use sim_core::{
@@ -21,6 +22,7 @@ use tokio::{
2122
use tracing::{info, info_span};
2223

2324
mod aggregate;
25+
mod liveness;
2426

2527
type InputBlockId = sim_core::model::InputBlockId<Node>;
2628
type EndorserBlockId = sim_core::model::EndorserBlockId<Node>;
@@ -46,7 +48,7 @@ pub struct EventMonitor {
4648
pool_ids: Vec<NodeId>,
4749
maximum_ib_age: u64,
4850
maximum_eb_age: u64,
49-
events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>,
51+
events_source: LivenessMonitor,
5052
output_path: Option<PathBuf>,
5153
aggregate: bool,
5254
}
@@ -71,7 +73,7 @@ impl EventMonitor {
7173
pool_ids,
7274
maximum_ib_age,
7375
maximum_eb_age: config.max_eb_age,
74-
events_source,
76+
events_source: LivenessMonitor::new(config, events_source),
7577
output_path,
7678
aggregate: config.aggregate_events,
7779
}
@@ -185,6 +187,7 @@ impl EventMonitor {
185187
Event::TXReceived { .. } => {
186188
tx_messages.received += 1;
187189
}
190+
Event::TXLost { .. } => {}
188191
Event::RBLotteryWon { .. } => {}
189192
Event::RBGenerated {
190193
id: BlockId { slot, producer },
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
use std::collections::{BTreeMap, BTreeSet, VecDeque};
2+
3+
use sim_core::{
4+
clock::Timestamp,
5+
config::SimConfiguration,
6+
events::{BlockRef, Event},
7+
model::{TransactionId, TransactionLostReason},
8+
};
9+
use tokio::sync::mpsc;
10+
11+
use super::{EndorserBlockId, InputBlockId};
12+
13+
/// Emits additional events when it is no longer possible for a transaction to reach the chain.
14+
pub struct LivenessMonitor {
15+
events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>,
16+
queue: VecDeque<(Event, Timestamp)>,
17+
txs: BTreeMap<TransactionId, MonitoredTX>,
18+
ibs: BTreeMap<InputBlockId, MonitoredIB>,
19+
ebs: BTreeMap<EndorserBlockId, MonitoredEB>,
20+
stage_length: u64,
21+
vote_threshold: u64,
22+
}
23+
24+
impl LivenessMonitor {
25+
pub fn new(
26+
config: &SimConfiguration,
27+
events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>,
28+
) -> Self {
29+
Self {
30+
events_source,
31+
queue: VecDeque::new(),
32+
txs: BTreeMap::new(),
33+
ibs: BTreeMap::new(),
34+
ebs: BTreeMap::new(),
35+
stage_length: config.stage_length,
36+
vote_threshold: config.vote_threshold,
37+
}
38+
}
39+
40+
pub async fn recv(&mut self) -> Option<(Event, Timestamp)> {
41+
if let Some(next) = self.queue.pop_front() {
42+
return Some(next);
43+
}
44+
let (event, time) = self.events_source.recv().await?;
45+
match &event {
46+
Event::TXGenerated { id, .. } => {
47+
self.txs.insert(*id, MonitoredTX::new());
48+
}
49+
Event::IBGenerated {
50+
id, transactions, ..
51+
} => {
52+
for tx_id in transactions {
53+
if let Some(tx) = self.txs.get_mut(tx_id) {
54+
tx.ibs.insert(id.clone());
55+
tx.last_ib_pipeline = std::cmp::max(tx.last_ib_pipeline, Some(id.pipeline));
56+
}
57+
}
58+
self.ibs
59+
.insert(id.clone(), MonitoredIB::new(transactions.clone()));
60+
}
61+
Event::EBGenerated {
62+
id,
63+
transactions,
64+
input_blocks,
65+
endorser_blocks,
66+
..
67+
} => {
68+
for BlockRef { id: tx_id } in transactions {
69+
if let Some(tx) = self.txs.get_mut(tx_id) {
70+
tx.ebs.insert(id.clone());
71+
tx.last_eb_pipeline = std::cmp::max(tx.last_eb_pipeline, Some(id.pipeline));
72+
}
73+
}
74+
for BlockRef { id: ib_id } in input_blocks {
75+
if let Some(ib) = self.ibs.get(ib_id) {
76+
for tx_id in &ib.txs {
77+
if let Some(tx) = self.txs.get_mut(tx_id) {
78+
tx.ebs.insert(id.clone());
79+
tx.last_eb_pipeline =
80+
std::cmp::max(tx.last_eb_pipeline, Some(id.pipeline));
81+
}
82+
}
83+
}
84+
}
85+
self.ebs.insert(
86+
id.clone(),
87+
MonitoredEB::new(
88+
transactions.iter().map(|tx| tx.id).collect(),
89+
input_blocks.iter().map(|ib| ib.id.clone()).collect(),
90+
endorser_blocks.iter().map(|eb| eb.id.clone()).collect(),
91+
),
92+
);
93+
}
94+
Event::VTBundleGenerated { votes, .. } => {
95+
for (eb_id, count) in &votes.0 {
96+
let eb = self.ebs.get_mut(eb_id).unwrap();
97+
eb.votes += *count as u64;
98+
if eb.votes >= self.vote_threshold {
99+
for ib_id in &eb.ibs {
100+
if let Some(ib) = self.ibs.get_mut(ib_id) {
101+
for tx_id in &ib.txs {
102+
if let Some(tx) = self.txs.get_mut(tx_id) {
103+
tx.certified_ebs.insert(eb_id.clone());
104+
}
105+
}
106+
}
107+
}
108+
for tx_id in &eb.txs {
109+
if let Some(tx) = self.txs.get_mut(tx_id) {
110+
tx.certified_ebs.insert(eb_id.clone());
111+
}
112+
}
113+
}
114+
}
115+
}
116+
Event::RBGenerated {
117+
endorsement,
118+
transactions,
119+
..
120+
} => {
121+
for tx in transactions {
122+
self.txs.remove(tx);
123+
}
124+
if let Some(endorsement) = endorsement {
125+
let mut eb_queue = vec![endorsement.eb.id.clone()];
126+
while let Some(eb_id) = eb_queue.pop() {
127+
let Some(eb) = self.ebs.remove(&eb_id) else {
128+
continue;
129+
};
130+
self.txs.retain(|_, tx| !tx.ebs.contains(&eb_id));
131+
for ib_id in &eb.ibs {
132+
self.ibs.remove(ib_id);
133+
}
134+
eb_queue.extend(eb.ebs);
135+
}
136+
}
137+
}
138+
Event::GlobalSlot { slot } => {
139+
if slot % self.stage_length == 0 {
140+
let pipeline = slot / self.stage_length;
141+
self.handle_new_pipeline(pipeline, time);
142+
}
143+
}
144+
_ => {}
145+
}
146+
Some((event, time))
147+
}
148+
149+
fn handle_new_pipeline(&mut self, pipeline: u64, time: Timestamp) {
150+
self.txs.retain(|id, tx| {
151+
if tx.ebs.is_empty()
152+
&& tx
153+
.last_ib_pipeline
154+
.is_some_and(|ib_pipeline| ib_pipeline + 4 < pipeline)
155+
{
156+
// this transaction was only in IBs which never reached any EBs.
157+
self.queue.push_back((
158+
Event::TXLost {
159+
id: *id,
160+
reason: TransactionLostReason::IBExpired,
161+
},
162+
time,
163+
));
164+
return false;
165+
}
166+
167+
if tx.certified_ebs.is_empty()
168+
&& tx
169+
.last_eb_pipeline
170+
.is_some_and(|eb_pipeline| eb_pipeline + 1 < pipeline)
171+
{
172+
// this transaction was only in EBs which were never certified
173+
self.queue.push_back((
174+
Event::TXLost {
175+
id: *id,
176+
reason: TransactionLostReason::EBExpired,
177+
},
178+
time,
179+
));
180+
return false;
181+
}
182+
183+
true
184+
});
185+
}
186+
}
187+
188+
struct MonitoredTX {
189+
last_ib_pipeline: Option<u64>,
190+
ibs: BTreeSet<InputBlockId>,
191+
last_eb_pipeline: Option<u64>,
192+
ebs: BTreeSet<EndorserBlockId>,
193+
certified_ebs: BTreeSet<EndorserBlockId>,
194+
}
195+
impl MonitoredTX {
196+
fn new() -> Self {
197+
Self {
198+
last_ib_pipeline: None,
199+
ibs: BTreeSet::new(),
200+
last_eb_pipeline: None,
201+
ebs: BTreeSet::new(),
202+
certified_ebs: BTreeSet::new(),
203+
}
204+
}
205+
}
206+
207+
struct MonitoredIB {
208+
txs: Vec<TransactionId>,
209+
}
210+
impl MonitoredIB {
211+
fn new(txs: Vec<TransactionId>) -> Self {
212+
Self { txs }
213+
}
214+
}
215+
216+
struct MonitoredEB {
217+
txs: Vec<TransactionId>,
218+
ibs: Vec<InputBlockId>,
219+
ebs: Vec<EndorserBlockId>,
220+
votes: u64,
221+
}
222+
impl MonitoredEB {
223+
fn new(txs: Vec<TransactionId>, ibs: Vec<InputBlockId>, ebs: Vec<EndorserBlockId>) -> Self {
224+
Self {
225+
txs,
226+
ibs,
227+
ebs,
228+
votes: 0,
229+
}
230+
}
231+
}

sim-rs/sim-core/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ pub struct SimConfiguration {
438438
pub max_eb_age: u64,
439439
pub late_ib_inclusion: bool,
440440
pub variant: LeiosVariant,
441+
pub vote_threshold: u64,
441442
pub(crate) praos_fallback: bool,
442443
pub(crate) header_diffusion_time: Duration,
443444
pub(crate) relay_strategy: RelayStrategy,
@@ -446,7 +447,6 @@ pub struct SimConfiguration {
446447
pub(crate) ib_generation_probability: f64,
447448
pub(crate) eb_generation_probability: f64,
448449
pub(crate) vote_probability: f64,
449-
pub(crate) vote_threshold: u64,
450450
pub(crate) vote_slot_length: u64,
451451
pub(crate) max_block_size: u64,
452452
pub(crate) max_ib_size: u64,

sim-rs/sim-core/src/events.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
config::{NodeConfiguration, NodeId},
1010
model::{
1111
Block, BlockId, CpuTaskId, EndorserBlockId, InputBlockId, NoVoteReason, Transaction,
12-
TransactionId, VoteBundle, VoteBundleId,
12+
TransactionId, TransactionLostReason, VoteBundle, VoteBundleId,
1313
},
1414
};
1515

@@ -115,6 +115,10 @@ pub enum Event {
115115
sender: Node,
116116
recipient: Node,
117117
},
118+
TXLost {
119+
id: TransactionId,
120+
reason: TransactionLostReason,
121+
},
118122
RBLotteryWon {
119123
id: BlockId<Node>,
120124
slot: u64,

sim-rs/sim-core/src/model.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,12 @@ pub enum NoVoteReason {
218218
UncertifiedEBReference,
219219
}
220220

221+
#[derive(Debug, Clone, Serialize)]
222+
pub enum TransactionLostReason {
223+
IBExpired,
224+
EBExpired,
225+
}
226+
221227
#[derive(Clone, Debug, Serialize)]
222228
pub struct Endorsement<Node: Display = NodeId> {
223229
pub eb: EndorserBlockId<Node>,

sim-rs/sim-core/src/sim/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1336,7 +1336,7 @@ impl Node {
13361336
.entry(votes.id.producer)
13371337
.or_default();
13381338
*eb_votes += count;
1339-
if *eb_votes as u64 > self.sim_config.vote_threshold {
1339+
if *eb_votes as u64 >= self.sim_config.vote_threshold {
13401340
self.leios
13411341
.earliest_eb_cert_times_by_pipeline
13421342
.entry(eb.pipeline)

0 commit comments

Comments
 (0)