Skip to content

Commit 00a2e2e

Browse files
committed
test(chores): connection handling tests
1 parent 00a567a commit 00a2e2e

File tree

5 files changed

+381
-15
lines changed

5 files changed

+381
-15
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ jobs:
7474
image: minaprotocol/mina-daemon:2.0.0rampup4-14047c5-focal-berkeley
7575
strategy:
7676
matrix:
77-
test: [p2p_basic_incoming, p2p_basic_outgoing]
77+
test: [p2p_basic_connections, p2p_basic_incoming, p2p_basic_outgoing]
7878
fail-fast: false
7979

8080
steps:

node/testing/src/scenarios/driver.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl<'cluster> Driver<'cluster> {
130130
pub async fn wait_for(
131131
&mut self,
132132
duration: Duration,
133-
f: impl Fn(ClusterNodeId, &Event, &State) -> bool,
133+
mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
134134
) -> anyhow::Result<Option<(ClusterNodeId, Event)>> {
135135
let timeout = std::time::Instant::now() + duration;
136136
while std::time::Instant::now() < timeout {
@@ -139,11 +139,11 @@ impl<'cluster> Driver<'cluster> {
139139
for (node_id, state, events) in self.runner.pending_events() {
140140
for (_, event) in events {
141141
if f(node_id, event, state) {
142-
eprintln!("!!! {event:?}");
142+
eprintln!("!!! {node_id}: {event:?}");
143143
found = Some((node_id, event.clone()));
144144
break;
145145
} else {
146-
eprintln!(">>> {event:?}");
146+
eprintln!(">>> {node_id}: {event:?}");
147147
let event = event.to_string();
148148
steps.push(ScenarioStep::Event { node_id, event });
149149
}
@@ -169,16 +169,18 @@ impl<'cluster> Driver<'cluster> {
169169
while std::time::Instant::now() < timeout {
170170
let mut steps = Vec::new();
171171
let mut found = false;
172-
for (node_id, state, events) in self.runner.pending_events() {
172+
'pending_events: for (node_id, state, events) in self.runner.pending_events() {
173173
for (_, event) in events {
174-
if f(node_id, event, state) {
175-
eprintln!("!!! {event:?}");
176-
found = true;
177-
break;
174+
found = f(node_id, event, state);
175+
steps.push(ScenarioStep::Event {
176+
node_id,
177+
event: event.to_string(),
178+
});
179+
if found {
180+
eprintln!("!!! {node_id}: {event:?}");
181+
break 'pending_events;
178182
} else {
179-
eprintln!(">>> {event:?}");
180-
let event = event.to_string();
181-
steps.push(ScenarioStep::Event { node_id, event });
183+
eprintln!(">>> {node_id}: {event:?}");
182184
}
183185
}
184186
}
Lines changed: 326 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,327 @@
1-
/// Connections that are initiated outside of the state machine (e.g. by Kademlia) should be present in the state machine.
1+
use std::time::Duration;
2+
3+
use node::{
4+
event_source::Event,
5+
p2p::{
6+
connection::outgoing::P2pConnectionOutgoingInitOpts, P2pEvent, P2pPeerState, P2pPeerStatus,
7+
P2pState, PeerId,
8+
},
9+
};
10+
11+
use crate::{
12+
node::RustNodeTestingConfig,
13+
scenarios::{add_rust_nodes, wait_for_nodes_listening_on_localhost, ClusterRunner, Driver},
14+
};
15+
16+
fn has_active_peer(p2p_state: &P2pState, peer_id: &PeerId) -> bool {
17+
p2p_state.ready_peers_iter().any(|(id, _)| id == peer_id)
18+
}
19+
20+
/// Two nodes should properly handle a situation when they are connecting to each other simultaneously.
221
#[derive(documented::Documented, Default, Clone, Copy)]
3-
pub struct AllConnections;
22+
pub struct SimultaneousConnections;
23+
24+
impl SimultaneousConnections {
25+
pub async fn run<'cluster>(self, runner: ClusterRunner<'cluster>) {
26+
let mut driver = Driver::new(runner);
27+
28+
let (node1, peer_id1) = driver.add_rust_node(RustNodeTestingConfig::berkeley_default());
29+
let (node2, peer_id2) = driver.add_rust_node(RustNodeTestingConfig::berkeley_default());
30+
31+
assert!(
32+
wait_for_nodes_listening_on_localhost(
33+
&mut driver,
34+
Duration::from_secs(30),
35+
[node1, node2]
36+
)
37+
.await
38+
.unwrap(),
39+
"nodes should be listening"
40+
);
41+
42+
driver
43+
.exec_step(crate::scenario::ScenarioStep::ConnectNodes {
44+
dialer: node1,
45+
listener: crate::scenario::ListenerNode::Rust(node2),
46+
})
47+
.await
48+
.expect("connect event should be dispatched");
49+
driver
50+
.exec_step(crate::scenario::ScenarioStep::ConnectNodes {
51+
dialer: node2,
52+
listener: crate::scenario::ListenerNode::Rust(node1),
53+
})
54+
.await
55+
.expect("connect event should be dispatched");
56+
57+
// Run the cluster while there are events
58+
while driver
59+
.run_until(Duration::from_secs(30), |_, _, _| true)
60+
.await
61+
.unwrap()
62+
{}
63+
64+
let p2p_state1 = &driver.inner().node(node1).unwrap().state().p2p;
65+
let p2p_state2 = &driver.inner().node(node2).unwrap().state().p2p;
66+
67+
let node1_peer = p2p_state1.peers.get(&peer_id2);
68+
let node2_peer = p2p_state2.peers.get(&peer_id1);
69+
70+
assert!(
71+
matches!(
72+
node1_peer,
73+
Some(P2pPeerState {
74+
status: P2pPeerStatus::Ready(..),
75+
..
76+
})
77+
),
78+
"node2 should be a ready peer of node1, but it is {node1_peer:?}"
79+
);
80+
assert!(
81+
matches!(
82+
node2_peer,
83+
Some(P2pPeerState {
84+
status: P2pPeerStatus::Ready(..),
85+
..
86+
})
87+
),
88+
"node1 should be a ready peer of node2, but it is {node2_peer:?}"
89+
);
90+
}
91+
}
92+
93+
/// Connections between all peers are symmetric, i.e. iff the node1 has the node2 among its active peers, then the node2 should have the node1 as its active peers.
94+
#[derive(documented::Documented, Default, Clone, Copy)]
95+
pub struct AllNodesConnectionsAreSymmetric;
96+
97+
impl AllNodesConnectionsAreSymmetric {
98+
pub async fn run<'cluster>(self, runner: ClusterRunner<'cluster>) {
99+
const MAX: u16 = 32;
100+
101+
let mut driver = Driver::new(runner);
102+
103+
let (_, (_, seed_addr)) =
104+
driver.add_rust_node_with(RustNodeTestingConfig::berkeley_default(), |state| {
105+
let config = &state.p2p.config;
106+
let port = config.libp2p_port.unwrap();
107+
let peer_id = config.identity_pub_key.peer_id();
108+
let addr = format!(
109+
"/ip4/127.0.0.1/tcp/{port}/p2p/{}",
110+
peer_id.clone().to_libp2p_string()
111+
)
112+
.parse::<P2pConnectionOutgoingInitOpts>()
113+
.unwrap();
114+
(peer_id, addr)
115+
});
116+
117+
let peers: Vec<_> = (0..MAX)
118+
.into_iter()
119+
.map(|_| {
120+
driver.add_rust_node(
121+
RustNodeTestingConfig::berkeley_default()
122+
.initial_peers(vec![seed_addr.clone()]),
123+
)
124+
})
125+
.collect();
126+
127+
// Run the cluster while there are events
128+
while driver
129+
.run_until(Duration::from_secs(30), |_, _, _| true)
130+
.await
131+
.unwrap()
132+
{}
133+
134+
// Check that for each peer, if it is in the node's peer list, then the node is in the peer's peer list
135+
for (peer1, peer_id1) in &peers {
136+
let peer1_p2p_state = &driver.inner().node(*peer1).unwrap().state().p2p;
137+
for (peer2, peer_id2) in &peers {
138+
if peer2 == peer1 {
139+
continue;
140+
}
141+
let peer2_p2p_state = &driver.inner().node(*peer2).unwrap().state().p2p;
142+
143+
if has_active_peer(peer2_p2p_state, peer_id1) {
144+
assert!(
145+
has_active_peer(peer1_p2p_state, peer_id2),
146+
"node {peer2} should be an active peer of the node {peer1}, but it is {:?}",
147+
peer1_p2p_state.peers.get(peer_id2)
148+
);
149+
} else {
150+
assert!(
151+
!has_active_peer(peer1_p2p_state, peer_id2),
152+
"node {peer2} should not be an active peer of the node {peer1}, but it is"
153+
);
154+
}
155+
}
156+
}
157+
}
158+
}
159+
160+
/// Connections with other peers are symmetric for seed node, i.e. iff a node is the seed's peer, then it has the node among its peers.
161+
#[derive(documented::Documented, Default, Clone, Copy)]
162+
pub struct SeedConnectionsAreSymmetric;
163+
164+
impl SeedConnectionsAreSymmetric {
165+
pub async fn run<'cluster>(self, runner: ClusterRunner<'cluster>) {
166+
const MAX: u16 = 32;
167+
168+
let mut driver = Driver::new(runner);
169+
170+
let (node_ut, (node_ut_peer_id, seed_addr)) =
171+
driver.add_rust_node_with(RustNodeTestingConfig::berkeley_default(), |state| {
172+
let config = &state.p2p.config;
173+
let port = config.libp2p_port.unwrap();
174+
let peer_id = config.identity_pub_key.peer_id();
175+
let addr = format!(
176+
"/ip4/127.0.0.1/tcp/{port}/p2p/{}",
177+
peer_id.clone().to_libp2p_string()
178+
)
179+
.parse::<P2pConnectionOutgoingInitOpts>()
180+
.unwrap();
181+
(peer_id, addr)
182+
});
183+
184+
let peers: Vec<_> = (0..MAX)
185+
.into_iter()
186+
.map(|_| {
187+
driver.add_rust_node(
188+
RustNodeTestingConfig::berkeley_default()
189+
.initial_peers(vec![seed_addr.clone()]),
190+
)
191+
})
192+
.collect();
193+
194+
// // wait for all peers to listen
195+
// let satisfied = wait_for_nodes_listening_on_localhost(
196+
// &mut driver,
197+
// Duration::from_secs(3 * 60),
198+
// peers.clone(),
199+
// )
200+
// .await
201+
// .unwrap();
202+
// assert!(satisfied, "all peers should be listening");
203+
204+
// Run the cluster for a while
205+
driver
206+
.run_until(Duration::from_secs(2 * 60), |_, _, _| false)
207+
.await
208+
.unwrap();
209+
210+
// Check that for each peer, if it is in the node's peer list, then the node is in the peer's peer list
211+
let node_ut_p2p_state = &driver.inner().node(node_ut).unwrap().state().p2p;
212+
for (peer, peer_id) in peers {
213+
let peer_p2p_state = &driver.inner().node(peer).unwrap().state().p2p;
214+
215+
if peer_p2p_state
216+
.ready_peers_iter()
217+
.any(|(peer_id, _)| peer_id == &node_ut_peer_id)
218+
{
219+
assert!(
220+
node_ut_p2p_state
221+
.ready_peers_iter()
222+
.any(|(pid, _)| pid == &peer_id),
223+
"node {peer} should be in the node's peer list"
224+
);
225+
} else {
226+
assert!(
227+
!node_ut_p2p_state
228+
.ready_peers_iter()
229+
.any(|(pid, _)| pid == &peer_id),
230+
"node {peer} should not be in the node's peer list"
231+
);
232+
}
233+
}
234+
}
235+
}
236+
237+
/// A Rust node's incoming connections should be limited.
238+
#[derive(documented::Documented, Default, Clone, Copy)]
239+
pub struct MaxNumberOfPeers;
240+
241+
impl MaxNumberOfPeers {
242+
pub async fn run<'cluster>(self, runner: ClusterRunner<'cluster>) {
243+
const TOTAL: u16 = 512;
244+
const MAX: u16 = 32;
245+
246+
let mut driver = Driver::new(runner);
247+
248+
let (node_ut, nut_peer_id) =
249+
driver.add_rust_node(RustNodeTestingConfig::berkeley_default().max_peers(MAX.into()));
250+
251+
let (peers, _): (Vec<_>, Vec<_>) = add_rust_nodes(
252+
&mut driver,
253+
TOTAL,
254+
RustNodeTestingConfig::berkeley_default(),
255+
);
256+
257+
// wait for all peers to listen
258+
let satisfied = wait_for_nodes_listening_on_localhost(
259+
&mut driver,
260+
Duration::from_secs(3 * 60),
261+
peers.clone(),
262+
)
263+
.await
264+
.unwrap();
265+
assert!(satisfied, "all peers should be listening");
266+
267+
eprintln!("connecting nodes....");
268+
269+
for peer in &peers {
270+
driver
271+
.exec_step(crate::scenario::ScenarioStep::ConnectNodes {
272+
dialer: *peer,
273+
listener: crate::scenario::ListenerNode::Rust(node_ut),
274+
})
275+
.await
276+
.expect("connect event should be dispatched");
277+
}
278+
279+
let mut connected = 0_i32;
280+
281+
while let Some(exceeded) = driver
282+
.wait_for(Duration::from_secs(5 * 60), |node_id, event, _| {
283+
if node_id != node_ut {
284+
return false;
285+
}
286+
let Event::P2p(P2pEvent::Connection(conn_event)) = event else {
287+
return false;
288+
};
289+
match conn_event {
290+
node::p2p::P2pConnectionEvent::Finalized(_, Ok(())) => {
291+
connected += 1;
292+
}
293+
node::p2p::P2pConnectionEvent::Closed(_) => {
294+
connected -= 1;
295+
}
296+
_ => {}
297+
}
298+
return connected > MAX.into();
299+
})
300+
.await
301+
.unwrap()
302+
{
303+
let state = driver
304+
.exec_even_step(exceeded)
305+
.await
306+
.unwrap()
307+
.expect("connect message should be dispatched");
308+
let count = state.p2p.ready_peers_iter().count();
309+
assert!(count <= MAX.into(), "max number of peers exceeded: {count}");
310+
}
311+
312+
// check that the number of ready peers does not exceed the maximal allowed number
313+
let state = driver.inner().node(node_ut).unwrap().state();
314+
let count = state.p2p.ready_peers_iter().count();
315+
assert!(count <= MAX.into(), "max number of peers exceeded: {count}");
316+
317+
// check that the number of nodes with the node as their peer does not exceed the maximal allowed number
318+
let peers_connected = peers
319+
.into_iter()
320+
.filter_map(|peer| driver.inner().node(peer))
321+
.filter_map(|peer| peer.state().p2p.peers.get(&nut_peer_id))
322+
.filter(|state| matches!(state.status, P2pPeerStatus::Ready(..)))
323+
.count();
324+
assert!(peers_connected <= MAX.into(), "peers connections to the node exceed the max number of connections: {peers_connected}");
325+
326+
}
327+
}

0 commit comments

Comments
 (0)