Skip to content

Commit affd510

Browse files
committed
cleaner WorkerState
1 parent ed6f01e commit affd510

File tree

1 file changed

+58
-39
lines changed

1 file changed

+58
-39
lines changed

timeboost-sequencer/src/decrypt.rs

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,10 @@ impl Decrypter {
153153
}
154154
None => {
155155
let kv = KeyStoreVec::new(key_store);
156-
(Arc::new(RwLock::new(kv)), WorkerState::default())
156+
(
157+
Arc::new(RwLock::new(kv)),
158+
WorkerState::DkgPending(HashMap::new()),
159+
)
157160
}
158161
};
159162

@@ -366,33 +369,40 @@ impl Drop for Decrypter {
366369

367370
/// The operational state of the Worker.
368371
///
369-
/// State Machine Flow:
370-
/// - DkgPending -> Running <-> ResharingComplete -> ShuttingDown
371-
/// - HandoverPending -> HandoverComplete -> Running <-> ResharingComplete -> ShuttingDown
372+
/// # State Machine Flow: (epoch e1 is special, e2 onwards are the same)
373+
///
374+
/// note: ShuttingDown and ResharingComplete will trigger send_handover_msg,
375+
/// Running is triggered by maybe_switch_committee except in epoch 1
376+
///
377+
/// #1: "e1 -> e2, in C1, but not C2"
378+
/// DkgPending -> Running -> ShuttingDown
379+
///
380+
/// #2: "e1 -> e2, in C1 and C2"
381+
/// DkgPending -> Running -> ResharingComplete -> Running (in e2)
382+
///
383+
/// #3: "ex -> ex+1, in Cx, but not Cx+1"
384+
/// HandoverPending -> HandoverComplete -> Running (in ex) -> ShuttingDown
385+
///
386+
/// #4: "ex -> ex+1, in Cx and Cx+1"
387+
/// HandoverPending -> HandoverComplete -> Running (in ex) -> ResharingComplete -> Running (in ex+1)
372388
#[derive(Debug, Clone)]
373389
#[allow(clippy::large_enum_variant)]
374390
enum WorkerState {
375391
/// Awaiting resharing messages from the previous committee.
376392
HandoverPending(HashMap<PublicKey, ResharingSubset>),
377-
/// Received enough resharing messages to complete the handover.
378-
HandoverComplete(DecryptionKey),
393+
/// Received enough resharing messages to complete the handover, but yet actively running.
394+
HandoverComplete,
379395
/// Expects to obtain the initial DKG key through DKG bundles.
380396
///
381397
/// Upon startup the Worker requests DKG messages from remote nodes
382398
/// such that, if the local node is behind, it will catchup immediately.
383399
DkgPending(HashMap<PublicKey, DkgSubset>),
384-
/// Already completed at least one instance of DKG. Ready for resharing.
385-
Running(DecryptionKey),
386-
/// Obtained keys for both the current and next committee.
387-
ResharingComplete(DecryptionKey, DecryptionKey),
400+
/// Active mode with decryption key ready.
401+
Running,
402+
/// Obtained decryption key for the next committee also as a member (see case #2 and #4)
403+
ResharingComplete(DecryptionKey),
388404
/// Completed resharing and handover but is not a member of next committee.
389-
ShuttingDown(DecryptionKey),
390-
}
391-
392-
impl Default for WorkerState {
393-
fn default() -> Self {
394-
Self::DkgPending(HashMap::new())
395-
}
405+
ShuttingDown,
396406
}
397407

398408
/// Worker is responsible for "hatching" ciphertexts.
@@ -493,7 +503,7 @@ impl Worker {
493503
loop {
494504
let mut cache_modified = false;
495505
// process pending inclusion lists received during catchup
496-
if !self.pending.is_empty() && matches!(self.state, WorkerState::Running(_)) {
506+
if !self.pending.is_empty() && matches!(self.state, WorkerState::Running) {
497507
for incl in std::mem::take(&mut self.pending).into_values() {
498508
match self.on_decrypt_request(incl, true).await {
499509
Ok(()) => {}
@@ -586,7 +596,7 @@ impl Worker {
586596
Err(err) => warn!(node = %self.label, %round, %err, "error on hatch"),
587597
}
588598

589-
if matches!(self.state, WorkerState::ShuttingDown(_)) {
599+
if matches!(self.state, WorkerState::ShuttingDown) {
590600
// graceful shut down
591601
if let Some(next_committee) = self.next_committee {
592602
if next_committee.num() - 1 == self.last_hatched_round {
@@ -726,7 +736,7 @@ impl Worker {
726736
.map_err(|e| DecrypterError::Dkg(e.to_string()))?;
727737

728738
self.enc_key.set(dec_sk.clone());
729-
self.state = WorkerState::Running(dec_sk);
739+
self.state = WorkerState::Running;
730740
info!(node = %self.label, committee_id = %committee.id(), "dkg finished (catchup successful)");
731741
}
732742

@@ -794,7 +804,7 @@ impl Worker {
794804
.map_err(|e| DecrypterError::Dkg(e.to_string()))?;
795805

796806
info!(committee_id = %committee.id(), node = %self.label, "handover finished");
797-
self.state = WorkerState::HandoverComplete(next_dec_key.clone());
807+
self.state = WorkerState::HandoverComplete;
798808
self.enc_key.set(next_dec_key);
799809
self.dkg_completed.insert(committee.id());
800810
}
@@ -875,8 +885,8 @@ impl Worker {
875885
.result()
876886
.map_err(|e| DecrypterError::Dkg(e.to_string()))?;
877887

878-
self.enc_key.set(dec_sk.clone());
879-
self.state = WorkerState::Running(dec_sk);
888+
self.enc_key.set(dec_sk);
889+
self.state = WorkerState::Running;
880890
self.dkg_completed.insert(committee.id());
881891
info!(committee_id = %committee.id(), node = %self.label, "dkg finished");
882892
}
@@ -961,10 +971,10 @@ impl Worker {
961971
)
962972
.map_err(|e| DecrypterError::Dkg(e.to_string()))?;
963973

964-
self.state = WorkerState::ResharingComplete(dec_key.clone(), next_dec_key.clone());
974+
self.state = WorkerState::ResharingComplete(next_dec_key);
965975
} else {
966976
// resharing complete; node will shut down at next committee switch
967-
self.state = WorkerState::ShuttingDown(dec_key.clone());
977+
self.state = WorkerState::ShuttingDown;
968978
}
969979

970980
trace!(committee_id = %committee.id(), node = %self.label, "resharing complete; handing over");
@@ -1100,7 +1110,7 @@ impl Worker {
11001110
/// NOTE: when a ciphertext is malformed, we will skip decrypting it (treat as garbage) here.
11011111
/// but will later be marked as decrypted during `hatch()`
11021112
async fn decrypt(&mut self, incl: &InclusionList) -> Result<DecShareBatch> {
1103-
let dec_sk = match &self.state {
1113+
let dec_sk: DecryptionKey = match &self.state {
11041114
WorkerState::DkgPending(_) => {
11051115
self.pending.insert(incl.round(), incl.clone());
11061116
return Err(DecrypterError::DkgPending);
@@ -1110,10 +1120,16 @@ impl Worker {
11101120
"Worker state does not hold decryption key".to_string(),
11111121
));
11121122
}
1113-
WorkerState::Running(dec_key)
1114-
| WorkerState::ResharingComplete(dec_key, _)
1115-
| WorkerState::HandoverComplete(dec_key)
1116-
| WorkerState::ShuttingDown(dec_key) => dec_key,
1123+
WorkerState::ResharingComplete(_) => {
1124+
return Err(DecrypterError::Dkg(format!(
1125+
"resharing completed, but Worker not active: label={}, round={}",
1126+
self.label,
1127+
incl.round()
1128+
)));
1129+
}
1130+
_ => self.enc_key.get().ok_or_else(|| {
1131+
DecrypterError::Internal("Worker running without dec key".to_string())
1132+
})?,
11171133
};
11181134

11191135
let round = Round::new(incl.round(), self.current);
@@ -1201,9 +1217,11 @@ impl Worker {
12011217
}
12021218

12031219
let dec_sk = match &self.state {
1204-
WorkerState::Running(dec_key)
1205-
| WorkerState::ResharingComplete(dec_key, _)
1206-
| WorkerState::ShuttingDown(dec_key) => dec_key,
1220+
WorkerState::Running
1221+
| WorkerState::ResharingComplete(_)
1222+
| WorkerState::ShuttingDown => self.enc_key.get().ok_or_else(|| {
1223+
DecrypterError::Internal("Worker running without dec key".to_string())
1224+
})?,
12071225
_ => {
12081226
return Err(DecrypterError::Dkg(
12091227
"(hatching) worker state does not hold decryption key".to_string(),
@@ -1412,17 +1430,18 @@ impl Worker {
14121430

14131431
// update state machine
14141432
self.state = match &self.state {
1415-
WorkerState::HandoverComplete(decryption_key) => {
1433+
WorkerState::HandoverComplete => {
14161434
info!(node = %self.label, committee = %self.current, "(new node) successful committee switch");
1417-
WorkerState::Running(decryption_key.clone())
1435+
WorkerState::Running
14181436
}
1419-
WorkerState::ResharingComplete(_, next_key) => {
1437+
WorkerState::ResharingComplete(next_key) => {
14201438
info!(node = %self.label, committee = %self.current, "(old node) successful committee switch");
1421-
WorkerState::Running(next_key.clone())
1439+
self.enc_key.set(next_key.clone());
1440+
WorkerState::Running
14221441
}
1423-
WorkerState::ShuttingDown(dec_key) => {
1442+
WorkerState::ShuttingDown => {
14241443
info!("(old node) not a member of new committee. ready for shut down");
1425-
WorkerState::ShuttingDown(dec_key.clone())
1444+
WorkerState::ShuttingDown
14261445
}
14271446
_ => {
14281447
return Err(DecrypterError::Internal(

0 commit comments

Comments
 (0)