Skip to content

Commit 4807944

Browse files
authored
Merge pull request #423 from EspressoSystems/tw/multiwatch
Add `Multiwatcher`.
2 parents 18f2d2f + 54b9bad commit 4807944

File tree

11 files changed

+342
-144
lines changed

11 files changed

+342
-144
lines changed

robusta/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ rust-version.workspace = true
88
[dependencies]
99
bincode = { workspace = true }
1010
bon = { workspace = true }
11+
bytes = { workspace = true }
1112
data-encoding = { workspace = true }
1213
either = { workspace = true }
1314
espresso-types = { workspace = true }
@@ -21,6 +22,7 @@ serde_json = { workspace = true }
2122
thiserror = { workspace = true }
2223
timeboost-types = { path = "../timeboost-types" }
2324
tokio = { workspace = true }
25+
tokio-stream = { workspace = true }
2426
tokio-tungstenite = { workspace = true }
2527
tracing = { workspace = true }
2628
url = { workspace = true }

robusta/src/config.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::{iter::repeat, time::Duration};
1+
use std::{iter::repeat, num::NonZeroUsize, time::Duration};
22

33
use bon::Builder;
4-
use url::{ParseError, Url};
4+
use url::Url;
55

6-
const NUM_DELAYS: usize = 5;
6+
const NUM_DELAYS: NonZeroUsize = NonZeroUsize::new(5).expect("5 > 0");
77

