@@ -7,6 +7,7 @@ use std::{
7
7
} ;
8
8
9
9
use anyhow:: Context ;
10
+ use async_recursion:: async_recursion;
10
11
use clap:: Parser ;
11
12
12
13
use indicatif:: { ProgressBar , ProgressDrawTarget } ;
@@ -15,13 +16,17 @@ use mithril_common::{
15
16
entities:: {
16
17
Epoch , PartyId , ProtocolMessage , ProtocolParameters , SignedEntityType , SingleSignatures ,
17
18
} ,
18
- messages:: { EpochSettingsMessage , RegisterSignatureMessage , RegisterSignerMessage } ,
19
+ messages:: {
20
+ CertificateListItemMessage , EpochSettingsMessage , MithrilStakeDistributionListItemMessage ,
21
+ RegisterSignatureMessage , RegisterSignerMessage ,
22
+ } ,
19
23
test_utils:: { MithrilFixture , MithrilFixtureBuilder } ,
20
24
StdResult ,
21
25
} ;
22
26
23
27
use mithril_end_to_end:: { Aggregator , BftNode } ;
24
28
use reqwest:: StatusCode ;
29
+ use serde:: Deserialize ;
25
30
use slog:: Level ;
26
31
use slog_scope:: { info, warn} ;
27
32
use thiserror:: Error ;
@@ -40,9 +45,9 @@ macro_rules! spin_while_waiting {
40
45
let probe = async move { $block } ;
41
46
42
47
select! {
43
- _ = spinner => Err ( String :: new( ) . into( ) ) ,
44
- _ = sleep( $timeout) => Err ( $timeout_message. into( ) ) ,
45
- _ = probe => Ok ( ( ) )
48
+ _ = spinner => Err ( String :: new( ) . into( ) ) ,
49
+ _ = sleep( $timeout) => Err ( $timeout_message. into( ) ) ,
50
+ res = probe => res
46
51
}
47
52
} } ;
48
53
}
@@ -129,6 +134,7 @@ pub async fn wait_for_http_response(url: &str, timeout: Duration, message: &str)
129
134
while reqwest:: get( url) . await . is_err( ) {
130
135
sleep( Duration :: from_millis( 300 ) ) . await ;
131
136
}
137
+ Ok ( ( ) )
132
138
} ,
133
139
timeout,
134
140
message. to_owned( ) ,
@@ -165,6 +171,7 @@ pub async fn wait_for_epoch_settings_at_epoch(
165
171
_ => sleep( Duration :: from_millis( 300 ) ) . await ,
166
172
}
167
173
}
174
+ Ok ( ( ) )
168
175
} ,
169
176
timeout,
170
177
format!( "Waiting for epoch {epoch}" ) ,
@@ -195,13 +202,81 @@ pub async fn wait_for_pending_certificate(
195
202
_ => sleep( Duration :: from_millis( 300 ) ) . await ,
196
203
}
197
204
}
205
+ Ok ( ( ) )
198
206
} ,
199
207
timeout,
200
208
format!( "Waiting for pending certificate" ) ,
201
209
format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
202
210
)
203
211
}
204
212
213
+ #[ async_recursion]
214
+ async fn request_first_list_item < I > ( url : & str ) -> Result < I , String >
215
+ where
216
+ for < ' a > I : Deserialize < ' a > + Sync + Send + Clone ,
217
+ {
218
+ sleep ( Duration :: from_millis ( 300 ) ) . await ;
219
+
220
+ match reqwest:: get ( url) . await {
221
+ Ok ( response) => match response. status ( ) {
222
+ StatusCode :: OK => match response. json :: < Vec < I > > ( ) . await . as_deref ( ) {
223
+ Ok ( [ first_item, ..] ) => Ok ( first_item. to_owned ( ) ) ,
224
+ Ok ( & [ ] ) => request_first_list_item :: < I > ( url) . await ,
225
+ Err ( err) => Err ( format ! ( "Invalid list body : {err}" ) ) ,
226
+ } ,
227
+ s if s. is_server_error ( ) => {
228
+ let message = format ! (
229
+ "Server error while waiting for the Aggregator, http code: {}" ,
230
+ s
231
+ ) ;
232
+ warn ! ( "{message}" ) ;
233
+ Err ( message)
234
+ }
235
+ _ => request_first_list_item :: < I > ( url) . await ,
236
+ } ,
237
+ Err ( err) => Err ( format ! ( "Request to `{url}` failed: {err}" ) ) ,
238
+ }
239
+ }
240
+
241
+ /// Wait for certificates
242
+ pub async fn wait_for_certificates (
243
+ aggregator : & Aggregator ,
244
+ timeout : Duration ,
245
+ ) -> StdResult < CertificateListItemMessage > {
246
+ let url = & format ! ( "{}/certificates" , aggregator. endpoint( ) ) ;
247
+ spin_while_waiting ! (
248
+ {
249
+ request_first_list_item:: <CertificateListItemMessage >( url)
250
+ . await
251
+ . map_err( |e| e. into( ) )
252
+ } ,
253
+ timeout,
254
+ format!( "Waiting for certificates" ) ,
255
+ format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
256
+ )
257
+ }
258
+
259
+ /// Wait for Mithril Stake Distribution artifacts
260
+ pub async fn wait_for_mithril_stake_distribution_artifacts (
261
+ aggregator : & Aggregator ,
262
+ timeout : Duration ,
263
+ ) -> StdResult < MithrilStakeDistributionListItemMessage > {
264
+ let url = & format ! (
265
+ "{}/artifact/mithril-stake-distributions" ,
266
+ aggregator. endpoint( )
267
+ ) ;
268
+ spin_while_waiting ! (
269
+ {
270
+ request_first_list_item:: <MithrilStakeDistributionListItemMessage >( url)
271
+ . await
272
+ . map_err( |e| e. into( ) )
273
+ } ,
274
+ timeout,
275
+ format!( "Waiting for mithril stake distribution artifacts" ) ,
276
+ format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
277
+ )
278
+ }
279
+
205
280
pub async fn register_signers_to_aggregator (
206
281
aggregator : & Aggregator ,
207
282
signers_fixture : & MithrilFixture ,
@@ -538,6 +613,12 @@ async fn main() -> StdResult<()> {
538
613
. await ?;
539
614
assert_eq ! ( 0 , errors) ;
540
615
616
+ info ! ( ">> Wait for certificates to be available..." ) ;
617
+ wait_for_certificates ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
618
+
619
+ info ! ( ">> Wait for artifacts to be available..." ) ;
620
+ wait_for_mithril_stake_distribution_artifacts ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
621
+
541
622
info ! ( ">> All steps executed successfully, stopping all tasks..." ) ;
542
623
543
624
aggregator. stop ( ) . await . unwrap ( ) ;
0 commit comments