Skip to content

Commit 11e47ad

Browse files
committed
address comments
2 parents 7101d3e + 3728a81 commit 11e47ad

File tree

30 files changed

+1013
-534
lines changed

30 files changed

+1013
-534
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ nohash-hasher = "0.2"
7272
parking_lot = "0.12.3"
7373
portpicker = "0.1.1"
7474
prometheus = "0.14"
75-
prost = "0.13.5"
75+
prost = "0.14.1"
7676
quickcheck = "1.0.3"
7777
rand = "0.9"
7878
rayon = "1.10"
@@ -96,8 +96,9 @@ tokio-stream = "0.1.17"
9696
tokio-tungstenite = { version = "0.27.0", features = ["rustls-tls-webpki-roots", "url"] }
9797
tokio-util = "0.7.15"
9898
toml = "0.8.19"
99-
tonic = "0.13.1"
100-
tonic-build = { version = "0.13.1", features = ["prost"] }
99+
tonic = "0.14.1"
100+
tonic-prost = "0.14.1"
101+
tonic-prost-build = "0.14.1"
101102
tracing = "0.1"
102103
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
103104
turmoil = "0.6.4"

robusta/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ mod tests {
303303
let clt = Client::new(cfg.clone());
304304
let height = clt.height().await.unwrap();
305305
let mut watcher = Watcher::new(cfg, height, None);
306-
let header = watcher.next().await;
306+
let header = watcher.next().await.unwrap_right();
307307
assert_eq!(u64::from(height), header.height());
308308
}
309309
}

robusta/src/multiwatcher.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ use std::{
44
Arc,
55
atomic::{AtomicU64, Ordering},
66
},
7+
time::Duration,
78
};
89

910
use crate::{Config, Height, Watcher};
11+
use either::Either;
1012
use espresso_types::{Header, NamespaceId};
1113
use futures::{StreamExt, stream::SelectAll};
1214
use tokio::{
1315
spawn,
1416
sync::{Barrier, mpsc},
1517
task::JoinHandle,
18+
time::sleep,
1619
};
1720
use tokio_stream::wrappers::ReceiverStream;
1821
use tracing::{debug, warn};
@@ -66,16 +69,47 @@ impl Multiwatcher {
6669
let lower_bound = lower_bound.clone();
6770
watchers.push(spawn(async move {
6871
let i = Id(i);
69-
let mut w = Watcher::new(c, height, nsid);
7072
loop {
71-
let h = w.next().await;
72-
if h.height() <= lower_bound.load(Ordering::Relaxed) {
73-
continue;
73+
let height = lower_bound.load(Ordering::Relaxed);
74+
let mut w = Watcher::new(c.clone(), height, nsid);
75+
let mut expected = height + 1;
76+
loop {
77+
match w.next().await {
78+
Either::Right(hdr) => {
79+
if hdr.height() > expected {
80+
warn!(
81+
url = %c.wss_base_url,
82+
height = %hdr.height(),
83+
expected = %expected,
84+
"unexpected block height"
85+
);
86+
break;
87+
}
88+
expected += 1;
89+
if hdr.height() <= lower_bound.load(Ordering::Relaxed) {
90+
continue;
91+
}
92+
if tx.send((i, hdr)).await.is_err() {
93+
return;
94+
}
95+
}
96+
Either::Left(height) => {
97+
if *height > expected {
98+
warn!(
99+
url = %c.wss_base_url,
100+
height = %height,
101+
expected = %expected,
102+
"unexpected block height"
103+
);
104+
break;
105+
}
106+
expected += 1;
107+
}
108+
}
109+
barrier.wait().await;
74110
}
75-
if tx.send((i, h)).await.is_err() {
76-
break;
77-
}
78-
barrier.wait().await;
111+
drop(w);
112+
sleep(Duration::from_secs(3)).await // wait a little before re-connecting
79113
}
80114
}));
81115
}

robusta/src/types.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{borrow::Cow, fmt};
1+
use std::{
2+
borrow::Cow,
3+
fmt,
4+
ops::{Add, AddAssign, Deref},
5+
};
26

37
use bon::Builder;
48
use data_encoding::BASE64URL_NOPAD;
@@ -42,6 +46,42 @@ macro_rules! Primitive {
4246
self.0.fmt(f)
4347
}
4448
}
49+
50+
impl Deref for $name {
51+
type Target = $t;
52+
53+
fn deref(&self) -> &$t {
54+
&self.0
55+
}
56+
}
57+
58+
impl Add for $name {
59+
type Output = Self;
60+
61+
fn add(self, rhs: Self) -> Self::Output {
62+
Self(self.0 + rhs.0)
63+
}
64+
}
65+
66+
impl Add<$t> for $name {
67+
type Output = Self;
68+
69+
fn add(self, rhs: $t) -> Self {
70+
Self(self.0 + rhs)
71+
}
72+
}
73+
74+
impl AddAssign for $name {
75+
fn add_assign(&mut self, rhs: Self) {
76+
*self = *self + rhs
77+
}
78+
}
79+
80+
impl AddAssign<$t> for $name {
81+
fn add_assign(&mut self, rhs: $t) {
82+
*self = *self + rhs
83+
}
84+
}
4585
};
4686
}
4787

