Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, iter};
use std::{collections::HashMap, iter, sync::Arc};

use linera_base::{
crypto::{AccountPublicKey, AccountSecretKey},
Expand All @@ -25,7 +25,11 @@ use linera_sdk::abis::fungible;
use linera_storage::Storage;
use num_format::{Locale, ToFormattedString};
use prometheus_parse::{HistogramCount, Scrape, Value};
use tokio::{runtime::Handle, sync::mpsc, task, time};
use tokio::{
runtime::Handle,
sync::{mpsc, Barrier},
task, time,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn, Instrument as _};

Expand Down Expand Up @@ -173,9 +177,9 @@ where
let mut tasks_running = 0;
while let Some(()) = bps_tasks_logger_receiver.recv().await {
tasks_running += 1;
info!("{}/{} tasks running", tasks_running, num_chains);
info!("{}/{} tasks ready to start", tasks_running, num_chains);
if tasks_running == num_chains {
info!("All tasks are running");
info!("All tasks are ready to start");
break;
}
}
Expand All @@ -184,6 +188,7 @@ where
let mut bps_remainder = bps.unwrap_or_default() % num_chains;
let bps_share = bps.map(|bps| bps / num_chains);

let barrier = Arc::new(Barrier::new(num_chains));
let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
for (chain_id, operations, key_pair) in blocks_infos {
let bps_share = if bps_remainder > 0 {
Expand All @@ -200,6 +205,7 @@ where
let local_node = local_node.clone();
let chain_client = chain_clients[&chain_id].clone();
let bps_tasks_logger_sender = bps_tasks_logger_sender.clone();
let inner_barrier = barrier.clone();
chain_client.process_inbox().await?;
join_set.spawn_blocking(move || {
handle.block_on(
Expand All @@ -215,6 +221,7 @@ where
committee,
local_node,
bps_tasks_logger_sender,
inner_barrier,
))
.await?;

Expand Down Expand Up @@ -503,13 +510,15 @@ where
committee: Committee,
local_node: LocalNodeClient<S>,
bps_tasks_logger_sender: mpsc::Sender<()>,
barrier: Arc<Barrier>,
) -> Result<(), BenchmarkError> {
let chain_id = chain_client.chain_id();
bps_tasks_logger_sender.send(()).await?;
barrier.wait().await;
info!(
"Starting benchmark at target BPS of {:?}, for chain {:?}",
bps, chain_id
);
bps_tasks_logger_sender.send(()).await?;
let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery;
let mut num_sent_proposals = 0;
let authenticated_signer = Some(AccountOwner::from(key_pair.public()));
Expand Down