diff --git a/Cargo.lock b/Cargo.lock index 79bb2c68aef..8c7c691a8ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2067,6 +2067,7 @@ dependencies = [ "libp2p", "metrics-exporter-prometheus", "rstest", + "serde", "tokio", "tokio-metrics", "tracing", diff --git a/crates/apollo_network_benchmark/Cargo.toml b/crates/apollo_network_benchmark/Cargo.toml index 5795f33309b..5b3a0c23e25 100644 --- a/crates/apollo_network_benchmark/Cargo.toml +++ b/crates/apollo_network_benchmark/Cargo.toml @@ -15,6 +15,7 @@ futures.workspace = true lazy_static.workspace = true libp2p = { workspace = true, features = ["identify"] } metrics-exporter-prometheus.workspace = true +serde.workspace = true tokio = { workspace = true, features = ["full", "sync"] } tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] } tracing.workspace = true diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs index 2fb7f86e139..91846d89c9a 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs @@ -11,6 +11,7 @@ use tracing::Level; mod message_test; mod message; +mod protocol; mod stress_test_node; use apollo_network_benchmark::node_args::NodeArgs; diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs new file mode 100644 index 00000000000..0bca5a5455d --- /dev/null +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs @@ -0,0 +1,95 @@ +// TODO(AndrewL): Remove this once the sender and receiver are used +#![allow(dead_code)] + +use apollo_network::network_manager::{ + BroadcastTopicChannels, + BroadcastTopicClient, + BroadcastTopicClientTrait, + BroadcastTopicServer, + NetworkManager, +}; +use apollo_network_benchmark::node_args::NetworkProtocol; +use futures::StreamExt; +use libp2p::gossipsub::{Sha256Topic, Topic}; +use libp2p::PeerId; + +// ================================ +// Types and Constants +// ================================ + +lazy_static::lazy_static! { + pub static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string()); +} + +pub type TopicType = Vec; + +/// Registers protocol channels on an existing network manager. +/// Returns a sender and receiver for the configured protocol. +pub fn register_protocol_channels( + network_manager: &mut NetworkManager, + buffer_size: usize, + protocol: &NetworkProtocol, +) -> (MessageSender, MessageReceiver) { + match protocol { + NetworkProtocol::Gossipsub => { + let channels = network_manager + .register_broadcast_topic::(TOPIC.clone(), buffer_size) + .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } = + channels; + + ( + MessageSender::Gossipsub(broadcast_topic_client), + MessageReceiver::Gossipsub(broadcasted_messages_receiver), + ) + } + } +} + +// ================================ +// MessageSender +// ================================ + +/// Message sender abstraction for different protocols +pub enum MessageSender { + Gossipsub(BroadcastTopicClient), +} + +impl MessageSender { + pub async fn send_message(&mut self, _peers: &[PeerId], message: TopicType) { + match self { + MessageSender::Gossipsub(client) => { + client.broadcast_message(message).await.unwrap(); + } + } + } +} + +// ================================ +// MessageReceiver +// ================================ + +pub enum MessageReceiver { + Gossipsub(BroadcastTopicServer), +} + +impl MessageReceiver { + pub async fn for_each(self, mut f: F) + where + F: FnMut(TopicType, Option) + Copy, + { + match self { + MessageReceiver::Gossipsub(receiver) => { + receiver + .for_each(|message| async move { + let (payload_opt, meta) = message; + let peer_id = meta.originator_id.private_get_peer_id(); + let payload = + payload_opt.expect("Broadcasted message should contain payload"); + f(payload, Some(peer_id)); + }) + .await + } + } + } +} diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs index 20b0b46bc31..171dee600ee 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs @@ -10,10 +10,18 @@ use libp2p::Multiaddr; use tokio::task::JoinHandle; use tracing::{info, warn}; +use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender}; + /// The main stress test node that manages network communication and monitoring pub struct BroadcastNetworkStressTestNode { args: NodeArgs, network_manager: Option, + // TODO(AndrewL): Remove this once they are used + #[allow(dead_code)] + message_sender: Option, + // TODO(AndrewL): Remove this once they are used + #[allow(dead_code)] + message_receiver: Option, } impl BroadcastNetworkStressTestNode { @@ -47,9 +55,22 @@ impl BroadcastNetworkStressTestNode { pub async fn new(args: NodeArgs) -> Self { // Create network configuration let network_config = Self::create_network_config(&args); + // Create network manager - let network_manager = NetworkManager::new(network_config, None, None); - Self { args, network_manager: Some(network_manager) } + let mut network_manager = NetworkManager::new(network_config, None, None); + + // Register protocol channels + let (message_sender, message_receiver) = register_protocol_channels( + &mut network_manager, + args.user.buffer_size, + &args.user.network_protocol, + ); + Self { + args, + network_manager: Some(network_manager), + message_sender: Some(message_sender), + message_receiver: Some(message_receiver), + } } /// Starts the network manager in the background diff --git a/crates/apollo_network_benchmark/src/node_args.rs b/crates/apollo_network_benchmark/src/node_args.rs index c0a6d374115..7bdd930eba8 100644 --- a/crates/apollo_network_benchmark/src/node_args.rs +++ b/crates/apollo_network_benchmark/src/node_args.rs @@ -1,4 +1,20 @@ -use clap::Parser; +use std::fmt::Display; + +use clap::{Parser, ValueEnum}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, ValueEnum, PartialEq, Eq, Serialize, Deserialize)] +pub enum NetworkProtocol { + /// Use gossipsub for broadcasting (default) + #[value(name = "gossipsub")] + Gossipsub, +} + +impl Display for NetworkProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.to_possible_value().unwrap().get_name()) + } +} #[derive(Parser, Debug, Clone)] #[command(version, about, long_about = None)] @@ -29,6 +45,14 @@ pub struct UserArgs { #[arg(short, long, env, default_value = "2")] pub verbosity: u8, + /// Buffer size for the broadcast topic + #[arg(long, env, default_value = "100000")] + pub buffer_size: usize, + + /// The network protocol to use for communication (default: gossipsub) + #[arg(long, env, default_value = "gossipsub")] + pub network_protocol: NetworkProtocol, + /// The timeout in seconds for the node. /// When the node runs for longer than this, it will be killed. #[arg(long, env, default_value = "4000")]