88
#[derive(Debug, Clone, Builder)]
99
pub struct Config {
@@ -12,25 +12,29 @@ pub struct Config {
1212
pub(crate) label: String,
1313

1414
/// Espresso network base URL.
15-
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
1615
pub(crate) base_url: Url,
1716

1817
/// Espresso network websocket base URL.
19-
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
2018
pub(crate) wss_base_url: Url,
2119

2220
/// The sequence of delays between successive requests.
2321
///
2422
/// The last value is repeated forever.
2523
#[builder(default = [1, 3, 5, 10, 15])]
26-
pub(crate) delays: [u8; NUM_DELAYS],
24+
pub(crate) delays: [u8; NUM_DELAYS.get()],
2725
}
2826

2927
impl Config {
28+
pub fn with_websocket_base_url(&self, u: Url) -> Self {
29+
let mut c = self.clone();
30+
c.wss_base_url = u;
31+
c
32+
}
33+
3034
pub fn delay_iter(&self) -> impl Iterator<Item = Duration> + use<> {
3135
self.delays
3236
.into_iter()
33-
.chain(repeat(self.delays[NUM_DELAYS - 1]))
37+
.chain(repeat(self.delays[NUM_DELAYS.get() - 1]))
3438
.map(|n| Duration::from_secs(n.into()))
3539
}
3640
}

robusta/src/lib.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod config;
2+
mod multiwatcher;
23
mod types;
34
mod watcher;
45

@@ -18,8 +19,9 @@ use tracing::{debug, warn};
1819

1920
use crate::types::{TX, TaggedBase64, TransactionsWithProof, VidCommonResponse};
2021

22+
pub use crate::multiwatcher::Multiwatcher;
2123
pub use crate::types::Height;
22-
pub use crate::watcher::{WatchError, watch};
24+
pub use crate::watcher::{WatchError, Watcher};
2325
pub use config::{Config, ConfigBuilder};
2426
pub use espresso_types;
2527

@@ -276,10 +278,7 @@ fn deserialize<T: DeserializeOwned>(d: &[u8]) -> Result<T, Error> {
276278

277279
#[cfg(test)]
278280
mod tests {
279-
use futures::StreamExt;
280-
use tokio::pin;
281-
282-
use super::{Client, Config};
281+
use super::{Client, Config, Watcher};
283282

284283
#[tokio::test]
285284
async fn decaf_smoke() {
@@ -288,17 +287,23 @@ mod tests {
288287
.try_init();
289288

290289
let cfg = Config::builder()
291-
.base_url("https://query.decaf.testnet.espresso.network/v1/")
292-
.unwrap()
293-
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
294-
.unwrap()
290+
.base_url(
291+
"https://query.decaf.testnet.espresso.network/v1/"
292+
.parse()
293+
.unwrap(),
294+
)
295+
.wss_base_url(
296+
"wss://query.decaf.testnet.espresso.network/v1/"
297+
.parse()
298+
.unwrap(),
299+
)
295300
.label("decaf_smoke")
296301
.build();
297302

298303
let clt = Client::new(cfg.clone());
299304
let height = clt.height().await.unwrap();
300-
let headers = super::watch(&cfg, height, None).await.unwrap();
301-
pin!(headers);
302-
assert_eq!(u64::from(height), headers.next().await.unwrap().height());
305+
let mut watcher = Watcher::new(cfg, height, None);
306+
let header = watcher.next().await;
307+
assert_eq!(u64::from(height), header.height());
303308
}
304309
}

robusta/src/multiwatcher.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use std::{
2+
collections::{BTreeMap, HashMap, HashSet},
3+
sync::{
4+
Arc,
5+
atomic::{AtomicU64, Ordering},
6+
},
7+
};
8+
9+
use crate::{Config, Height, Watcher};
10+
use espresso_types::{Header, NamespaceId};
11+
use futures::{StreamExt, stream::SelectAll};
12+
use tokio::{
13+
spawn,
14+
sync::{Barrier, mpsc},
15+
task::JoinHandle,
16+
};
17+
use tokio_stream::wrappers::ReceiverStream;
18+
use tracing::{debug, warn};
19+
20+
#[derive(Debug)]
21+
pub struct Multiwatcher {
22+
threshold: usize,
23+
lower_bound: Arc<AtomicU64>,
24+
watchers: Vec<JoinHandle<()>>,
25+
headers: BTreeMap<Height, HashMap<Header, HashSet<Id>>>,
26+
stream: SelectAll<ReceiverStream<(Id, Header)>>,
27+
}
28+
29+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
30+
struct Id(usize);
31+
32+
impl Drop for Multiwatcher {
33+
fn drop(&mut self) {
34+
for w in &self.watchers {
35+
w.abort();
36+
}
37+
}
38+
}
39+
40+
impl Multiwatcher {
41+
pub fn new<C, H, N>(configs: C, height: H, nsid: N, threshold: usize) -> Self
42+
where
43+
C: IntoIterator<Item = Config>,
44+
H: Into<Height>,
45+
N: Into<NamespaceId>,
46+
{
47+
let height = height.into();
48+
let nsid = nsid.into();
49+
50+
// We require `threshold` watchers to deliver the next header.
51+
// Adversaries may produce headers in quick succession, causing
52+
// excessive memory usage.
53+
let barrier = Arc::new(Barrier::new(threshold));
54+
55+
// We track the last delivered height as a lower bound.
56+
// Watchers skip over headers up to and including that height.
57+
let lower_bound = Arc::new(AtomicU64::new(height.into()));
58+
59+
let mut stream = SelectAll::new();
60+
let mut watchers = Vec::new();
61+
62+
for (i, c) in configs.into_iter().enumerate() {
63+
let (tx, rx) = mpsc::channel(10);
64+
stream.push(ReceiverStream::new(rx));
65+
let barrier = barrier.clone();
66+
let lower_bound = lower_bound.clone();
67+
watchers.push(spawn(async move {
68+
let i = Id(i);
69+
let mut w = Watcher::new(c, height, nsid);
70+
loop {
71+
let h = w.next().await;
72+
if h.height() <= lower_bound.load(Ordering::Relaxed) {
73+
continue;
74+
}
75+
if tx.send((i, h)).await.is_err() {
76+
break;
77+
}
78+
barrier.wait().await;
79+
}
80+
}));
81+
}
82+
83+
assert!(!watchers.is_empty());
84+
85+
Self {
86+
threshold,
87+
stream,
88+
watchers,
89+
lower_bound,
90+
headers: BTreeMap::from_iter([(height, HashMap::new())]),
91+
}
92+
}
93+
94+
pub async fn next(&mut self) -> Header {
95+
loop {
96+
let (i, hdr) = self.stream.next().await.expect("watchers never terminate");
97+
let h = Height::from(hdr.height());
98+
if Some(h) < self.headers.first_entry().map(|e| *e.key()) {
99+
debug!(height = %h, "ignoring header below minimum height");
100+
continue;
101+
}
102+
if self.has_voted(h, i) {
103+
warn!(height = %h, "source sent multiple headers for same height");
104+
continue;
105+
}
106+
let counter = self.headers.entry(h).or_default();
107+
let votes = counter.get(&hdr).map(|ids| ids.len()).unwrap_or(0) + 1;
108+
if votes >= self.threshold {
109+
self.headers.retain(|k, _| *k > h);
110+
self.lower_bound.store(h.into(), Ordering::Relaxed);
111+
debug!(height = %h, "header available");
112+
return hdr;
113+
}
114+
debug!(height = %h, %votes, "vote added");
115+
counter.entry(hdr).or_default().insert(i);
116+
}
117+
}
118+
119+
fn has_voted(&self, height: Height, id: Id) -> bool {
120+
let Some(m) = self.headers.get(&height) else {
121+
return false;
122+
};
123+
for ids in m.values() {
124+
if ids.contains(&id) {
125+
return true;
126+
}
127+
}
128+
false
129+
}
130+
}

robusta/src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ pub(crate) struct VidCommonResponse {
1919

2020
macro_rules! Primitive {
2121
($name:ident, $t:ty) => {
22-
#[derive(Debug, Copy, Clone, Deserialize, Serialize)]
22+
#[derive(
23+
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize,
24+
)]
2325
#[serde(transparent)]
2426
pub struct $name($t);
2527

0 commit comments

Comments
 (0)