Skip to content

Commit 23b2c93

Browse files
committed
Ensure header heights are consecutive.
In a `Multiwatcher`, the individual watchers only make progress when a threshold has delivered a value. Each watcher is required to deliver consecutive and strictly monotonic block heights, otherwise the watcher is dropped and re-connects after some time, starting at the current lower bound of heights.
1 parent 8bc6758 commit 23b2c93

File tree

4 files changed

+90
-15
lines changed

4 files changed

+90
-15
lines changed

robusta/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ mod tests {
303303
let clt = Client::new(cfg.clone());
304304
let height = clt.height().await.unwrap();
305305
let mut watcher = Watcher::new(cfg, height, None);
306-
let header = watcher.next().await;
306+
let header = watcher.next().await.unwrap_right();
307307
assert_eq!(u64::from(height), header.height());
308308
}
309309
}

robusta/src/multiwatcher.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ use std::{
44
Arc,
55
atomic::{AtomicU64, Ordering},
66
},
7+
time::Duration,
78
};
89

910
use crate::{Config, Height, Watcher};
11+
use either::Either;
1012
use espresso_types::{Header, NamespaceId};
1113
use futures::{StreamExt, stream::SelectAll};
1214
use tokio::{
1315
spawn,
1416
sync::{Barrier, mpsc},
1517
task::JoinHandle,
18+
time::sleep,
1619
};
1720
use tokio_stream::wrappers::ReceiverStream;
1821
use tracing::{debug, warn};
@@ -66,16 +69,47 @@ impl Multiwatcher {
6669
let lower_bound = lower_bound.clone();
6770
watchers.push(spawn(async move {
6871
let i = Id(i);
69-
let mut w = Watcher::new(c, height, nsid);
7072
loop {
71-
let h = w.next().await;
72-
if h.height() <= lower_bound.load(Ordering::Relaxed) {
73-
continue;
73+
let height = lower_bound.load(Ordering::Relaxed);
74+
let mut w = Watcher::new(c.clone(), height, nsid);
75+
let mut expected = height + 1;
76+
loop {
77+
match w.next().await {
78+
Either::Right(hdr) => {
79+
if hdr.height() > expected {
80+
warn!(
81+
url = %c.wss_base_url,
82+
height = %hdr.height(),
83+
expected = %expected,
84+
"unexpected block height"
85+
);
86+
break;
87+
}
88+
expected += 1;
89+
if hdr.height() <= lower_bound.load(Ordering::Relaxed) {
90+
continue;
91+
}
92+
if tx.send((i, hdr)).await.is_err() {
93+
return;
94+
}
95+
}
96+
Either::Left(height) => {
97+
if *height > expected {
98+
warn!(
99+
url = %c.wss_base_url,
100+
height = %height,
101+
expected = %expected,
102+
"unexpected block height"
103+
);
104+
break;
105+
}
106+
expected += 1;
107+
}
108+
}
109+
barrier.wait().await;
74110
}
75-
if tx.send((i, h)).await.is_err() {
76-
break;
77-
}
78-
barrier.wait().await;
111+
drop(w);
112+
sleep(Duration::from_secs(3)).await // wait a little before re-connecting
79113
}
80114
}));
81115
}

robusta/src/types.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{borrow::Cow, fmt};
1+
use std::{
2+
borrow::Cow,
3+
fmt,
4+
ops::{Add, AddAssign, Deref},
5+
};
26

37
use bon::Builder;
48
use data_encoding::BASE64URL_NOPAD;
@@ -42,6 +46,42 @@ macro_rules! Primitive {
4246
self.0.fmt(f)
4347
}
4448
}
49+
50+
impl Deref for $name {
51+
type Target = $t;
52+
53+
fn deref(&self) -> &$t {
54+
&self.0
55+
}
56+
}
57+
58+
impl Add for $name {
59+
type Output = Self;
60+
61+
fn add(self, rhs: Self) -> Self::Output {
62+
Self(self.0 + rhs.0)
63+
}
64+
}
65+
66+
impl Add<$t> for $name {
67+
type Output = Self;
68+
69+
fn add(self, rhs: $t) -> Self {
70+
Self(self.0 + rhs)
71+
}
72+
}
73+
74+
impl AddAssign for $name {
75+
fn add_assign(&mut self, rhs: Self) {
76+
*self = *self + rhs
77+
}
78+
}
79+
80+
impl AddAssign<$t> for $name {
81+
fn add_assign(&mut self, rhs: $t) {
82+
*self = *self + rhs
83+
}
84+
}
4585
};
4686
}
4787

robusta/src/watcher.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::str::from_utf8;
22

33
use bytes::Bytes;
4+
use either::Either;
45
use espresso_types::{Header, NamespaceId};
56
use futures::{SinkExt, StreamExt};
67
use reqwest::header::LOCATION;
@@ -37,7 +38,7 @@ impl Watcher {
3738
}
3839
}
3940

40-
pub async fn next(&mut self) -> Header {
41+
pub async fn next(&mut self) -> Either<Height, Header> {
4142
'main: loop {
4243
let ws = if let Some(w) = &mut self.websocket {
4344
w
@@ -74,16 +75,16 @@ impl Watcher {
7475
Some(Ok(Message::Text(text))) => {
7576
match serde_json::from_str::<Header>(text.as_str()) {
7677
Ok(hdr) => {
78+
self.height = hdr.height().into();
7779
if let Some(id) = &self.namespace {
7880
if hdr.ns_table().find_ns_id(id).is_some() {
79-
self.height = hdr.height().into();
80-
return hdr;
81+
return Either::Right(hdr);
8182
} else {
8283
debug!(height = %hdr.height(), "namespace id not found");
84+
return Either::Left(self.height);
8385
}
8486
} else {
85-
self.height = hdr.height().into();
86-
return hdr;
87+
return Either::Right(hdr);
8788
}
8889
}
8990
Err(err) => {

0 commit comments

Comments
 (0)