@@ -81,22 +81,22 @@ pub struct Benchmark<Env: Environment> {
8181impl < Env : Environment > Benchmark < Env > {
8282 #[ expect( clippy:: too_many_arguments) ]
8383 pub async fn run_benchmark < C : ClientContext < Environment = Env > + ' static > (
84- num_chain_groups : usize ,
84+ num_chains : usize ,
8585 transactions_per_block : usize ,
8686 bps : usize ,
87- chain_clients : Vec < Vec < ChainClient < Env > > > ,
88- blocks_infos : Vec < Vec < Vec < Operation > > > ,
87+ chain_clients : Vec < ChainClient < Env > > ,
88+ blocks_infos : Vec < Vec < Operation > > ,
8989 health_check_endpoints : Option < String > ,
9090 runtime_in_seconds : Option < u64 > ,
91- delay_between_chain_groups_ms : Option < u64 > ,
91+ delay_between_chains_ms : Option < u64 > ,
9292 chain_listener : ChainListener < C > ,
9393 shutdown_notifier : & CancellationToken ,
9494 ) -> Result < ( ) , BenchmarkError > {
95- let bps_counts = ( 0 ..num_chain_groups )
95+ let bps_counts = ( 0 ..num_chains )
9696 . map ( |_| Arc :: new ( AtomicUsize :: new ( 0 ) ) )
9797 . collect :: < Vec < _ > > ( ) ;
9898 let notifier = Arc :: new ( Notify :: new ( ) ) ;
99- let barrier = Arc :: new ( Barrier :: new ( num_chain_groups + 1 ) ) ;
99+ let barrier = Arc :: new ( Barrier :: new ( num_chains + 1 ) ) ;
100100
101101 let chain_listener_handle = tokio:: spawn (
102102 async move {
@@ -117,19 +117,19 @@ impl<Env: Environment> Benchmark<Env> {
117117 ) ;
118118
119119 let ( runtime_control_task, runtime_control_sender) =
120- Self :: runtime_control_task ( shutdown_notifier, runtime_in_seconds, num_chain_groups ) ;
120+ Self :: runtime_control_task ( shutdown_notifier, runtime_in_seconds, num_chains ) ;
121121
122- let bps_initial_share = bps / num_chain_groups ;
123- let mut bps_remainder = bps % num_chain_groups ;
122+ let bps_initial_share = bps / num_chains ;
123+ let mut bps_remainder = bps % num_chains ;
124124 let mut join_set = task:: JoinSet :: < Result < ( ) , BenchmarkError > > :: new ( ) ;
125- for ( chain_group_index , ( chain_group , chain_clients ) ) in blocks_infos
125+ for ( chain_idx , ( block_info , chain_client ) ) in blocks_infos
126126 . into_iter ( )
127127 . zip ( chain_clients. into_iter ( ) )
128128 . enumerate ( )
129129 {
130130 let shutdown_notifier_clone = shutdown_notifier. clone ( ) ;
131131 let barrier_clone = barrier. clone ( ) ;
132- let bps_count_clone = bps_counts[ chain_group_index ] . clone ( ) ;
132+ let bps_count_clone = bps_counts[ chain_idx ] . clone ( ) ;
133133 let notifier_clone = notifier. clone ( ) ;
134134 let runtime_control_sender_clone = runtime_control_sender. clone ( ) ;
135135 let bps_share = if bps_remainder > 0 {
@@ -138,38 +138,44 @@ impl<Env: Environment> Benchmark<Env> {
138138 } else {
139139 bps_initial_share
140140 } ;
141+ let chain_id = chain_client. chain_id ( ) ;
141142 join_set. spawn (
142143 async move {
143144 Box :: pin ( Self :: run_benchmark_internal (
144- chain_group_index,
145+ chain_idx,
146+ chain_id,
145147 bps_share,
146- chain_group ,
147- chain_clients ,
148+ block_info ,
149+ chain_client ,
148150 shutdown_notifier_clone,
149151 bps_count_clone,
150152 barrier_clone,
151153 notifier_clone,
152154 runtime_control_sender_clone,
153- delay_between_chain_groups_ms ,
155+ delay_between_chains_ms ,
154156 ) )
155157 . await ?;
156158
157159 Ok ( ( ) )
158160 }
159- . instrument (
160- tracing:: info_span!( "chain_group" , chain_group_index = ?chain_group_index) ,
161- ) ,
161+ . instrument ( tracing:: info_span!( "chain_id" , chain_id = ?chain_id) ) ,
162162 ) ;
163163 }
164164
165+ // Wait for tasks and fail immediately if any task returns an error or panics
166+ while let Some ( result) = join_set. join_next ( ) . await {
167+ let inner_result = result?;
168+ if let Err ( e) = inner_result {
169+ error ! ( "Benchmark task failed: {}" , e) ;
170+ shutdown_notifier. cancel ( ) ;
171+ join_set. abort_all ( ) ;
172+ return Err ( e) ;
173+ }
174+ }
175+
165176 let metrics_watcher =
166177 Self :: metrics_watcher ( health_check_endpoints, shutdown_notifier) . await ?;
167178
168- join_set
169- . join_all ( )
170- . await
171- . into_iter ( )
172- . collect :: < Result < Vec < _ > , _ > > ( ) ?;
173179 info ! ( "All benchmark tasks completed" ) ;
174180 bps_control_task. await ?;
175181 if let Some ( metrics_watcher) = metrics_watcher {
@@ -517,42 +523,39 @@ impl<Env: Environment> Benchmark<Env> {
517523
518524 #[ expect( clippy:: too_many_arguments) ]
519525 async fn run_benchmark_internal (
520- chain_group_index : usize ,
526+ chain_idx : usize ,
527+ chain_id : ChainId ,
521528 bps : usize ,
522- chain_group : Vec < Vec < Operation > > ,
523- chain_clients : Vec < ChainClient < Env > > ,
529+ operations : Vec < Operation > ,
530+ chain_client : ChainClient < Env > ,
524531 shutdown_notifier : CancellationToken ,
525532 bps_count : Arc < AtomicUsize > ,
526533 barrier : Arc < Barrier > ,
527534 notifier : Arc < Notify > ,
528535 runtime_control_sender : Option < mpsc:: Sender < ( ) > > ,
529- delay_between_chain_groups_ms : Option < u64 > ,
536+ delay_between_chains_ms : Option < u64 > ,
530537 ) -> Result < ( ) , BenchmarkError > {
531538 barrier. wait ( ) . await ;
532- if let Some ( delay_between_chain_groups_ms ) = delay_between_chain_groups_ms {
539+ if let Some ( delay_between_chains_ms ) = delay_between_chains_ms {
533540 time:: sleep ( time:: Duration :: from_millis (
534- ( chain_group_index as u64 ) * delay_between_chain_groups_ms ,
541+ ( chain_idx as u64 ) * delay_between_chains_ms ,
535542 ) )
536543 . await ;
537544 }
538- info ! ( "Starting benchmark for chain group {:?}" , chain_group_index ) ;
545+ info ! ( "Starting benchmark for chain {:?}" , chain_id ) ;
539546
540547 if let Some ( runtime_control_sender) = runtime_control_sender {
541548 runtime_control_sender. send ( ( ) ) . await ?;
542549 }
543550
544- for ( operations, chain_client) in chain_group
545- . into_iter ( )
546- . zip ( chain_clients. into_iter ( ) )
547- . cycle ( )
548- {
551+ loop {
549552 if shutdown_notifier. is_cancelled ( ) {
550553 info ! ( "Shutdown signal received, stopping benchmark" ) ;
551554 break ;
552555 }
553556
554557 chain_client
555- . execute_operations ( operations, vec ! [ ] )
558+ . execute_operations ( operations. clone ( ) , vec ! [ ] )
556559 . await
557560 . map_err ( BenchmarkError :: ChainClient ) ?
558561 . expect ( "should execute block with operations" ) ;
@@ -588,60 +591,56 @@ impl<Env: Environment> Benchmark<Env> {
588591
589592 /// Generates information related to one block per chain.
590593 pub fn make_benchmark_block_info (
591- benchmark_chains : Vec < Vec < ( ChainId , AccountOwner ) > > ,
594+ benchmark_chains : Vec < ( ChainId , AccountOwner ) > ,
592595 transactions_per_block : usize ,
593596 fungible_application_id : Option < ApplicationId > ,
594- ) -> Vec < Vec < Vec < Operation > > > {
597+ ) -> Vec < Vec < Operation > > {
595598 let mut blocks_infos = Vec :: new ( ) ;
596- for chains in benchmark_chains {
597- let mut infos = Vec :: new ( ) ;
598- let chains_len = chains. len ( ) ;
599+ for ( i, ( _, owner) ) in benchmark_chains. iter ( ) . enumerate ( ) {
599600 let amount = Amount :: from ( 1 ) ;
600- for i in 0 ..chains_len {
601- let owner = chains[ i] . 1 ;
602- let mut operations = Vec :: new ( ) ;
601+ let mut operations = Vec :: new ( ) ;
602+
603+ let mut other_chains: Vec < _ > = if benchmark_chains. len ( ) == 1 {
604+ // If there's only one chain, just have it send to itself.
605+ benchmark_chains
606+ . iter ( )
607+ . map ( |( chain_id, _) | * chain_id)
608+ . collect ( )
609+ } else {
610+ // If there's more than one chain, have it send to all other chains, and don't
611+ // send to self.
612+ benchmark_chains
613+ . iter ( )
614+ . enumerate ( )
615+ . filter ( |( j, _) | i != * j)
616+ . map ( |( _, ( chain_id, _) ) | * chain_id)
617+ . collect ( )
618+ } ;
603619
604- let mut other_chains: Vec < _ > = if chains_len == 1 {
605- // If there's only one chain, just have it send to itself.
606- chains. iter ( ) . map ( |( chain_id, _) | * chain_id) . collect ( )
607- } else {
608- // If there's more than one chain, have it send to all other chains, and don't
609- // send to self.
610- chains
611- . iter ( )
612- . enumerate ( )
613- . filter ( |( j, _) | i != * j)
614- . map ( |( _, ( chain_id, _) ) | * chain_id)
615- . collect ( )
620+ other_chains. shuffle ( & mut thread_rng ( ) ) ;
621+ for recipient_chain_id in other_chains {
622+ let operation = match fungible_application_id {
623+ Some ( application_id) => Self :: fungible_transfer (
624+ application_id,
625+ recipient_chain_id,
626+ * owner,
627+ * owner,
628+ amount,
629+ ) ,
630+ None => Operation :: system ( SystemOperation :: Transfer {
631+ owner : AccountOwner :: CHAIN ,
632+ recipient : Recipient :: chain ( recipient_chain_id) ,
633+ amount,
634+ } ) ,
616635 } ;
617-
618- other_chains. shuffle ( & mut thread_rng ( ) ) ;
619-
620- for recipient_chain_id in other_chains {
621- let operation = match fungible_application_id {
622- Some ( application_id) => Self :: fungible_transfer (
623- application_id,
624- recipient_chain_id,
625- owner,
626- owner,
627- amount,
628- ) ,
629- None => Operation :: system ( SystemOperation :: Transfer {
630- owner : AccountOwner :: CHAIN ,
631- recipient : Recipient :: chain ( recipient_chain_id) ,
632- amount,
633- } ) ,
634- } ;
635- operations. push ( operation) ;
636- }
637- let operations = operations
638- . into_iter ( )
639- . cycle ( )
640- . take ( transactions_per_block)
641- . collect ( ) ;
642- infos. push ( operations) ;
636+ operations. push ( operation) ;
643637 }
644- blocks_infos. push ( infos) ;
638+ let operations = operations
639+ . into_iter ( )
640+ . cycle ( )
641+ . take ( transactions_per_block)
642+ . collect ( ) ;
643+ blocks_infos. push ( operations) ;
645644 }
646645 blocks_infos
647646 }
0 commit comments