|
| 1 | +use std::time::Duration; |
| 2 | + |
| 3 | +use libp2p::Multiaddr; |
| 4 | +use node::{ |
| 5 | + event_source::Event, |
| 6 | + p2p::{ |
| 7 | + connection::outgoing::P2pConnectionOutgoingInitOpts, P2pConnectionEvent, P2pEvent, |
| 8 | + P2pListenEvent, P2pListenerId, PeerId, |
| 9 | + }, |
| 10 | + State, |
| 11 | +}; |
| 12 | + |
| 13 | +use crate::{cluster::ClusterNodeId, node::RustNodeTestingConfig, scenario::ScenarioStep}; |
| 14 | + |
| 15 | +use super::ClusterRunner; |
| 16 | + |
| 17 | +fn match_addr_with_port_and_peer_id( |
| 18 | + port: u16, |
| 19 | + peer_id: PeerId, |
| 20 | +) -> impl Fn(&P2pConnectionOutgoingInitOpts) -> bool { |
| 21 | + move |conn_opt| match conn_opt { |
| 22 | + P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) => { |
| 23 | + &libp2p_opts.peer_id == &peer_id && libp2p_opts.port == port |
| 24 | + } |
| 25 | + _ => false, |
| 26 | + } |
| 27 | +} |
| 28 | + |
| 29 | +fn get_peers_iter( |
| 30 | + data: &serde_json::Value, |
| 31 | +) -> Option<impl Iterator<Item = Option<(&str, i64, &str)>>> { |
| 32 | + let iter = data |
| 33 | + .as_object()? |
| 34 | + .get("data")? |
| 35 | + .get("getPeers")? |
| 36 | + .as_array()? |
| 37 | + .iter() |
| 38 | + .map(|elt| { |
| 39 | + let elt = elt.as_object()?; |
| 40 | + let host = elt.get("host")?.as_str()?; |
| 41 | + let port = elt.get("libp2pPort")?.as_i64()?; |
| 42 | + let peer_id = elt.get("peerId")?.as_str()?; |
| 43 | + Some((host, port, peer_id)) |
| 44 | + }); |
| 45 | + Some(iter) |
| 46 | +} |
| 47 | + |
| 48 | +const PEERS_QUERY: &str = r#"query { |
| 49 | + getPeers { |
| 50 | + host |
| 51 | + libp2pPort |
| 52 | + peerId |
| 53 | + } |
| 54 | +}"#; |
| 55 | + |
| 56 | +pub fn connection_finalized_event( |
| 57 | + pred: impl Fn(ClusterNodeId, &PeerId) -> bool, |
| 58 | +) -> impl Fn(ClusterNodeId, &Event, &State) -> bool { |
| 59 | + move |node_id, event, _| { |
| 60 | + matches!( |
| 61 | + event, |
| 62 | + Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(peer, res))) if pred(node_id, peer) && res.is_ok() |
| 63 | + ) |
| 64 | + } |
| 65 | +} |
| 66 | + |
| 67 | +pub fn as_listen_new_addr_event(event: &Event) -> Option<(&Multiaddr, &P2pListenerId)> { |
| 68 | + if let Event::P2p(P2pEvent::Listen(P2pListenEvent::NewListenAddr { listener_id, addr })) = event |
| 69 | + { |
| 70 | + Some((addr, listener_id)) |
| 71 | + } else { |
| 72 | + None |
| 73 | + } |
| 74 | +} |
| 75 | + |
| 76 | +pub fn as_connection_finalized_event(event: &Event) -> Option<(&PeerId, &Result<(), String>)> { |
| 77 | + if let Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(peer, res))) = event { |
| 78 | + Some((peer, res)) |
| 79 | + } else { |
| 80 | + None |
| 81 | + } |
| 82 | +} |
| 83 | + |
| 84 | +fn identify_event(peer_id: PeerId) -> impl Fn(ClusterNodeId, &Event, &State) -> bool { |
| 85 | + move |_, event, _| { |
| 86 | + matches!( |
| 87 | + event, |
| 88 | + Event::P2p(P2pEvent::Libp2pIdentify(peer, _)) if peer == &peer_id |
| 89 | + ) |
| 90 | + } |
| 91 | +} |
| 92 | + |
| 93 | +pub struct Driver<'cluster> { |
| 94 | + runner: ClusterRunner<'cluster>, |
| 95 | + emulated_time: bool, |
| 96 | +} |
| 97 | + |
| 98 | +impl<'cluster> Driver<'cluster> { |
| 99 | + pub fn new(runner: ClusterRunner<'cluster>) -> Self { |
| 100 | + Driver { |
| 101 | + runner, |
| 102 | + emulated_time: false, |
| 103 | + } |
| 104 | + } |
| 105 | + |
| 106 | + pub fn with_emulated_time(runner: ClusterRunner<'cluster>) -> Self { |
| 107 | + Driver { |
| 108 | + runner, |
| 109 | + emulated_time: true, |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + async fn sleep(&self, duration: Duration) { |
| 114 | + if !self.emulated_time { |
| 115 | + tokio::time::sleep(duration).await; |
| 116 | + } |
| 117 | + } |
| 118 | + |
| 119 | + pub async fn wait_for( |
| 120 | + &mut self, |
| 121 | + duration: Duration, |
| 122 | + f: impl Fn(ClusterNodeId, &Event, &State) -> bool, |
| 123 | + ) -> anyhow::Result<Option<(ClusterNodeId, Event)>> { |
| 124 | + let timeout = std::time::Instant::now() + duration; |
| 125 | + while std::time::Instant::now() < timeout { |
| 126 | + let mut steps = Vec::new(); |
| 127 | + let mut found = None; |
| 128 | + for (node_id, state, events) in self.runner.pending_events() { |
| 129 | + for (_, event) in events { |
| 130 | + if f(node_id, event, state) { |
| 131 | + eprintln!("!!! {event:?}"); |
| 132 | + found = Some((node_id, event.clone())); |
| 133 | + break; |
| 134 | + } else { |
| 135 | + eprintln!(">>> {event:?}"); |
| 136 | + let event = event.to_string(); |
| 137 | + steps.push(ScenarioStep::Event { node_id, event }); |
| 138 | + } |
| 139 | + } |
| 140 | + } |
| 141 | + for step in steps { |
| 142 | + self.runner.exec_step(step).await?; |
| 143 | + } |
| 144 | + if found.is_some() { |
| 145 | + return Ok(found); |
| 146 | + } |
| 147 | + self.idle(Duration::from_millis(100)).await?; |
| 148 | + } |
| 149 | + Ok(None) |
| 150 | + } |
| 151 | + |
| 152 | + pub async fn run_until( |
| 153 | + &mut self, |
| 154 | + duration: Duration, |
| 155 | + mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool, |
| 156 | + ) -> anyhow::Result<bool> { |
| 157 | + let timeout = std::time::Instant::now() + duration; |
| 158 | + while std::time::Instant::now() < timeout { |
| 159 | + let mut steps = Vec::new(); |
| 160 | + let mut found = false; |
| 161 | + for (node_id, state, events) in self.runner.pending_events() { |
| 162 | + for (_, event) in events { |
| 163 | + if f(node_id, event, state) { |
| 164 | + eprintln!("!!! {event:?}"); |
| 165 | + found = true; |
| 166 | + break; |
| 167 | + } else { |
| 168 | + eprintln!(">>> {event:?}"); |
| 169 | + let event = event.to_string(); |
| 170 | + steps.push(ScenarioStep::Event { node_id, event }); |
| 171 | + } |
| 172 | + } |
| 173 | + } |
| 174 | + for step in steps { |
| 175 | + self.runner.exec_step(step).await?; |
| 176 | + } |
| 177 | + if found { |
| 178 | + return Ok(true); |
| 179 | + } |
| 180 | + self.idle(Duration::from_millis(100)).await?; |
| 181 | + } |
| 182 | + Ok(false) |
| 183 | + } |
| 184 | + |
| 185 | + pub async fn run(&mut self, duration: Duration) -> anyhow::Result<()> { |
| 186 | + let finish = std::time::Instant::now() + duration; |
| 187 | + while std::time::Instant::now() < finish { |
| 188 | + self.idle(Duration::from_millis(100)).await?; |
| 189 | + } |
| 190 | + Ok(()) |
| 191 | + } |
| 192 | + |
| 193 | + pub async fn idle(&mut self, duration: Duration) -> anyhow::Result<()> { |
| 194 | + self.sleep(duration).await; |
| 195 | + self.runner |
| 196 | + .exec_step(ScenarioStep::AdvanceTime { |
| 197 | + by_nanos: 10 * 1_000_000, |
| 198 | + }) |
| 199 | + .await?; |
| 200 | + let nodes = self |
| 201 | + .runner |
| 202 | + .cluster() |
| 203 | + .nodes_iter() |
| 204 | + .map(|(node_id, _)| node_id) |
| 205 | + .collect::<Vec<_>>(); |
| 206 | + for node_id in nodes { |
| 207 | + self.runner |
| 208 | + .exec_step(ScenarioStep::CheckTimeouts { node_id }) |
| 209 | + .await?; |
| 210 | + } |
| 211 | + Ok(()) |
| 212 | + } |
| 213 | + |
| 214 | + pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> { |
| 215 | + self.runner.exec_step(step).await |
| 216 | + } |
| 217 | + |
| 218 | + pub async fn exec_even_step( |
| 219 | + &mut self, |
| 220 | + (node_id, event): (ClusterNodeId, Event), |
| 221 | + ) -> anyhow::Result<Option<&State>> { |
| 222 | + let event = event.to_string(); |
| 223 | + let result = if self |
| 224 | + .runner |
| 225 | + .exec_step(ScenarioStep::Event { node_id, event }) |
| 226 | + .await? |
| 227 | + { |
| 228 | + Some( |
| 229 | + self.runner |
| 230 | + .cluster() |
| 231 | + .node(node_id) |
| 232 | + .ok_or(anyhow::format_err!("no node {}", node_id.index()))? |
| 233 | + .state(), |
| 234 | + ) |
| 235 | + } else { |
| 236 | + None |
| 237 | + }; |
| 238 | + Ok(result) |
| 239 | + } |
| 240 | + |
| 241 | + pub fn add_rust_node( |
| 242 | + &mut self, |
| 243 | + testing_config: RustNodeTestingConfig, |
| 244 | + ) -> (ClusterNodeId, PeerId) { |
| 245 | + let node_id = self.runner.add_rust_node(testing_config); |
| 246 | + let peer_id = self.runner.node(node_id).unwrap().peer_id(); |
| 247 | + (node_id, peer_id) |
| 248 | + } |
| 249 | + |
| 250 | + pub fn inner(&self) -> &ClusterRunner { |
| 251 | + &self.runner |
| 252 | + } |
| 253 | + |
| 254 | + #[allow(dead_code)] |
| 255 | + pub fn into_inner(self) -> ClusterRunner<'cluster> { |
| 256 | + self.runner |
| 257 | + } |
| 258 | +} |
| 259 | + |
| 260 | +/// Runs the cluster until each of the `nodes` is listening on the localhost interface. |
| 261 | +pub async fn wait_for_nodes_listening_on_localhost<'cluster>( |
| 262 | + driver: &mut Driver<'cluster>, |
| 263 | + duration: Duration, |
| 264 | + nodes: impl IntoIterator<Item = ClusterNodeId>, |
| 265 | +) -> anyhow::Result<bool> { |
| 266 | + let mut nodes = std::collections::BTreeSet::from_iter(nodes); // TODO: filter out nodes that already listening |
| 267 | + |
| 268 | + // predicate matching event "listening on localhost interface" |
| 269 | + let ip4_localhost = libp2p::multiaddr::Protocol::Ip4("127.0.0.1".parse().unwrap()); |
| 270 | + let pred = |node_id, event: &_, _state: &_| { |
| 271 | + if let Some((addr, _)) = as_listen_new_addr_event(event) { |
| 272 | + if Some(&ip4_localhost) == addr.iter().next().as_ref() { |
| 273 | + nodes.remove(&node_id); |
| 274 | + } |
| 275 | + nodes.is_empty() |
| 276 | + } else { |
| 277 | + false |
| 278 | + } |
| 279 | + }; |
| 280 | + |
| 281 | + // wait for all peers to listen |
| 282 | + driver.run_until(duration, pred).await |
| 283 | +} |
| 284 | + |
| 285 | +/// Creates `num` Rust nodes in the cluster |
| 286 | +pub fn add_rust_nodes<'cluster, N, NodeIds, PeerIds>( |
| 287 | + driver: &mut Driver, |
| 288 | + num: N, |
| 289 | + config: RustNodeTestingConfig, |
| 290 | +) -> (NodeIds, PeerIds) |
| 291 | +where |
| 292 | + N: Into<u16>, |
| 293 | + NodeIds: Default + Extend<ClusterNodeId>, |
| 294 | + PeerIds: Default + Extend<PeerId>, |
| 295 | +{ |
| 296 | + (0..num.into()) |
| 297 | + .into_iter() |
| 298 | + .map(|_| driver.add_rust_node(config.clone())) |
| 299 | + .unzip() |
| 300 | +} |
0 commit comments