diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index e14f6ef6c73..aadbfeee5d1 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -30,8 +30,10 @@ use thiserror_context::Context; use tracing::{debug, info}; #[cfg(feature = "benchmark")] use { - crate::benchmark::Benchmark, + crate::benchmark::{Benchmark, BenchmarkError}, + futures::{stream, StreamExt, TryStreamExt}, linera_base::{data_types::Amount, identifiers::ApplicationId}, + linera_core::client::ChainClientError, linera_execution::{ committee::{Committee, Epoch}, system::{OpenChainConfig, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX}, @@ -704,6 +706,43 @@ where Ok((chain_clients, epoch, blocks_infos, committee)) } + pub async fn wrap_up_benchmark( + &mut self, + chain_clients: HashMap>, + close_chains: bool, + wrap_up_max_in_flight: usize, + ) -> Result<(), Error> { + if close_chains { + info!("Closing chains..."); + let stream = stream::iter(chain_clients.values().cloned()) + .map(|chain_client| async move { + Benchmark::::close_benchmark_chain(&chain_client).await?; + info!("Closed chain {:?}", chain_client.chain_id()); + Ok::<(), BenchmarkError>(()) + }) + .buffer_unordered(wrap_up_max_in_flight); + stream.try_collect::>().await?; + } else { + info!("Processing inbox for all chains..."); + let stream = stream::iter(chain_clients.values().cloned()) + .map(|chain_client| async move { + chain_client.process_inbox().await?; + info!("Processed inbox for chain {:?}", chain_client.chain_id()); + Ok::<(), ChainClientError>(()) + }) + .buffer_unordered(wrap_up_max_in_flight); + stream.try_collect::>().await?; + + info!("Updating wallet from chain clients..."); + for chain_client in chain_clients.values() { + self.wallet.as_mut().update_from_state(chain_client).await; + } + self.save_wallet().await?; + } + + Ok(()) + } + async fn process_inboxes_and_force_validator_updates(&mut self) { let chain_clients = self .wallet diff --git a/linera-service/src/linera/command.rs b/linera-service/src/linera/command.rs index 096faaee57f..926e5ffe1ef 100644 --- a/linera-service/src/linera/command.rs +++ b/linera-service/src/linera/command.rs @@ -388,6 +388,10 @@ pub enum ClientCommand { /// closing chains. #[arg(long, default_value = "5")] wrap_up_max_in_flight: usize, + + /// Confirm before starting the benchmark. + #[arg(long)] + confirm_before_start: bool, }, /// Create genesis configuration for a Linera deployment. diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 491307054bb..c4157a9b1be 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -59,12 +59,6 @@ use serde_json::Value; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn, Instrument as _}; -#[cfg(feature = "benchmark")] -use { - futures::{stream, TryStreamExt}, - linera_client::benchmark::BenchmarkError, - linera_core::client::ChainClientError, -}; mod command; mod net_up_utils; @@ -750,6 +744,7 @@ impl Runnable for Job { close_chains, health_check_endpoints, wrap_up_max_in_flight, + confirm_before_start, } => { assert!(num_chains > 0, "Number of chains must be greater than 0"); assert!( @@ -773,6 +768,22 @@ impl Runnable for Job { ) .await?; + if confirm_before_start { + info!("Ready to start benchmark. Say 'yes' when you want to proceed. Only 'yes' will be accepted"); + if !std::io::stdin() + .lines() + .next() + .unwrap()? + .eq_ignore_ascii_case("yes") + { + info!("Benchmark cancelled by user"); + context + .wrap_up_benchmark(chain_clients, close_chains, wrap_up_max_in_flight) + .await?; + return Ok(()); + } + } + linera_client::benchmark::Benchmark::::run_benchmark( num_chains, transactions_per_block, @@ -786,40 +797,9 @@ impl Runnable for Job { ) .await?; - if close_chains { - info!("Closing chains..."); - let stream = stream::iter(chain_clients.values().cloned()) - .map(|chain_client| async move { - linera_client::benchmark::Benchmark::::close_benchmark_chain( - &chain_client, - ) - .await?; - info!("Closed chain {:?}", chain_client.chain_id()); - Ok::<(), BenchmarkError>(()) - }) - .buffer_unordered(wrap_up_max_in_flight); - stream.try_collect::>().await?; - } else { - info!("Processing inbox for all chains..."); - let stream = stream::iter(chain_clients.values().cloned()) - .map(|chain_client| async move { - chain_client.process_inbox().await?; - info!("Processed inbox for chain {:?}", chain_client.chain_id()); - Ok::<(), ChainClientError>(()) - }) - .buffer_unordered(wrap_up_max_in_flight); - stream.try_collect::>().await?; - - info!("Updating wallet from chain clients..."); - for chain_client in chain_clients.values() { - context - .wallet - .as_mut() - .update_from_state(chain_client) - .await; - } - context.save_wallet().await?; - } + context + .wrap_up_benchmark(chain_clients, close_chains, wrap_up_max_in_flight) + .await?; } Watch { chain_id, raw } => {