Skip to content

Commit 2018d06

Browse files
committed
sim-rs: add a simple tx propagation test
1 parent 7fc0e7b commit 2018d06

File tree

8 files changed

+272
-8
lines changed

8 files changed

+272
-8
lines changed

sim-rs/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sim-rs/sim-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ tokio-util = "0.7"
2121
tracing = "0.1"
2222

2323
[dev-dependencies]
24+
serde_yaml = "0.9"
2425
tokio = { version = "1", features = ["macros", "rt"] }

sim-rs/sim-core/src/clock/mock.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ impl MockClockCoordinator {
7878
);
7979

8080
self.time.store(until, std::sync::atomic::Ordering::Release);
81-
let mut something_happened = false;
8281
self.waiters = std::mem::take(&mut self.waiters)
8382
.into_iter()
8483
.filter_map(|(actor, waiter)| {
@@ -88,14 +87,12 @@ impl MockClockCoordinator {
8887
}
8988
if *t == until {
9089
let _ = waiter.done.send(());
91-
something_happened = true;
9290
return None;
9391
}
9492
}
9593
Some((actor, waiter))
9694
})
9795
.collect();
98-
assert!(something_happened, "no actors were waiting for {until:?}");
9996
}
10097
}
10198

sim-rs/sim-core/src/model.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Block {
8484
}
8585
}
8686

