Skip to content

Commit 20babe4

Browse files
committed
All tasks wait before starting
1 parent 8206c44 commit 20babe4

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

linera-client/src/benchmark.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::{collections::HashMap, iter};
4+
use std::{collections::HashMap, iter, sync::Arc};
55

66
use linera_base::{
77
crypto::{AccountPublicKey, AccountSecretKey},
@@ -26,7 +26,11 @@ use linera_sdk::abis::fungible;
2626
use linera_storage::Storage;
2727
use num_format::{Locale, ToFormattedString};
2828
use prometheus_parse::{HistogramCount, Scrape, Value};
29-
use tokio::{runtime::Handle, sync::mpsc, task, time};
29+
use tokio::{
30+
runtime::Handle,
31+
sync::{mpsc, Barrier},
32+
task, time,
33+
};
3034
use tokio_util::sync::CancellationToken;
3135
use tracing::{debug, error, info, warn, Instrument as _};
3236

@@ -174,9 +178,9 @@ where
174178
let mut tasks_running = 0;
175179
while let Some(()) = bps_tasks_logger_receiver.recv().await {
176180
tasks_running += 1;
177-
info!("{}/{} tasks running", tasks_running, num_chains);
181+
info!("{}/{} tasks ready to start", tasks_running, num_chains);
178182
if tasks_running == num_chains {
179-
info!("All tasks are running");
183+
info!("All tasks are ready to start");
180184
break;
181185
}
182186
}
@@ -185,6 +189,7 @@ where
185189
let mut bps_remainder = bps.unwrap_or_default() % num_chains;
186190
let bps_share = bps.map(|bps| bps / num_chains);
187191

192+
let barrier = Arc::new(Barrier::new(num_chains));
188193
let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
189194
for (chain_id, operations, key_pair) in blocks_infos {
190195
let bps_share = if bps_remainder > 0 {
@@ -201,6 +206,7 @@ where
201206
let local_node = local_node.clone();
202207
let chain_client = chain_clients[&chain_id].clone();
203208
let bps_tasks_logger_sender = bps_tasks_logger_sender.clone();
209+
let inner_barrier = barrier.clone();
204210
chain_client.process_inbox().await?;
205211
join_set.spawn_blocking(move || {
206212
handle.block_on(
@@ -216,6 +222,7 @@ where
216222
committee,
217223
local_node,
218224
bps_tasks_logger_sender,
225+
inner_barrier,
219226
))
220227
.await?;
221228

@@ -504,13 +511,15 @@ where
504511
committee: Committee,
505512
local_node: LocalNodeClient<S>,
506513
bps_tasks_logger_sender: mpsc::Sender<()>,
514+
barrier: Arc<Barrier>,
507515
) -> Result<(), BenchmarkError> {
508516
let chain_id = chain_client.chain_id();
517+
bps_tasks_logger_sender.send(()).await?;
518+
barrier.wait().await;
509519
info!(
510520
"Starting benchmark at target BPS of {:?}, for chain {:?}",
511521
bps, chain_id
512522
);
513-
bps_tasks_logger_sender.send(()).await?;
514523
let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery;
515524
let mut num_sent_proposals = 0;
516525
let authenticated_signer = Some(AccountOwner::from(key_pair.public()));

0 commit comments

Comments
 (0)