Skip to content

Commit e25a2b7

Browse files
apollo_network_benchmark: added message broadcasting task
1 parent b2e66cc commit e25a2b7

File tree

4 files changed

+80
-7
lines changed

4 files changed

+80
-7
lines changed

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,47 @@
1-
use std::time::SystemTime;
1+
use std::time::{Duration, SystemTime};
22

33
use apollo_metrics::metrics::LossyIntoF64;
4+
use apollo_network_benchmark::node_args::NodeArgs;
45
use libp2p::PeerId;
56

6-
use crate::message::StressTestMessage;
7+
use crate::message::{StressTestMessage, METADATA_SIZE};
78
use crate::metrics::{
89
RECEIVE_MESSAGE_BYTES,
910
RECEIVE_MESSAGE_BYTES_SUM,
1011
RECEIVE_MESSAGE_COUNT,
1112
RECEIVE_MESSAGE_DELAY_SECONDS,
1213
};
14+
use crate::protocol::MessageSender;
15+
16+
fn get_message(id: u64, size_bytes: usize) -> StressTestMessage {
17+
let message = StressTestMessage::new(id, 0, vec![0; size_bytes - *METADATA_SIZE]);
18+
assert_eq!(Vec::<u8>::from(message.clone()).len(), size_bytes);
19+
message
20+
}
21+
22+
/// Unified implementation for sending stress test messages via any protocol
23+
pub async fn send_stress_test_messages(
24+
mut message_sender: MessageSender,
25+
args: &NodeArgs,
26+
peers: Vec<PeerId>,
27+
) {
28+
let size_bytes = args.user.message_size_bytes;
29+
let heartbeat = Duration::from_millis(args.user.heartbeat_millis);
30+
31+
let mut message_index = 0;
32+
let mut message = get_message(args.runner.id, size_bytes).clone();
33+
34+
let mut interval = tokio::time::interval(heartbeat);
35+
loop {
36+
interval.tick().await;
37+
38+
message.metadata.time = SystemTime::now();
39+
message.metadata.message_index = message_index;
40+
let message_clone = message.clone().into();
41+
message_sender.send_message(&peers, message_clone).await;
42+
message_index += 1;
43+
}
44+
}
1345

1446
pub fn receive_stress_test_message(received_message: Vec<u8>, _sender_peer_id: Option<PeerId>) {
1547
let end_time = SystemTime::now();

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
33
use std::time::Duration;
44

55
use clap::Parser;
6+
use message::METADATA_SIZE;
67
use metrics_exporter_prometheus::PrometheusBuilder;
78
use tokio_metrics::RuntimeMetricsReporterBuilder;
89
use tracing::Level;
@@ -39,6 +40,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3940

4041
println!("Starting network stress test with args:\n{args:?}");
4142

43+
assert!(
44+
args.user.message_size_bytes >= *METADATA_SIZE,
45+
"Message size must be at least {} bytes",
46+
*METADATA_SIZE
47+
);
48+
4249
// Set up metrics
4350
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new(
4451
Ipv4Addr::UNSPECIFIED,

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@ use apollo_network::NetworkConfig;
66
use apollo_network_benchmark::node_args::NodeArgs;
77
use futures::future::{select_all, BoxFuture};
88
use futures::FutureExt;
9-
use libp2p::Multiaddr;
9+
use libp2p::swarm::dial_opts::DialOpts;
10+
use libp2p::{Multiaddr, PeerId};
1011
use tokio::task::JoinHandle;
1112
use tracing::{info, warn};
1213

13-
use crate::handlers::receive_stress_test_message;
14+
use crate::handlers::{receive_stress_test_message, send_stress_test_messages};
1415
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};
1516

1617
/// The main stress test node that manages network communication and monitoring
1718
pub struct BroadcastNetworkStressTestNode {
1819
args: NodeArgs,
20+
network_config: NetworkConfig,
1921
network_manager: Option<NetworkManager>,
20-
// TODO(AndrewL): Remove this once they are used
21-
#[allow(dead_code)]
2222
message_sender: Option<MessageSender>,
2323
message_receiver: Option<MessageReceiver>,
2424
}
@@ -56,7 +56,7 @@ impl BroadcastNetworkStressTestNode {
5656
let network_config = Self::create_network_config(&args);
5757

5858
// Create network manager
59-
let mut network_manager = NetworkManager::new(network_config, None, None);
59+
let mut network_manager = NetworkManager::new(network_config.clone(), None, None);
6060

6161
// Register protocol channels
6262
let (message_sender, message_receiver) = register_protocol_channels(
@@ -66,6 +66,7 @@ impl BroadcastNetworkStressTestNode {
6666
);
6767
Self {
6868
args,
69+
network_config,
6970
network_manager: Some(network_manager),
7071
message_sender: Some(message_sender),
7172
message_receiver: Some(message_receiver),
@@ -82,6 +83,30 @@ impl BroadcastNetworkStressTestNode {
8283
.boxed()
8384
}
8485

86+
fn get_peers(&self) -> Vec<PeerId> {
87+
self.network_config
88+
.bootstrap_peer_multiaddr
89+
.as_ref()
90+
.map(|peers| {
91+
peers.iter().map(|m| DialOpts::from(m.clone()).get_peer_id().unwrap()).collect()
92+
})
93+
.unwrap_or_default()
94+
}
95+
96+
/// Starts the message sending task if this node should broadcast
97+
pub async fn start_message_sender(&mut self) -> BoxFuture<'static, ()> {
98+
let message_sender =
99+
self.message_sender.take().expect("message_sender should be available");
100+
101+
let args_clone = self.args.clone();
102+
let peers = self.get_peers();
103+
104+
async move {
105+
send_stress_test_messages(message_sender, &args_clone, peers).await;
106+
}
107+
.boxed()
108+
}
109+
85110
/// Starts the message receiving task
86111
pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> {
87112
let message_receiver =
@@ -100,6 +125,7 @@ impl BroadcastNetworkStressTestNode {
100125
let mut tasks = Vec::new();
101126
tasks.push(self.start_network_manager().await);
102127
tasks.push(self.make_message_receiver_task().await);
128+
tasks.push(self.start_message_sender().await);
103129

104130
tasks
105131
}

crates/apollo_network_benchmark/src/node_args.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ pub struct UserArgs {
5353
#[arg(long, env, default_value = "gossipsub")]
5454
pub network_protocol: NetworkProtocol,
5555

56+
/// Size of StressTestMessage
57+
#[arg(long, env, default_value = "1024")]
58+
pub message_size_bytes: usize,
59+
60+
/// The time to sleep between broadcasts of StressTestMessage in milliseconds
61+
#[arg(long, env, default_value = "1000")]
62+
pub heartbeat_millis: u64,
63+
5664
/// The timeout in seconds for the node.
5765
/// When the node runs for longer than this, it will be killed.
5866
#[arg(long, env, default_value = "4000")]

0 commit comments

Comments
 (0)