Skip to content

Commit 3c87686

Browse files
apollo_network_benchmark: added conditional broadcasting based on mode
1 parent 1165b86 commit 3c87686

File tree

1 file changed

+38
-7
lines changed

1 file changed

+38
-7
lines changed

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
use apollo_network::network_manager::NetworkManager;
55
use apollo_network::NetworkConfig;
6-
use apollo_network_benchmark::node_args::NodeArgs;
6+
use apollo_network_benchmark::node_args::{Mode, NodeArgs};
77
use futures::future::{select_all, BoxFuture};
88
use futures::FutureExt;
99
use libp2p::swarm::dial_opts::DialOpts;
@@ -83,6 +83,22 @@ impl BroadcastNetworkStressTestNode {
8383
.boxed()
8484
}
8585

86+
/// Gets the broadcaster ID with validation for modes that require it
87+
fn get_broadcaster_id(args: &NodeArgs) -> u64 {
88+
args.user.broadcaster.expect("broadcaster required for OneBroadcast mode")
89+
}
90+
91+
/// Determines if this node should broadcast messages based on the mode
92+
pub fn should_broadcast(&self) -> bool {
93+
match self.args.user.mode {
94+
Mode::AllBroadcast => true,
95+
Mode::OneBroadcast => {
96+
let broadcaster_id = Self::get_broadcaster_id(&self.args);
97+
self.args.runner.id == broadcaster_id
98+
}
99+
}
100+
}
101+
86102
fn get_peers(&self) -> Vec<PeerId> {
87103
self.network_config
88104
.bootstrap_peer_multiaddr
@@ -94,17 +110,29 @@ impl BroadcastNetworkStressTestNode {
94110
}
95111

96112
/// Starts the message sending task if this node should broadcast
97-
pub async fn start_message_sender(&mut self) -> BoxFuture<'static, ()> {
113+
pub async fn start_message_sender(&mut self) -> Option<BoxFuture<'static, ()>> {
114+
if !self.should_broadcast() {
115+
info!(
116+
"Node {} will NOT broadcast in mode `{}`",
117+
self.args.runner.id, self.args.user.mode
118+
);
119+
return None;
120+
}
121+
122+
info!("Node {} will broadcast in mode `{}`", self.args.runner.id, self.args.user.mode);
123+
98124
let message_sender =
99125
self.message_sender.take().expect("message_sender should be available");
100126

101127
let args_clone = self.args.clone();
102128
let peers = self.get_peers();
103129

104-
async move {
105-
send_stress_test_messages(message_sender, &args_clone, peers).await;
106-
}
107-
.boxed()
130+
Some(
131+
async move {
132+
send_stress_test_messages(message_sender, &args_clone, peers).await;
133+
}
134+
.boxed(),
135+
)
108136
}
109137

110138
/// Starts the message receiving task
@@ -125,7 +153,10 @@ impl BroadcastNetworkStressTestNode {
125153
let mut tasks = Vec::new();
126154
tasks.push(self.start_network_manager().await);
127155
tasks.push(self.make_message_receiver_task().await);
128-
tasks.push(self.start_message_sender().await);
156+
157+
if let Some(sender_task) = self.start_message_sender().await {
158+
tasks.push(sender_task);
159+
}
129160

130161
tasks
131162
}

0 commit comments

Comments
 (0)