Skip to content

Commit 3278412

Browse files
committed
Increase readability on hatch method.
1 parent b1b55da commit 3278412

File tree

3 files changed

+124
-94
lines changed

3 files changed

+124
-94
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/src/tests/timeboost/timeboost_handover.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async fn run_handover(
4040
const NUM_OF_BLOCKS_PER_EPOCH: usize = 50;
4141

4242
let tasks = TaskTracker::new();
43-
let (bcast, _) = broadcast::channel(500);
43+
let (bcast, _) = broadcast::channel(1024);
4444
let finish = CancellationToken::new();
4545
let round2block = Arc::new(Round2Block::new());
4646

timeboost-sequencer/src/decrypt.rs

Lines changed: 122 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,8 @@ enum NextCommittee {
9898
/// As part of the Timeboost protocol, it produces decryption shares by threshold-decrypting these
9999
/// ciphertexts. The shares are exchanged with other decrypters, and once a sufficient number are
100100
/// collected, shares can be combined ("hatching") to obtain the plaintext.
101-
///
102-
/// In addition, the `Decrypter` extracts DKG bundles from candidate lists and combines them to derive
103-
/// the keys used for threshold decryption/combining.
101+
/// In addition, the `Decrypter` extracts DKG bundles from candidate lists and combines them to
102+
/// derive the keys used for threshold decryption/combining.
104103
pub struct Decrypter {
105104
/// Public key of the node.
106105
label: PublicKey,
@@ -398,10 +397,10 @@ enum WorkerState {
398397
/// Obtains the threshold decryption key from DKG bundles.
399398
///
400399
/// A node can recover its threshold decryption key in two ways:
401-
/// 1. **Consensus**: by combining DKG bundles extracted directly from candidate lists
402-
/// produced by peers.
403-
/// 2. **Network**: by receiving and combining an agreed-upon subset of DKG bundles from a
404-
/// designated set of peers.
400+
/// 1. **Consensus**: by combining DKG bundles extracted directly from candidate lists produced
401+
/// by peers.
402+
/// 2. **Network**: by receiving and combining an agreed-upon subset of DKG bundles from a
403+
/// designated set of peers.
405404
///
406405
/// For the initial DKG, method (1) is used.
407406
/// For resharing, method (2) is used, with the source peers being the previous committee.
@@ -598,7 +597,7 @@ impl Worker {
598597
.unwrap_or(RoundNumber::genesis())
599598
}
600599

601-
/// Returns true iff DKG is completed for the committee (ongoing or no pending DKG found)
600+
/// Returns true iff DKG is completed for the given committee.
602601
fn dkg_completed(&self, committee_id: &CommitteeId) -> bool {
603602
if let Some(acc) = self.tracker.get(committee_id) {
604603
acc.completed()
@@ -713,7 +712,7 @@ impl Worker {
713712
let acc = DkgAccumulator::from_subset(current.clone(), subset.to_owned());
714713
self.tracker.insert(committee.id(), acc);
715714
let dec_key = subset
716-
.extract_key(&current, &self.dkg_sk, prev.as_ref())
715+
.extract_key(current, &self.dkg_sk, prev.as_ref())
717716
.map_err(|e| DecrypterError::Dkg(e.to_string()))?;
718717

719718
self.dec_key.set(dec_key);
@@ -895,9 +894,7 @@ impl Worker {
895894
/// NOTE: when a ciphertext is malformed, we will skip decrypting it (treat as garbage) here.
896895
/// but will later be marked as decrypted during `hatch()`
897896
async fn decrypt(&mut self, incl: &InclusionList) -> Result<DecShareBatch> {
898-
let dec_key: DecryptionKey = self.dec_key.get().ok_or_else(|| {
899-
DecrypterError::Internal("Worker running without dec key".to_string())
900-
})?;
897+
let dec_key: DecryptionKey = self.decryption_key()?;
901898

902899
let round = Round::new(incl.round(), self.current);
903900
let ciphertexts: Vec<_> = incl
@@ -963,51 +960,105 @@ impl Worker {
963960
Ok(())
964961
}
965962

966-
/// Attempt to hatch for round, returns Ok(Some(_)) if hatched successfully, Ok(None) if
967-
/// insufficient shares or inclusion list yet received (hatching target arrive later than
968-
/// decrypted shares is possible due to out-of-order delivery).
969-
/// Local cache are garbage collected for hatched rounds.
963+
/// Attempts to decrypt and finalize an inclusion list for the given round number.
970964
async fn hatch(&mut self, round: RoundNumber) -> Result<Option<InclusionList>> {
971-
let dec_key = match &self.state {
972-
WorkerState::Running => self.dec_key.get().ok_or_else(|| {
973-
DecrypterError::Internal("Worker running without dec key".to_string())
974-
})?,
975-
_ => {
976-
return Err(DecrypterError::Dkg(
977-
"(hatching) worker state does not hold decryption key".to_string(),
978-
));
979-
}
980-
};
965+
let dec_key = self.decryption_key()?;
981966

982967
let Some((incl, is_encrypted)) = self.incls.get(&round) else {
983968
return Ok(None);
984969
};
985970
let mut incl = incl.clone();
986971

987-
// return immediately to parent if no encrypted transactions
988972
if !is_encrypted {
989-
self.gc(&round).await?;
990-
self.last_hatched_round = round;
973+
return Ok(Some(self.finalize_hatch(round, incl).await?));
974+
}
991975

992-
self.tx
993-
.send(incl.clone())
994-
.await
995-
.map_err(|_| EndOfPlay::DecrypterDown)?;
996-
return Ok(Some(incl));
976+
let Some(decrypted) = self.decrypt_ciphertexts(round, &incl, &dec_key).await? else {
977+
return Ok(None);
978+
};
979+
980+
self.update_inclusion_list(&mut incl, decrypted)?;
981+
982+
Ok(Some(self.finalize_hatch(round, incl).await?))
983+
}
984+
985+
/// Get the current decryption key
986+
fn decryption_key(&self) -> Result<DecryptionKey> {
987+
match &self.state {
988+
WorkerState::Running => self.dec_key.get().ok_or_else(|| {
989+
DecrypterError::Internal("Worker running without dec key".to_string())
990+
}),
991+
_ => Err(DecrypterError::Dkg(
992+
"(hatching) worker state does not hold decryption key".to_string(),
993+
)),
994+
}
995+
}
996+
997+
/// Get the current key store for the committee.
998+
fn current_store(&self) -> Result<KeyStore> {
999+
let guard = self.key_stores.read();
1000+
guard
1001+
.get(self.current)
1002+
.cloned()
1003+
.ok_or_else(|| DecrypterError::NoCommittee(self.current))
1004+
}
1005+
1006+
/// Update the inclusion list with decrypted plaintexts.
1007+
fn update_inclusion_list(
1008+
&self,
1009+
incl: &mut InclusionList,
1010+
decrypted: Vec<Option<Plaintext>>,
1011+
) -> Result<()> {
1012+
let mut num_encrypted_priority_bundles = 0;
1013+
1014+
// update priority bundles
1015+
incl.priority_bundles_mut()
1016+
.iter_mut()
1017+
.filter(|pb| pb.bundle().is_encrypted())
1018+
.zip(decrypted.clone())
1019+
.for_each(|(pb, opt_plaintext)| {
1020+
num_encrypted_priority_bundles += 1;
1021+
match opt_plaintext {
1022+
Some(pt) => pb.set_data(timeboost_types::Bytes::from(<Vec<u8>>::from(pt))),
1023+
// None means garbage (undecryptable ciphertext), simply mark as decrypted
1024+
None => pb.set_data(pb.bundle().data().clone()),
1025+
}
1026+
});
1027+
1028+
// update regular bundles
1029+
incl.regular_bundles_mut()
1030+
.iter_mut()
1031+
.filter(|b| b.is_encrypted())
1032+
.zip(decrypted[num_encrypted_priority_bundles..].to_vec())
1033+
.for_each(|(bundle, opt_plaintext)| {
1034+
match opt_plaintext {
1035+
Some(pt) => bundle.set_data(timeboost_types::Bytes::from(<Vec<u8>>::from(pt))),
1036+
// None means garbage (undecryptable ciphertext), simply mark as decrypted
1037+
None => bundle.set_data(bundle.data().clone()),
1038+
}
1039+
});
1040+
1041+
if incl.is_encrypted() {
1042+
return Err(DecrypterError::Internal(
1043+
"didn't fully decrypt inclusion list".to_string(),
1044+
));
9971045
}
9981046

1047+
Ok(())
1048+
}
1049+
1050+
/// Decrypt ciphertexts using available shares.
1051+
async fn decrypt_ciphertexts(
1052+
&mut self,
1053+
round: RoundNumber,
1054+
incl: &InclusionList,
1055+
dec_key: &DecryptionKey,
1056+
) -> Result<Option<Vec<Option<Plaintext>>>> {
9991057
let ciphertexts = incl.filter_ciphertexts().map(|b| deserialize(b).ok());
10001058
let Some(dec_shares) = self.dec_shares.get(&round) else {
10011059
return Ok(None);
10021060
};
1003-
let key_store = {
1004-
let guard = self.key_stores.read();
1005-
let Some(key_store) = guard.get(self.current) else {
1006-
error!(node = %self.label, committee = %self.current, "current committee not found");
1007-
return Err(DecrypterError::NoCommittee(self.current));
1008-
};
1009-
key_store.clone()
1010-
};
1061+
let key_store = self.current_store()?;
10111062

10121063
if dec_shares.is_empty()
10131064
|| dec_shares.iter().any(|opt_dec_shares| {
@@ -1020,13 +1071,6 @@ impl Worker {
10201071
return Ok(None);
10211072
}
10221073

1023-
// hatching ciphertext
1024-
// Option<_> uses None to indicate either invalid ciphertext, or 2f+1 invalid decryption
1025-
// share both imply "skip hatching this garbage bundle which will result in no-op
1026-
// during execution"
1027-
1028-
let mut decrypted: Vec<Option<Plaintext>> = vec![];
1029-
10301074
let Some(per_ct_opt_dec_shares) = self.dec_shares.get_mut(&round) else {
10311075
return Ok(None);
10321076
};
@@ -1052,7 +1096,7 @@ impl Worker {
10521096
.into_par_iter()
10531097
.zip(per_ct_opt_dec_shares.par_iter_mut())
10541098
.map(|(maybe_ct, decryption_shares)| {
1055-
// Collect valid decryption shares
1099+
// collect valid decryption shares
10561100
let valid_shares: Vec<_> = decryption_shares
10571101
.iter()
10581102
.filter_map(|s| s.as_ref())
@@ -1080,7 +1124,7 @@ impl Worker {
10801124
) {
10811125
Ok(pt) => CombineResult::Success(pt),
10821126
Err(ThresholdEncError::FaultySubset(wrong_indices)) => {
1083-
CombineResult::FaultySubset(wrong_indices.into())
1127+
CombineResult::FaultySubset(wrong_indices)
10841128
}
10851129
Err(e) => CombineResult::Error(e),
10861130
}
@@ -1090,18 +1134,19 @@ impl Worker {
10901134
})
10911135
.await?;
10921136

1137+
let mut decrypted = Vec::new();
10931138
for (result, decryption_shares) in combine_results.into_iter().zip(per_ct_opt_dec_shares) {
10941139
match result {
10951140
CombineResult::Success(pt) => decrypted.push(Some(pt)),
10961141
CombineResult::FaultySubset(wrong_indices) => {
1097-
// Remove faulty decryption shares
1142+
// remove faulty decryption shares
10981143
decryption_shares.retain(|opt_s| {
10991144
opt_s
11001145
.clone()
11011146
.is_none_or(|s| !wrong_indices.contains(&s.index()))
11021147
});
11031148
warn!(node = %self.label, ?wrong_indices, "combine found faulty subset");
1104-
// Not ready to hatch this ciphertext
1149+
// not ready to hatch this ciphertext
11051150
return Ok(None);
11061151
}
11071152
CombineResult::Error(e) => {
@@ -1114,50 +1159,27 @@ impl Worker {
11141159
}
11151160
}
11161161

1117-
// construct/modify the inclusion list to replace with decrypted payload
1118-
let mut num_encrypted_priority_bundles = 0;
1119-
incl.priority_bundles_mut()
1120-
.iter_mut()
1121-
.filter(|pb| pb.bundle().is_encrypted())
1122-
.zip(decrypted.clone())
1123-
.for_each(|(pb, opt_plaintext)| {
1124-
num_encrypted_priority_bundles += 1;
1125-
match opt_plaintext {
1126-
Some(pt) => pb.set_data(timeboost_types::Bytes::from(<Vec<u8>>::from(pt))),
1127-
// None means garbage (undecryptable ciphertext), simply mark as decrypted
1128-
None => pb.set_data(pb.bundle().data().clone()),
1129-
}
1130-
});
1131-
incl.regular_bundles_mut()
1132-
.iter_mut()
1133-
.filter(|b| b.is_encrypted())
1134-
.zip(decrypted[num_encrypted_priority_bundles..].to_vec())
1135-
.for_each(|(bundle, opt_plaintext)| {
1136-
match opt_plaintext {
1137-
Some(pt) => bundle.set_data(timeboost_types::Bytes::from(<Vec<u8>>::from(pt))),
1138-
// None means garbage (undecryptable ciphertext), simply mark as decrypted
1139-
None => bundle.set_data(bundle.data().clone()),
1140-
}
1141-
});
1142-
if incl.is_encrypted() {
1143-
return Err(DecrypterError::Internal(
1144-
"didn't fully decrypt inclusion list".to_string(),
1145-
));
1146-
}
1162+
Ok(Some(decrypted))
1163+
}
11471164

1148-
// garbage collect hatched rounds
1165+
/// Finalize the hatching process by updating state and sending the inclusion list.
1166+
async fn finalize_hatch(
1167+
&mut self,
1168+
round: RoundNumber,
1169+
incl: InclusionList,
1170+
) -> Result<InclusionList> {
11491171
self.last_hatched_round = round;
1150-
11511172
self.gc(&round).await?;
11521173

11531174
self.tx
11541175
.send(incl.clone())
11551176
.await
11561177
.map_err(|_| EndOfPlay::DecrypterDown)?;
11571178

1158-
Ok(Some(incl))
1179+
Ok(incl)
11591180
}
11601181

1182+
/// Adds a new committee to the worker and updates network connections.
11611183
async fn on_next_committee(&mut self, c: AddressableCommittee, k: KeyStore) -> Result<()> {
11621184
info!(node = %self.label, committee = %c.committee().id(), "add next committee");
11631185
let key_store = {
@@ -1188,6 +1210,17 @@ impl Worker {
11881210
Ok(())
11891211
}
11901212

1213+
/// Prepares for a committee switch at the specified round.
1214+
///
1215+
/// This method is called on nodes that are members of the upcoming committee.
1216+
/// It handles two distinct cases:
1217+
///
1218+
/// 1. **New Committee Member**: If this is the node's first committee, it requests DKG subsets
1219+
/// from the current committee to construct the threshold decryption key.
1220+
///
1221+
/// 2. **Existing Committee Member**: If the node is in both current and next committee, it
1222+
/// generates and caches the new decryption keys immediately, ensuring they're ready for use
1223+
/// when the switch occurs at the specified round.
11911224
async fn on_use_committee(&mut self, round: Round) -> Result<()> {
11921225
info!(node = %self.label, %round, "use committee");
11931226
let committee_id = round.committee();
@@ -1239,6 +1272,7 @@ impl Worker {
12391272
Ok(())
12401273
}
12411274

1275+
/// Switches to the next committee if the clock is past the start round.
12421276
async fn maybe_switch_committee(&mut self) -> Result<()> {
12431277
let NextCommittee::Use(start, next_key) = &self.next_committee else {
12441278
return Ok(());
@@ -1288,6 +1322,7 @@ impl Worker {
12881322
Ok(())
12891323
}
12901324

1325+
/// Removes the old committee and updates network connections.
12911326
async fn remove_old_committee(&mut self) -> Result<()> {
12921327
let NextCommittee::Del(round) = self.next_committee else {
12931328
return Ok(());
@@ -1297,13 +1332,7 @@ impl Worker {
12971332
return Ok(());
12981333
};
12991334

1300-
let key_store = {
1301-
let guard = self.key_stores.read();
1302-
let Some(key_store) = guard.get(self.current) else {
1303-
return Err(DecrypterError::NoCommittee(self.current));
1304-
};
1305-
key_store.clone()
1306-
};
1335+
let key_store = self.current_store()?;
13071336
let old = self
13081337
.net
13091338
.parties()

0 commit comments

Comments
 (0)