87-
#[derive(Clone, Debug)]
87+
#[derive(Clone, Debug, PartialEq, Eq)]
8888
pub struct LinearRankingBlockHeader {
8989
pub id: BlockId,
9090
pub vrf: u64,
@@ -93,7 +93,7 @@ pub struct LinearRankingBlockHeader {
9393
pub eb_announcement: Option<EndorserBlockId>,
9494
}
9595

96-
#[derive(Clone, Debug)]
96+
#[derive(Clone, Debug, PartialEq, Eq)]
9797
pub struct LinearRankingBlock {
9898
pub header: LinearRankingBlockHeader,
9999
pub transactions: Vec<Arc<Transaction>>,
@@ -227,7 +227,7 @@ impl StracciatellaEndorserBlock {
227227
}
228228
}
229229

230-
#[derive(Debug)]
230+
#[derive(Debug, Clone, PartialEq, Eq)]
231231
pub struct LinearEndorserBlock {
232232
pub slot: u64,
233233
pub producer: NodeId,
@@ -293,7 +293,7 @@ pub enum TransactionLostReason {
293293
EBExpired,
294294
}
295295

296-
#[derive(Clone, Debug, Serialize)]
296+
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
297297
pub struct Endorsement<Node: Display = NodeId> {
298298
pub eb: EndorserBlockId<Node>,
299299
pub size_bytes: u64,

sim-rs/sim-core/src/sim.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ mod linear_leios;
2828
mod lottery;
2929
mod slot;
3030
mod stracciatella;
31+
#[cfg(test)]
32+
mod tests;
3133
mod tx;
3234

3335
enum NetworkWrapper {
@@ -304,6 +306,13 @@ impl<N: NodeImpl> Default for EventResult<N> {
304306
}
305307

306308
impl<N: NodeImpl> EventResult<N> {
309+
#[cfg(test)]
310+
pub fn merge(&mut self, mut other: EventResult<N>) {
311+
self.messages.append(&mut other.messages);
312+
self.tasks.append(&mut other.tasks);
313+
self.timed_events.append(&mut other.timed_events);
314+
}
315+
307316
pub fn send_to(&mut self, to: NodeId, msg: N::Message) {
308317
self.messages.push((to, msg));
309318
}

sim-rs/sim-core/src/sim/linear_leios.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
},
3333
};
3434

35-
#[derive(Clone, Debug)]
35+
#[derive(Clone, Debug, PartialEq, Eq)]
3636
pub enum Message {
3737
// TX propagation
3838
AnnounceTx(TransactionId),
@@ -114,6 +114,7 @@ impl SimMessage for Message {
114114
}
115115
}
116116

117+
#[derive(Debug, Clone, PartialEq, Eq)]
117118
pub enum CpuTask {
118119
/// A transaction has been received and validated, and is ready to propagate
119120
TransactionValidated(NodeId, Arc<Transaction>),
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use rand::{RngCore, SeedableRng};
4+
use rand_chacha::ChaChaRng;
5+
use tokio::sync::mpsc;
6+
7+
use crate::{
8+
clock::{Clock, MockClockCoordinator, Timestamp},
9+
config::{NodeId, RawLinkInfo, RawNode, RawTopology, SimConfiguration, TransactionConfig},
10+
events::{Event, EventTracker},
11+
model::{LinearEndorserBlock, LinearRankingBlock, Transaction},
12+
sim::{
13+
EventResult, NodeImpl,
14+
linear_leios::{CpuTask, LinearLeiosNode, Message},
15+
lottery::{LotteryKind, MockLotteryResults},
16+
},
17+
};
18+
19+
fn new_sim_config(topology: RawTopology) -> Arc<SimConfiguration> {
20+
let params =
21+
serde_yaml::from_slice(include_bytes!("../../../../parameters/config.default.yaml"))
22+
.unwrap();
23+
let topology = topology.into();
24+
Arc::new(SimConfiguration::build(params, topology).unwrap())
25+
}
26+
27+
fn new_sim(
28+
sim_config: Arc<SimConfiguration>,
29+
event_tx: mpsc::UnboundedSender<(Event, Timestamp)>,
30+
clock: Clock,
31+
) -> (
32+
HashMap<NodeId, LinearLeiosNode>,
33+
HashMap<NodeId, Arc<MockLotteryResults>>,
34+
) {
35+
let tracker = EventTracker::new(event_tx, clock.clone(), &sim_config.nodes);
36+
let mut rng = ChaChaRng::seed_from_u64(sim_config.seed);
37+
let mut lottery = HashMap::new();
38+
let nodes = sim_config
39+
.nodes
40+
.iter()
41+
.map(|config| {
42+
let rng = ChaChaRng::seed_from_u64(rng.next_u64());
43+
let mut node = LinearLeiosNode::new(
44+
config,
45+
sim_config.clone(),
46+
tracker.clone(),
47+
rng,
48+
clock.clone(),
49+
);
50+
let lottery_results = Arc::new(MockLotteryResults::default());
51+
node.mock_lottery(lottery_results.clone());
52+
lottery.insert(config.id, lottery_results);
53+
(config.id, node)
54+
})
55+
.collect();
56+
(nodes, lottery)
57+
}
58+
59+
fn new_topology(nodes: Vec<(&'static str, RawNode)>) -> RawTopology {
60+
RawTopology {
61+
nodes: nodes
62+
.into_iter()
63+
.map(|(name, node)| (name.to_string(), node))
64+
.collect(),
65+
}
66+
}
67+
fn new_node(stake: Option<u64>, producers: Vec<&'static str>) -> RawNode {
68+
RawNode {
69+
stake,
70+
location: crate::config::RawNodeLocation::Cluster {
71+
cluster: "all".into(),
72+
},
73+
cpu_core_count: Some(4),
74+
tx_conflict_fraction: None,
75+
tx_generation_weight: None,
76+
producers: producers
77+
.iter()
78+
.map(|n| {
79+
(
80+
n.to_string(),
81+
RawLinkInfo {
82+
latency_ms: 0.0,
83+
bandwidth_bytes_per_second: None,
84+
},
85+
)
86+
})
87+
.collect(),
88+
adversarial: None,
89+
behaviours: vec![],
90+
}
91+
}
92+
93+
struct TestDriver {
94+
sim_config: Arc<SimConfiguration>,
95+
rng: ChaChaRng,
96+
slot: u64,
97+
time: MockClockCoordinator,
98+
nodes: HashMap<NodeId, LinearLeiosNode>,
99+
lottery: HashMap<NodeId, Arc<MockLotteryResults>>,
100+
queued: HashMap<NodeId, EventResult<LinearLeiosNode>>,
101+
}
102+
103+
impl TestDriver {
104+
fn new(topology: RawTopology) -> Self {
105+
let sim_config = new_sim_config(topology);
106+
let rng = ChaChaRng::seed_from_u64(sim_config.seed);
107+
let slot = 0;
108+
let time = MockClockCoordinator::new();
109+
let (event_tx, _event_rx) = mpsc::unbounded_channel();
110+
let (nodes, lottery) = new_sim(sim_config.clone(), event_tx, time.clock());
111+
Self {
112+
sim_config,
113+
rng,
114+
slot,
115+
time,
116+
nodes,
117+
lottery,
118+
queued: HashMap::new(),
119+
}
120+
}
121+
122+
pub fn id_for(&self, name: &str) -> NodeId {
123+
self.sim_config
124+
.nodes
125+
.iter()
126+
.find_map(|n| if n.name == name { Some(n.id) } else { None })
127+
.unwrap()
128+
}
129+
130+
pub fn produce_tx(&mut self, node_id: NodeId, conflict: bool) -> Arc<Transaction> {
131+
let TransactionConfig::Real(tx_config) = &self.sim_config.transactions else {
132+
panic!("unexpected TX config")
133+
};
134+
let tx = Arc::new(tx_config.new_tx(&mut self.rng, Some(if conflict { 1.0 } else { 0.0 })));
135+
let node = self.nodes.get_mut(&node_id).unwrap();
136+
let events = node.handle_new_tx(tx.clone());
137+
self.queued.entry(node_id).or_default().merge(events);
138+
tx
139+
}
140+
141+
pub fn win_next_rb_lottery(&mut self, node_id: NodeId, result: u64) {
142+
self.lottery
143+
.get(&node_id)
144+
.unwrap()
145+
.configure_win(LotteryKind::GenerateRB, result);
146+
}
147+
148+
pub fn next_slot(&mut self) {
149+
self.slot += 1;
150+
self.time.advance_time(Timestamp::from_secs(self.slot));
151+
for (node_id, node) in self.nodes.iter_mut() {
152+
let events = node.handle_new_slot(self.slot);
153+
self.queued.entry(*node_id).or_default().merge(events);
154+
}
155+
}
156+
157+
pub fn expect_message(
158+
&mut self,
159+
from: NodeId,
160+
to: NodeId,
161+
message: <LinearLeiosNode as NodeImpl>::Message,
162+
) {
163+
let queued = self.queued.entry(from).or_default();
164+
let mut found = false;
165+
queued.messages.retain(|(t, msg)| {
166+
if t == &to && msg == &message {
167+
found = true;
168+
false
169+
} else {
170+
true
171+
}
172+
});
173+
assert!(
174+
found,
175+
"message {message:?} was not sent from {from} to {to}"
176+
);
177+
let events = self
178+
.nodes
179+
.get_mut(&to)
180+
.unwrap()
181+
.handle_message(from, message);
182+
self.queued.entry(to).or_default().merge(events);
183+
}
184+
185+
pub fn expect_cpu_task(&mut self, node: NodeId, task: <LinearLeiosNode as NodeImpl>::Task) {
186+
self.expect_cpu_task_matching(node, |t| if *t == task { Some(t.clone()) } else { None });
187+
}
188+
189+
pub fn expect_cpu_task_matching<T, M>(&mut self, node: NodeId, matcher: M) -> T
190+
where
191+
M: Fn(&<LinearLeiosNode as NodeImpl>::Task) -> Option<T>,
192+
{
193+
let queued = self.queued.entry(node).or_default();
194+
let mut result = None;
195+
let mut new_queued = EventResult::default();
196+
queued.tasks.retain(|t| {
197+
if result.is_some() {
198+
return true;
199+
}
200+
result = matcher(t);
201+
if result.is_some() {
202+
new_queued = self
203+
.nodes
204+
.get_mut(&node)
205+
.unwrap()
206+
.handle_cpu_task(t.clone());
207+
}
208+
result.is_none()
209+
});
210+
queued.merge(new_queued);
211+
result.expect("no CPU tasks matching filter")
212+
}
213+
}
214+
215+
fn is_new_rb_task(task: &CpuTask) -> Option<(LinearRankingBlock, Option<LinearEndorserBlock>)> {
216+
match task {
217+
CpuTask::RBBlockGenerated(rb, eb) => {
218+
Some((rb.clone(), eb.as_ref().map(|(eb, _)| eb.clone())))
219+
}
220+
_ => None,
221+
}
222+
}
223+
224+
#[test]
225+
fn should_propagate_transactions() {
226+
let topology = new_topology(vec![
227+
("node-1", new_node(Some(1000), vec!["node-2"])),
228+
("node-2", new_node(Some(1000), vec!["node-1"])),
229+
]);
230+
let mut sim = TestDriver::new(topology);
231+
let node1 = sim.id_for("node-1");
232+
let node2 = sim.id_for("node-2");
233+
234+
// Node 1 produces a transaction, node 2 should request it
235+
let tx1 = sim.produce_tx(node1, false);
236+
sim.expect_message(node1, node2, Message::AnnounceTx(tx1.id));
237+
sim.expect_message(node2, node1, Message::RequestTx(tx1.id));
238+
sim.expect_message(node1, node2, Message::Tx(tx1.clone()));
239+
sim.expect_cpu_task(node2, CpuTask::TransactionValidated(node1, tx1.clone()));
240+
241+
// Node 2 produces a transaction, node 1 should request it
242+
let tx2 = sim.produce_tx(node2, false);
243+
sim.expect_message(node2, node1, Message::AnnounceTx(tx2.id));
244+
sim.expect_message(node1, node2, Message::RequestTx(tx2.id));
245+
sim.expect_message(node2, node1, Message::Tx(tx2.clone()));
246+
sim.expect_cpu_task(node1, CpuTask::TransactionValidated(node2, tx2.clone()));
247+
248+
// When node 1 produces an RB, it should include both TXs
249+
sim.win_next_rb_lottery(node1, 0);
250+
sim.next_slot();
251+
let (new_rb, new_eb) = sim.expect_cpu_task_matching(node1, is_new_rb_task);
252+
assert_eq!(new_rb.transactions, vec![tx1, tx2]);
253+
assert_eq!(new_eb, None);
254+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mod linear_leios;

0 commit comments

Comments
 (0)