Skip to content

Commit 5124634

Browse files
authored
Generate operations dynamically (#4345)
## Motivation Right now we always precalculate the operations for each block. However, the performance gain we get from it should be minimal, and it limits us on what we can do. ## Proposal Generate the operations dynamically instead, giving more flexibility on what we can do. ## Test Plan Tested against a local network, works as expected ## Release Plan - Nothing to do / These changes follow the usual release cycle.
1 parent 64bbc65 commit 5124634

File tree

3 files changed

+133
-106
lines changed

3 files changed

+133
-106
lines changed

linera-client/src/benchmark.rs

Lines changed: 114 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use linera_execution::{system::SystemOperation, Operation};
2323
use linera_sdk::abis::fungible::FungibleOperation;
2424
use num_format::{Locale, ToFormattedString};
2525
use prometheus_parse::{HistogramCount, Scrape, Value};
26-
use rand::{seq::SliceRandom, thread_rng};
26+
use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
2727
use serde::{Deserialize, Serialize};
2828
use tokio::{
2929
sync::{mpsc, Barrier, Notify},
@@ -73,6 +73,8 @@ pub enum BenchmarkError {
7373
ConfigLoadError(#[from] anyhow::Error),
7474
#[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
7575
NotEnoughChainsInWallet(usize, usize),
76+
#[error("Random number generator error: {0}")]
77+
RandError(#[from] rand::Error),
7678
}
7779

7880
#[derive(Debug)]
@@ -109,17 +111,18 @@ pub struct Benchmark<Env: Environment> {
109111
impl<Env: Environment> Benchmark<Env> {
110112
#[expect(clippy::too_many_arguments)]
111113
pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
112-
num_chains: usize,
113-
transactions_per_block: usize,
114114
bps: usize,
115115
chain_clients: Vec<ChainClient<Env>>,
116-
blocks_infos: Vec<Vec<Operation>>,
116+
all_chains: Vec<ChainId>,
117+
transactions_per_block: usize,
118+
fungible_application_id: Option<ApplicationId>,
117119
health_check_endpoints: Option<String>,
118120
runtime_in_seconds: Option<u64>,
119121
delay_between_chains_ms: Option<u64>,
120122
chain_listener: ChainListener<C>,
121123
shutdown_notifier: &CancellationToken,
122124
) -> Result<(), BenchmarkError> {
125+
let num_chains = chain_clients.len();
123126
let bps_counts = (0..num_chains)
124127
.map(|_| Arc::new(AtomicUsize::new(0)))
125128
.collect::<Vec<_>>();
@@ -150,31 +153,30 @@ impl<Env: Environment> Benchmark<Env> {
150153
let bps_initial_share = bps / num_chains;
151154
let mut bps_remainder = bps % num_chains;
152155
let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
153-
for (chain_idx, (block_info, chain_client)) in blocks_infos
154-
.into_iter()
155-
.zip(chain_clients.into_iter())
156-
.enumerate()
157-
{
156+
for (chain_idx, chain_client) in chain_clients.into_iter().enumerate() {
157+
let chain_id = chain_client.chain_id();
158158
let shutdown_notifier_clone = shutdown_notifier.clone();
159159
let barrier_clone = barrier.clone();
160160
let bps_count_clone = bps_counts[chain_idx].clone();
161161
let notifier_clone = notifier.clone();
162162
let runtime_control_sender_clone = runtime_control_sender.clone();
163+
let all_chains_clone = all_chains.clone();
163164
let bps_share = if bps_remainder > 0 {
164165
bps_remainder -= 1;
165166
bps_initial_share + 1
166167
} else {
167168
bps_initial_share
168169
};
169-
let chain_id = chain_client.chain_id();
170170
join_set.spawn(
171171
async move {
172172
Box::pin(Self::run_benchmark_internal(
173173
chain_idx,
174174
chain_id,
175175
bps_share,
176-
block_info,
177176
chain_client,
177+
all_chains_clone,
178+
transactions_per_block,
179+
fungible_application_id,
178180
shutdown_notifier_clone,
179181
bps_count_clone,
180182
barrier_clone,
@@ -554,8 +556,10 @@ impl<Env: Environment> Benchmark<Env> {
554556
chain_idx: usize,
555557
chain_id: ChainId,
556558
bps: usize,
557-
operations: Vec<Operation>,
558559
chain_client: ChainClient<Env>,
560+
all_chains: Vec<ChainId>,
561+
transactions_per_block: usize,
562+
fungible_application_id: Option<ApplicationId>,
559563
shutdown_notifier: CancellationToken,
560564
bps_count: Arc<AtomicUsize>,
561565
barrier: Arc<Barrier>,
@@ -576,6 +580,12 @@ impl<Env: Environment> Benchmark<Env> {
576580
runtime_control_sender.send(()).await?;
577581
}
578582

583+
let owner = chain_client
584+
.identity()
585+
.await
586+
.map_err(BenchmarkError::ChainClient)?;
587+
let mut destination_manager = ChainDestinationManager::new(chain_id, all_chains)?;
588+
579589
loop {
580590
tokio::select! {
581591
biased;
@@ -584,7 +594,15 @@ impl<Env: Environment> Benchmark<Env> {
584594
info!("Shutdown signal received, stopping benchmark");
585595
break;
586596
}
587-
result = chain_client.execute_operations(operations.clone(), vec![]) => {
597+
result = chain_client.execute_operations(
598+
Self::generate_operations(
599+
owner,
600+
transactions_per_block,
601+
fungible_application_id,
602+
&mut destination_manager,
603+
),
604+
vec![]
605+
) => {
588606
result
589607
.map_err(BenchmarkError::ChainClient)?
590608
.expect("should execute block with operations");
@@ -601,6 +619,47 @@ impl<Env: Environment> Benchmark<Env> {
601619
Ok(())
602620
}
603621

622+
fn create_operation(
623+
fungible_application_id: Option<ApplicationId>,
624+
recipient_chain_id: ChainId,
625+
owner: AccountOwner,
626+
amount: Amount,
627+
) -> Operation {
628+
match fungible_application_id {
629+
Some(application_id) => {
630+
Self::fungible_transfer(application_id, recipient_chain_id, owner, owner, amount)
631+
}
632+
None => Operation::system(SystemOperation::Transfer {
633+
owner: AccountOwner::CHAIN,
634+
recipient: Account::chain(recipient_chain_id),
635+
amount,
636+
}),
637+
}
638+
}
639+
640+
/// Generate operations for a single block, randomizing destinations after each full cycle
641+
fn generate_operations(
642+
owner: AccountOwner,
643+
transactions_per_block: usize,
644+
fungible_application_id: Option<ApplicationId>,
645+
destination_manager: &mut ChainDestinationManager,
646+
) -> Vec<Operation> {
647+
let mut operations = Vec::with_capacity(transactions_per_block);
648+
let amount = Amount::from_attos(1);
649+
650+
for _ in 0..transactions_per_block {
651+
let recipient_chain_id = destination_manager.get_next_destination();
652+
operations.push(Self::create_operation(
653+
fungible_application_id,
654+
recipient_chain_id,
655+
owner,
656+
amount,
657+
));
658+
}
659+
660+
operations
661+
}
662+
604663
/// Closes the chain that was created for the benchmark.
605664
pub async fn close_benchmark_chain(
606665
chain_client: &ChainClient<Env>,
@@ -640,76 +699,6 @@ impl<Env: Environment> Benchmark<Env> {
640699
Ok(all_chains)
641700
}
642701

643-
pub fn make_benchmark_block_info(
644-
benchmark_chains: Vec<(ChainId, AccountOwner)>,
645-
transactions_per_block: usize,
646-
fungible_application_id: Option<ApplicationId>,
647-
all_chains: Vec<ChainId>,
648-
) -> Result<Vec<Vec<Operation>>, BenchmarkError> {
649-
let mut blocks_infos = Vec::new();
650-
let amount = Amount::from_attos(1);
651-
652-
for (current_chain_id, owner) in benchmark_chains.iter() {
653-
let mut operations = Vec::new();
654-
655-
let mut other_chains: Vec<_> = if all_chains.len() == 1 {
656-
// If there's only one chain, just have it send to itself.
657-
all_chains.clone()
658-
} else {
659-
// If there's more than one chain, have it send to all other chains, and don't
660-
// send to self.
661-
all_chains
662-
.iter()
663-
.filter(|chain_id| **chain_id != *current_chain_id)
664-
.copied()
665-
.collect()
666-
};
667-
668-
other_chains.shuffle(&mut thread_rng());
669-
670-
// Calculate adjusted transactions_per_block to ensure even distribution
671-
let num_destinations = other_chains.len();
672-
let adjusted_transactions_per_block = if transactions_per_block % num_destinations != 0
673-
{
674-
let adjusted = transactions_per_block.div_ceil(num_destinations) * num_destinations;
675-
warn!(
676-
"Requested transactions_per_block ({}) is not evenly divisible by number of destination chains ({}). \
677-
Adjusting to {} transactions per block to ensure transfers cancel each other out.",
678-
transactions_per_block, num_destinations, adjusted
679-
);
680-
adjusted
681-
} else {
682-
transactions_per_block
683-
};
684-
685-
for recipient_chain_id in other_chains {
686-
let operation = match fungible_application_id {
687-
Some(application_id) => Self::fungible_transfer(
688-
application_id,
689-
recipient_chain_id,
690-
*owner,
691-
*owner,
692-
amount,
693-
),
694-
None => Operation::system(SystemOperation::Transfer {
695-
owner: AccountOwner::CHAIN,
696-
recipient: Account::chain(recipient_chain_id),
697-
amount,
698-
}),
699-
};
700-
operations.push(operation);
701-
}
702-
703-
let operations = operations
704-
.into_iter()
705-
.cycle()
706-
.take(adjusted_transactions_per_block)
707-
.collect();
708-
blocks_infos.push(operations);
709-
}
710-
Ok(blocks_infos)
711-
}
712-
713702
/// Creates a fungible token transfer operation.
714703
pub fn fungible_transfer(
715704
application_id: ApplicationId,
@@ -734,3 +723,45 @@ impl<Env: Environment> Benchmark<Env> {
734723
}
735724
}
736725
}
726+
727+
struct ChainDestinationManager {
728+
source_chain_id: ChainId,
729+
destination_index: usize,
730+
destination_chains: Vec<ChainId>,
731+
rng: SmallRng,
732+
}
733+
734+
impl ChainDestinationManager {
735+
fn new(
736+
source_chain_id: ChainId,
737+
mut destination_chains: Vec<ChainId>,
738+
) -> Result<Self, BenchmarkError> {
739+
let mut rng = SmallRng::from_rng(thread_rng())?;
740+
destination_chains.shuffle(&mut rng);
741+
742+
Ok(Self {
743+
source_chain_id,
744+
destination_index: 0,
745+
destination_chains,
746+
rng,
747+
})
748+
}
749+
750+
fn get_next_destination(&mut self) -> ChainId {
751+
// Check if we've gone through all destinations
752+
if self.destination_index >= self.destination_chains.len() {
753+
// Reshuffle the destinations for the next cycle
754+
self.destination_chains.shuffle(&mut self.rng);
755+
self.destination_index = 0;
756+
}
757+
758+
let destination_chain_id = self.destination_chains[self.destination_index];
759+
self.destination_index += 1;
760+
761+
if destination_chain_id == self.source_chain_id {
762+
self.get_next_destination()
763+
} else {
764+
destination_chain_id
765+
}
766+
}
767+
}

linera-client/src/client_context.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -671,12 +671,11 @@ where
671671
pub async fn prepare_for_benchmark(
672672
&mut self,
673673
num_chains: usize,
674-
transactions_per_block: usize,
675674
tokens_per_chain: Amount,
676675
fungible_application_id: Option<ApplicationId>,
677676
pub_keys: Vec<AccountPublicKey>,
678677
chains_config_path: Option<&Path>,
679-
) -> Result<(Vec<ChainClient<Env>>, Vec<Vec<Operation>>), Error> {
678+
) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
680679
let start = Instant::now();
681680
// Below all block proposals are supposed to succeed without retries, we
682681
// must make sure that all incoming payments have been accepted on-chain
@@ -736,14 +735,7 @@ where
736735
}
737736
}
738737

739-
let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
740-
benchmark_chains,
741-
transactions_per_block,
742-
fungible_application_id,
743-
all_chains,
744-
)?;
745-
746-
Ok((chain_clients, blocks_infos))
738+
Ok((chain_clients, all_chains))
747739
}
748740

749741
pub async fn wrap_up_benchmark(
@@ -841,20 +833,19 @@ where
841833
if chains_found_in_wallet == num_chains {
842834
break;
843835
}
844-
// This should never panic, because `owned_chain_ids` only returns the owned chains that
845-
// we have a key pair for.
846-
let owner = self
847-
.wallet
848-
.get(chain_id)
849-
.and_then(|chain| chain.owner)
850-
.unwrap();
851836
let chain_client = self.make_chain_client(chain_id);
852837
let ownership = chain_client.chain_info().await?.manager.ownership;
853838
if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
854839
continue;
855840
}
856841
chain_client.process_inbox().await?;
857-
benchmark_chains.push((chain_id, owner));
842+
benchmark_chains.push((
843+
chain_id,
844+
*ownership
845+
.super_owners
846+
.first()
847+
.expect("should have a super owner"),
848+
));
858849
chain_clients.push(chain_client);
859850
chains_found_in_wallet += 1;
860851
}
@@ -940,6 +931,12 @@ where
940931
info!("Processing default chain inbox");
941932
default_chain_client.process_inbox().await?;
942933

934+
assert_eq!(
935+
benchmark_chains.len(),
936+
chain_clients.len(),
937+
"benchmark_chains and chain_clients must have the same size"
938+
);
939+
943940
Ok((benchmark_chains, chain_clients))
944941
}
945942

linera-service/src/cli/main.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -841,10 +841,9 @@ impl Runnable for Job {
841841
wallet,
842842
signer.into_value(),
843843
);
844-
let (chain_clients, blocks_infos) = context
844+
let (chain_clients, all_chains) = context
845845
.prepare_for_benchmark(
846846
num_chains,
847-
transactions_per_block,
848847
tokens_per_chain,
849848
fungible_application_id,
850849
pub_keys,
@@ -883,11 +882,11 @@ impl Runnable for Job {
883882
shutdown_notifier.clone(),
884883
);
885884
linera_client::benchmark::Benchmark::run_benchmark(
886-
num_chains,
887-
transactions_per_block,
888885
bps,
889886
chain_clients.clone(),
890-
blocks_infos,
887+
all_chains,
888+
transactions_per_block,
889+
fungible_application_id,
891890
health_check_endpoints.clone(),
892891
runtime_in_seconds,
893892
delay_between_chains_ms,

0 commit comments

Comments
 (0)