Skip to content

Commit d9ef4ef

Browse files
committed
All tasks wait before starting
1 parent dfa3e44 commit d9ef4ef

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

linera-client/src/benchmark.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::{collections::HashMap, iter};
4+
use std::{collections::HashMap, iter, sync::Arc};
55

66
use linera_base::{
77
crypto::{AccountPublicKey, AccountSecretKey},
@@ -25,7 +25,11 @@ use linera_sdk::abis::fungible;
2525
use linera_storage::Storage;
2626
use num_format::{Locale, ToFormattedString};
2727
use prometheus_parse::{HistogramCount, Scrape, Value};
28-
use tokio::{runtime::Handle, sync::mpsc, task, time};
28+
use tokio::{
29+
runtime::Handle,
30+
sync::{mpsc, Barrier},
31+
task, time,
32+
};
2933
use tokio_util::sync::CancellationToken;
3034
use tracing::{debug, error, info, warn, Instrument as _};
3135

@@ -173,9 +177,9 @@ where
173177
let mut tasks_running = 0;
174178
while let Some(()) = bps_tasks_logger_receiver.recv().await {
175179
tasks_running += 1;
176-
info!("{}/{} tasks running", tasks_running, num_chains);
180+
info!("{}/{} tasks ready to start", tasks_running, num_chains);
177181
if tasks_running == num_chains {
178-
info!("All tasks are running");
182+
info!("All tasks are ready to start");
179183
break;
180184
}
181185
}
@@ -184,6 +188,7 @@ where
184188
let mut bps_remainder = bps.unwrap_or_default() % num_chains;
185189
let bps_share = bps.map(|bps| bps / num_chains);
186190

191+
let barrier = Arc::new(Barrier::new(num_chains));
187192
let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
188193
for (chain_id, operations, key_pair) in blocks_infos {
189194
let bps_share = if bps_remainder > 0 {
@@ -200,6 +205,7 @@ where
200205
let local_node = local_node.clone();
201206
let chain_client = chain_clients[&chain_id].clone();
202207
let bps_tasks_logger_sender = bps_tasks_logger_sender.clone();
208+
let inner_barrier = barrier.clone();
203209
chain_client.process_inbox().await?;
204210
join_set.spawn_blocking(move || {
205211
handle.block_on(
@@ -215,6 +221,7 @@ where
215221
committee,
216222
local_node,
217223
bps_tasks_logger_sender,
224+
inner_barrier,
218225
))
219226
.await?;
220227

@@ -503,13 +510,15 @@ where
503510
committee: Committee,
504511
local_node: LocalNodeClient<S>,
505512
bps_tasks_logger_sender: mpsc::Sender<()>,
513+
barrier: Arc<Barrier>,
506514
) -> Result<(), BenchmarkError> {
507515
let chain_id = chain_client.chain_id();
516+
bps_tasks_logger_sender.send(()).await?;
517+
barrier.wait().await;
508518
info!(
509519
"Starting benchmark at target BPS of {:?}, for chain {:?}",
510520
bps, chain_id
511521
);
512-
bps_tasks_logger_sender.send(()).await?;
513522
let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery;
514523
let mut num_sent_proposals = 0;
515524
let authenticated_signer = Some(AccountOwner::from(key_pair.public()));

0 commit comments

Comments
 (0)