Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

use apollo_metrics::metrics::LossyIntoF64;
use apollo_network_benchmark::node_args::NodeArgs;
use libp2p::PeerId;

use crate::message::StressTestMessage;
use crate::message::{StressTestMessage, METADATA_SIZE};
use crate::metrics::{
RECEIVE_MESSAGE_BYTES,
RECEIVE_MESSAGE_BYTES_SUM,
RECEIVE_MESSAGE_COUNT,
RECEIVE_MESSAGE_DELAY_SECONDS,
};
use crate::protocol::MessageSender;

fn get_message(id: u64, size_bytes: usize) -> StressTestMessage {
let message = StressTestMessage::new(id, 0, vec![0; size_bytes - *METADATA_SIZE]);
assert_eq!(Vec::<u8>::from(message.clone()).len(), size_bytes);
message
}

/// Unified implementation for sending stress test messages via any protocol
pub async fn send_stress_test_messages(
mut message_sender: MessageSender,
args: &NodeArgs,
peers: Vec<PeerId>,
) {
let size_bytes = args.user.message_size_bytes;
let heartbeat = Duration::from_millis(args.user.heartbeat_millis);

let mut message_index = 0;
let mut message = get_message(args.runner.id, size_bytes).clone();

let mut interval = tokio::time::interval(heartbeat);
loop {
interval.tick().await;

message.metadata.time = SystemTime::now();
message.metadata.message_index = message_index;
let message_clone = message.clone().into();
message_sender.send_message(&peers, message_clone).await;
message_index += 1;
}
}

pub fn receive_stress_test_message(received_message: Vec<u8>, _sender_peer_id: Option<PeerId>) {
let end_time = SystemTime::now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::time::Duration;

use clap::Parser;
use message::METADATA_SIZE;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio_metrics::RuntimeMetricsReporterBuilder;
use tracing::Level;
Expand Down Expand Up @@ -40,6 +41,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Starting network stress test with args:\n{args:?}");

assert!(
args.user.message_size_bytes >= *METADATA_SIZE,
"Message size must be at least {} bytes",
*METADATA_SIZE
);

// Set up metrics
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use apollo_network::NetworkConfig;
use apollo_network_benchmark::node_args::NodeArgs;
use futures::future::{select_all, BoxFuture};
use futures::FutureExt;
use libp2p::Multiaddr;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::{Multiaddr, PeerId};
use tokio::task::JoinHandle;
use tracing::{info, warn};

use crate::handlers::receive_stress_test_message;
use crate::handlers::{receive_stress_test_message, send_stress_test_messages};
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};

/// The main stress test node that manages network communication and monitoring
pub struct BroadcastNetworkStressTestNode {
args: NodeArgs,
network_config: NetworkConfig,
network_manager: Option<NetworkManager>,
// TODO(AndrewL): Remove this once they are used
#[allow(dead_code)]
message_sender: Option<MessageSender>,
message_receiver: Option<MessageReceiver>,
}
Expand Down Expand Up @@ -56,7 +56,7 @@ impl BroadcastNetworkStressTestNode {
let network_config = Self::create_network_config(&args);

// Create network manager
let mut network_manager = NetworkManager::new(network_config, None, None);
let mut network_manager = NetworkManager::new(network_config.clone(), None, None);

// Register protocol channels
let (message_sender, message_receiver) = register_protocol_channels(
Expand All @@ -66,6 +66,7 @@ impl BroadcastNetworkStressTestNode {
);
Self {
args,
network_config,
network_manager: Some(network_manager),
message_sender: Some(message_sender),
message_receiver: Some(message_receiver),
Expand All @@ -82,6 +83,30 @@ impl BroadcastNetworkStressTestNode {
.boxed()
}

fn get_peers(&self) -> Vec<PeerId> {
self.network_config
.bootstrap_peer_multiaddr
.as_ref()
.map(|peers| {
peers.iter().map(|m| DialOpts::from(m.clone()).get_peer_id().unwrap()).collect()
})
.unwrap_or_default()
}

/// Starts the message sending task if this node should broadcast
pub async fn start_message_sender(&mut self) -> BoxFuture<'static, ()> {
let message_sender =
self.message_sender.take().expect("message_sender should be available");

let args_clone = self.args.clone();
let peers = self.get_peers();

async move {
send_stress_test_messages(message_sender, &args_clone, peers).await;
}
.boxed()
}

/// Starts the message receiving task
pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> {
let message_receiver =
Expand All @@ -100,6 +125,7 @@ impl BroadcastNetworkStressTestNode {
let mut tasks = Vec::new();
tasks.push(self.start_network_manager().await);
tasks.push(self.make_message_receiver_task().await);
tasks.push(self.start_message_sender().await);

tasks
}
Expand Down
8 changes: 8 additions & 0 deletions crates/apollo_network_benchmark/src/node_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub struct UserArgs {
#[arg(long, env, default_value = "gossipsub")]
pub network_protocol: NetworkProtocol,

/// Size of StressTestMessage
#[arg(long, env, default_value = "1024")]
pub message_size_bytes: usize,

/// The time to sleep between broadcasts of StressTestMessage in milliseconds
#[arg(long, env, default_value = "1000")]
pub heartbeat_millis: u64,

/// The timeout in seconds for the node.
/// When the node runs for longer than this, it will be killed.
#[arg(long, env, default_value = "4000")]
Expand Down
Loading