Skip to content

Commit 8dd58d8

Browse files
authored
Merge branch 'main' into tw/consecutive-heights
2 parents 59c4e48 + fee0729 commit 8dd58d8

File tree

16 files changed

+61
-208
lines changed

16 files changed

+61
-208
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ nohash-hasher = "0.2"
7171
parking_lot = "0.12.3"
7272
portpicker = "0.1.1"
7373
prometheus = "0.14"
74-
prost = "0.13.5"
74+
prost = "0.14.1"
7575
quickcheck = "1.0.3"
7676
rand = "0.9"
7777
rayon = "1.10"
@@ -95,8 +95,9 @@ tokio-stream = "0.1.17"
9595
tokio-tungstenite = { version = "0.27.0", features = ["rustls-tls-webpki-roots", "url"] }
9696
tokio-util = "0.7.15"
9797
toml = "0.8.19"
98-
tonic = "0.13.1"
99-
tonic-build = { version = "0.13.1", features = ["prost"] }
98+
tonic = "0.14.1"
99+
tonic-prost = "0.14.1"
100+
tonic-prost-build = "0.14.1"
100101
tracing = "0.1"
101102
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
102103
turmoil = "0.6.4"

tests/src/tests/timeboost/block_order.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ async fn block_order() {
7474
rxs.push(rx)
7575
}
7676

77-
// wait until DKG is done
78-
enc_key.wait().await;
77+
enc_key.read().await;
7978
tracing::info!("DKG done");
8079

8180
tasks.spawn(gen_bundles(enc_key, bcast.clone()));

tests/src/tests/timeboost/transaction_order.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ async fn transaction_order() {
7373
rxs.push(rx)
7474
}
7575

76-
// wait until DKG is done
77-
enc_key.wait().await;
76+
enc_key.read().await;
7877
tracing::info!("DKG done");
7978

8079
tasks.spawn(gen_bundles(enc_key, bcast.clone()));

timeboost-crypto/src/prelude.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ pub type ThresholdDecKeyShare = <DecryptionScheme as ThresholdEncScheme>::KeySha
7878

