@@ -26,7 +26,7 @@ use timeboost_types::{
26
26
} ;
27
27
use tokio:: spawn;
28
28
use tokio:: sync:: mpsc:: { Receiver , Sender , channel} ;
29
- use tokio:: task:: { JoinError , JoinHandle } ;
29
+ use tokio:: task:: { JoinError , JoinHandle , spawn_blocking } ;
30
30
use tracing:: { debug, error, info, trace, warn} ;
31
31
32
32
use crate :: config:: DecrypterConfig ;
@@ -242,23 +242,49 @@ impl Decrypter {
242
242
/// # Returns
243
243
/// - `Some(DkgBundle)` if a new dealing was successfully created for the current committee.
244
244
/// - `None` if already submitted or if encryption key is missing.
245
- pub fn gen_dkg_bundle ( & mut self ) -> Option < DkgBundle > {
246
- let guard = self . key_stores . read ( ) ;
247
- let Some ( store) = guard. get ( self . current ) else {
245
+ pub async fn gen_dkg_bundle ( & mut self ) -> Option < DkgBundle > {
246
+ let Some ( current_store) = self . key_stores . read ( ) . get ( self . current ) . cloned ( ) else {
248
247
warn ! ( node = %self . label, committee = %self . current, "missing current key store" ) ;
249
248
return None ;
250
249
} ;
251
- let Some ( node_idx) = store . committee ( ) . get_index ( & self . label ) else {
250
+ let Some ( node_idx) = current_store . committee ( ) . get_index ( & self . label ) else {
252
251
warn ! ( node = %self . label, committee = %self . current, "local key not in store" ) ;
253
252
return None ;
254
253
} ;
255
254
256
- let vess = Vess :: new_fast ( ) ;
257
- let mut rng = thread_rng ( ) ;
258
- let secret = <Vss as VerifiableSecretSharing >:: Secret :: rand ( & mut rng) ;
259
- let ( ct, cm) = vess
260
- . encrypt_shares ( store. committee ( ) , store. sorted_keys ( ) , secret, DKG_AAD )
261
- . ok ( ) ?;
255
+ let ( ct, cm) = match spawn_blocking ( move || {
256
+ let vess = Vess :: new_fast ( ) ;
257
+ let mut rng = thread_rng ( ) ;
258
+ let secret = <Vss as VerifiableSecretSharing >:: Secret :: rand ( & mut rng) ;
259
+ vess. encrypt_shares (
260
+ current_store. committee ( ) ,
261
+ current_store. sorted_keys ( ) ,
262
+ secret,
263
+ DKG_AAD ,
264
+ )
265
+ } )
266
+ . await
267
+ {
268
+ Ok ( Ok ( result) ) => result,
269
+ Ok ( Err ( e) ) => {
270
+ warn ! (
271
+ node = %self . label,
272
+ committee = %self . current,
273
+ error = ?e,
274
+ "failed to produce dkg bundle"
275
+ ) ;
276
+ return None ;
277
+ }
278
+ Err ( e) => {
279
+ warn ! (
280
+ node = %self . label,
281
+ committee = %self . current,
282
+ error = ?e,
283
+ "task failed producing dkg bundle"
284
+ ) ;
285
+ return None ;
286
+ }
287
+ } ;
262
288
debug ! ( node = %self . label, curr = %self . current, "produced dkg bundle" ) ;
263
289
Some ( DkgBundle :: new ( ( node_idx, self . label ) , self . current , ct, cm) )
264
290
}
@@ -268,10 +294,9 @@ impl Decrypter {
268
294
/// # Returns
269
295
/// - `Some(DkgBundle)` if a Resharing dealing was successfully created.
270
296
/// - `None` if already submitted or if encryption key is missing.
271
- pub fn gen_resharing_bundle ( & mut self , next_store : KeyStore ) -> Option < DkgBundle > {
297
+ pub async fn gen_resharing_bundle ( & mut self , next_store : KeyStore ) -> Option < DkgBundle > {
272
298
let committee_id = next_store. committee ( ) . id ( ) ;
273
- let guard = self . key_stores . read ( ) ;
274
- let Some ( current_store) = guard. get ( self . current ) else {
299
+ let Some ( current_store) = self . key_stores . read ( ) . get ( self . current ) . cloned ( ) else {
275
300
warn ! ( node = %self . label, committee = %self . current, "missing current key store" ) ;
276
301
return None ;
277
302
} ;
@@ -285,15 +310,38 @@ impl Decrypter {
285
310
warn ! ( node = %self . label, committee = %committee_id, "no existing key to reshare" ) ;
286
311
return None ;
287
312
} ;
288
- let vess = Vess :: new_fast ( ) ;
289
- let ( ct, cm) = vess
290
- . encrypt_reshares (
313
+
314
+ let ( ct, cm) = match spawn_blocking ( move || {
315
+ let vess = Vess :: new_fast ( ) ;
316
+ vess. encrypt_reshares (
291
317
next_store. committee ( ) ,
292
318
next_store. sorted_keys ( ) ,
293
319
* dec_key. privkey ( ) . share ( ) ,
294
320
DKG_AAD ,
295
321
)
296
- . ok ( ) ?;
322
+ } )
323
+ . await
324
+ {
325
+ Ok ( Ok ( result) ) => result,
326
+ Ok ( Err ( e) ) => {
327
+ warn ! (
328
+ node = %self . label,
329
+ committee = %committee_id,
330
+ error = ?e,
331
+ "failed to produce reshare bundle"
332
+ ) ;
333
+ return None ;
334
+ }
335
+ Err ( e) => {
336
+ warn ! (
337
+ node = %self . label,
338
+ committee = %committee_id,
339
+ error = ?e,
340
+ "task failed producing resharing bundle"
341
+ ) ;
342
+ return None ;
343
+ }
344
+ } ;
297
345
debug ! ( node = %self . label, curr = %self . current, next = %committee_id, "produced resharing bundle" ) ;
298
346
Some ( DkgBundle :: new ( ( node_idx, self . label ) , committee_id, ct, cm) )
299
347
}
@@ -979,8 +1027,8 @@ impl Worker {
979
1027
980
1028
/// Get the key store for the current committee.
981
1029
fn current_store ( & self ) -> Result < KeyStore > {
982
- let guard = self . key_stores . read ( ) ;
983
- guard
1030
+ self . key_stores
1031
+ . read ( )
984
1032
. get ( self . current )
985
1033
. cloned ( )
986
1034
. ok_or_else ( || DecrypterError :: NoCommittee ( self . current ) )
@@ -1068,7 +1116,7 @@ impl Worker {
1068
1116
}
1069
1117
1070
1118
// process each ciphertext in parallel using spawn_blocking
1071
- let combine_results = tokio :: task :: spawn_blocking ( {
1119
+ let combine_results = spawn_blocking ( {
1072
1120
let key_store = key_store. clone ( ) ;
1073
1121
let dec_key = dec_key. clone ( ) ;
1074
1122
let ciphertexts: Vec < _ > = ciphertexts. collect ( ) ;
@@ -2162,26 +2210,24 @@ mod tests {
2162
2210
2163
2211
/// Generate all DKG bundle (one per decrypter) then enqueue all bundles at all decrypters
2164
2212
async fn enqueue_all_dkg_bundles ( decrypters : & mut [ Decrypter ] , key_store : Option < KeyStore > ) {
2165
- let bundles = if let Some ( key_store) = key_store {
2166
- decrypters
2167
- . iter_mut ( )
2168
- . map ( |decrypter| {
2169
- decrypter
2170
- . gen_resharing_bundle ( key_store . clone ( ) )
2171
- . expect ( "DKG bundle should be generated" )
2172
- } )
2173
- . collect :: < VecDeque < _ > > ( )
2213
+ let bundles: VecDeque < _ > = if let Some ( key_store) = key_store {
2214
+ futures :: future :: join_all (
2215
+ decrypters
2216
+ . iter_mut ( )
2217
+ . map ( |d| d . gen_resharing_bundle ( key_store . clone ( ) ) ) ,
2218
+ )
2219
+ . await
2220
+ . into_iter ( )
2221
+ . collect ( )
2174
2222
} else {
2175
- decrypters
2176
- . iter_mut ( )
2177
- . map ( |decrypter| {
2178
- decrypter
2179
- . gen_dkg_bundle ( )
2180
- . expect ( "DKG bundle should be generated" )
2181
- } )
2182
- . collect :: < VecDeque < _ > > ( )
2223
+ futures:: future:: join_all ( decrypters. iter_mut ( ) . map ( Decrypter :: gen_dkg_bundle) )
2224
+ . await
2225
+ . into_iter ( )
2226
+ . collect ( )
2183
2227
} ;
2184
2228
2229
+ let bundles: VecDeque < _ > = bundles. into_iter ( ) . flatten ( ) . collect ( ) ;
2230
+
2185
2231
// enqueuing them all to decrypters
2186
2232
for decrypter in decrypters. iter_mut ( ) {
2187
2233
for dkg in bundles. clone ( ) {
0 commit comments