|
1 | | -use std::{collections::BTreeSet, time::Duration}; |
| 1 | +use std::{ |
| 2 | + collections::{BTreeSet, HashSet}, |
| 3 | + time::Duration, |
| 4 | +}; |
2 | 5 |
|
3 | | -use multiaddr::Multiaddr; |
4 | | -use p2p::PeerId; |
| 6 | +use multiaddr::{multiaddr, Multiaddr}; |
| 7 | +use p2p::{ |
| 8 | + identity::SecretKey, |
| 9 | + network::identify::{ |
| 10 | + stream::P2pNetworkIdentifyStreamState, P2pNetworkIdentify, P2pNetworkIdentifyAction, |
| 11 | + P2pNetworkIdentifyStreamAction, |
| 12 | + }, |
| 13 | + p2p_effects, p2p_timeout_effects, |
| 14 | + token::{self, DiscoveryAlgorithm}, |
| 15 | + Data, P2pAction, P2pNetworkAction, P2pNetworkYamuxAction, PeerId, |
| 16 | +}; |
5 | 17 | use p2p_testing::{ |
6 | | - cluster::{Cluster, ClusterBuilder, ClusterEvent}, |
7 | | - event::RustNodeEvent, |
| 18 | + cluster::{Cluster, ClusterBuilder, ClusterEvent, Listener}, |
| 19 | + event::{event_mapper_effect, RustNodeEvent}, |
8 | 20 | futures::TryStreamExt, |
9 | 21 | predicates::{async_fn, listener_is_ready, peer_is_connected}, |
| 22 | + redux::{Action, State}, |
10 | 23 | rust_node::{RustNodeConfig, RustNodeId}, |
| 24 | + service::ClusterService, |
11 | 25 | stream::ClusterStreamExt, |
| 26 | + test_node::TestNode, |
12 | 27 | }; |
| 28 | +use redux::{ActionWithMeta, Store}; |
13 | 29 |
|
14 | 30 | #[tokio::test] |
15 | 31 | async fn rust_node_to_rust_node() -> anyhow::Result<()> { |
@@ -85,6 +101,171 @@ async fn rust_node_to_rust_node() -> anyhow::Result<()> { |
85 | 101 | Ok(()) |
86 | 102 | } |
87 | 103 |
|
| 104 | +#[tokio::test] |
| 105 | +/// Test that even if bad node spams many different listen_addrs we don't end up with duplicates |
| 106 | +async fn test_bad_node() -> anyhow::Result<()> { |
| 107 | + let mut cluster = ClusterBuilder::new() |
| 108 | + .ports_with_len(100) |
| 109 | + .idle_duration(Duration::from_millis(100)) |
| 110 | + .start() |
| 111 | + .await?; |
| 112 | + |
| 113 | + let bad_node = cluster.add_rust_node( |
| 114 | + RustNodeConfig::default() |
| 115 | + .with_discovery(true) |
| 116 | + .with_override(bad_node_effects), |
| 117 | + )?; |
| 118 | + let bad_node_peer_id = cluster.rust_node(bad_node).peer_id(); |
| 119 | + let bad_node_port = cluster.rust_node(bad_node).libp2p_port(); |
| 120 | + |
| 121 | + let node = cluster.add_rust_node( |
| 122 | + RustNodeConfig::default() |
| 123 | + .with_discovery(true) |
| 124 | + .with_initial_peers([Listener::Rust(bad_node)]), |
| 125 | + )?; |
| 126 | + |
| 127 | + let mut not_identified = BTreeSet::from_iter([(node, bad_node_peer_id)]); |
| 128 | + wait_for_identify(&mut cluster, &mut not_identified, Duration::from_secs(10)).await?; |
| 129 | + |
| 130 | + let routing_table = &cluster |
| 131 | + .rust_node(node) |
| 132 | + .state() |
| 133 | + .network |
| 134 | + .scheduler |
| 135 | + .discovery_state() |
| 136 | + .expect("State must be initialized") |
| 137 | + .routing_table; |
| 138 | + |
| 139 | + let bad_peer_entry = routing_table |
| 140 | + .look_up(&bad_node_peer_id.into()) |
| 141 | + .expect("Node not found"); |
| 142 | + |
| 143 | + let bad_peer_addresses = bad_peer_entry |
| 144 | + .addrs |
| 145 | + .iter() |
| 146 | + .map(Clone::clone) |
| 147 | + .collect::<HashSet<_>>(); |
| 148 | + |
| 149 | + let expected_addrs = [ |
| 150 | + multiaddr!(Ip4([127, 0, 0, 1]), Tcp(bad_node_port)), |
| 151 | + multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)), |
| 152 | + multiaddr!(Ip6([0; 16]), Tcp(10500u16)), |
| 153 | + multiaddr!(Ip6([1; 16]), Tcp(10500u16)), |
| 154 | + ] |
| 155 | + .into_iter() |
| 156 | + .collect::<HashSet<_>>(); |
| 157 | + |
| 158 | + assert_eq!(bad_peer_addresses, expected_addrs); |
| 159 | + |
| 160 | + Ok(()) |
| 161 | +} |
| 162 | + |
| 163 | +fn bad_node_effects( |
| 164 | + store: &mut Store<State, ClusterService, Action>, |
| 165 | + action: ActionWithMeta<Action>, |
| 166 | +) { |
| 167 | + { |
| 168 | + let (action, meta) = action.split(); |
| 169 | + match action { |
| 170 | + Action::P2p(a) => { |
| 171 | + match a.clone() { |
| 172 | + P2pAction::Network(P2pNetworkAction::Identify( |
| 173 | + P2pNetworkIdentifyAction::Stream(P2pNetworkIdentifyStreamAction::New { |
| 174 | + addr, |
| 175 | + peer_id, |
| 176 | + stream_id, |
| 177 | + .. |
| 178 | + }), |
| 179 | + )) => { |
| 180 | + let state = store |
| 181 | + .state() |
| 182 | + .state() |
| 183 | + .network |
| 184 | + .scheduler |
| 185 | + .identify_state |
| 186 | + .find_identify_stream_state(&peer_id, &stream_id) |
| 187 | + .expect("Unable to find identify stream"); |
| 188 | + |
| 189 | + if let P2pNetworkIdentifyStreamState::SendIdentify = state { |
| 190 | + let listen_addrs = vec![ |
| 191 | + multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)), |
| 192 | + multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)), |
| 193 | + multiaddr!(Ip6([0; 16]), Tcp(10500u16)), |
| 194 | + multiaddr!(Ip6([0; 16]), Tcp(10500u16)), |
| 195 | + multiaddr!(Ip6([1; 16]), Tcp(10500u16)), |
| 196 | + multiaddr!(Ip6([1; 16]), Tcp(10500u16)), |
| 197 | + multiaddr!(Dns("domain.com"), Tcp(10530u16)), |
| 198 | + multiaddr!(Dns("domain.com"), Tcp(10530u16)), |
| 199 | + multiaddr!(Dns("domain.com"), Tcp(10530u16)), |
| 200 | + multiaddr!(Dns("domain.com"), Tcp(10530u16)), |
| 201 | + multiaddr!(Unix("domain.com")), |
| 202 | + multiaddr!(Https), |
| 203 | + ]; |
| 204 | + |
| 205 | + let public_key = Some(SecretKey::rand().public_key()); |
| 206 | + |
| 207 | + let protocols = vec![ |
| 208 | + token::StreamKind::Identify( |
| 209 | + token::IdentifyAlgorithm::Identify1_0_0, |
| 210 | + ), |
| 211 | + token::StreamKind::Broadcast( |
| 212 | + p2p::token::BroadcastAlgorithm::Meshsub1_1_0, |
| 213 | + ), |
| 214 | + p2p::token::StreamKind::Rpc(token::RpcAlgorithm::Rpc0_0_1), |
| 215 | + p2p::token::StreamKind::Discovery( |
| 216 | + DiscoveryAlgorithm::Kademlia1_0_0, |
| 217 | + ), |
| 218 | + ]; |
| 219 | + |
| 220 | + let identify_msg = P2pNetworkIdentify { |
| 221 | + protocol_version: Some("ipfs/0.1.0".to_string()), |
| 222 | + agent_version: Some("openmina".to_owned()), |
| 223 | + public_key, |
| 224 | + listen_addrs, |
| 225 | + observed_addr: None, |
| 226 | + protocols, |
| 227 | + }; |
| 228 | + |
| 229 | + let mut out = Vec::new(); |
| 230 | + let identify_msg_proto = identify_msg.to_proto_message(); |
| 231 | + |
| 232 | + prost::Message::encode_length_delimited(&identify_msg_proto, &mut out) |
| 233 | + .expect("Error converting message"); |
| 234 | + |
| 235 | + store.dispatch(Action::P2p( |
| 236 | + P2pNetworkYamuxAction::OutgoingData { |
| 237 | + addr, |
| 238 | + stream_id, |
| 239 | + data: Data(out.into_boxed_slice()), |
| 240 | + flags: Default::default(), |
| 241 | + } |
| 242 | + .into(), |
| 243 | + )); |
| 244 | + |
| 245 | + store.dispatch(Action::P2p( |
| 246 | + P2pNetworkIdentifyStreamAction::Close { |
| 247 | + addr, |
| 248 | + peer_id, |
| 249 | + stream_id, |
| 250 | + } |
| 251 | + .into(), |
| 252 | + )); |
| 253 | + } |
| 254 | + } |
| 255 | + |
| 256 | + a => { |
| 257 | + p2p_effects(store, meta.with_action(a.clone())); |
| 258 | + } |
| 259 | + } |
| 260 | + event_mapper_effect(store, a); |
| 261 | + } |
| 262 | + Action::Idle(_) => { |
| 263 | + p2p_timeout_effects(store, &meta); |
| 264 | + } |
| 265 | + }; |
| 266 | + } |
| 267 | +} |
| 268 | + |
88 | 269 | async fn wait_for_identify( |
89 | 270 | cluster: &mut Cluster, |
90 | 271 | nodes_peers: &mut BTreeSet<(RustNodeId, PeerId)>, |
|
0 commit comments