Skip to content

Commit 9766da9

Browse files
committed
Ask confirmation to start benchmark
1 parent 425dc10 commit 9766da9

File tree

3 files changed

+64
-41
lines changed

3 files changed

+64
-41
lines changed

linera-client/src/client_context.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ use thiserror_context::Context;
3030
use tracing::{debug, info};
3131
#[cfg(feature = "benchmark")]
3232
use {
33-
crate::benchmark::Benchmark,
33+
crate::benchmark::{Benchmark, BenchmarkError},
34+
futures::{stream, StreamExt, TryStreamExt},
3435
linera_base::{data_types::Amount, identifiers::ApplicationId},
36+
linera_core::client::ChainClientError,
3537
linera_execution::{
3638
committee::{Committee, Epoch},
3739
system::{OpenChainConfig, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX},
@@ -704,6 +706,43 @@ where
704706
Ok((chain_clients, epoch, blocks_infos, committee))
705707
}
706708

709+
pub async fn wrap_up_benchmark(
710+
&mut self,
711+
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>,
712+
close_chains: bool,
713+
wrap_up_max_in_flight: usize,
714+
) -> Result<(), Error> {
715+
if close_chains {
716+
info!("Closing chains...");
717+
let stream = stream::iter(chain_clients.values().cloned())
718+
.map(|chain_client| async move {
719+
Benchmark::<S>::close_benchmark_chain(&chain_client).await?;
720+
info!("Closed chain {:?}", chain_client.chain_id());
721+
Ok::<(), BenchmarkError>(())
722+
})
723+
.buffer_unordered(wrap_up_max_in_flight);
724+
stream.try_collect::<Vec<_>>().await?;
725+
} else {
726+
info!("Processing inbox for all chains...");
727+
let stream = stream::iter(chain_clients.values().cloned())
728+
.map(|chain_client| async move {
729+
chain_client.process_inbox().await?;
730+
info!("Processed inbox for chain {:?}", chain_client.chain_id());
731+
Ok::<(), ChainClientError>(())
732+
})
733+
.buffer_unordered(wrap_up_max_in_flight);
734+
stream.try_collect::<Vec<_>>().await?;
735+
736+
info!("Updating wallet from chain clients...");
737+
for chain_client in chain_clients.values() {
738+
self.wallet.as_mut().update_from_state(chain_client).await;
739+
}
740+
self.save_wallet().await?;
741+
}
742+
743+
Ok(())
744+
}
745+
707746
async fn process_inboxes_and_force_validator_updates(&mut self) {
708747
let chain_clients = self
709748
.wallet

linera-service/src/linera/command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,10 @@ pub enum ClientCommand {
388388
/// closing chains.
389389
#[arg(long, default_value = "5")]
390390
wrap_up_max_in_flight: usize,
391+
392+
/// Confirm before starting the benchmark.
393+
#[arg(long)]
394+
confirm_before_start: bool,
391395
},
392396

393397
/// Create genesis configuration for a Linera deployment.

linera-service/src/linera/main.rs

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@ use serde_json::Value;
5959
use tokio::task::JoinSet;
6060
use tokio_util::sync::CancellationToken;
6161
use tracing::{debug, error, info, warn, Instrument as _};
62-
#[cfg(feature = "benchmark")]
63-
use {
64-
futures::{stream, TryStreamExt},
65-
linera_client::benchmark::BenchmarkError,
66-
linera_core::client::ChainClientError,
67-
};
6862

6963
mod command;
7064
mod net_up_utils;
@@ -750,6 +744,7 @@ impl Runnable for Job {
750744
close_chains,
751745
health_check_endpoints,
752746
wrap_up_max_in_flight,
747+
confirm_before_start,
753748
} => {
754749
assert!(num_chains > 0, "Number of chains must be greater than 0");
755750
assert!(
@@ -773,6 +768,22 @@ impl Runnable for Job {
773768
)
774769
.await?;
775770

771+
if confirm_before_start {
772+
info!("Ready to start benchmark. Say 'yes' when you want to proceed. Only 'yes' will be accepted");
773+
if !std::io::stdin()
774+
.lines()
775+
.next()
776+
.unwrap()?
777+
.eq_ignore_ascii_case("yes")
778+
{
779+
info!("Benchmark cancelled by user");
780+
context
781+
.wrap_up_benchmark(chain_clients, close_chains, wrap_up_max_in_flight)
782+
.await?;
783+
return Ok(());
784+
}
785+
}
786+
776787
linera_client::benchmark::Benchmark::<S>::run_benchmark(
777788
num_chains,
778789
transactions_per_block,
@@ -786,40 +797,9 @@ impl Runnable for Job {
786797
)
787798
.await?;
788799

789-
if close_chains {
790-
info!("Closing chains...");
791-
let stream = stream::iter(chain_clients.values().cloned())
792-
.map(|chain_client| async move {
793-
linera_client::benchmark::Benchmark::<S>::close_benchmark_chain(
794-
&chain_client,
795-
)
796-
.await?;
797-
info!("Closed chain {:?}", chain_client.chain_id());
798-
Ok::<(), BenchmarkError>(())
799-
})
800-
.buffer_unordered(wrap_up_max_in_flight);
801-
stream.try_collect::<Vec<_>>().await?;
802-
} else {
803-
info!("Processing inbox for all chains...");
804-
let stream = stream::iter(chain_clients.values().cloned())
805-
.map(|chain_client| async move {
806-
chain_client.process_inbox().await?;
807-
info!("Processed inbox for chain {:?}", chain_client.chain_id());
808-
Ok::<(), ChainClientError>(())
809-
})
810-
.buffer_unordered(wrap_up_max_in_flight);
811-
stream.try_collect::<Vec<_>>().await?;
812-
813-
info!("Updating wallet from chain clients...");
814-
for chain_client in chain_clients.values() {
815-
context
816-
.wallet
817-
.as_mut()
818-
.update_from_state(chain_client)
819-
.await;
820-
}
821-
context.save_wallet().await?;
822-
}
800+
context
801+
.wrap_up_benchmark(chain_clients, close_chains, wrap_up_max_in_flight)
802+
.await?;
823803
}
824804

825805
Watch { chain_id, raw } => {

0 commit comments

Comments
 (0)