diff --git a/Cargo.lock b/Cargo.lock index f9d328e9a35..d883ee37b86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1626,6 +1626,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gossipsub-tests" +version = "0.1.0" +dependencies = [ + "if-addrs 0.14.0", + "libp2p", + "redis 0.32.3", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "group" version = "0.13.0" @@ -1817,7 +1829,7 @@ dependencies = [ "env_logger 0.11.8", "futures", "libp2p", - "redis", + "redis 0.24.0", "serde", "serde_json", "tokio", @@ -2145,6 +2157,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "if-addrs" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf39cc0423ee66021dc5eccface85580e4a001e0c5288bae8bea7ecb69225e90" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "if-watch" version = "3.2.1" @@ -2155,7 +2177,7 @@ dependencies = [ "core-foundation", "fnv", "futures", - "if-addrs", + "if-addrs 0.10.2", "ipnet", "log", "netlink-packet-core", @@ -2262,7 +2284,7 @@ dependencies = [ "libp2p-webrtc-websys", "mime_guess", "rand 0.8.5", - "redis", + "redis 0.24.0", "reqwest", "rust-embed", "serde", @@ -2278,6 +2300,17 @@ dependencies = [ "web-time 1.1.0", ] +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "libc", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -4614,6 +4647,28 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f0f6a8c53351d89a3869a703459995a0bcadcfa846002707fbc7e5cca235c4a" +dependencies = [ + "bytes", + "cfg-if", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.11" @@ -5209,6 +5264,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -5723,17 +5784,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.2" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/Cargo.toml b/Cargo.toml index 22bc98cd32c..56bd0987cec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "wasm-tests/webtransport-tests", + "wasm-tests/webtransport-tests", "gossipsub-tests", ] resolver = "2" diff --git a/Dockerfile.gossipsub-tests b/Dockerfile.gossipsub-tests new file mode 100644 index 00000000000..dd84f487884 --- /dev/null +++ b/Dockerfile.gossipsub-tests @@ -0,0 +1,13 @@ +FROM rust:1.88-bullseye AS builder +WORKDIR /usr/src/libp2p + +RUN apt-get update + +COPY . . + +RUN cd ./gossipsub-tests && cargo build --release + +FROM debian:bullseye-slim +COPY --from=builder /usr/src/libp2p/target/release/gossipsub-tests /usr/local/bin/gossipsub-tests + +ENTRYPOINT ["gossipsub-tests"] diff --git a/gossipsub-tests/Cargo.toml b/gossipsub-tests/Cargo.toml new file mode 100644 index 00000000000..0f3a74dd7ab --- /dev/null +++ b/gossipsub-tests/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "gossipsub-tests" +version = "0.1.0" +edition = "2021" + +[dependencies] +if-addrs = "0.14" +libp2p = { path = "../libp2p", default-features = false, features = ["dns", "tcp", "tokio", "gossipsub", "ed25519", "noise", "yamux"] } +redis = { version = "0.32", features = ["tokio-comp"] } +tokio = { version = "1.46", features = [ "macros"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } \ No newline at end of file diff --git a/gossipsub-tests/README.md b/gossipsub-tests/README.md new file mode 100644 index 00000000000..91c768ae95f --- /dev/null +++ b/gossipsub-tests/README.md @@ -0,0 +1,24 @@ +# gossipsub-tests + +A test suite for verifying the behavior of gossipsub protocol. + +## Build and Run + +**Note: All commands below should be run from the rust-libp2p repository root directory.** + +Build the Docker image: +```bash +docker build -f Dockerfile.gossipsub-tests -t gossipsub-tests . +``` + +Run `subscribe` test case using Kurtosis: +```bash +# Run `subscribe` test +kurtosis run -v brief --enclave gossipsub github.com/ackintosh/gossipsub-package --args-file gossipsub-tests/subscribe.yaml + +# Inspect the running enclave and node status +kurtosis enclave inspect gossipsub + +# Export logs for analysis +kurtosis dump gossipsub-test-logs +``` diff --git a/gossipsub-tests/src/addr.rs b/gossipsub-tests/src/addr.rs new file mode 100644 index 00000000000..4353e513a08 --- /dev/null +++ b/gossipsub-tests/src/addr.rs @@ -0,0 +1,28 @@ +use libp2p::multiaddr::Protocol; +use libp2p::Multiaddr; + +/// Creates a listener multiaddress for the local machine with the specified TCP port. +pub(crate) fn get_listener_addr(port: u16) -> Result { + let mut addr = get_local_multiaddr()?; + addr.push(Protocol::Tcp(port)); + Ok(addr) +} + +/// Gets the local machine's non-loopback IPv4 address as a multiaddress. +fn get_local_multiaddr() -> Result { + let Ok(ifs) = if_addrs::get_if_addrs() else { + return Err("Failed to get interfaces."); + }; + + let addrs = ifs + .iter() + .filter(|i| !i.addr.is_loopback() && i.addr.ip().is_ipv4()) + .map(|i| Multiaddr::try_from(i.addr.ip()).unwrap()) + .collect::>(); + + if addrs.len() > 1 { + return Err("Failed to get interfaces."); + } + + Ok(addrs[0].clone()) +} diff --git a/gossipsub-tests/src/main.rs b/gossipsub-tests/src/main.rs new file mode 100644 index 00000000000..a1972f5f70e --- /dev/null +++ b/gossipsub-tests/src/main.rs @@ -0,0 +1,84 @@ +use crate::addr::get_listener_addr; +use crate::node::collect_addr; +use crate::redis::RedisClient; +use crate::subscribe::subscribe; +use libp2p::Multiaddr; +use tracing::info; + +mod addr; +mod node; +mod redis; +mod subscribe; + +#[tokio::main] +async fn main() { + if let Ok(env_filter) = tracing_subscriber::EnvFilter::try_from_default_env() { + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + } + + let context = Context::new().await; + info!("Context:"); + info!(" test_name: {}", context.test_name); + info!(" node_count: {}", context.node_count); + info!(" node_index: {}", context.node_index); + info!(" local_addr: {}", context.local_addr); + info!(" participants:"); + for (i, participant) in context.participants.iter().enumerate() { + info!(" [{i}]: {participant}"); + } + + // Execute the specified test scenario + match context.test_name.as_str() { + "subscribe" => subscribe(context).await, + test_name => unreachable!("Unknown test name: {test_name}"), + }; +} + +/// Test execution context containing configuration and coordination resources. +/// +/// Holds all necessary information for a test node including Redis connection +/// for inter-node coordination, network addresses, and test parameters. +struct Context { + test_name: String, + redis: RedisClient, + local_addr: Multiaddr, + participants: Vec, + node_count: u64, + node_index: u64, +} + +impl Context { + async fn new() -> Self { + let redis_host = std::env::var("REDIS_HOST").expect("REDIS_HOST must be set"); + let redis_port = std::env::var("REDIS_PORT") + .expect("REDIS_PORT must be set") + .parse() + .expect("REDIS_PORT must be a valid number"); + let mut redis = RedisClient::new(redis_host, redis_port) + .await + .expect("Connect to redis"); + + let node_count: u64 = std::env::var("NODE_COUNT") + .expect("NODE_COUNT must be set") + .parse() + .expect("NODE_COUNT must be a valid number"); + + let local_addr = get_listener_addr(9000).unwrap(); + let participants = collect_addr(&mut redis, local_addr.clone(), node_count as usize).await; + + Context { + test_name: std::env::var("TEST_NAME").expect("TEST_NAME must be set"), + redis, + local_addr, + participants, + node_count: std::env::var("NODE_COUNT") + .expect("NODE_COUNT must be set") + .parse() + .expect("NODE_COUNT must be a valid number"), + node_index: std::env::var("NODE_INDEX") + .expect("NODE_INDEX must be set") + .parse() + .expect("NODE_INDEX must be a valid number"), + } + } +} diff --git a/gossipsub-tests/src/node.rs b/gossipsub-tests/src/node.rs new file mode 100644 index 00000000000..b30d3f384d1 --- /dev/null +++ b/gossipsub-tests/src/node.rs @@ -0,0 +1,140 @@ +use crate::redis::RedisClient; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::futures::{FutureExt, StreamExt}; +use libp2p::gossipsub::{ + AllowAllSubscriptionFilter, Behaviour, Config, Event, IdentityTransform, MessageAuthenticity, +}; +use libp2p::identity::{Keypair, PeerId}; +use libp2p::swarm::SwarmEvent; +use libp2p::{Multiaddr, Swarm, Transport}; +use std::time::Duration; +use tracing::{debug, info}; + +/// Creates a gossipsub behavior for testing. +pub(crate) fn gossipsub(keypair: Keypair) -> Result { + Behaviour::new_with_subscription_filter_and_transform( + MessageAuthenticity::Signed(keypair), + Config::default(), + AllowAllSubscriptionFilter {}, + IdentityTransform {}, + ) +} + +/// Creates a configured libp2p transport stack for gossipsub testing. +pub(crate) fn transport( + keypair: &Keypair, +) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> { + let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)); + let transport = libp2p::dns::tokio::Transport::system(tcp) + .expect("DNS") + .boxed(); + + transport + .upgrade(libp2p::core::upgrade::Version::V1) + .authenticate( + libp2p::noise::Config::new(keypair) + .expect("signing can fail only once during starting a node"), + ) + .multiplex(libp2p::yamux::Config::default()) + .timeout(Duration::from_secs(20)) + .boxed() +} + +/// Synchronizes test nodes and drives the swarm until all nodes reach a barrier. +/// +/// This function processes swarm events while waiting for all test nodes to signal +/// completion of a specific test phase using Redis coordination. +/// +/// # Arguments +/// * `key` - Redis key for this synchronization barrier +/// * `swarm` - The libp2p swarm to drive +/// * `redis` - Redis client for inter-node coordination +/// * `target` - Number of nodes that must signal before proceeding +/// +/// # Returns +/// All swarm events that occurred during the synchronization period +pub(crate) async fn barrier_and_drive_swarm( + key: &str, + swarm: &mut Swarm, + redis: &mut RedisClient, + target: u64, +) -> Vec> { + let swarm_events = swarm + .take_until(redis.signal_and_wait(key, target).boxed_local()) + .collect::>() + .await; + + debug!("[Swarm events] {swarm_events:?}"); + swarm_events +} + +/// Collects addresses from all test nodes for network setup. +/// +/// Each node adds its address to a shared Redis list and waits until +/// all participating nodes have registered their addresses. +pub(crate) async fn collect_addr( + redis: &mut RedisClient, + local_addr: Multiaddr, + target: usize, +) -> Vec { + let key = "collect_addr"; + redis.push(key, local_addr.to_string()).await; + + let mut list = redis.list(key).await; + loop { + if list.len() >= target { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + list = redis.list(key).await; + } + + list.iter() + .map(|s| Multiaddr::try_from(s.as_str()).unwrap()) + .collect() +} + +/// Ensures all connected peers have subscribed to the topic. +/// This function counts subscription events from previously received events +/// and waits for additional subscription events until the target number is reached. +/// +/// Note: Currently only supports checking for a single topic. +pub(crate) async fn ensure_all_peers_subscribed( + swarm: &mut Swarm, + received_events: &Vec>, + target: usize, +) { + let mut num_subscribed = 0; + received_events.iter().for_each(|event| { + if let SwarmEvent::Behaviour(Event::Subscribed { peer_id, topic }) = event { + info!( + "Peer {} successfully subscribed to topic '{}'", + peer_id, topic + ); + num_subscribed += 1; + } + }); + + if num_subscribed == target { + return; + } + + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event { + Event::Subscribed { peer_id, topic } => { + info!( + "Peer {} successfully subscribed to topic '{}'", + peer_id, topic + ); + num_subscribed += 1; + if num_subscribed == target { + return; + } + } + _ => unreachable!(), + }, + event => debug!("[Swarm event] {event:?}"), + } + } +} diff --git a/gossipsub-tests/src/redis.rs b/gossipsub-tests/src/redis.rs new file mode 100644 index 00000000000..17f9961a622 --- /dev/null +++ b/gossipsub-tests/src/redis.rs @@ -0,0 +1,43 @@ +use redis::aio::MultiplexedConnection; +use redis::{AsyncCommands, Client, RedisError}; +use std::time::Duration; + +/// Redis client wrapper for test coordination between multiple nodes. +pub(crate) struct RedisClient { + inner: MultiplexedConnection, +} + +impl RedisClient { + /// Creates a new Redis client connection. + pub(crate) async fn new(host: String, port: u16) -> Result { + let client = Client::open(format!("redis://{host}:{port}/"))?; + let connection = client.get_multiplexed_async_connection().await?; + + Ok(RedisClient { inner: connection }) + } + + /// Increments a counter and waits until it reaches the target value. + /// + /// Used for synchronizing multiple test nodes at specific checkpoints. + pub(crate) async fn signal_and_wait(&mut self, key: &str, target: u64) { + let mut count: u64 = self.inner.incr(key, 1_u64).await.unwrap(); + + loop { + if count >= target { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + count = self.inner.get(key).await.unwrap(); + } + } + + /// Pushes a string to the end of a Redis list. + pub(crate) async fn push(&mut self, key: &str, value: String) { + let _: () = self.inner.rpush(key, value).await.unwrap(); + } + + /// Retrieves all elements from a Redis list. + pub(crate) async fn list(&mut self, key: &str) -> Vec { + self.inner.lrange(key, 0, -1).await.unwrap() + } +} diff --git a/gossipsub-tests/src/subscribe.rs b/gossipsub-tests/src/subscribe.rs new file mode 100644 index 00000000000..81a9cefbb6f --- /dev/null +++ b/gossipsub-tests/src/subscribe.rs @@ -0,0 +1,143 @@ +use crate::node::{barrier_and_drive_swarm, ensure_all_peers_subscribed, gossipsub, transport}; +use crate::Context; +use libp2p::core::ConnectedPoint; +use libp2p::futures::StreamExt; +use libp2p::gossipsub::IdentTopic; +use libp2p::swarm::SwarmEvent; +use libp2p::SwarmBuilder; +use tracing::{debug, info, warn}; + +/// Tests the gossipsub subscription mechanism across multiple nodes. +/// +/// Verifies that nodes can successfully connect to each other, subscribe to a topic, +/// and form a proper mesh network for gossipsub message distribution. +pub(crate) async fn subscribe(mut context: Context) { + let mut swarm_events = vec![]; + let is_primary_node = context.node_index == 0; + info!("is_primary_node: {is_primary_node}"); + let keypair = libp2p::identity::Keypair::generate_ed25519(); + + // //////////////////////////////////////////////////////////////////////// + // Start libp2p + // //////////////////////////////////////////////////////////////////////// + info!("Starting libp2p swarm initialization"); + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_other_transport(|key| transport(key)) + .expect("infallible") + .with_behaviour(|key| gossipsub(key.clone()).expect("Valid configuration")) + .expect("infallible") + .build(); + + swarm + .listen_on(context.local_addr.clone()) + .expect("Swarm starts listening"); + + swarm_events.extend( + barrier_and_drive_swarm( + "Start libp2p", + &mut swarm, + &mut context.redis, + context.node_count, + ) + .await, + ); + info!("libp2p swarm initialization completed"); + + // //////////////////////////////////////////////////////////////////////// + // Connect to nodes + // //////////////////////////////////////////////////////////////////////// + if is_primary_node { + let nodes = context + .participants + .iter() + .filter(|&p| p != &context.local_addr) + .collect::>(); + nodes.iter().for_each(|&p| { + swarm.dial(p.clone()).unwrap(); + }); + + // Ensure that a connection has been established. + let mut connected_nodes = 0; + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { + peer_id: _, + endpoint, + .. + } => match endpoint { + ConnectedPoint::Dialer { address, .. } => { + info!("Successfully connected to peer at {}", address); + connected_nodes += 1; + } + event => debug!("[Swarm event] {event:?}"), + }, + event => debug!("[Swarm event] {event:?}"), + } + if connected_nodes == nodes.len() { + info!( + "Primary node successfully connected to all {} peer nodes", + nodes.len() + ); + break; + } + } + } + + swarm_events.extend( + barrier_and_drive_swarm( + "Connect to nodes", + &mut swarm, + &mut context.redis, + context.node_count, + ) + .await, + ); + + let all_peers: Vec<_> = swarm.behaviour().all_peers().collect(); + let all_peers_count = all_peers.len(); + info!("All connected peers (total: {}):", all_peers_count); + for (index, peer) in all_peers.iter().enumerate() { + info!(" [{index}]: {peer:?}"); + } + + // //////////////////////////////////////////////////////////////////////// + // Subscribe to a topic + // //////////////////////////////////////////////////////////////////////// + info!("Starting topic subscription process"); + let topic = IdentTopic::new("test_subscribe"); + if !swarm.behaviour_mut().subscribe(&topic).unwrap() { + warn!( + "Already subscribed to topic '{}', subscription request ignored", + topic + ); + } + + // Wait for all connected peers to be subscribed. + ensure_all_peers_subscribed(&mut swarm, &swarm_events, all_peers_count).await; + + barrier_and_drive_swarm( + "Subscribe to a topic", + &mut swarm, + &mut context.redis, + context.node_count, + ) + .await; + info!("Topic subscription process completed"); + + // //////////////////////////////////////////////////////////////////////// + // Assertions + // //////////////////////////////////////////////////////////////////////// + let mesh_peers = swarm.behaviour().mesh_peers(&topic.hash()); + assert!(mesh_peers.count() > 0, "should have mesh peers."); + + barrier_and_drive_swarm( + "End of test", + &mut swarm, + &mut context.redis, + context.node_count, + ) + .await; + + info!("Subscribe test completed successfully"); +} diff --git a/gossipsub-tests/subscribe.yaml b/gossipsub-tests/subscribe.yaml new file mode 100644 index 00000000000..a42b1cb6141 --- /dev/null +++ b/gossipsub-tests/subscribe.yaml @@ -0,0 +1,5 @@ +test_name: subscribe +log_level: debug +participants: + - image: gossipsub-tests:latest + count: 21