@@ -25,6 +25,26 @@ use slog_scope::{info, warn};
25
25
use thiserror:: Error ;
26
26
use tokio:: { select, task:: JoinSet , time:: sleep} ;
27
27
28
+ macro_rules! spin_while_waiting {
29
+ ( $block: block, $timeout: expr, $wait_message: expr, $timeout_message: expr) => { {
30
+ let progress_bar = ProgressBar :: new_spinner( ) . with_message( $wait_message) ;
31
+
32
+ let spinner = async move {
33
+ loop {
34
+ progress_bar. tick( ) ;
35
+ sleep( Duration :: from_millis( 50 ) ) . await ;
36
+ }
37
+ } ;
38
+ let probe = async move { $block } ;
39
+
40
+ select! {
41
+ _ = spinner => Err ( String :: new( ) . into( ) ) ,
42
+ _ = sleep( $timeout) => Err ( $timeout_message. into( ) ) ,
43
+ _ = probe => Ok ( ( ) )
44
+ }
45
+ } } ;
46
+ }
47
+
28
48
#[ derive( Debug , Error ) ]
29
49
pub enum LoadError {
30
50
#[ error( "Registering signer party_id={party_id}, expected HTTP code {expected_http_code} got {got_http_code} with the message: {error_message}." ) ]
@@ -55,8 +75,10 @@ pub fn generate_signer_data(number_of_signers: usize) -> MithrilFixture {
55
75
}
56
76
57
77
/// Generate signer registration
58
- pub fn generate_register_message ( signers_fixture : & MithrilFixture ) -> Vec < RegisterSignerMessage > {
59
- let epoch = Epoch ( 2 ) ;
78
+ pub fn generate_register_message (
79
+ signers_fixture : & MithrilFixture ,
80
+ epoch : Epoch ,
81
+ ) -> Vec < RegisterSignerMessage > {
60
82
signers_fixture
61
83
. signers ( )
62
84
. into_iter ( )
@@ -73,68 +95,131 @@ pub fn generate_register_message(signers_fixture: &MithrilFixture) -> Vec<Regist
73
95
74
96
/// Wait for http response until timeout
75
97
pub async fn wait_for_http_response ( url : & str , timeout : Duration , message : & str ) -> StdResult < ( ) > {
76
- let progress_bar = ProgressBar :: new_spinner ( ) . with_message ( message. to_owned ( ) ) ;
77
- let spinner = async move {
78
- loop {
79
- progress_bar. tick ( ) ;
80
- sleep ( Duration :: from_millis ( 50 ) ) . await ;
81
- }
82
- } ;
83
- let probe = async move {
84
- while reqwest:: get ( url) . await . is_err ( ) {
85
- sleep ( Duration :: from_millis ( 300 ) ) . await ;
86
- }
87
- } ;
88
-
89
- select ! {
90
- _ = spinner => Err ( String :: new( ) . into( ) ) ,
91
- _ = sleep( timeout) => Err ( format!( "Aggregator did not get a response after {timeout:?} from '{url}'" ) . into( ) ) ,
92
- _ = probe => Ok ( ( ) )
93
- }
98
+ spin_while_waiting ! (
99
+ {
100
+ while reqwest:: get( url) . await . is_err( ) {
101
+ sleep( Duration :: from_millis( 300 ) ) . await ;
102
+ }
103
+ } ,
104
+ timeout,
105
+ message. to_owned( ) ,
106
+ format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
107
+ )
94
108
}
95
109
96
110
/// Wait for a given epoch in the epoch settings until timeout
97
111
pub async fn wait_for_epoch_settings_at_epoch (
98
- url : & str ,
112
+ aggregator : & Aggregator ,
99
113
timeout : Duration ,
100
114
epoch : Epoch ,
101
115
) -> StdResult < ( ) > {
102
- let progress_bar =
103
- ProgressBar :: new_spinner ( ) . with_message ( format ! ( "Waiting for epoch {epoch}" ) ) ;
104
- let spinner = async move {
105
- loop {
106
- progress_bar. tick ( ) ;
107
- sleep ( Duration :: from_millis ( 50 ) ) . await ;
108
- }
109
- } ;
110
- let probe = async move {
111
- while let Ok ( response) = reqwest:: get ( url) . await {
112
- match response. status ( ) {
113
- StatusCode :: OK => {
114
- let epoch_settings = response. json :: < EpochSettingsMessage > ( ) . await . unwrap ( ) ;
116
+ let url = & format ! ( "{}/epoch-settings" , aggregator. endpoint( ) ) ;
117
+ spin_while_waiting ! (
118
+ {
119
+ while let Ok ( response) = reqwest:: get( url) . await {
120
+ match response. status( ) {
121
+ StatusCode :: OK => {
122
+ let epoch_settings = response. json:: <EpochSettingsMessage >( ) . await . unwrap( ) ;
123
+
124
+ if epoch_settings. epoch >= epoch {
125
+ break ;
126
+ }
127
+ sleep( Duration :: from_millis( 300 ) ) . await
128
+ }
129
+ s if s. is_server_error( ) => {
130
+ warn!(
131
+ "Server error while waiting for the Aggregator, http code: {}" ,
132
+ s
133
+ ) ;
134
+ break ;
135
+ }
136
+ _ => sleep( Duration :: from_millis( 300 ) ) . await ,
137
+ }
138
+ }
139
+ } ,
140
+ timeout,
141
+ format!( "Waiting for epoch {epoch}" ) ,
142
+ format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
143
+ )
144
+ }
115
145
116
- if epoch_settings. epoch >= epoch {
146
+ /// Wait for pending certificate
147
+ pub async fn wait_for_pending_certificate (
148
+ aggregator : & Aggregator ,
149
+ timeout : Duration ,
150
+ ) -> StdResult < ( ) > {
151
+ let url = & format ! ( "{}/certificate-pending" , aggregator. endpoint( ) ) ;
152
+ spin_while_waiting ! (
153
+ {
154
+ while let Ok ( response) = reqwest:: get( url) . await {
155
+ match response. status( ) {
156
+ StatusCode :: OK => {
157
+ break ;
158
+ }
159
+ s if s. is_server_error( ) => {
160
+ warn!(
161
+ "Server error while waiting for the Aggregator, http code: {}" ,
162
+ s
163
+ ) ;
117
164
break ;
118
165
}
119
- sleep ( Duration :: from_millis ( 300 ) ) . await
166
+ _ => sleep( Duration :: from_millis( 300 ) ) . await ,
120
167
}
121
- s if s. is_server_error ( ) => {
122
- warn ! (
123
- "Server error while waiting for the Aggregator, http code: {}" ,
124
- s
125
- ) ;
126
- break ;
168
+ }
169
+ } ,
170
+ timeout,
171
+ format!( "Waiting for pending certificate" ) ,
172
+ format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
173
+ )
174
+ }
175
+
176
+ pub async fn register_signers_to_aggregator (
177
+ aggregator : & Aggregator ,
178
+ signers_fixture : & MithrilFixture ,
179
+ epoch : Epoch ,
180
+ ) -> StdResult < usize > {
181
+ let register_messages = generate_register_message ( signers_fixture, epoch) ;
182
+
183
+ let mut join_set: JoinSet < StdResult < ( ) > > = JoinSet :: new ( ) ;
184
+ let progress_bar = ProgressBar :: with_draw_target (
185
+ Some ( register_messages. len ( ) as u64 ) ,
186
+ ProgressDrawTarget :: stdout ( ) ,
187
+ ) ;
188
+ for register in register_messages {
189
+ let endpoint = aggregator. endpoint ( ) ;
190
+ join_set. spawn ( async move {
191
+ let response = reqwest:: Client :: new ( )
192
+ . post ( format ! ( "{}/register-signer" , endpoint) )
193
+ . json ( & register)
194
+ . send ( )
195
+ . await
196
+ . unwrap ( ) ;
197
+
198
+ match response. status ( ) {
199
+ StatusCode :: CREATED => Ok ( ( ) ) ,
200
+ status => Err ( LoadError :: SignerRegistrationError {
201
+ expected_http_code : 201 ,
202
+ got_http_code : status. as_u16 ( ) as u32 ,
203
+ party_id : register. party_id ,
204
+ error_message : response. text ( ) . await . unwrap ( ) ,
127
205
}
128
- _ => sleep ( Duration :: from_millis ( 300 ) ) . await ,
206
+ . into ( ) ) ,
129
207
}
130
- }
131
- } ;
208
+ } ) ;
209
+ }
210
+ let mut errors = 0 ;
211
+
212
+ while let Some ( res) = join_set. join_next ( ) . await {
213
+ let res = res. expect ( "Tokio task join failed!" ) ;
214
+ progress_bar. inc ( 1 ) ;
132
215
133
- select ! {
134
- _ = spinner => Err ( String :: new ( ) . into ( ) ) ,
135
- _ = sleep ( timeout ) => Err ( format! ( "Aggregator did not get a response after {timeout:?} from '{url}'" ) . into ( ) ) ,
136
- _ = probe => Ok ( ( ) )
216
+ if res . is_err ( ) {
217
+ // eprintln!("Signer error caught: {res:?}");
218
+ errors += 1 ;
219
+ }
137
220
}
221
+
222
+ Ok ( errors)
138
223
}
139
224
140
225
pub fn write_stake_distribution (
@@ -274,11 +359,11 @@ async fn main() -> StdResult<()> {
274
359
let args = AggregatorParameters :: new ( & opts) ?;
275
360
let mock_stake_distribution_file_path = args. work_dir . join ( "stake_distribution.json" ) ;
276
361
let mock_epoch_file_path = args. work_dir . join ( "epoch.txt" ) ;
362
+ let mut current_epoch = Epoch ( 1 ) ;
277
363
info ! ( ">> Starting stress test with options: {opts:?}" ) ;
278
364
279
365
info ! ( ">> Creation of the Signer Key Registrations payloads" ) ;
280
366
let signers_fixture = generate_signer_data ( opts. num_signers ) ;
281
- let register_messages = generate_register_message ( & signers_fixture) ;
282
367
283
368
info ! ( ">> Launch Aggregator" ) ;
284
369
let mut aggregator = Aggregator :: new (
@@ -291,7 +376,7 @@ async fn main() -> StdResult<()> {
291
376
)
292
377
. unwrap ( ) ;
293
378
294
- write_epoch ( & mock_epoch_file_path, Epoch ( 1 ) ) ;
379
+ write_epoch ( & mock_epoch_file_path, current_epoch ) ;
295
380
write_stake_distribution ( & mock_stake_distribution_file_path, & signers_fixture) ;
296
381
297
382
aggregator. change_run_interval ( Duration :: from_secs ( 6 ) ) ;
@@ -307,56 +392,46 @@ async fn main() -> StdResult<()> {
307
392
)
308
393
. await ?;
309
394
310
- let mut join_set: JoinSet < StdResult < ( ) > > = JoinSet :: new ( ) ;
311
- let progress_bar =
312
- ProgressBar :: with_draw_target ( Some ( opts. num_signers as u64 ) , ProgressDrawTarget :: stdout ( ) ) ;
313
-
314
395
info ! ( ">> Send the Signer Key Registrations payloads" ) ;
315
- for register in register_messages {
316
- let endpoint = aggregator. endpoint ( ) ;
317
- join_set. spawn ( async move {
318
- let response = reqwest:: Client :: new ( )
319
- . post ( format ! ( "{}/register-signer" , endpoint) )
320
- . json ( & register)
321
- . send ( )
322
- . await
323
- . unwrap ( ) ;
396
+ let errors =
397
+ register_signers_to_aggregator ( & aggregator, & signers_fixture, current_epoch + 1 ) . await ?;
398
+ assert_eq ! ( 0 , errors) ;
324
399
325
- match response. status ( ) {
326
- StatusCode :: CREATED => Ok ( ( ) ) ,
327
- status => Err ( LoadError :: SignerRegistrationError {
328
- expected_http_code : 201 ,
329
- got_http_code : status. as_u16 ( ) as u32 ,
330
- party_id : register. party_id ,
331
- error_message : response. text ( ) . await . unwrap ( ) ,
332
- }
333
- . into ( ) ) ,
334
- }
335
- } ) ;
400
+ info ! ( ">> Move one epoch forward in order to issue the genesis certificate" ) ;
401
+ current_epoch += 1 ;
402
+ write_epoch ( & mock_epoch_file_path, current_epoch) ;
403
+ wait_for_epoch_settings_at_epoch ( & aggregator, Duration :: from_secs ( 10 ) , current_epoch) . await ?;
404
+ {
405
+ info ! ( ">> Compute genesis certificate" ) ;
406
+ let mut genesis_aggregator = Aggregator :: copy_configuration ( & aggregator) ;
407
+ genesis_aggregator
408
+ . bootstrap_genesis ( )
409
+ . await
410
+ . expect ( "Genesis aggregator should be able to bootstrap genesis" ) ;
411
+
412
+ sleep ( Duration :: from_secs ( 10 ) ) . await ;
336
413
}
337
- let mut errors = 0 ;
338
414
339
- while let Some ( res) = join_set. join_next ( ) . await {
340
- let res = res. expect ( "Tokio task join failed!" ) ;
341
- progress_bar. inc ( 1 ) ;
415
+ info ! ( ">> Send the Signer Key Registrations payloads" ) ;
416
+ let errors =
417
+ register_signers_to_aggregator ( & aggregator, & signers_fixture, current_epoch + 1 ) . await ?;
418
+ assert_eq ! ( 0 , errors) ;
342
419
343
- if res. is_err ( ) {
344
- // eprintln!("Signer error caught: {res:?}");
345
- errors += 1 ;
346
- }
347
- }
420
+ info ! ( ">> Move one epoch forward in order to start creating certificates" ) ;
421
+ current_epoch += 1 ;
422
+ write_epoch ( & mock_epoch_file_path, current_epoch) ;
423
+ wait_for_epoch_settings_at_epoch ( & aggregator, Duration :: from_secs ( 10 ) , current_epoch) . await ?;
348
424
425
+ info ! ( ">> Send the Signer Key Registrations payloads" ) ;
426
+ let errors =
427
+ register_signers_to_aggregator ( & aggregator, & signers_fixture, current_epoch + 1 ) . await ?;
349
428
assert_eq ! ( 0 , errors) ;
350
429
351
- write_epoch ( & mock_epoch_file_path, Epoch ( 2 ) ) ;
352
- wait_for_epoch_settings_at_epoch (
353
- & format ! ( "{}/epoch-settings" , aggregator. endpoint( ) ) ,
354
- Duration :: from_secs ( 10 ) ,
355
- Epoch ( 2 ) ,
356
- )
357
- . await ?;
430
+ info ! ( ">> Wait for pending certificate to be available" ) ;
431
+ wait_for_pending_certificate ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
432
+
433
+ info ! ( ">> All steps executed successfully, stopping all tasks..." ) ;
358
434
359
- // ensure POSTing payload gives 200
360
435
aggregator. stop ( ) . await . unwrap ( ) ;
361
436
Ok ( ( ) )
362
437
}
0 commit comments