Skip to content

Commit 195a6ee

Browse files
ghubertpaloAlenar
authored andcommitted
make separate binary for load tests
1 parent 5f34af9 commit 195a6ee

File tree

9 files changed

+289
-168
lines changed

9 files changed

+289
-168
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -973,12 +973,12 @@ impl DependenciesBuilder {
973973
.get_current_epoch()
974974
.await
975975
.map_err(|e| DependenciesBuilderError::Initialization {
976-
message: "cannot create aggregator runner".to_string(),
976+
message: "cannot create aggregator runner: failed to retrieve current epoch."
977+
.to_string(),
977978
error: Some(e.into()),
978979
})?
979980
.ok_or(DependenciesBuilderError::Initialization {
980-
message: "cannot build aggregator runner: impossible to retrieve current epoch"
981-
.to_string(),
981+
message: "cannot build aggregator runner: no epoch returned.".to_string(),
982982
error: None,
983983
})?;
984984
let (work_epoch, epoch_to_sign, next_epoch_to_sign) = match current_epoch {

mithril-test-lab/mithril-end-to-end/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,18 @@ homepage = { workspace = true }
88
license = { workspace = true }
99
repository = { workspace = true }
1010

11+
[[bin]]
12+
name = "load-aggregator"
13+
test = false
14+
bench = false
15+
1116
[dependencies]
17+
anyhow = "1.0.71"
1218
async-trait = "0.1.52"
1319
clap = { version = "4.0.18", features = ["derive"] }
1420
glob = "0.3"
1521
hex = "0.4.3"
22+
indicatif = { version = "0.17.5", features = ["tokio"] }
1623
mithril-common = { path = "../../mithril-common" }
1724
reqwest = { version = "0.11", features = ["json"] }
1825
serde = { version = "1.0", features = ["derive"] }
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
use std::{path::PathBuf, sync::Arc, time::Duration};
2+
3+
use anyhow::Context;
4+
use clap::Parser;
5+
6+
use indicatif::{ProgressBar, ProgressDrawTarget};
7+
use mithril_common::{
8+
digesters::DummyImmutablesDbBuilder,
9+
entities::{Epoch, PartyId, ProtocolParameters},
10+
messages::RegisterSignerMessage,
11+
test_utils::{MithrilFixture, MithrilFixtureBuilder},
12+
StdResult,
13+
};
14+
15+
use mithril_end_to_end::{Aggregator, BftNode};
16+
use reqwest::StatusCode;
17+
use slog::Level;
18+
use thiserror::Error;
19+
use tokio::{select, task::JoinSet, time::sleep};
20+
21+
#[derive(Debug, Error)]
22+
pub enum LoadError {
23+
#[error("Registering signer party_id={party_id}, expected HTTP code {expected_http_code} got {got_http_code} with the message: {error_message}.")]
24+
SignerRegistrationError {
25+
party_id: PartyId,
26+
expected_http_code: u32,
27+
got_http_code: u32,
28+
error_message: String,
29+
},
30+
}
31+
32+
fn init_logger(opts: &MainOpts) -> slog_scope::GlobalLoggerGuard {
33+
use slog::Drain;
34+
35+
let decorator = slog_term::TermDecorator::new().build();
36+
let drain = slog_term::FullFormat::new(decorator).build().fuse();
37+
let drain = slog_async::Async::new(drain).build().fuse();
38+
let drain = slog::LevelFilter::new(drain, opts.log_level()).fuse();
39+
40+
slog_scope::set_global_logger(slog::Logger::root(Arc::new(drain), slog::o!()))
41+
}
42+
43+
pub fn generate_signer_data(number_of_signers: usize) -> MithrilFixture {
44+
MithrilFixtureBuilder::default()
45+
.with_signers(number_of_signers)
46+
.build()
47+
}
48+
49+
/// Generate signer registration
50+
pub fn generate_register_message(signers_fixture: &MithrilFixture) -> Vec<RegisterSignerMessage> {
51+
let epoch = Epoch(2);
52+
signers_fixture
53+
.signers()
54+
.into_iter()
55+
.map(|signer| RegisterSignerMessage {
56+
epoch: Some(epoch),
57+
party_id: signer.party_id,
58+
verification_key: signer.verification_key,
59+
verification_key_signature: signer.verification_key_signature,
60+
operational_certificate: signer.operational_certificate,
61+
kes_period: signer.kes_period,
62+
})
63+
.collect::<Vec<_>>()
64+
}
65+
66+
#[derive(Debug, Parser)]
67+
#[command(author, version, about, long_about = None)]
68+
pub struct MainOpts {
69+
/// Location of the Cardano CLI binary
70+
#[arg(short, long)]
71+
pub cardano_cli_path: PathBuf,
72+
73+
/// Temporary location for logs, databases etc.
74+
#[arg(short, long)]
75+
pub temporary_path: Option<PathBuf>,
76+
77+
/// Path of the Aggregator binary
78+
#[arg(short, long, default_value = "./target/debug")]
79+
pub aggregator_dir: PathBuf,
80+
81+
/// Number of concurrent signers
82+
#[arg(long, default_value = "2")]
83+
pub num_signers: usize,
84+
85+
/// Mithril technical Era
86+
#[arg(long, default_value = "thales")]
87+
pub mithril_era: String,
88+
89+
/// Aggregator HTTP port
90+
#[arg(short = 'p', long, default_value = "8888")]
91+
server_port: u32,
92+
93+
/// Log level
94+
#[arg(short='v', action = clap::ArgAction::Count)]
95+
verbose: u8,
96+
}
97+
98+
impl MainOpts {
99+
/// get log level from parameters
100+
pub fn log_level(&self) -> Level {
101+
match self.verbose {
102+
0 => Level::Warning,
103+
1 => Level::Info,
104+
2 => Level::Debug,
105+
_ => Level::Trace,
106+
}
107+
}
108+
}
109+
110+
#[derive(Debug)]
111+
pub struct AggregatorParameters {
112+
server_port: u32,
113+
bft_node: BftNode,
114+
cardano_cli_path: PathBuf,
115+
work_dir: PathBuf,
116+
bin_dir: PathBuf,
117+
mithril_era: String,
118+
}
119+
120+
impl AggregatorParameters {
121+
fn new(opts: &MainOpts) -> StdResult<Self> {
122+
// configure a dummy immutable db
123+
let immutable_db = DummyImmutablesDbBuilder::new("load-tester")
124+
.with_immutables(&[1, 2, 3])
125+
.append_immutable_trio()
126+
.build();
127+
128+
let bft_node = BftNode {
129+
db_path: immutable_db.dir,
130+
socket_path: PathBuf::new(),
131+
};
132+
let tmp_dir = opts
133+
.temporary_path
134+
.as_ref()
135+
.cloned()
136+
.unwrap_or_else(|| std::env::temp_dir().join("load-aggregator"));
137+
138+
if tmp_dir.exists() {
139+
std::fs::remove_dir_all(&tmp_dir).with_context(|| {
140+
format!(
141+
"Could not remove existing temp directory '{}'.",
142+
tmp_dir.display()
143+
)
144+
})?;
145+
}
146+
std::fs::create_dir_all(&tmp_dir)
147+
.with_context(|| format!("Could not create temp directory '{}'.", tmp_dir.display()))?;
148+
149+
let cardano_cli_path = {
150+
if !opts.cardano_cli_path.exists() {
151+
Err(format!(
152+
"Given cardano-cli path does not exist: {}",
153+
opts.cardano_cli_path.display()
154+
))?
155+
}
156+
157+
opts.cardano_cli_path.canonicalize().with_context(|| {
158+
format!(
159+
"Could not canonicalize path to the cardano-cli, path: {}",
160+
opts.cardano_cli_path.display()
161+
)
162+
})?
163+
};
164+
165+
let aggregator_parameters = AggregatorParameters {
166+
bft_node,
167+
bin_dir: opts.aggregator_dir.clone(),
168+
cardano_cli_path,
169+
server_port: opts.server_port,
170+
work_dir: tmp_dir,
171+
mithril_era: opts.mithril_era.clone(),
172+
};
173+
174+
Ok(aggregator_parameters)
175+
}
176+
}
177+
178+
#[tokio::main]
179+
async fn main() -> StdResult<()> {
180+
let opts = MainOpts::parse();
181+
let _logger = init_logger(&opts);
182+
let args = AggregatorParameters::new(&opts)?;
183+
println!("OPTIONS={opts:?}");
184+
let signers_fixture = generate_signer_data(opts.num_signers);
185+
let register_messages = generate_register_message(&signers_fixture);
186+
let mut aggregator = Aggregator::new(
187+
args.server_port as u64,
188+
&args.bft_node,
189+
&args.cardano_cli_path,
190+
&args.work_dir,
191+
&args.bin_dir,
192+
&args.mithril_era,
193+
)
194+
.unwrap();
195+
let progress_bar = ProgressBar::new_spinner().with_message("starting Aggregator process…");
196+
aggregator.set_protocol_parameters(&ProtocolParameters::default());
197+
aggregator.serve().unwrap();
198+
let spinner = async move {
199+
loop {
200+
progress_bar.tick();
201+
sleep(Duration::from_millis(50)).await;
202+
}
203+
};
204+
205+
select! {
206+
_ = spinner => (),
207+
_ = sleep(Duration::from_secs(10)) => (),
208+
}
209+
210+
let mut join_set: JoinSet<StdResult<()>> = JoinSet::new();
211+
let progress_bar =
212+
ProgressBar::with_draw_target(Some(opts.num_signers as u64), ProgressDrawTarget::stdout());
213+
214+
for register in register_messages {
215+
let endpoint = aggregator.endpoint();
216+
join_set.spawn(async move {
217+
let response = reqwest::Client::new()
218+
.post(format!("{}/register-signer", endpoint))
219+
.json(&register)
220+
.send()
221+
.await
222+
.unwrap();
223+
224+
match response.status() {
225+
StatusCode::CREATED => Ok(()),
226+
status => Err(LoadError::SignerRegistrationError {
227+
expected_http_code: 201,
228+
got_http_code: status.as_u16() as u32,
229+
party_id: register.party_id,
230+
error_message: response.text().await.unwrap(),
231+
}
232+
.into()),
233+
}
234+
});
235+
}
236+
let mut errors = 0;
237+
238+
while let Some(res) = join_set.join_next().await {
239+
let res = res.expect("Tokio task join failed!");
240+
progress_bar.inc(1);
241+
242+
if res.is_err() {
243+
// eprintln!("Signer error caught: {res:?}");
244+
errors += 1;
245+
}
246+
}
247+
248+
assert_eq!(opts.num_signers - 1, errors);
249+
250+
// ensure POSTing payload gives 200
251+
aggregator.stop().await.unwrap();
252+
Ok(())
253+
}

mithril-test-lab/mithril-end-to-end/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ mod mithril;
55
mod run_only;
66
mod utils;
77

8-
pub use devnet::Devnet;
8+
pub use devnet::*;
99
pub use end_to_end_spec::Spec;
1010
pub use mithril::*;
1111
pub use run_only::RunOnly;

0 commit comments

Comments
 (0)