Skip to content

Commit 88f2f4e

Browse files
committed
Use Barrier and atomic lower bound.
Requires a threshold of headers before delivering the next header. This limits the number of headers by which a minority can move ahead. If done maliciously it could cause excessive memory usage. The lower bound is updated on every delivery. Headers up to and including that height are ignored.
1 parent d45e3cd commit 88f2f4e

File tree

1 file changed

+42
-5
lines changed

1 file changed

+42
-5
lines changed

robusta/src/multiwatcher.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,26 @@
1-
use std::collections::{BTreeMap, HashMap, HashSet};
1+
use std::{
2+
collections::{BTreeMap, HashMap, HashSet},
3+
sync::{
4+
Arc,
5+
atomic::{AtomicU64, Ordering},
6+
},
7+
};
28

39
use crate::{Config, Height, Watcher};
410
use espresso_types::{Header, NamespaceId};
511
use futures::{StreamExt, stream::SelectAll};
6-
use tokio::{spawn, sync::mpsc, task::JoinHandle};
12+
use tokio::{
13+
spawn,
14+
sync::{Barrier, mpsc},
15+
task::JoinHandle,
16+
};
717
use tokio_stream::wrappers::ReceiverStream;
818
use tracing::{debug, warn};
919

1020
#[derive(Debug)]
1121
pub struct Multiwatcher {
1222
threshold: usize,
23+
lower_bound: Arc<AtomicU64>,
1324
watchers: Vec<JoinHandle<()>>,
1425
headers: BTreeMap<Height, HashMap<Header, HashSet<Id>>>,
1526
stream: SelectAll<ReceiverStream<(Id, Header)>>,
@@ -35,22 +46,47 @@ impl Multiwatcher {
3546
{
3647
let height = height.into();
3748
let nsid = nsid.into();
49+
50+
// We require `threshold` watchers to deliver the next header.
51+
// Adversaries may produce headers in quick succession, causing
52+
// excessive memory usage.
53+
let barrier = Arc::new(Barrier::new(threshold));
54+
55+
// We track the last delivered height as a lower bound.
56+
// Watchers skip over headers up to and including that height.
57+
let lower_bound = Arc::new(AtomicU64::new(height.into()));
58+
3859
let mut stream = SelectAll::new();
3960
let mut watchers = Vec::new();
61+
4062
for (i, c) in configs.into_iter().enumerate() {
41-
let (tx, rx) = mpsc::channel(32);
63+
let (tx, rx) = mpsc::channel(10);
4264
stream.push(ReceiverStream::new(rx));
65+
let barrier = barrier.clone();
66+
let lower_bound = lower_bound.clone();
4367
watchers.push(spawn(async move {
44-
let id = Id(i);
68+
let i = Id(i);
4569
let mut w = Watcher::new(c, height, nsid);
46-
while tx.send((id, w.next().await)).await.is_ok() {}
70+
loop {
71+
let h = w.next().await;
72+
if h.height() <= lower_bound.load(Ordering::Relaxed) {
73+
continue;
74+
}
75+
if tx.send((i, h)).await.is_err() {
76+
break;
77+
}
78+
barrier.wait().await;
79+
}
4780
}));
4881
}
82+
4983
assert!(!watchers.is_empty());
84+
5085
Self {
5186
threshold,
5287
stream,
5388
watchers,
89+
lower_bound,
5490
headers: BTreeMap::from_iter([(height, HashMap::new())]),
5591
}
5692
}
@@ -71,6 +107,7 @@ impl Multiwatcher {
71107
let votes = counter.get(&hdr).map(|ids| ids.len()).unwrap_or(0) + 1;
72108
if votes >= self.threshold {
73109
self.headers.retain(|k, _| *k > h);
110+
self.lower_bound.store(h.into(), Ordering::Relaxed);
74111
debug!(height = %h, "header available");
75112
return hdr;
76113
}

0 commit comments

Comments
 (0)