11// Copyright (c) Zefchain Labs, Inc.
22// SPDX-License-Identifier: Apache-2.0
33
4- use std:: { collections:: HashMap , iter} ;
4+ use std:: { collections:: HashMap , iter, sync :: Arc } ;
55
66use linera_base:: {
77 crypto:: { AccountPublicKey , AccountSecretKey } ,
@@ -25,7 +25,11 @@ use linera_sdk::abis::fungible;
2525use linera_storage:: Storage ;
2626use num_format:: { Locale , ToFormattedString } ;
2727use prometheus_parse:: { HistogramCount , Scrape , Value } ;
28- use tokio:: { runtime:: Handle , sync:: mpsc, task, time} ;
28+ use tokio:: {
29+ runtime:: Handle ,
30+ sync:: { mpsc, Barrier } ,
31+ task, time,
32+ } ;
2933use tokio_util:: sync:: CancellationToken ;
3034use tracing:: { debug, error, info, warn, Instrument as _} ;
3135
@@ -173,9 +177,9 @@ where
173177 let mut tasks_running = 0 ;
174178 while let Some ( ( ) ) = bps_tasks_logger_receiver. recv ( ) . await {
175179 tasks_running += 1 ;
176- info ! ( "{}/{} tasks running " , tasks_running, num_chains) ;
180+ info ! ( "{}/{} tasks ready to start " , tasks_running, num_chains) ;
177181 if tasks_running == num_chains {
178- info ! ( "All tasks are running " ) ;
182+ info ! ( "All tasks are ready to start " ) ;
179183 break ;
180184 }
181185 }
@@ -184,6 +188,7 @@ where
184188 let mut bps_remainder = bps. unwrap_or_default ( ) % num_chains;
185189 let bps_share = bps. map ( |bps| bps / num_chains) ;
186190
191+ let barrier = Arc :: new ( Barrier :: new ( num_chains) ) ;
187192 let mut join_set = task:: JoinSet :: < Result < ( ) , BenchmarkError > > :: new ( ) ;
188193 for ( chain_id, operations, key_pair) in blocks_infos {
189194 let bps_share = if bps_remainder > 0 {
@@ -200,6 +205,7 @@ where
200205 let local_node = local_node. clone ( ) ;
201206 let chain_client = chain_clients[ & chain_id] . clone ( ) ;
202207 let bps_tasks_logger_sender = bps_tasks_logger_sender. clone ( ) ;
208+ let inner_barrier = barrier. clone ( ) ;
203209 chain_client. process_inbox ( ) . await ?;
204210 join_set. spawn_blocking ( move || {
205211 handle. block_on (
@@ -215,6 +221,7 @@ where
215221 committee,
216222 local_node,
217223 bps_tasks_logger_sender,
224+ inner_barrier,
218225 ) )
219226 . await ?;
220227
@@ -503,13 +510,15 @@ where
503510 committee : Committee ,
504511 local_node : LocalNodeClient < S > ,
505512 bps_tasks_logger_sender : mpsc:: Sender < ( ) > ,
513+ barrier : Arc < Barrier > ,
506514 ) -> Result < ( ) , BenchmarkError > {
507515 let chain_id = chain_client. chain_id ( ) ;
516+ bps_tasks_logger_sender. send ( ( ) ) . await ?;
517+ barrier. wait ( ) . await ;
508518 info ! (
509519 "Starting benchmark at target BPS of {:?}, for chain {:?}" ,
510520 bps, chain_id
511521 ) ;
512- bps_tasks_logger_sender. send ( ( ) ) . await ?;
513522 let cross_chain_message_delivery = chain_client. options ( ) . cross_chain_message_delivery ;
514523 let mut num_sent_proposals = 0 ;
515524 let authenticated_signer = Some ( AccountOwner :: from ( key_pair. public ( ) ) ) ;
0 commit comments