Skip to content

Commit 5ecdc01

Browse files
committed
Only enqueue DKG bundles if the decrypter has capacity.
1 parent 0e35ea7 commit 5ecdc01

File tree

3 files changed

+38
-71
lines changed

3 files changed

+38
-71
lines changed

timeboost-sequencer/src/decrypt.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use multisig::{Committee, CommitteeId, PublicKey};
1010
use parking_lot::RwLock;
1111
use sailfish::types::{CommitteeVec, Evidence, Round, RoundNumber};
1212
use serde::{Deserialize, Serialize};
13-
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
13+
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
1414
use std::result::Result as StdResult;
1515
use std::sync::Arc;
1616
use timeboost_crypto::prelude::{LabeledDkgDecKey, ThresholdEncKeyCell, Vess, Vss};
@@ -232,18 +232,13 @@ impl Decrypter {
232232
Ok(())
233233
}
234234

235-
/// Send the received DKG bundles to worker
236-
pub async fn enqueue_dkg(
237-
&self,
238-
pending_dkgs: &mut VecDeque<DkgBundle>,
239-
) -> StdResult<(), DecrypterDown> {
240-
while let Some(b) = pending_dkgs.pop_front() {
241-
self.worker_tx
242-
.send(Command::Dkg(b))
243-
.await
244-
.map_err(|_| DecrypterDown(()))?;
245-
debug!(node = %self.label, "enqueued one dkg bundle");
246-
}
235+
/// Send the received DKG bundle to worker
236+
pub async fn enqueue_dkg(&self, dkg: DkgBundle) -> StdResult<(), DecrypterDown> {
237+
self.worker_tx
238+
.send(Command::Dkg(dkg))
239+
.await
240+
.map_err(|_| DecrypterDown(()))?;
241+
debug!(node = %self.label, "enqueued one dkg bundle");
247242
Ok(())
248243
}
249244

@@ -1621,11 +1616,12 @@ mod tests {
16211616

16221617
// enqueuing them all to decrypters
16231618
for decrypter in decrypters.iter_mut() {
1624-
let mut pending_dkgs = dkg_bundles.clone();
1625-
decrypter
1626-
.enqueue_dkg(&mut pending_dkgs)
1627-
.await
1628-
.expect("DKG bundles should be enqueued successfully");
1619+
for dkg in dkg_bundles.clone() {
1620+
decrypter
1621+
.enqueue_dkg(dkg)
1622+
.await
1623+
.expect("DKG bundles should be enqueued successfully");
1624+
}
16291625
}
16301626
}
16311627

timeboost-sequencer/src/lib.rs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -261,23 +261,22 @@ impl Task {
261261
// processing its actions continues unhindered.
262262
async fn go(mut self) -> Result<()> {
263263
let mut pending = None;
264-
let mut pending_dkgs = VecDeque::new();
264+
let mut dkg_bundles = VecDeque::new();
265265
let mut candidates = Candidates::new();
266266

267267
if !self.sailfish.is_init() {
268268
let actions = self.sailfish.init();
269269
candidates = self.execute(actions).await?;
270+
}
270271

271-
// DKG dealing generation
272-
// TODO: move/copy to main loop when resharing
273-
if let Some(bundle) = self.decrypter.gen_dkg_bundle() {
274-
self.bundles.add_bundles(once(BundleVariant::Dkg(bundle)));
275-
}
272+
// TODO: move/copy to main loop when resharing
273+
if let Some(bundle) = self.decrypter.gen_dkg_bundle() {
274+
self.bundles.add_bundles(once(BundleVariant::Dkg(bundle)));
276275
}
277276

278277
loop {
279278
if pending.is_none() {
280-
while let Some(ilist) = self.next_inclusion(&mut candidates, &mut pending_dkgs) {
279+
while let Some(ilist) = self.next_inclusion(&mut candidates, &mut dkg_bundles) {
281280
if !self.decrypter.has_capacity() {
282281
pending = Some(ilist);
283282
break;
@@ -288,12 +287,15 @@ impl Task {
288287
}
289288
}
290289

291-
// always sync DKG bundles
292-
self.next_dkg(&mut candidates, &mut pending_dkgs);
293-
if !pending_dkgs.is_empty() {
294-
tracing::debug!(num_bundles = %pending_dkgs.len(), "enqueuing dkg bundles");
295-
if let Err(err) = self.decrypter.enqueue_dkg(&mut pending_dkgs).await {
296-
error!(node = %self.label, %err, "dkg enqueue error");
290+
if pending.is_none() {
291+
while let Some(dkg) = dkg_bundles.pop_front() {
292+
if !self.decrypter.has_capacity() {
293+
dkg_bundles.push_front(dkg);
294+
break;
295+
}
296+
if let Err(err) = self.decrypter.enqueue_dkg(dkg).await {
297+
error!(node = %self.label, %err, "dkg enqueue error");
298+
}
297299
}
298300
}
299301

@@ -435,7 +437,7 @@ impl Task {
435437
// preprocess the candidate list to pull out the DKG bundles first
436438
for cl in lists.iter() {
437439
if let Some(dkg) = cl.dkg_bundle() {
438-
pending_dkgs.push_back(dkg);
440+
pending_dkgs.push_back(dkg.clone());
439441
}
440442
}
441443
// then process it to construct the next inclusion list
@@ -456,19 +458,6 @@ impl Task {
456458
}
457459
None
458460
}
459-
460-
/// Handle candidate lists and "pull" out the DKG bundles, it won't touch or drop or consume
461-
/// regular/priority bundles, but only consume/take the DKG bundles inside `candidates` then
462-
/// append them to `pending_dkgs`.
463-
fn next_dkg(&mut self, candidates: &mut Candidates, pending_dkgs: &mut VecDeque<DkgBundle>) {
464-
for (_, _, list) in candidates.iter_mut() {
465-
for cl in list.iter_mut() {
466-
if let Some(dkg) = cl.take_dkg_bundle() {
467-
pending_dkgs.push_back(dkg);
468-
}
469-
}
470-
}
471-
}
472461
}
473462

474463
#[derive(Debug, thiserror::Error)]

timeboost-types/src/candidate_list.rs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
use std::ops::Deref;
12
use std::sync::Arc;
2-
use std::{ops::Deref, sync::RwLock};
33

44
use bytes::{BufMut, Bytes, BytesMut};
55
use committable::{Commitment, Committable, RawCommitmentBuilder};
@@ -20,7 +20,7 @@ struct Inner {
2020
index: DelayedInboxIndex,
2121
priority: Vec<SignedPriorityBundle>,
2222
regular: Vec<Bundle>,
23-
dkg: RwLock<Option<DkgBundle>>,
23+
dkg: Option<DkgBundle>,
2424
}
2525

2626
#[derive(Debug)]
@@ -29,7 +29,7 @@ pub struct Builder {
2929
index: DelayedInboxIndex,
3030
priority: Vec<SignedPriorityBundle>,
3131
regular: Vec<Bundle>,
32-
dkg: RwLock<Option<DkgBundle>>,
32+
dkg: Option<DkgBundle>,
3333
}
3434

3535
impl Builder {
@@ -44,7 +44,7 @@ impl Builder {
4444
}
4545

4646
pub fn with_dkg(mut self, d: Option<DkgBundle>) -> Self {
47-
self.dkg = RwLock::new(d);
47+
self.dkg = d;
4848
self
4949
}
5050

@@ -69,7 +69,7 @@ impl CandidateList {
6969
index: i.into(),
7070
regular: Vec::new(),
7171
priority: Vec::new(),
72-
dkg: RwLock::new(None),
72+
dkg: None,
7373
}
7474
}
7575

@@ -112,26 +112,8 @@ impl CandidateList {
112112
self.0.index
113113
}
114114

115-
pub fn dkg_bundle(&self) -> Option<DkgBundle> {
116-
match self.0.dkg.read() {
117-
Ok(dkg) => dkg.clone(),
118-
Err(poisoned) => {
119-
tracing::error!("RwLock on DkgBundle poisoned during read");
120-
let dkg = poisoned.into_inner();
121-
dkg.clone()
122-
}
123-
}
124-
}
125-
126-
pub fn take_dkg_bundle(&self) -> Option<DkgBundle> {
127-
match self.0.dkg.write() {
128-
Ok(mut dkg) => dkg.take(),
129-
Err(poisoned) => {
130-
tracing::error!("RwLock on DkgBundle poisoned during write");
131-
let mut dkg = poisoned.into_inner();
132-
dkg.take()
133-
}
134-
}
115+
pub fn dkg_bundle(&self) -> Option<&DkgBundle> {
116+
self.0.dkg.as_ref()
135117
}
136118
}
137119

@@ -140,7 +122,7 @@ impl Committable for CandidateList {
140122
let mut builder = RawCommitmentBuilder::new("CandidateList")
141123
.u64_field("time", self.0.time.into())
142124
.u64_field("index", self.0.index.into())
143-
.optional("dkg", &self.dkg_bundle())
125+
.optional("dkg", &self.0.dkg)
144126
.u64_field("priority", self.0.priority.len() as u64);
145127
builder = self
146128
.0

0 commit comments

Comments
 (0)