@@ -26,7 +26,7 @@ use linera_sdk::abis::fungible;
2626use linera_storage:: Storage ;
2727use num_format:: { Locale , ToFormattedString } ;
2828use prometheus_parse:: { HistogramCount , Scrape , Value } ;
29- use tokio:: { runtime:: Handle , task, time} ;
29+ use tokio:: { runtime:: Handle , sync :: mpsc , task, time} ;
3030use tokio_util:: sync:: CancellationToken ;
3131use tracing:: { debug, error, info, warn, Instrument as _} ;
3232
@@ -38,9 +38,9 @@ pub enum BenchmarkError {
3838 #[ error( "Proxy of validator {0} unhealthy! Latency p99 is too high: {1} ms" ) ]
3939 ProxyUnhealthy ( String , f64 ) ,
4040 #[ error( "Failed to send message: {0}" ) ]
41- SendError ( #[ from] crossbeam_channel:: SendError < ( ) > ) ,
41+ CrossbeamSendError ( #[ from] crossbeam_channel:: SendError < ( ) > ) ,
4242 #[ error( "Failed to join task: {0}" ) ]
43- JoinError ( #[ from] tokio :: task:: JoinError ) ,
43+ JoinError ( #[ from] task:: JoinError ) ,
4444 #[ error( "Failed to parse validator metrics port: {0}" ) ]
4545 ParseValidatorMetricsPort ( #[ from] std:: num:: ParseIntError ) ,
4646 #[ error( "Failed to parse validator metrics address: {0}" ) ]
@@ -73,6 +73,8 @@ pub enum BenchmarkError {
7373 NoDataYetForP99Calculation ,
7474 #[ error( "Unexpected empty bucket" ) ]
7575 UnexpectedEmptyBucket ,
76+ #[ error( "Failed to send message: {0}" ) ]
77+ TokioSendError ( #[ from] mpsc:: error:: SendError < ( ) > ) ,
7678}
7779
7880#[ derive( Debug ) ]
@@ -120,7 +122,7 @@ where
120122 // the desired BPS, the tasks would continue sending block proposals until the channel's
121123 // buffer is filled, which would cause us to not properly control the BPS rate.
122124 let ( sender, receiver) = crossbeam_channel:: bounded ( 0 ) ;
123- let bps_control_task = tokio :: task:: spawn_blocking ( move || {
125+ let bps_control_task = task:: spawn_blocking ( move || {
124126 handle. block_on ( async move {
125127 let mut recv_count = 0 ;
126128 let mut start = time:: Instant :: now ( ) ;
@@ -167,6 +169,19 @@ where
167169 } )
168170 } ) ;
169171
172+ let ( bps_tasks_logger_sender, mut bps_tasks_logger_receiver) = mpsc:: channel ( num_chains) ;
173+ let bps_tasks_logger_task = task:: spawn ( async move {
174+ let mut tasks_running = 0 ;
175+ while let Some ( ( ) ) = bps_tasks_logger_receiver. recv ( ) . await {
176+ tasks_running += 1 ;
177+ info ! ( "{}/{} tasks running" , tasks_running, num_chains) ;
178+ if tasks_running == num_chains {
179+ info ! ( "All tasks are running" ) ;
180+ break ;
181+ }
182+ }
183+ } ) ;
184+
170185 let mut bps_remainder = bps. unwrap_or_default ( ) % num_chains;
171186 let bps_share = bps. map ( |bps| bps / num_chains) ;
172187
@@ -185,6 +200,7 @@ where
185200 let committee = committee. clone ( ) ;
186201 let local_node = local_node. clone ( ) ;
187202 let chain_client = chain_clients[ & chain_id] . clone ( ) ;
203+ let bps_tasks_logger_sender = bps_tasks_logger_sender. clone ( ) ;
188204 chain_client. process_inbox ( ) . await ?;
189205 join_set. spawn_blocking ( move || {
190206 handle. block_on (
@@ -199,6 +215,7 @@ where
199215 sender,
200216 committee,
201217 local_node,
218+ bps_tasks_logger_sender,
202219 ) )
203220 . await ?;
204221
@@ -225,6 +242,7 @@ where
225242 if let Some ( metrics_watcher) = metrics_watcher {
226243 metrics_watcher. await ??;
227244 }
245+ bps_tasks_logger_task. await ?;
228246
229247 Ok ( ( ) )
230248 }
@@ -485,12 +503,14 @@ where
485503 sender : crossbeam_channel:: Sender < ( ) > ,
486504 committee : Committee ,
487505 local_node : LocalNodeClient < S > ,
506+ bps_tasks_logger_sender : mpsc:: Sender < ( ) > ,
488507 ) -> Result < ( ) , BenchmarkError > {
489508 let chain_id = chain_client. chain_id ( ) ;
490509 info ! (
491510 "Starting benchmark at target BPS of {:?}, for chain {:?}" ,
492511 bps, chain_id
493512 ) ;
513+ bps_tasks_logger_sender. send ( ( ) ) . await ?;
494514 let cross_chain_message_delivery = chain_client. options ( ) . cross_chain_message_delivery ;
495515 let mut num_sent_proposals = 0 ;
496516 let authenticated_signer = Some ( AccountOwner :: from ( key_pair. public ( ) ) ) ;
0 commit comments