7979
/// `ThresholdEncKeyCell` is a thread-safe container for an optional `ThresholdEncKey`
8080
/// that allows asynchronous notification when the key is set.
81-
///
82-
/// Internally, it uses an `RwLock<Option<ThresholdEncKey>>` to guard the key,
83-
/// and a `Notify` to wake up tasks waiting for the key to become available.
8481
#[derive(Debug, Clone, Default)]
8582
pub struct ThresholdEncKeyCell {
8683
key: Arc<RwLock<Option<ThresholdEncKey>>>,
@@ -105,18 +102,13 @@ impl ThresholdEncKeyCell {
105102
self.key.read()
106103
}
107104

108-
/// Asynchronously waits for the key to become available, then returns it.
109-
///
110-
/// If the key is already present, it is returned immediately.
111-
/// Otherwise, the current task is suspended until `set()` is called.
112-
///
113-
/// The returned key is a clone of the stored key.
114-
pub async fn wait(&self) -> ThresholdEncKey {
105+
pub async fn read(&self) -> ThresholdEncKey {
115106
loop {
107+
let fut = self.notify.notified();
116108
if let Some(k) = self.get() {
117109
return k;
118110
}
119-
self.notify.notified().await;
111+
fut.await;
120112
}
121113
}
122114
}

timeboost-proto/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ rust-version.workspace = true
99
prost = { workspace = true }
1010
thiserror = { workspace = true }
1111
tonic = { workspace = true }
12+
tonic-prost = { workspace = true }
1213

1314
[build-dependencies]
14-
tonic-build = { workspace = true }
15+
tonic-prost-build = { workspace = true }

timeboost-proto/build.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::error::Error;
22

33
fn main() -> Result<(), Box<dyn Error>> {
4-
tonic_build::configure()
4+
tonic_prost_build::configure()
55
.build_server(true)
66
.out_dir("src")
7-
.bytes([".block.Block.payload", ".inclusion.Transaction.encoded_txn"])
7+
.bytes(".block.Block.payload")
8+
.bytes(".inclusion.Transaction.encoded_txn")
89
.compile_protos(
910
&[
1011
"protos/inclusion_list.proto",

timeboost-sequencer/src/decrypt.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use multisig::{Committee, CommitteeId, PublicKey};
1010
use parking_lot::RwLock;
1111
use sailfish::types::{CommitteeVec, Evidence, Round, RoundNumber};
1212
use serde::{Deserialize, Serialize};
13-
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
13+
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
1414
use std::result::Result as StdResult;
1515
use std::sync::Arc;
1616
use timeboost_crypto::prelude::{LabeledDkgDecKey, ThresholdEncKeyCell, Vess, Vss};
@@ -232,18 +232,13 @@ impl Decrypter {
232232
Ok(())
233233
}
234234

235-
/// Send the received DKG bundles to worker
236-
pub async fn enqueue_dkg(
237-
&self,
238-
pending_dkgs: &mut VecDeque<DkgBundle>,
239-
) -> StdResult<(), DecrypterDown> {
240-
while let Some(b) = pending_dkgs.pop_front() {
241-
self.worker_tx
242-
.send(Command::Dkg(b))
243-
.await
244-
.map_err(|_| DecrypterDown(()))?;
245-
debug!(node = %self.label, "enqueued one dkg bundle");
246-
}
235+
/// Send the received DKG bundle to worker
236+
pub async fn enqueue_dkg(&self, dkg: DkgBundle) -> StdResult<(), DecrypterDown> {
237+
self.worker_tx
238+
.send(Command::Dkg(dkg))
239+
.await
240+
.map_err(|_| DecrypterDown(()))?;
241+
debug!(node = %self.label, "enqueued one dkg bundle");
247242
Ok(())
248243
}
249244

@@ -1621,11 +1616,12 @@ mod tests {
16211616

16221617
// enqueuing them all to decrypters
16231618
for decrypter in decrypters.iter_mut() {
1624-
let mut pending_dkgs = dkg_bundles.clone();
1625-
decrypter
1626-
.enqueue_dkg(&mut pending_dkgs)
1627-
.await
1628-
.expect("DKG bundles should be enqueued successfully");
1619+
for dkg in dkg_bundles.clone() {
1620+
decrypter
1621+
.enqueue_dkg(dkg)
1622+
.await
1623+
.expect("DKG bundles should be enqueued successfully");
1624+
}
16291625
}
16301626
}
16311627

timeboost-sequencer/src/lib.rs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -261,23 +261,22 @@ impl Task {
261261
// processing its actions continues unhindered.
262262
async fn go(mut self) -> Result<()> {
263263
let mut pending = None;
264-
let mut pending_dkgs = VecDeque::new();
264+
let mut dkg_bundles = VecDeque::new();
265265
let mut candidates = Candidates::new();
266266

267267
if !self.sailfish.is_init() {
268268
let actions = self.sailfish.init();
269269
candidates = self.execute(actions).await?;
270+
}
270271

271-
// DKG dealing generation
272-
// TODO: move/copy to main loop when resharing
273-
if let Some(bundle) = self.decrypter.gen_dkg_bundle() {
274-
self.bundles.add_bundles(once(BundleVariant::Dkg(bundle)));
275-
}
272+
// TODO: move/copy to main loop when resharing
273+
if let Some(bundle) = self.decrypter.gen_dkg_bundle() {
274+
self.bundles.add_bundles(once(BundleVariant::Dkg(bundle)));
276275
}
277276

278277
loop {
279278
if pending.is_none() {
280-
while let Some(ilist) = self.next_inclusion(&mut candidates, &mut pending_dkgs) {
279+
while let Some(ilist) = self.next_inclusion(&mut candidates, &mut dkg_bundles) {
281280
if !self.decrypter.has_capacity() {
282281
pending = Some(ilist);
283282
break;
@@ -288,12 +287,15 @@ impl Task {
288287
}
289288
}
290289

291-
// always sync DKG bundles
292-
self.next_dkg(&mut candidates, &mut pending_dkgs);
293-
if !pending_dkgs.is_empty() {
294-
tracing::debug!(num_bundles = %pending_dkgs.len(), "enqueuing dkg bundles");
295-
if let Err(err) = self.decrypter.enqueue_dkg(&mut pending_dkgs).await {
296-
error!(node = %self.label, %err, "dkg enqueue error");
290+
if pending.is_none() {
291+
while let Some(dkg) = dkg_bundles.pop_front() {
292+
if !self.decrypter.has_capacity() {
293+
dkg_bundles.push_front(dkg);
294+
break;
295+
}
296+
if let Err(err) = self.decrypter.enqueue_dkg(dkg).await {
297+
error!(node = %self.label, %err, "dkg enqueue error");
298+
}
297299
}
298300
}
299301

@@ -435,7 +437,7 @@ impl Task {
435437
// preprocess the candidate list to pull out the DKG bundles first
436438
for cl in lists.iter() {
437439
if let Some(dkg) = cl.dkg_bundle() {
438-
pending_dkgs.push_back(dkg);
440+
pending_dkgs.push_back(dkg.clone());
439441
}
440442
}
441443
// then process it to construct the next inclusion list
@@ -456,19 +458,6 @@ impl Task {
456458
}
457459
None
458460
}
459-
460-
/// Handle candidate lists and "pull" out the DKG bundles, it won't touch or drop or consume
461-
/// regular/priority bundles, but only consume/take the DKG bundles inside `candidates` then
462-
/// append them to `pending_dkgs`.
463-
fn next_dkg(&mut self, candidates: &mut Candidates, pending_dkgs: &mut VecDeque<DkgBundle>) {
464-
for (_, _, list) in candidates.iter_mut() {
465-
for cl in list.iter_mut() {
466-
if let Some(dkg) = cl.take_dkg_bundle() {
467-
pending_dkgs.push_back(dkg);
468-
}
469-
}
470-
}
471-
}
472461
}
473462

474463
#[derive(Debug, thiserror::Error)]

timeboost-sequencer/src/queue.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,14 @@ impl DataSource for BundleQueue {
191191
inner.set_time(time);
192192

193193
if r.is_genesis() || inner.mode.is_passive() {
194-
// allow DKG in passive mode (first 8 rounds from genesis)
195-
let candidate_list = CandidateList::builder(Timestamp::now(), inner.index)
196-
.with_dkg(inner.dkg.clone())
194+
return CandidateList::builder(Timestamp::now(), inner.index)
195+
.with_dkg(inner.dkg.take())
197196
.finish()
198197
.try_into()
199198
.unwrap_or_else(|err| {
200199
error!(%err, "candidate list serialization error");
201200
CandidateListBytes::default()
202201
});
203-
inner.dkg = None;
204-
return candidate_list;
205202
}
206203

207204
let mut size_budget = inner.max_len;
@@ -229,19 +226,16 @@ impl DataSource for BundleQueue {
229226
regular.push(b.clone())
230227
}
231228

232-
let candidate_list = CandidateList::builder(inner.time, inner.index)
229+
CandidateList::builder(inner.time, inner.index)
233230
.with_priority_bundles(priority)
234231
.with_regular_bundles(regular)
235-
.with_dkg(inner.dkg.clone())
232+
.with_dkg(inner.dkg.take())
236233
.finish()
237234
.try_into()
238235
.unwrap_or_else(|err| {
239236
error!(%err, "candidate list serialization error");
240237
CandidateListBytes::default()
241-
});
242-
243-
inner.dkg = None;
244-
candidate_list
238+
})
245239
}
246240
}
247241

timeboost-types/src/candidate_list.rs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
use std::ops::Deref;
12
use std::sync::Arc;
2-
use std::{ops::Deref, sync::RwLock};
33

44
use bytes::{BufMut, Bytes, BytesMut};
55
use committable::{Commitment, Committable, RawCommitmentBuilder};
@@ -20,7 +20,7 @@ struct Inner {
2020
index: DelayedInboxIndex,
2121
priority: Vec<SignedPriorityBundle>,
2222
regular: Vec<Bundle>,
23-
dkg: RwLock<Option<DkgBundle>>,
23+
dkg: Option<DkgBundle>,
2424
}
2525

2626
#[derive(Debug)]
@@ -29,7 +29,7 @@ pub struct Builder {
2929
index: DelayedInboxIndex,
3030
priority: Vec<SignedPriorityBundle>,
3131
regular: Vec<Bundle>,
32-
dkg: RwLock<Option<DkgBundle>>,
32+
dkg: Option<DkgBundle>,
3333
}
3434

3535
impl Builder {
@@ -44,7 +44,7 @@ impl Builder {
4444
}
4545

4646
pub fn with_dkg(mut self, d: Option<DkgBundle>) -> Self {
47-
self.dkg = RwLock::new(d);
47+
self.dkg = d;
4848
self
4949
}
5050

@@ -69,7 +69,7 @@ impl CandidateList {
6969
index: i.into(),
7070
regular: Vec::new(),
7171
priority: Vec::new(),
72-
dkg: RwLock::new(None),
72+
dkg: None,
7373
}
7474
}
7575

@@ -112,26 +112,8 @@ impl CandidateList {
112112
self.0.index
113113
}
114114

115-
pub fn dkg_bundle(&self) -> Option<DkgBundle> {
116-
match self.0.dkg.read() {
117-
Ok(dkg) => dkg.clone(),
118-
Err(poisoned) => {
119-
tracing::error!("RwLock on DkgBundle poisoned during read");
120-
let dkg = poisoned.into_inner();
121-
dkg.clone()
122-
}
123-
}
124-
}
125-
126-
pub fn take_dkg_bundle(&self) -> Option<DkgBundle> {
127-
match self.0.dkg.write() {
128-
Ok(mut dkg) => dkg.take(),
129-
Err(poisoned) => {
130-
tracing::error!("RwLock on DkgBundle poisoned during write");
131-
let mut dkg = poisoned.into_inner();
132-
dkg.take()
133-
}
134-
}
115+
pub fn dkg_bundle(&self) -> Option<&DkgBundle> {
116+
self.0.dkg.as_ref()
135117
}
136118
}
137119

@@ -140,7 +122,7 @@ impl Committable for CandidateList {
140122
let mut builder = RawCommitmentBuilder::new("CandidateList")
141123
.u64_field("time", self.0.time.into())
142124
.u64_field("index", self.0.index.into())
143-
.optional("dkg", &self.dkg_bundle())
125+
.optional("dkg", &self.0.dkg)
144126
.u64_field("priority", self.0.priority.len() as u64);
145127
builder = self
146128
.0

0 commit comments

Comments
 (0)