@@ -3,7 +3,7 @@ use std::time::Duration;
33
44use apollo_network:: network_manager:: NetworkManager ;
55use apollo_network:: NetworkConfig ;
6- use apollo_network_benchmark:: node_args:: NodeArgs ;
6+ use apollo_network_benchmark:: node_args:: { Mode , NodeArgs } ;
77use futures:: future:: { select_all, BoxFuture } ;
88use futures:: FutureExt ;
99use libp2p:: swarm:: dial_opts:: DialOpts ;
@@ -83,6 +83,22 @@ impl BroadcastNetworkStressTestNode {
8383 . boxed ( )
8484 }
8585
86+ /// Gets the broadcaster ID with validation for modes that require it
87+ fn get_broadcaster_id ( args : & NodeArgs ) -> u64 {
88+ args. user . broadcaster . expect ( "broadcaster required for OneBroadcast mode" )
89+ }
90+
91+ /// Determines if this node should broadcast messages based on the mode
92+ pub fn should_broadcast ( & self ) -> bool {
93+ match self . args . user . mode {
94+ Mode :: AllBroadcast => true ,
95+ Mode :: OneBroadcast => {
96+ let broadcaster_id = Self :: get_broadcaster_id ( & self . args ) ;
97+ self . args . runner . id == broadcaster_id
98+ }
99+ }
100+ }
101+
86102 fn get_peers ( & self ) -> Vec < PeerId > {
87103 self . network_config
88104 . bootstrap_peer_multiaddr
@@ -94,17 +110,29 @@ impl BroadcastNetworkStressTestNode {
94110 }
95111
96112 /// Starts the message sending task if this node should broadcast
97- pub async fn start_message_sender ( & mut self ) -> BoxFuture < ' static , ( ) > {
113+ pub async fn start_message_sender ( & mut self ) -> Option < BoxFuture < ' static , ( ) > > {
114+ if !self . should_broadcast ( ) {
115+ info ! (
116+ "Node {} will NOT broadcast in mode `{}`" ,
117+ self . args. runner. id, self . args. user. mode
118+ ) ;
119+ return None ;
120+ }
121+
122+ info ! ( "Node {} will broadcast in mode `{}`" , self . args. runner. id, self . args. user. mode) ;
123+
98124 let message_sender =
99125 self . message_sender . take ( ) . expect ( "message_sender should be available" ) ;
100126
101127 let args_clone = self . args . clone ( ) ;
102128 let peers = self . get_peers ( ) ;
103129
104- async move {
105- send_stress_test_messages ( message_sender, & args_clone, peers) . await ;
106- }
107- . boxed ( )
130+ Some (
131+ async move {
132+ send_stress_test_messages ( message_sender, & args_clone, peers) . await ;
133+ }
134+ . boxed ( ) ,
135+ )
108136 }
109137
110138 /// Starts the message receiving task
@@ -125,7 +153,10 @@ impl BroadcastNetworkStressTestNode {
125153 let mut tasks = Vec :: new ( ) ;
126154 tasks. push ( self . start_network_manager ( ) . await ) ;
127155 tasks. push ( self . make_message_receiver_task ( ) . await ) ;
128- tasks. push ( self . start_message_sender ( ) . await ) ;
156+
157+ if let Some ( sender_task) = self . start_message_sender ( ) . await {
158+ tasks. push ( sender_task) ;
159+ }
129160
130161 tasks
131162 }
0 commit comments