Skip to content

Commit e84830a

Browse files
committed
simplify DkgState, only Pending and Ready
1 parent 311608f commit e84830a

File tree

2 files changed

+24
-24
lines changed

2 files changed

+24
-24
lines changed

tests/src/tests/timeboost.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ where
128128

129129
/// Generate random bundles at a fixed frequency.
130130
async fn gen_bundles(enc_key: ThresholdEncKeyCell, tx: broadcast::Sender<BundleVariant>) {
131-
// allow time for DKG to settle so recovering nodes may catchup
132-
sleep(Duration::from_secs(20)).await;
133131
loop {
134132
let Ok(b) = make_bundle(&enc_key) else {
135133
warn!("Failed to generate bundle");

timeboost-sequencer/src/decrypt.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl Decrypter {
159159
.dkg_stores(dkg_stores.clone())
160160
.current(committee.id())
161161
.net(Overlay::new(net))
162-
.dkg_state(DkgState::Genesis)
162+
.dkg_state(DkgState::default())
163163
.tx(dec_tx)
164164
.rx(cmd_rx)
165165
.enc_key(cfg.threshold_enc_key.clone())
@@ -350,21 +350,23 @@ impl Drop for Decrypter {
350350

351351
#[derive(Debug, Clone)]
352352
enum DkgState {
353-
/// The node has not yet received sufficient DkgBundles to obtain the threshold
354-
/// decryption key material.
355-
Genesis,
356-
357353
/// The node received inclusion lists with encrypted bundles but did not receive enough
358354
/// DkgBundles to obtain the key. This suggests that the node is catching up.
359355
///
360356
/// In this state, the node will broadcast a DKG subset request and will obtain the key
361357
/// once sufficient subsets have been received from remote nodes over the network.
362-
Recover(HashMap<PublicKey, Subset>),
358+
Pending(HashMap<PublicKey, Subset>),
363359

364360
/// The node has completed DKG and can now combine/produce threshold decryption shares.
365361
Completed(DecryptionKey),
366362
}
367363

364+
impl Default for DkgState {
365+
fn default() -> Self {
366+
Self::Pending(HashMap::new())
367+
}
368+
}
369+
368370
/// Worker is responsible for "hatching" ciphertexts.
369371
///
370372
/// When ciphertexts in a round have received t+1 decryption shares
@@ -450,6 +452,12 @@ impl Worker {
450452
pub async fn go(mut self) -> EndOfPlay {
451453
let node = self.label;
452454

455+
// always try to catchup first, if other nodes haven't finished, they simply won't respond
456+
if let Err(e) = self.dkg_catchup().await {
457+
debug!("err during dkg_catchup: {:?}", e);
458+
return EndOfPlay::NetworkDown;
459+
}
460+
453461
loop {
454462
let mut cache_modified = false;
455463
// process any pending inclusion lists received during recovery
@@ -557,7 +565,7 @@ impl Worker {
557565
let conf = bincode::config::standard().with_limit::<MAX_MESSAGE_SIZE>();
558566
match bincode::serde::decode_from_slice(&bytes, conf)?.0 {
559567
Protocol::GetRequest(cid) => self.on_get_request(src, cid).await?,
560-
Protocol::GetResponse(subset) => self.on_get_response(src, subset).await?,
568+
Protocol::GetResponse(res) => self.on_get_response(src, res).await?,
561569
Protocol::Batch(batch) => {
562570
self.on_batch_msg(src, batch).await?;
563571
return Ok(true);
@@ -612,12 +620,12 @@ impl Worker {
612620
}
613621

614622
/// A get response for DKG subset has been received.
615-
async fn on_get_response(&mut self, src: PublicKey, subset: SubsetResponse) -> Result<()> {
616-
let SubsetResponse { round, subset } = subset;
623+
async fn on_get_response(&mut self, src: PublicKey, res: SubsetResponse) -> Result<()> {
624+
let SubsetResponse { round, subset } = res;
617625
let (round_num, committee_id) = round.into_parts();
618626
trace!(node = %self.label, from=%src, %committee_id, round=%round_num, "received get_response");
619627

620-
let DkgState::Recover(ref mut subsets) = self.dkg_state else {
628+
let DkgState::Pending(ref mut subsets) = self.dkg_state else {
621629
trace!("received get_response but not in a recovering state");
622630
return Ok(());
623631
};
@@ -823,11 +831,12 @@ impl Worker {
823831
Ok(())
824832
}
825833

826-
/// The node entered recover state and will catchup with the help of remote nodes.
827-
async fn recover(&mut self, incl: InclusionList) -> Result<()> {
828-
let req = Protocol::GetRequest(Round::new(incl.round(), self.current));
834+
/// The node will always try to catchup with the help of remote nodes first.
835+
async fn dkg_catchup(&mut self) -> Result<()> {
836+
let round = self.first_requested_round.unwrap_or_default();
837+
let req = Protocol::GetRequest(Round::new(round, self.current));
829838
self.net
830-
.broadcast(incl.round().u64(), serialize(&req)?)
839+
.broadcast(round.u64(), serialize(&req)?)
831840
.await
832841
.map_err(|e| DecrypterError::End(e.into()))?;
833842
Ok(())
@@ -877,14 +886,7 @@ impl Worker {
877886
/// but will later be marked as decrypted during `hatch()`
878887
async fn decrypt(&mut self, incl: &InclusionList) -> Result<DecShareBatch> {
879888
let dec_sk = match &self.dkg_state {
880-
DkgState::Genesis => {
881-
// received encrypted bundles but haven't received enough DkgBundles.
882-
self.recover(incl.clone()).await?;
883-
self.pending.insert(incl.round(), incl.clone());
884-
self.dkg_state = DkgState::Recover(HashMap::default());
885-
return Err(DecrypterError::DkgPending);
886-
}
887-
DkgState::Recover(_) => {
889+
DkgState::Pending(_) => {
888890
// we already initiated catchup; awaiting response from remote nodes.
889891
self.pending.insert(incl.round(), incl.clone());
890892
return Err(DecrypterError::DkgPending);

0 commit comments

Comments
 (0)