robusta/src/watcher.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::str::from_utf8;
22

33
use bytes::Bytes;
4+
use either::Either;
45
use espresso_types::{Header, NamespaceId};
56
use futures::{SinkExt, StreamExt};
67
use reqwest::header::LOCATION;
@@ -37,7 +38,7 @@ impl Watcher {
3738
}
3839
}
3940

40-
pub async fn next(&mut self) -> Header {
41+
pub async fn next(&mut self) -> Either<Height, Header> {
4142
'main: loop {
4243
let ws = if let Some(w) = &mut self.websocket {
4344
w
@@ -74,16 +75,16 @@ impl Watcher {
7475
Some(Ok(Message::Text(text))) => {
7576
match serde_json::from_str::<Header>(text.as_str()) {
7677
Ok(hdr) => {
78+
self.height = hdr.height().into();
7779
if let Some(id) = &self.namespace {
7880
if hdr.ns_table().find_ns_id(id).is_some() {
79-
self.height = hdr.height().into();
80-
return hdr;
81+
return Either::Right(hdr);
8182
} else {
8283
debug!(height = %hdr.height(), "namespace id not found");
84+
return Either::Left(self.height);
8385
}
8486
} else {
85-
self.height = hdr.height().into();
86-
return hdr;
87+
return Either::Right(hdr);
8788
}
8889
}
8990
Err(err) => {

tests/src/tests/timeboost/block_order.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ async fn block_order() {
7474
rxs.push(rx)
7575
}
7676

77-
// wait until DKG is done
78-
enc_key.wait().await;
77+
enc_key.read().await;
7978
tracing::info!("DKG done");
8079

8180
tasks.spawn(gen_bundles(enc_key, bcast.clone()));

tests/src/tests/timeboost/transaction_order.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ async fn transaction_order() {
7373
rxs.push(rx)
7474
}
7575

76-
// wait until DKG is done
77-
enc_key.wait().await;
76+
enc_key.read().await;
7877
tracing::info!("DKG done");
7978

8079
tasks.spawn(gen_bundles(enc_key, bcast.clone()));

timeboost-crypto/src/feldman.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use ark_serialize::{CanonicalSerialize, SerializationError, serialize_to_vec};
66
use ark_std::marker::PhantomData;
77
use ark_std::rand::Rng;
88
use derive_more::{Deref, From, IntoIterator};
9+
use multisig::Committee;
910
use rayon::prelude::*;
1011
use serde::{Deserialize, Serialize};
1112
use serde_with::serde_as;
@@ -33,6 +34,10 @@ impl FeldmanVssPublicParam {
3334
Self { t, n }
3435
}
3536

37+
pub fn from(c: &Committee) -> Self {
38+
Self::new(c.one_honest_threshold(), c.size())
39+
}
40+
3641
pub fn threshold(&self) -> usize {
3742
self.t.get()
3843
}

timeboost-crypto/src/prelude.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ pub type ThresholdDecKeyShare = <DecryptionScheme as ThresholdEncScheme>::KeySha
7878

7979
/// `ThresholdEncKeyCell` is a thread-safe container for an optional `ThresholdEncKey`
8080
/// that allows asynchronous notification when the key is set.
81-
///
82-
/// Internally, it uses an `RwLock<Option<ThresholdEncKey>>` to guard the key,
83-
/// and a `Notify` to wake up tasks waiting for the key to become available.
8481
#[derive(Debug, Clone, Default)]
8582
pub struct ThresholdEncKeyCell {
8683
key: Arc<RwLock<Option<ThresholdEncKey>>>,
@@ -105,18 +102,13 @@ impl ThresholdEncKeyCell {
105102
self.key.read()
106103
}
107104

108-
/// Asynchronously waits for the key to become available, then returns it.
109-
///
110-
/// If the key is already present, it is returned immediately.
111-
/// Otherwise, the current task is suspended until `set()` is called.
112-
///
113-
/// The returned key is a clone of the stored key.
114-
pub async fn wait(&self) -> ThresholdEncKey {
105+
pub async fn read(&self) -> ThresholdEncKey {
115106
loop {
107+
let fut = self.notify.notified();
116108
if let Some(k) = self.get() {
117109
return k;
118110
}
119-
self.notify.notified().await;
111+
fut.await;
120112
}
121113
}
122114
}

0 commit comments

Comments
 (0)