diff --git a/linera-client/src/benchmark.rs b/linera-client/src/benchmark.rs index 23bfd36b91d..0d02b3580f8 100644 --- a/linera-client/src/benchmark.rs +++ b/linera-client/src/benchmark.rs @@ -1,7 +1,7 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, iter}; +use std::{collections::HashMap, iter, sync::Arc}; use linera_base::{ crypto::{AccountPublicKey, AccountSecretKey}, @@ -25,7 +25,11 @@ use linera_sdk::abis::fungible; use linera_storage::Storage; use num_format::{Locale, ToFormattedString}; use prometheus_parse::{HistogramCount, Scrape, Value}; -use tokio::{runtime::Handle, sync::mpsc, task, time}; +use tokio::{ + runtime::Handle, + sync::{mpsc, Barrier}, + task, time, +}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn, Instrument as _}; @@ -173,9 +177,9 @@ where let mut tasks_running = 0; while let Some(()) = bps_tasks_logger_receiver.recv().await { tasks_running += 1; - info!("{}/{} tasks running", tasks_running, num_chains); + info!("{}/{} tasks ready to start", tasks_running, num_chains); if tasks_running == num_chains { - info!("All tasks are running"); + info!("All tasks are ready to start"); break; } } @@ -184,6 +188,7 @@ where let mut bps_remainder = bps.unwrap_or_default() % num_chains; let bps_share = bps.map(|bps| bps / num_chains); + let barrier = Arc::new(Barrier::new(num_chains)); let mut join_set = task::JoinSet::>::new(); for (chain_id, operations, key_pair) in blocks_infos { let bps_share = if bps_remainder > 0 { @@ -200,6 +205,7 @@ where let local_node = local_node.clone(); let chain_client = chain_clients[&chain_id].clone(); let bps_tasks_logger_sender = bps_tasks_logger_sender.clone(); + let inner_barrier = barrier.clone(); chain_client.process_inbox().await?; join_set.spawn_blocking(move || { handle.block_on( @@ -215,6 +221,7 @@ where committee, local_node, bps_tasks_logger_sender, + inner_barrier, )) .await?; @@ -503,13 +510,15 @@ where committee: Committee, local_node: LocalNodeClient, bps_tasks_logger_sender: mpsc::Sender<()>, + barrier: Arc, ) -> Result<(), BenchmarkError> { let chain_id = chain_client.chain_id(); + bps_tasks_logger_sender.send(()).await?; + barrier.wait().await; info!( "Starting benchmark at target BPS of {:?}, for chain {:?}", bps, chain_id ); - bps_tasks_logger_sender.send(()).await?; let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery; let mut num_sent_proposals = 0; let authenticated_signer = Some(AccountOwner::from(key_pair.public()));