@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
12
12
use std:: collections:: { BTreeMap , BTreeSet , HashMap , HashSet , VecDeque } ;
13
13
use std:: result:: Result as StdResult ;
14
14
use std:: sync:: Arc ;
15
- use timeboost_crypto:: prelude:: { LabeledDkgDecKey , Vess , Vss } ;
15
+ use timeboost_crypto:: prelude:: { DkgDecKey , LabeledDkgDecKey , Vess , Vss } ;
16
16
use timeboost_crypto:: traits:: dkg:: VerifiableSecretSharing ;
17
17
use timeboost_crypto:: traits:: threshold_enc:: { ThresholdEncError , ThresholdEncScheme } ;
18
18
use timeboost_crypto:: { DecryptionScheme , Plaintext } ;
@@ -96,8 +96,6 @@ pub struct Decrypter {
96
96
worker_rx : Receiver < InclusionList > ,
97
97
/// Worker task handle.
98
98
worker : JoinHandle < EndOfPlay > ,
99
- /// Set of committees for which DKG bundles have already been submitted.
100
- submitted : BTreeSet < CommitteeId > ,
101
99
/// Pending threshold encryption key material
102
100
enc_key : DecryptionKeyCell ,
103
101
/// Key stores (shared with Worker)
@@ -150,7 +148,7 @@ impl Decrypter {
150
148
net. add ( new_peers. collect ( ) ) . await ?;
151
149
(
152
150
Arc :: new ( RwLock :: new ( kv) ) ,
153
- WorkerState :: AwaitingHandover ( HashMap :: new ( ) ) ,
151
+ WorkerState :: HandoverPending ( HashMap :: new ( ) ) ,
154
152
)
155
153
}
156
154
None => {
@@ -177,7 +175,6 @@ impl Decrypter {
177
175
incls : VecDeque :: new ( ) ,
178
176
worker_tx : cmd_tx,
179
177
worker_rx : dec_rx,
180
- submitted : BTreeSet :: default ( ) ,
181
178
worker : spawn ( worker. go ( ) ) ,
182
179
enc_key : cfg. threshold_enc_key ,
183
180
key_stores : key_stores. clone ( ) ,
@@ -236,10 +233,6 @@ impl Decrypter {
236
233
/// - `Some(DkgBundle)` if a new dealing was successfully created for the current committee.
237
234
/// - `None` if already submitted or if encryption key is missing.
238
235
pub fn gen_dkg_bundle ( & mut self ) -> Option < DkgBundle > {
239
- if self . submitted . contains ( & self . current ) {
240
- trace ! ( node = %self . label, committee = %self . current, "dkg bundle already submitted" ) ;
241
- return None ;
242
- }
243
236
let guard = self . key_stores . read ( ) ;
244
237
let Some ( store) = guard. get ( self . current ) else {
245
238
warn ! ( node = %self . label, committee = %self . current, "missing current key store" ) ;
@@ -256,7 +249,6 @@ impl Decrypter {
256
249
let ( ct, cm) = vess
257
250
. encrypt_shares ( store. committee ( ) , store. sorted_keys ( ) , secret, DKG_AAD )
258
251
. ok ( ) ?;
259
- self . submitted . insert ( self . current ) ;
260
252
Some ( DkgBundle :: new ( ( node_idx, self . label ) , self . current , ct, cm) )
261
253
}
262
254
@@ -267,10 +259,6 @@ impl Decrypter {
267
259
/// - `None` if already submitted or if encryption key is missing.
268
260
pub fn gen_resharing_bundle ( & mut self , next_store : KeyStore ) -> Option < DkgBundle > {
269
261
let committee_id = next_store. committee ( ) . id ( ) ;
270
- if self . submitted . contains ( & committee_id) {
271
- trace ! ( node = %self . label, committee = %committee_id, "resharing bundle already submitted" ) ;
272
- return None ;
273
- }
274
262
let guard = self . key_stores . read ( ) ;
275
263
let Some ( current_store) = guard. get ( self . current ) else {
276
264
warn ! ( node = %self . label, committee = %self . current, "missing current key store" ) ;
@@ -287,17 +275,14 @@ impl Decrypter {
287
275
return None ;
288
276
} ;
289
277
let vess = Vess :: new_fast ( ) ;
290
- let share = dec_sk. privkey ( ) . share ( ) ;
291
- let secret = <Vss as VerifiableSecretSharing >:: Secret :: from ( * share) ;
292
278
let ( ct, cm) = vess
293
279
. encrypt_reshares (
294
280
next_store. committee ( ) ,
295
281
next_store. sorted_keys ( ) ,
296
- secret ,
282
+ * dec_sk . privkey ( ) . share ( ) ,
297
283
DKG_AAD ,
298
284
)
299
285
. ok ( ) ?;
300
- self . submitted . insert ( committee_id) ;
301
286
Some ( DkgBundle :: new ( ( node_idx, self . label ) , committee_id, ct, cm) )
302
287
}
303
288
@@ -338,12 +323,12 @@ impl Decrypter {
338
323
339
324
return Ok ( dec_incl) ;
340
325
} else {
341
- warn ! (
326
+ error ! (
342
327
node = %self . label,
343
328
%round,
344
- "received unexpected inclusion list (no rounds in queue) "
329
+ "received unexpected inclusion list"
345
330
) ;
346
- return Ok ( dec_incl ) ;
331
+ return Err ( DecrypterDown ( ( ) ) ) ;
347
332
}
348
333
}
349
334
}
@@ -380,11 +365,15 @@ impl Drop for Decrypter {
380
365
}
381
366
382
367
/// The operational state of the Worker.
368
+ ///
369
+ /// State Machine Flow:
370
+ /// - DkgPending -> Running <-> ResharingComplete -> ShuttingDown
371
+ /// - HandoverPending -> HandoverComplete -> Running <-> ResharingComplete -> ShuttingDown
383
372
#[ derive( Debug , Clone ) ]
384
373
#[ allow( clippy:: large_enum_variant) ]
385
374
enum WorkerState {
386
375
/// Awaiting resharing messages from the previous committee.
387
- AwaitingHandover ( HashMap < PublicKey , ResharingSubset > ) ,
376
+ HandoverPending ( HashMap < PublicKey , ResharingSubset > ) ,
388
377
/// Received enough resharing messages to complete the handover.
389
378
HandoverComplete ( DecryptionKey ) ,
390
379
/// Expects to obtain the initial DKG key through DKG bundles.
@@ -393,7 +382,7 @@ enum WorkerState {
393
382
/// such that, if the local node is behind, it will catchup immediately.
394
383
DkgPending ( HashMap < PublicKey , DkgSubset > ) ,
395
384
/// Already completed at least one instance of DKG. Ready for resharing.
396
- ResharingPending ( DecryptionKey ) ,
385
+ Running ( DecryptionKey ) ,
397
386
/// Obtained keys for both the current and next committee.
398
387
ResharingComplete ( DecryptionKey , DecryptionKey ) ,
399
388
/// Completed resharing and handover but is not a member of next committee.
@@ -488,9 +477,11 @@ struct Worker {
488
477
489
478
impl Worker {
490
479
pub async fn go ( mut self ) -> EndOfPlay {
491
- let node = self . label ;
492
-
493
- if !matches ! ( self . state, WorkerState :: AwaitingHandover ( _) ) {
480
+ debug_assert ! ( matches!(
481
+ self . state,
482
+ WorkerState :: HandoverPending ( _) | WorkerState :: DkgPending ( _)
483
+ ) ) ;
484
+ if matches ! ( self . state, WorkerState :: DkgPending ( _) ) {
494
485
// immediately try to catchup first
495
486
match self . dkg_catchup ( ) . await {
496
487
Ok ( ( ) ) => { }
@@ -502,7 +493,7 @@ impl Worker {
502
493
loop {
503
494
let mut cache_modified = false ;
504
495
// process pending inclusion lists received during catchup
505
- if !self . pending . is_empty ( ) && matches ! ( self . state, WorkerState :: ResharingPending ( _) ) {
496
+ if !self . pending . is_empty ( ) && matches ! ( self . state, WorkerState :: Running ( _) ) {
506
497
for incl in std:: mem:: take ( & mut self . pending ) . into_values ( ) {
507
498
match self . on_decrypt_request ( incl, true ) . await {
508
499
Ok ( ( ) ) => { }
@@ -550,7 +541,7 @@ impl Worker {
550
541
} ,
551
542
Some ( Command :: Decrypt ( ( incl, is_encrypted) ) ) => {
552
543
let round = incl. round( ) ;
553
- trace!( % node, %round, "decrypt request" ) ;
544
+ trace!( node = % self . label , %round, "decrypt request" ) ;
554
545
match self . on_decrypt_request( incl, is_encrypted) . await {
555
546
Ok ( ( ) ) => { cache_modified = true }
556
547
Err ( DecrypterError :: End ( end) ) => return end,
@@ -592,7 +583,7 @@ impl Worker {
592
583
match self . hatch ( round) . await {
593
584
Ok ( _) => { }
594
585
Err ( DecrypterError :: End ( end) ) => return end,
595
- Err ( err) => warn ! ( % node, %round, %err, "error on hatch" ) ,
586
+ Err ( err) => warn ! ( node = % self . label , %round, %err, "error on hatch" ) ,
596
587
}
597
588
598
589
if matches ! ( self . state, WorkerState :: ShuttingDown ( _) ) {
@@ -735,7 +726,7 @@ impl Worker {
735
726
. map_err ( |e| DecrypterError :: Dkg ( e. to_string ( ) ) ) ?;
736
727
737
728
self . enc_key . set ( dec_sk. clone ( ) ) ;
738
- self . state = WorkerState :: ResharingPending ( dec_sk) ;
729
+ self . state = WorkerState :: Running ( dec_sk) ;
739
730
info ! ( node = %self . label, committee_id = %committee. id( ) , "dkg finished (catchup successful)" ) ;
740
731
}
741
732
@@ -751,7 +742,7 @@ impl Worker {
751
742
}
752
743
753
744
let subsets = match & mut self . state {
754
- WorkerState :: AwaitingHandover ( subsets) => subsets,
745
+ WorkerState :: HandoverPending ( subsets) => subsets,
755
746
_ => {
756
747
trace ! ( node = %self . label, current = %self . current, %msg. committee_id, "not awaiting handover" ) ;
757
748
return Ok ( ( ) ) ;
@@ -885,7 +876,7 @@ impl Worker {
885
876
. map_err ( |e| DecrypterError :: Dkg ( e. to_string ( ) ) ) ?;
886
877
887
878
self . enc_key . set ( dec_sk. clone ( ) ) ;
888
- self . state = WorkerState :: ResharingPending ( dec_sk) ;
879
+ self . state = WorkerState :: Running ( dec_sk) ;
889
880
self . dkg_completed . insert ( committee. id ( ) ) ;
890
881
info ! ( committee_id = %committee. id( ) , node = %self . label, "dkg finished" ) ;
891
882
}
@@ -938,7 +929,7 @@ impl Worker {
938
929
if let Some ( next_node_idx) = committee. get_index ( & self . label ) . map ( |idx| idx. into ( ) ) {
939
930
// node is a member of the next committee; decrypting reshares immediately
940
931
let vess = Vess :: new_fast ( ) ;
941
- self . dkg_sk = self . dkg_sk . with_node_idx ( next_node_idx) ;
932
+ self . dkg_sk = DkgDecKey :: from ( self . dkg_sk . clone ( ) ) . label ( next_node_idx) ;
942
933
let dealings: Vec < _ > = subset
943
934
. bundles ( )
944
935
. iter ( )
@@ -1114,12 +1105,12 @@ impl Worker {
1114
1105
self . pending . insert ( incl. round ( ) , incl. clone ( ) ) ;
1115
1106
return Err ( DecrypterError :: DkgPending ) ;
1116
1107
}
1117
- WorkerState :: AwaitingHandover ( _) => {
1108
+ WorkerState :: HandoverPending ( _) => {
1118
1109
return Err ( DecrypterError :: Dkg (
1119
1110
"Worker state does not hold decryption key" . to_string ( ) ,
1120
1111
) ) ;
1121
1112
}
1122
- WorkerState :: ResharingPending ( dec_key)
1113
+ WorkerState :: Running ( dec_key)
1123
1114
| WorkerState :: ResharingComplete ( dec_key, _)
1124
1115
| WorkerState :: HandoverComplete ( dec_key)
1125
1116
| WorkerState :: ShuttingDown ( dec_key) => dec_key,
@@ -1210,7 +1201,7 @@ impl Worker {
1210
1201
}
1211
1202
1212
1203
let dec_sk = match & self . state {
1213
- WorkerState :: ResharingPending ( dec_key)
1204
+ WorkerState :: Running ( dec_key)
1214
1205
| WorkerState :: ResharingComplete ( dec_key, _)
1215
1206
| WorkerState :: ShuttingDown ( dec_key) => dec_key,
1216
1207
_ => {
@@ -1423,11 +1414,11 @@ impl Worker {
1423
1414
self . state = match & self . state {
1424
1415
WorkerState :: HandoverComplete ( decryption_key) => {
1425
1416
info ! ( node = %self . label, committee = %self . current, "(new node) successful committee switch" ) ;
1426
- WorkerState :: ResharingPending ( decryption_key. clone ( ) )
1417
+ WorkerState :: Running ( decryption_key. clone ( ) )
1427
1418
}
1428
1419
WorkerState :: ResharingComplete ( _, next_key) => {
1429
1420
info ! ( node = %self . label, committee = %self . current, "(old node) successful committee switch" ) ;
1430
- WorkerState :: ResharingPending ( next_key. clone ( ) )
1421
+ WorkerState :: Running ( next_key. clone ( ) )
1431
1422
}
1432
1423
WorkerState :: ShuttingDown ( dec_key) => {
1433
1424
info ! ( "(old node) not a member of new committee. ready for shut down" ) ;
@@ -1598,7 +1589,7 @@ mod tests {
1598
1589
collections:: VecDeque ,
1599
1590
net:: { Ipv4Addr , SocketAddr } ,
1600
1591
sync:: Arc ,
1601
- time:: { Duration , Instant } ,
1592
+ time:: Instant ,
1602
1593
} ;
1603
1594
1604
1595
use timeboost_utils:: types:: logging;
@@ -1628,7 +1619,6 @@ mod tests {
1628
1619
const TEST_EPOCH : u64 = 42 ;
1629
1620
const TEST_CHAIN_ID : u64 = 0 ;
1630
1621
const TEST_SEQNO : u64 = 10 ;
1631
- const NETWORK_SETUP_DELAY_SECS : u64 = 1 ;
1632
1622
const RETAIN_ROUNDS : usize = 100 ;
1633
1623
const COM1 : u64 = 1 ;
1634
1624
const COM2 : u64 = 2 ;
@@ -1913,7 +1903,6 @@ mod tests {
1913
1903
Some ( com1_setup. clone ( ) ) ,
1914
1904
)
1915
1905
. await ;
1916
- tokio:: time:: sleep ( Duration :: from_secs ( NETWORK_SETUP_DELAY_SECS ) ) . await ;
1917
1906
1918
1907
let com1_round = RoundNumber :: new ( DECRYPTION_ROUND ) ;
1919
1908
let com2_round = RoundNumber :: new ( DECRYPTION_ROUND + 1 ) ;
@@ -1955,8 +1944,6 @@ mod tests {
1955
1944
. expect ( "use committee event succeeds" ) ;
1956
1945
}
1957
1946
1958
- tokio:: time:: sleep ( Duration :: from_secs ( NETWORK_SETUP_DELAY_SECS ) ) . await ;
1959
-
1960
1947
let priority_tx_message = b"Priority message for old committee" ;
1961
1948
let regular_tx_message = b"Non-priority message for old committee" ;
1962
1949
@@ -2397,9 +2384,6 @@ mod tests {
2397
2384
encryption_key_cells. push ( encryption_key_cell) ;
2398
2385
}
2399
2386
2400
- // Allow time for network setup
2401
- tokio:: time:: sleep ( Duration :: from_secs ( NETWORK_SETUP_DELAY_SECS ) ) . await ;
2402
-
2403
2387
(
2404
2388
decrypters,
2405
2389
DecrypterSetup :: new (
0 commit comments