Skip to content

Commit d45e3cd

Browse files
committed
Use Multiwatcher in verify task.
1 parent 48bd20f commit d45e3cd

File tree

5 files changed

+29
-32
lines changed

5 files changed

+29
-32
lines changed

robusta/src/multiwatcher.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ use futures::{StreamExt, stream::SelectAll};
66
use tokio::{spawn, sync::mpsc, task::JoinHandle};
77
use tokio_stream::wrappers::ReceiverStream;
88
use tracing::{debug, warn};
9-
use url::Url;
109

1110
#[derive(Debug)]
1211
pub struct Multiwatcher {
13-
height: Height,
1412
threshold: usize,
1513
watchers: Vec<JoinHandle<()>>,
1614
headers: BTreeMap<Height, HashMap<Header, HashSet<Id>>>,
@@ -29,11 +27,10 @@ impl Drop for Multiwatcher {
2927
}
3028

3129
impl Multiwatcher {
32-
pub fn new<C, H, I, N>(configs: C, height: H, nsid: N, threshold: usize) -> Self
30+
pub fn new<C, H, N>(configs: C, height: H, nsid: N, threshold: usize) -> Self
3331
where
3432
C: IntoIterator<Item = Config>,
3533
H: Into<Height>,
36-
I: IntoIterator<Item = Url>,
3734
N: Into<NamespaceId>,
3835
{
3936
let height = height.into();
@@ -49,52 +46,48 @@ impl Multiwatcher {
4946
while tx.send((id, w.next().await)).await.is_ok() {}
5047
}));
5148
}
49+
assert!(!watchers.is_empty());
5250
Self {
53-
height,
5451
threshold,
5552
stream,
5653
watchers,
5754
headers: BTreeMap::from_iter([(height, HashMap::new())]),
5855
}
5956
}
6057

61-
pub async fn next(&mut self) -> Option<Header> {
58+
pub async fn next(&mut self) -> Header {
6259
loop {
63-
let (i, hdr) = self.stream.next().await?;
60+
let (i, hdr) = self.stream.next().await.expect("watchers never terminate");
6461
let h = Height::from(hdr.height());
6562
if Some(h) < self.headers.first_entry().map(|e| *e.key()) {
66-
debug!(%h, "ignoring header below minimum height");
63+
debug!(height = %h, "ignoring header below minimum height");
6764
continue;
6865
}
6966
if self.has_voted(h, i) {
70-
warn!(%h, "source sent multiple headers for same height");
67+
warn!(height = %h, "source sent multiple headers for same height");
7168
continue;
7269
}
73-
let votes = self.headers.entry(h).or_default();
74-
if let Some(ids) = votes.get(&hdr)
75-
&& ids.len() + 1 >= self.threshold
76-
{
77-
self.gc(h);
78-
return Some(hdr);
70+
let counter = self.headers.entry(h).or_default();
71+
let votes = counter.get(&hdr).map(|ids| ids.len()).unwrap_or(0) + 1;
72+
if votes >= self.threshold {
73+
self.headers.retain(|k, _| *k > h);
74+
debug!(height = %h, "header available");
75+
return hdr;
7976
}
80-
votes.entry(hdr).or_default().insert(i);
77+
debug!(height = %h, %votes, "vote added");
78+
counter.entry(hdr).or_default().insert(i);
8179
}
8280
}
8381

8482
fn has_voted(&self, height: Height, id: Id) -> bool {
8583
let Some(m) = self.headers.get(&height) else {
8684
return false;
8785
};
88-
for v in m.values() {
89-
if v.contains(&id) {
86+
for ids in m.values() {
87+
if ids.contains(&id) {
9088
return true;
9189
}
9290
}
9391
false
9492
}
95-
96-
fn gc(&mut self, height: Height) {
97-
self.headers.retain(|h, _| *h >= height);
98-
self.height = height;
99-
}
10093
}

timeboost-builder/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ pub struct SubmitterConfig {
1818
pub(crate) pubkey: PublicKey,
1919
#[builder(into)]
2020
pub(crate) namespace: NamespaceId,
21-
pub(crate) robusta: robusta::Config,
21+
pub(crate) robusta: (robusta::Config, Vec<robusta::Config>),
2222
pub(crate) committee: Committee,
2323
}

timeboost-builder/src/submit.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl Submitter {
4242
where
4343
M: ::metrics::Metrics,
4444
{
45-
let client = robusta::Client::new(cfg.robusta.clone());
45+
let client = robusta::Client::new(cfg.robusta.0.clone());
4646
let verified = Arc::new(Mutex::new(BTreeSet::new()));
4747
let committees = Arc::new(AsyncMutex::new(CommitteeVec::new(cfg.committee.clone())));
4848
let handler = Handler {
@@ -58,10 +58,12 @@ impl Submitter {
5858
client: client.clone(),
5959
verified,
6060
};
61+
let mut configs = vec![cfg.robusta.0.clone()];
62+
configs.extend(cfg.robusta.1.iter().cloned());
6163
Submitter {
6264
handler,
6365
config: cfg,
64-
verify_task: spawn(verifier.verify()),
66+
verify_task: spawn(verifier.verify(configs)),
6567
submitters: TaskTracker::new(),
6668
committees,
6769
task_permits: Arc::new(Semaphore::new(MAX_TASKS)),
@@ -109,7 +111,7 @@ struct Verifier {
109111
}
110112

111113
impl Verifier {
112-
async fn verify(self) -> Empty {
114+
async fn verify(self, configs: Vec<robusta::Config>) -> Empty {
113115
let mut delays = self.client.config().delay_iter();
114116
let height = loop {
115117
if let Ok(h) = self.client.height().await {
@@ -118,7 +120,8 @@ impl Verifier {
118120
let d = delays.next().expect("delay iterator repeats endlessly");
119121
sleep(d).await;
120122
};
121-
let mut watcher = robusta::Watcher::new(self.client.config().clone(), height, self.nsid);
123+
let threshold = 2 * configs.len() / 3 + 1;
124+
let mut watcher = robusta::Multiwatcher::new(configs, height, self.nsid, threshold);
122125
loop {
123126
let h = watcher.next().await;
124127
let committees = self.committees.lock().await;
@@ -297,7 +300,7 @@ mod tests {
297300

298301
let scfg = SubmitterConfig::builder()
299302
.pubkey(k.public_key())
300-
.robusta(rcfg.clone())
303+
.robusta((rcfg.clone(), Vec::new()))
301304
.namespace(10_101u64)
302305
.committee(committee.clone())
303306
.build();

timeboost/src/binaries/timeboost.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,14 @@ async fn main() -> Result<()> {
234234
.internal_api(my_keyset.internal_address.clone())
235235
.maybe_nitro_addr(my_keyset.nitro_addr.clone())
236236
.recover(is_recover)
237-
.robusta(
237+
.robusta((
238238
robusta::Config::builder()
239239
.base_url(cli.espresso_base_url)
240240
.wss_base_url(cli.espresso_websocket_url)
241241
.label(pubkey.to_string())
242242
.build(),
243-
)
243+
Vec::new(),
244+
))
244245
.namespace(cli.namespace)
245246
.build();
246247

timeboost/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct TimeboostConfig {
5555
pub(crate) leash_len: usize,
5656

5757
/// Configuration of espresso network client.
58-
pub(crate) robusta: robusta::Config,
58+
pub(crate) robusta: (robusta::Config, Vec<robusta::Config>),
5959

6060
#[builder(into)]
6161
pub(crate) namespace: NamespaceId,

0 commit comments

Comments
 (0)