Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/apollo_network_benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ futures.workspace = true
lazy_static.workspace = true
libp2p = { workspace = true, features = ["identify"] }
metrics-exporter-prometheus.workspace = true
serde.workspace = true
tokio = { workspace = true, features = ["full", "sync"] }
tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] }
tracing.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tracing::Level;
mod message_test;

mod message;
mod protocol;
mod stress_test_node;

use apollo_network_benchmark::node_args::NodeArgs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// TODO(AndrewL): Remove this once the sender and receiver are used
#![allow(dead_code)]

use apollo_network::network_manager::{
BroadcastTopicChannels,
BroadcastTopicClient,
BroadcastTopicClientTrait,
BroadcastTopicServer,
NetworkManager,
};
use apollo_network_benchmark::node_args::NetworkProtocol;
use futures::StreamExt;
use libp2p::gossipsub::{Sha256Topic, Topic};
use libp2p::PeerId;

// ================================
// Types and Constants
// ================================

lazy_static::lazy_static! {
pub static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string());
}

pub type TopicType = Vec<u8>;

/// Registers protocol channels on an existing network manager.
/// Returns a sender and receiver for the configured protocol.
pub fn register_protocol_channels(
network_manager: &mut NetworkManager,
buffer_size: usize,
protocol: &NetworkProtocol,
) -> (MessageSender, MessageReceiver) {
match protocol {
NetworkProtocol::Gossipsub => {
let channels = network_manager
.register_broadcast_topic::<TopicType>(TOPIC.clone(), buffer_size)
.expect("Failed to register broadcast topic");
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
channels;

(
MessageSender::Gossipsub(broadcast_topic_client),
MessageReceiver::Gossipsub(broadcasted_messages_receiver),
)
}
}
}

// ================================
// MessageSender
// ================================

/// Message sender abstraction for different protocols
pub enum MessageSender {
Gossipsub(BroadcastTopicClient<TopicType>),
}

impl MessageSender {
pub async fn send_message(&mut self, _peers: &[PeerId], message: TopicType) {
match self {
MessageSender::Gossipsub(client) => {
client.broadcast_message(message).await.unwrap();
}
}
}
}

// ================================
// MessageReceiver
// ================================

pub enum MessageReceiver {
Gossipsub(BroadcastTopicServer<TopicType>),
}

impl MessageReceiver {
pub async fn for_each<F>(self, mut f: F)
where
F: FnMut(TopicType, Option<PeerId>) + Copy,
{
match self {
MessageReceiver::Gossipsub(receiver) => {
receiver
.for_each(|message| async move {
let (payload_opt, meta) = message;
let peer_id = meta.originator_id.private_get_peer_id();
let payload =
payload_opt.expect("Broadcasted message should contain payload");
f(payload, Some(peer_id));
})
.await
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ use libp2p::Multiaddr;
use tokio::task::JoinHandle;
use tracing::{info, warn};

use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};

/// The main stress test node that manages network communication and monitoring
pub struct BroadcastNetworkStressTestNode {
args: NodeArgs,
network_manager: Option<NetworkManager>,
// TODO(AndrewL): Remove this once they are used
#[allow(dead_code)]
message_sender: Option<MessageSender>,
// TODO(AndrewL): Remove this once they are used
#[allow(dead_code)]
message_receiver: Option<MessageReceiver>,
}

impl BroadcastNetworkStressTestNode {
Expand Down Expand Up @@ -47,9 +55,22 @@ impl BroadcastNetworkStressTestNode {
pub async fn new(args: NodeArgs) -> Self {
// Create network configuration
let network_config = Self::create_network_config(&args);

// Create network manager
let network_manager = NetworkManager::new(network_config, None, None);
Self { args, network_manager: Some(network_manager) }
let mut network_manager = NetworkManager::new(network_config, None, None);

// Register protocol channels
let (message_sender, message_receiver) = register_protocol_channels(
&mut network_manager,
args.user.buffer_size,
&args.user.network_protocol,
);
Self {
args,
network_manager: Some(network_manager),
message_sender: Some(message_sender),
message_receiver: Some(message_receiver),
}
}

/// Starts the network manager in the background
Expand Down
26 changes: 25 additions & 1 deletion crates/apollo_network_benchmark/src/node_args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
use clap::Parser;
use std::fmt::Display;

use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, ValueEnum, PartialEq, Eq, Serialize, Deserialize)]
pub enum NetworkProtocol {
/// Use gossipsub for broadcasting (default)
#[value(name = "gossipsub")]
Gossipsub,
}

impl Display for NetworkProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_possible_value().unwrap().get_name())
}
}

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -29,6 +45,14 @@ pub struct UserArgs {
#[arg(short, long, env, default_value = "2")]
pub verbosity: u8,

/// Buffer size for the broadcast topic
#[arg(long, env, default_value = "100000")]
pub buffer_size: usize,

/// The network protocol to use for communication (default: gossipsub)
#[arg(long, env, default_value = "gossipsub")]
pub network_protocol: NetworkProtocol,

/// The timeout in seconds for the node.
/// When the node runs for longer than this, it will be killed.
#[arg(long, env, default_value = "4000")]
Expand Down
Loading