@@ -12,14 +12,16 @@ use clap::Parser;
12
12
13
13
use indicatif:: { ProgressBar , ProgressDrawTarget } ;
14
14
use mithril_common:: {
15
- digesters:: DummyImmutablesDbBuilder ,
15
+ digesters:: {
16
+ CardanoImmutableDigester , DummyImmutableDb , DummyImmutablesDbBuilder , ImmutableDigester ,
17
+ } ,
16
18
entities:: {
17
- Epoch , PartyId , ProtocolMessage , ProtocolParameters , SignedEntityType , Signer ,
18
- SingleSignatures ,
19
+ Beacon , Epoch , PartyId , ProtocolMessage , ProtocolMessagePartKey , ProtocolParameters ,
20
+ SignedEntityType , Signer , SingleSignatures ,
19
21
} ,
20
22
messages:: {
21
23
CertificateListItemMessage , EpochSettingsMessage , MithrilStakeDistributionListItemMessage ,
22
- RegisterSignatureMessage , RegisterSignerMessage ,
24
+ RegisterSignatureMessage , RegisterSignerMessage , SnapshotListItemMessage ,
23
25
} ,
24
26
test_utils:: { MithrilFixture , MithrilFixtureBuilder } ,
25
27
StdResult ,
@@ -213,7 +215,10 @@ pub async fn wait_for_pending_certificate(
213
215
}
214
216
215
217
#[ async_recursion]
216
- async fn request_first_list_item < I > ( url : & str ) -> Result < I , String >
218
+ async fn request_first_list_item_with_expected_size < I > (
219
+ url : & str ,
220
+ expected_size : usize ,
221
+ ) -> Result < I , String >
217
222
where
218
223
for < ' a > I : Deserialize < ' a > + Sync + Send + Clone ,
219
224
{
@@ -222,8 +227,12 @@ where
222
227
match reqwest:: get ( url) . await {
223
228
Ok ( response) => match response. status ( ) {
224
229
StatusCode :: OK => match response. json :: < Vec < I > > ( ) . await . as_deref ( ) {
225
- Ok ( [ first_item, ..] ) => Ok ( first_item. to_owned ( ) ) ,
226
- Ok ( & [ ] ) => request_first_list_item :: < I > ( url) . await ,
230
+ Ok ( list) if list. len ( ) == expected_size => Ok ( list. first ( ) . unwrap ( ) . clone ( ) ) ,
231
+ Ok ( list) if list. len ( ) > expected_size => Err ( format ! (
232
+ "Invalid size, expected {expected_size}, got {}" ,
233
+ list. len( )
234
+ ) ) ,
235
+ Ok ( _) => request_first_list_item_with_expected_size :: < I > ( url, expected_size) . await ,
227
236
Err ( err) => Err ( format ! ( "Invalid list body : {err}" ) ) ,
228
237
} ,
229
238
s if s. is_server_error ( ) => {
@@ -234,7 +243,7 @@ where
234
243
warn ! ( "{message}" ) ;
235
244
Err ( message)
236
245
}
237
- _ => request_first_list_item :: < I > ( url) . await ,
246
+ _ => request_first_list_item_with_expected_size :: < I > ( url, expected_size ) . await ,
238
247
} ,
239
248
Err ( err) => Err ( format ! ( "Request to `{url}` failed: {err}" ) ) ,
240
249
}
@@ -271,15 +280,59 @@ pub async fn precompute_mithril_stake_distribution_signatures(
271
280
)
272
281
}
273
282
274
- /// Wait for certificates
283
+ /// Compute all signers single signatures for the given fixture
284
+ pub async fn compute_immutable_files_signatures (
285
+ immutable_db : & DummyImmutableDb ,
286
+ epoch : Epoch ,
287
+ signers_fixture : & MithrilFixture ,
288
+ timeout : Duration ,
289
+ ) -> StdResult < ( Beacon , Vec < SingleSignatures > ) > {
290
+ spin_while_waiting ! (
291
+ {
292
+ let beacon = Beacon :: new(
293
+ "devnet" . to_string( ) ,
294
+ * epoch,
295
+ // Minus one because the last immutable isn't "finished"
296
+ immutable_db. last_immutable_number( ) . unwrap( ) - 1 ,
297
+ ) ;
298
+ let digester = CardanoImmutableDigester :: new( None , slog_scope:: logger( ) ) ;
299
+ let digest = digester. compute_digest( & immutable_db. dir, & beacon) . await ?;
300
+ let signers_fixture = signers_fixture. clone( ) ;
301
+
302
+ let signatures = tokio:: task:: spawn_blocking( move || -> Vec <SingleSignatures > {
303
+ let mithril_stake_distribution_message = {
304
+ let mut message = ProtocolMessage :: new( ) ;
305
+ message. set_message_part( ProtocolMessagePartKey :: SnapshotDigest , digest) ;
306
+ message. set_message_part(
307
+ ProtocolMessagePartKey :: NextAggregateVerificationKey ,
308
+ signers_fixture. compute_and_encode_avk( ) ,
309
+ ) ;
310
+
311
+ message
312
+ } ;
313
+
314
+ signers_fixture. sign_all( & mithril_stake_distribution_message)
315
+ } )
316
+ . await ?;
317
+
318
+ Ok ( ( beacon, signatures) )
319
+ } ,
320
+ timeout,
321
+ format!( "Precompute signatures for CardanoImmutableFiles signed entity" ) ,
322
+ format!( "Precomputing signatures timeout after {timeout:?}" )
323
+ )
324
+ }
325
+
326
+ /// Wait for the given number of certificates, return the latest certificate
275
327
pub async fn wait_for_certificates (
276
328
aggregator : & Aggregator ,
329
+ total : usize ,
277
330
timeout : Duration ,
278
331
) -> StdResult < CertificateListItemMessage > {
279
332
let url = & format ! ( "{}/certificates" , aggregator. endpoint( ) ) ;
280
333
spin_while_waiting ! (
281
334
{
282
- request_first_list_item :: <CertificateListItemMessage >( url)
335
+ request_first_list_item_with_expected_size :: <CertificateListItemMessage >( url, total )
283
336
. await
284
337
. map_err( |e| e. into( ) )
285
338
} ,
@@ -300,12 +353,32 @@ pub async fn wait_for_mithril_stake_distribution_artifacts(
300
353
) ;
301
354
spin_while_waiting ! (
302
355
{
303
- request_first_list_item:: <MithrilStakeDistributionListItemMessage >( url)
356
+ request_first_list_item_with_expected_size:: <MithrilStakeDistributionListItemMessage >(
357
+ url, 1 ,
358
+ )
359
+ . await
360
+ . map_err( |e| e. into( ) )
361
+ } ,
362
+ timeout,
363
+ format!( "Waiting for mithril stake distribution artifacts" ) ,
364
+ format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
365
+ )
366
+ }
367
+
368
+ /// Wait for Cardano Immutable Files artifacts
369
+ pub async fn wait_for_immutable_files_artifacts (
370
+ aggregator : & Aggregator ,
371
+ timeout : Duration ,
372
+ ) -> StdResult < SnapshotListItemMessage > {
373
+ let url = & format ! ( "{}/artifact/snapshots" , aggregator. endpoint( ) ) ;
374
+ spin_while_waiting ! (
375
+ {
376
+ request_first_list_item_with_expected_size:: <SnapshotListItemMessage >( url, 1 )
304
377
. await
305
378
. map_err( |e| e. into( ) )
306
379
} ,
307
380
timeout,
308
- format!( "Waiting for mithril stake distribution artifacts" ) ,
381
+ format!( "Waiting for immutable files artifacts" ) ,
309
382
format!( "Aggregator did not get a response after {timeout:?} from '{url}'" )
310
383
)
311
384
}
@@ -638,10 +711,6 @@ async fn bootstrap_aggregator(
638
711
Ok ( aggregator)
639
712
}
640
713
641
- fn add_new_immutable_file ( aggregator : & Aggregator ) -> StdResult < ( ) > {
642
- todo ! ( )
643
- }
644
-
645
714
#[ tokio:: main( flavor = "multi_thread" ) ]
646
715
async fn main ( ) -> StdResult < ( ) > {
647
716
let opts = MainOpts :: parse ( ) ;
@@ -695,7 +764,7 @@ async fn main() -> StdResult<()> {
695
764
assert_eq ! ( 0 , errors) ;
696
765
697
766
info ! ( ">> Wait for certificates to be available..." ) ;
698
- wait_for_certificates ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
767
+ wait_for_certificates ( & aggregator, 1 , Duration :: from_secs ( 30 ) ) . await ?;
699
768
700
769
info ! ( ">> Wait for artifacts to be available..." ) ;
701
770
wait_for_mithril_stake_distribution_artifacts ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
@@ -706,6 +775,34 @@ async fn main() -> StdResult<()> {
706
775
info ! ( ">> Wait for pending certificate to be available" ) ;
707
776
wait_for_pending_certificate ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
708
777
778
+ info ! ( ">> Compute the immutable files signature" ) ;
779
+ let ( current_beacon, immutable_files_signatures) = compute_immutable_files_signatures (
780
+ & immutable_db,
781
+ current_epoch,
782
+ & signers_fixture,
783
+ Duration :: from_secs ( 30 ) ,
784
+ )
785
+ . await
786
+ . unwrap ( ) ;
787
+
788
+ info ! (
789
+ ">> Send the Signer Signatures payloads for CardanoImmutableFiles({:?})" ,
790
+ current_beacon
791
+ ) ;
792
+ let errors = register_signatures_to_aggregator (
793
+ & aggregator,
794
+ & immutable_files_signatures,
795
+ SignedEntityType :: CardanoImmutableFilesFull ( current_beacon) ,
796
+ )
797
+ . await ?;
798
+ assert_eq ! ( 0 , errors) ;
799
+
800
+ info ! ( ">> Wait for certificates to be available..." ) ;
801
+ wait_for_certificates ( & aggregator, 2 , Duration :: from_secs ( 30 ) ) . await ?;
802
+
803
+ info ! ( ">> Wait for artifacts to be available..." ) ;
804
+ wait_for_immutable_files_artifacts ( & aggregator, Duration :: from_secs ( 30 ) ) . await ?;
805
+
709
806
info ! ( ">> All steps executed successfully, stopping all tasks..." ) ;
710
807
711
808
aggregator. stop ( ) . await . unwrap ( ) ;
0 commit comments