@@ -15,6 +15,7 @@ use mithril_common::{
15
15
use mithril_end_to_end:: { Aggregator , BftNode } ;
16
16
use reqwest:: StatusCode ;
17
17
use slog:: Level ;
18
+ use slog_scope:: info;
18
19
use thiserror:: Error ;
19
20
use tokio:: { select, task:: JoinSet , time:: sleep} ;
20
21
@@ -40,6 +41,7 @@ fn init_logger(opts: &MainOpts) -> slog_scope::GlobalLoggerGuard {
40
41
slog_scope:: set_global_logger ( slog:: Logger :: root ( Arc :: new ( drain) , slog:: o!( ) ) )
41
42
}
42
43
44
+ /// Generate signer data
43
45
pub fn generate_signer_data ( number_of_signers : usize ) -> MithrilFixture {
44
46
MithrilFixtureBuilder :: default ( )
45
47
. with_signers ( number_of_signers)
@@ -63,6 +65,28 @@ pub fn generate_register_message(signers_fixture: &MithrilFixture) -> Vec<Regist
63
65
. collect :: < Vec < _ > > ( )
64
66
}
65
67
68
+ /// Wait for http response until timeout
69
+ pub async fn wait_for_http_response ( url : & str , timeout : Duration , message : & str ) -> StdResult < ( ) > {
70
+ let progress_bar = ProgressBar :: new_spinner ( ) . with_message ( message. to_owned ( ) ) ;
71
+ let spinner = async move {
72
+ loop {
73
+ progress_bar. tick ( ) ;
74
+ sleep ( Duration :: from_millis ( 50 ) ) . await ;
75
+ }
76
+ } ;
77
+ let probe = async move {
78
+ while reqwest:: get ( url) . await . is_err ( ) {
79
+ sleep ( Duration :: from_millis ( 300 ) ) . await ;
80
+ }
81
+ } ;
82
+
83
+ select ! {
84
+ _ = spinner => Err ( String :: new( ) . into( ) ) ,
85
+ _ = sleep( timeout) => Err ( format!( "Aggregator did not get a response after {timeout:?} from '{url}'" ) . into( ) ) ,
86
+ _ = probe => Ok ( ( ) )
87
+ }
88
+ }
89
+
66
90
#[ derive( Debug , Parser ) ]
67
91
#[ command( author, version, about, long_about = None ) ]
68
92
pub struct MainOpts {
@@ -180,9 +204,13 @@ async fn main() -> StdResult<()> {
180
204
let opts = MainOpts :: parse ( ) ;
181
205
let _logger = init_logger ( & opts) ;
182
206
let args = AggregatorParameters :: new ( & opts) ?;
183
- println ! ( "OPTIONS={opts:?}" ) ;
207
+ info ! ( ">> Starting stress test with options: {opts:?}" ) ;
208
+
209
+ info ! ( ">> Creation of the Signer Key Registrations payloads" ) ;
184
210
let signers_fixture = generate_signer_data ( opts. num_signers ) ;
185
211
let register_messages = generate_register_message ( & signers_fixture) ;
212
+
213
+ info ! ( ">> Launch Aggregator" ) ;
186
214
let mut aggregator = Aggregator :: new (
187
215
args. server_port as u64 ,
188
216
& args. bft_node ,
@@ -192,25 +220,22 @@ async fn main() -> StdResult<()> {
192
220
& args. mithril_era ,
193
221
)
194
222
. unwrap ( ) ;
195
- let progress_bar = ProgressBar :: new_spinner ( ) . with_message ( "starting Aggregator process…" ) ;
223
+
196
224
aggregator. set_protocol_parameters ( & ProtocolParameters :: default ( ) ) ;
197
225
aggregator. serve ( ) . unwrap ( ) ;
198
- let spinner = async move {
199
- loop {
200
- progress_bar. tick ( ) ;
201
- sleep ( Duration :: from_millis ( 50 ) ) . await ;
202
- }
203
- } ;
204
226
205
- select ! {
206
- _ = spinner => ( ) ,
207
- _ = sleep( Duration :: from_secs( 10 ) ) => ( ) ,
208
- }
227
+ wait_for_http_response (
228
+ & format ! ( "{}/epoch-settings" , aggregator. endpoint( ) ) ,
229
+ Duration :: from_secs ( 10 ) ,
230
+ "Waiting for the aggregator to start" ,
231
+ )
232
+ . await ?;
209
233
210
234
let mut join_set: JoinSet < StdResult < ( ) > > = JoinSet :: new ( ) ;
211
235
let progress_bar =
212
236
ProgressBar :: with_draw_target ( Some ( opts. num_signers as u64 ) , ProgressDrawTarget :: stdout ( ) ) ;
213
237
238
+ info ! ( ">> Send the Signer Key Registrations payloads" ) ;
214
239
for register in register_messages {
215
240
let endpoint = aggregator. endpoint ( ) ;
216
241
join_set. spawn ( async move {
0 commit comments