@@ -259,6 +259,7 @@ impl Decrypter {
259
259
let ( ct, cm) = vess
260
260
. encrypt_shares ( store. committee ( ) , store. sorted_keys ( ) , secret, DKG_AAD )
261
261
. ok ( ) ?;
262
+ debug ! ( node = %self . label, curr = %self . current, "produced dkg bundle" ) ;
262
263
Some ( DkgBundle :: new ( ( node_idx, self . label ) , self . current , ct, cm) )
263
264
}
264
265
@@ -293,7 +294,7 @@ impl Decrypter {
293
294
DKG_AAD ,
294
295
)
295
296
. ok ( ) ?;
296
- info ! ( node = %self . label, curr = %self . current, next = %committee_id, "produced dkg bundle" ) ;
297
+ debug ! ( node = %self . label, curr = %self . current, next = %committee_id, "produced resharing bundle" ) ;
297
298
Some ( DkgBundle :: new ( ( node_idx, self . label ) , committee_id, ct, cm) )
298
299
}
299
300
@@ -467,8 +468,7 @@ struct Worker {
467
468
#[ builder( default ) ]
468
469
dec_shares : DecShareCache ,
469
470
470
- /// Map of received decryption shares for each round.
471
- /// Useful to prevent DOS or DecShareBatch flooding by malicious peers.
471
+ /// Map of received decryption shares for each round (DOS prevention).
472
472
#[ builder( default ) ]
473
473
acks : BTreeMap < RoundNumber , HashSet < PublicKey > > ,
474
474
@@ -609,7 +609,7 @@ impl Worker {
609
609
/// A message from another node has been received.
610
610
/// Returns true if decryption shares have been updated.
611
611
async fn on_inbound ( & mut self , src : PublicKey , bytes : Bytes ) -> Result < bool > {
612
- // ignore msg sent to self during broadcast
612
+ // ignore msg sent to self
613
613
if src == self . label {
614
614
return Ok ( false ) ;
615
615
}
@@ -710,21 +710,23 @@ impl Worker {
710
710
711
711
if let Some ( ( & subset, _) ) = counts. iter ( ) . find ( |( _, count) | * * count >= threshold) {
712
712
let acc = DkgAccumulator :: from_subset ( current. clone ( ) , subset. to_owned ( ) ) ;
713
+ let mode = acc. mode ( ) . clone ( ) ;
713
714
self . tracker . insert ( committee. id ( ) , acc) ;
714
715
let dec_key = subset
715
716
. extract_key ( current, & self . dkg_sk , prev. as_ref ( ) )
716
717
. map_err ( |e| DecrypterError :: Dkg ( e. to_string ( ) ) ) ?;
717
718
718
719
self . dec_key . set ( dec_key) ;
719
720
self . state = WorkerState :: Running ;
720
- info ! ( node = %self . label, committee_id = %committee. id( ) , "dkg finished" ) ;
721
+ info ! ( node = %self . label, committee_id = %committee. id( ) , "{} finished" , mode ) ;
721
722
}
722
723
723
724
Ok ( ( ) )
724
725
}
725
726
726
727
/// A batch of decryption shares from another node has been received.
727
728
async fn on_batch_msg ( & mut self , src : PublicKey , batch : DecShareBatch ) -> Result < ( ) > {
729
+ trace ! ( node = %self . label, from=%src, round=%batch. round, "received batch message" ) ;
728
730
let round = batch. round . num ( ) ;
729
731
let committee_id = batch. round . committee ( ) ;
730
732
if committee_id != self . current {
@@ -748,7 +750,6 @@ impl Worker {
748
750
if round <= self . last_hatched_round {
749
751
return Ok ( ( ) ) ;
750
752
}
751
- trace ! ( node = %self . label, from=%src, %round, "inserting decrypted shares" ) ;
752
753
753
754
self . insert_shares ( batch) ?;
754
755
@@ -772,39 +773,38 @@ impl Worker {
772
773
return Ok ( ( ) ) ;
773
774
}
774
775
775
- if is_resharing && self . dec_key . get ( ) . is_none ( ) {
776
- warn ! ( node = %self . label, "initial DKG incomplete" ) ;
777
- return Ok ( ( ) ) ;
778
- }
776
+ let mode = if is_resharing {
777
+ if let Some ( dec_key) = self . dec_key . get ( ) {
778
+ AccumulatorMode :: Resharing ( dec_key. combkey ( ) . to_owned ( ) )
779
+ } else {
780
+ return Err ( DecrypterError :: Dkg ( "initial DKG incomplete" . into ( ) ) ) ;
781
+ }
782
+ } else {
783
+ AccumulatorMode :: Dkg
784
+ } ;
779
785
780
786
let Some ( key_store) = self . key_stores . read ( ) . get ( * committee_id) . cloned ( ) else {
781
787
return Err ( DecrypterError :: NoCommittee ( * committee_id) ) ;
782
788
} ;
783
789
784
- let acc = if is_resharing {
785
- let dec_key = self . dec_key . get ( ) . expect ( "dec_key exists for resharing" ) ;
786
- self . tracker . entry ( * committee_id) . or_insert_with ( || {
787
- DkgAccumulator :: new_resharing ( key_store. clone ( ) , dec_key. combkey ( ) . clone ( ) )
788
- } )
789
- } else {
790
- self . tracker
791
- . entry ( * committee_id)
792
- . or_insert_with ( || DkgAccumulator :: new_dkg ( key_store. clone ( ) ) )
793
- } ;
790
+ let acc = self
791
+ . tracker
792
+ . entry ( * committee_id)
793
+ . or_insert_with ( || DkgAccumulator :: new ( key_store. clone ( ) , mode. clone ( ) ) ) ;
794
794
795
795
acc. try_add ( bundle)
796
796
. await
797
797
. map_err ( |e| DecrypterError :: Dkg ( format ! ( "failed to add bundle: {e}" ) ) ) ?;
798
798
799
799
// for initial DKG, try to finalize if we have enough bundles
800
- if !is_resharing {
800
+ if matches ! ( mode , AccumulatorMode :: Dkg ) {
801
801
if let Some ( subset) = acc. try_finalize ( ) {
802
802
let dec_key = subset
803
803
. extract_key ( & key_store. clone ( ) , & self . dkg_sk , None )
804
804
. map_err ( |e| DecrypterError :: Dkg ( e. to_string ( ) ) ) ?;
805
805
self . dec_key . set ( dec_key) ;
806
806
self . state = WorkerState :: Running ;
807
- info ! ( committee_id = %key_store. committee( ) . id( ) , node = %self . label, "DKG completed" ) ;
807
+ info ! ( committee_id = %key_store. committee( ) . id( ) , node = %self . label, "initial DKG completed" ) ;
808
808
}
809
809
}
810
810
@@ -1106,7 +1106,6 @@ impl Worker {
1106
1106
return CombineResult :: InsufficientShares ;
1107
1107
}
1108
1108
1109
- // skip if no ciphertext
1110
1109
let Some ( ct) = maybe_ct else {
1111
1110
return CombineResult :: InsufficientShares ;
1112
1111
} ;
@@ -1179,18 +1178,7 @@ impl Worker {
1179
1178
/// Adds a new committee to the worker and updates network connections.
1180
1179
async fn on_next_committee ( & mut self , c : AddressableCommittee , k : KeyStore ) -> Result < ( ) > {
1181
1180
info ! ( node = %self . label, committee = %c. committee( ) . id( ) , "add next committee" ) ;
1182
- let key_store = {
1183
- let key_stores = self . key_stores . read ( ) ;
1184
- if key_stores. contains ( c. committee ( ) . id ( ) ) {
1185
- warn ! ( node = %self . label, committee = %c. committee( ) . id( ) , "committee already added" ) ;
1186
- return Ok ( ( ) ) ;
1187
- }
1188
- let Some ( key_store) = key_stores. get ( self . current ) else {
1189
- error ! ( node = %self . label, committee = %self . current, "current committee not found" ) ;
1190
- return Err ( DecrypterError :: NoCommittee ( self . current ) ) ;
1191
- } ;
1192
- key_store. clone ( )
1193
- } ;
1181
+ let key_store = self . current_store ( ) ?;
1194
1182
let mut additional = Vec :: new ( ) ;
1195
1183
for ( k, x, a) in c
1196
1184
. entries ( )
@@ -1253,7 +1241,7 @@ impl Worker {
1253
1241
1254
1242
self . next_committee =
1255
1243
NextCommittee :: Use ( round, Some ( NextKey :: new ( new_dkg_sk, new_dec_key) ) ) ;
1256
- debug ! ( node = %self . label, %round, "completed key extraction" ) ;
1244
+ trace ! ( node = %self . label, %round, "completed key extraction" ) ;
1257
1245
} else {
1258
1246
// not in new committee - request DKG subset from current committee
1259
1247
let dest: Vec < _ > = old. committee ( ) . parties ( ) . copied ( ) . collect ( ) ;
@@ -1264,7 +1252,7 @@ impl Worker {
1264
1252
. map_err ( |e| DecrypterError :: End ( e. into ( ) ) ) ?;
1265
1253
1266
1254
self . next_committee = NextCommittee :: Use ( round, None ) ;
1267
- debug ! ( node = %self . label, %round, "requested DKG subset from current" ) ;
1255
+ trace ! ( node = %self . label, %round, "requested DKG subset from current" ) ;
1268
1256
}
1269
1257
Ok ( ( ) )
1270
1258
}
0 commit comments