@@ -472,7 +472,7 @@ impl Worker {
472
472
msg = self . net. receive( ) => match msg {
473
473
Ok ( ( src, data) ) => {
474
474
match self . on_inbound( src, data) . await {
475
- Ok ( update ) => cache_modified |= update ,
475
+ Ok ( updated ) => cache_modified |= updated ,
476
476
Err ( DecrypterError :: End ( end) ) => return end,
477
477
Err ( err) => warn!( node = %self . label, %err, %src, "error on message" )
478
478
}
@@ -549,7 +549,7 @@ impl Worker {
549
549
/// A message from another node has been received.
550
550
/// Returns true if decryption shares have been updated.
551
551
async fn on_inbound ( & mut self , src : PublicKey , bytes : Bytes ) -> Result < bool > {
552
- trace ! ( node = %src, buf = %bytes. len( ) , "inbound message" ) ;
552
+ trace ! ( node = %self . label , from = % src, buf = %bytes. len( ) , "inbound message" ) ;
553
553
// ignore msg sent to self during broadcast
554
554
if src == self . label {
555
555
return Ok ( false ) ;
@@ -578,7 +578,7 @@ impl Worker {
578
578
) ) ;
579
579
}
580
580
581
- let Some ( committee) = self . committees . get ( committee_id) . cloned ( ) else {
581
+ let Some ( committee) = self . committees . get ( committee_id) else {
582
582
return Err ( DecrypterError :: NoCommittee ( committee_id) ) ;
583
583
} ;
584
584
@@ -622,7 +622,7 @@ impl Worker {
622
622
return Ok ( ( ) ) ;
623
623
} ;
624
624
625
- let Some ( committee) = self . committees . get ( committee_id) . cloned ( ) else {
625
+ let Some ( committee) = self . committees . get ( committee_id) else {
626
626
return Err ( DecrypterError :: NoCommittee ( committee_id) ) ;
627
627
} ;
628
628
committee
@@ -700,10 +700,7 @@ impl Worker {
700
700
return Ok ( ( ) ) ;
701
701
} ;
702
702
703
- if round <= self . last_hatched_round || round < self . oldest_cached_round ( ) {
704
- // shares for which the ciphertexts have already hatched
705
- // or shares that are older than the first ciphertext in
706
- // the local cache are not inserted.
703
+ if round <= self . last_hatched_round {
707
704
return Ok ( ( ) ) ;
708
705
}
709
706
trace ! ( node = %self . label, from=%src, %round, "inserting decrypted shares" ) ;
@@ -734,14 +731,15 @@ impl Worker {
734
731
735
732
async fn on_dkg_request ( & mut self , bundle : DkgBundle ) -> Result < ( ) > {
736
733
let cid = bundle. committee_id ( ) ;
737
- if self . is_dkg_completed ( bundle . committee_id ( ) ) {
734
+ if self . is_dkg_completed ( cid ) {
738
735
trace ! (
739
736
node = %self . label,
740
737
committee_id = %cid,
741
738
"received bundle but dkg already completed"
742
739
) ;
743
740
return Ok ( ( ) ) ;
744
741
}
742
+
745
743
let stores = self . dkg_stores . read ( ) ;
746
744
let Some ( dkg_store) = stores. iter ( ) . find ( |s| s. committee ( ) . id ( ) == * cid) else {
747
745
return Err ( DecrypterError :: Internal ( format ! (
@@ -753,13 +751,14 @@ impl Worker {
753
751
. dkg_tracker
754
752
. entry ( * cid)
755
753
. or_insert_with ( || DkgAccumulator :: new ( dkg_store. to_owned ( ) ) ) ;
754
+ drop ( stores) ;
756
755
757
756
acc. try_add ( bundle)
758
757
. map_err ( |e| DecrypterError :: Dkg ( format ! ( "unable to add dkg bundle: {e}" ) ) ) ?;
759
758
760
759
if let Some ( subset) = acc. try_finalize ( ) {
761
760
if * subset. committe_id ( ) == self . current {
762
- let committee = dkg_store . committee ( ) ;
761
+ let committee = acc . committee ( ) ;
763
762
// TODO:(alex) centralize these constant, redeclared in DkgAccumulator.try_add()
764
763
let aad: & [ u8 ; 3 ] = b"dkg" ;
765
764
let vess = ShoupVess :: new_fast_from ( committee) ;
0 commit comments