Skip to content

Commit 18f2d2f

Browse files
authored
Merge pull request #418 from EspressoSystems/tw/parallel_submission
Concurrently submit and verify blocks.
2 parents e59f09a + b4cadc5 commit 18f2d2f

File tree

20 files changed

+475
-329
lines changed

20 files changed

+475
-329
lines changed

robusta/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ rust-version.workspace = true
99
bincode = { workspace = true }
1010
bon = { workspace = true }
1111
data-encoding = { workspace = true }
12+
either = { workspace = true }
1213
espresso-types = { workspace = true }
1314
futures = { workspace = true }
1415
hotshot-query-service = { workspace = true }
1516
hotshot-types = { workspace = true }
17+
multisig = { path = "../multisig" }
1618
reqwest = { workspace = true }
1719
serde = { workspace = true }
1820
serde_json = { workspace = true }

robusta/src/config.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ const NUM_DELAYS: usize = 5;
77

88
#[derive(Debug, Clone, Builder)]
99
pub struct Config {
10+
/// Log label.
11+
#[builder(into)]
12+
pub(crate) label: String,
13+
1014
/// Espresso network base URL.
1115
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
1216
pub(crate) base_url: Url,
@@ -15,9 +19,6 @@ pub struct Config {
1519
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
1620
pub(crate) wss_base_url: Url,
1721

18-
#[builder(default = 3)]
19-
pub(crate) max_redirects: usize,
20-
2122
/// The sequence of delays between successive requests.
2223
///
2324
/// The last value is repeated forever.

robusta/src/lib.rs

Lines changed: 86 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ mod config;
22
mod types;
33
mod watcher;
44

5+
use std::iter::empty;
56
use std::time::Duration;
67

8+
use either::Either;
79
use espresso_types::{Header, NamespaceId, Transaction};
8-
use reqwest::{StatusCode, Url, redirect::Policy};
10+
use multisig::{Unchecked, Validated};
11+
use reqwest::{StatusCode, Url};
912
use serde::{Serialize, de::DeserializeOwned};
1013
use serde_json as json;
11-
use timeboost_types::CertifiedBlock;
14+
use timeboost_types::sailfish::CommitteeVec;
15+
use timeboost_types::{BlockNumber, CertifiedBlock};
1216
use tokio::time::sleep;
13-
use tracing::warn;
17+
use tracing::{debug, warn};
1418

1519
use crate::types::{TX, TaggedBase64, TransactionsWithProof, VidCommonResponse};
1620

@@ -20,7 +24,7 @@ pub use config::{Config, ConfigBuilder};
2024
pub use espresso_types;
2125

2226
/// A client for the Espresso network.
23-
#[derive(Debug)]
27+
#[derive(Debug, Clone)]
2428
pub struct Client {
2529
config: Config,
2630
client: reqwest::Client,
@@ -31,7 +35,6 @@ impl Client {
3135
let r = reqwest::Client::builder()
3236
.https_only(true)
3337
.timeout(Duration::from_secs(30))
34-
.redirect(Policy::limited(c.max_redirects))
3538
.build()
3639
.expect("TLS and DNS resolver work");
3740
Self {
@@ -40,45 +43,90 @@ impl Client {
4043
}
4144
}
4245

43-
pub async fn height(&mut self) -> Result<Height, Error> {
46+
pub fn config(&self) -> &Config {
47+
&self.config
48+
}
49+
50+
pub async fn height(&self) -> Result<Height, Error> {
4451
let u = self.config.base_url.join("status/block-height")?;
4552
self.get_with_retry(u).await
4653
}
4754

48-
pub async fn submit(&mut self, cb: &CertifiedBlock) -> Result<(), Error> {
49-
let nid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
50-
let trx = Transaction::new(nid, serialize(cb)?);
55+
pub async fn submit<N>(&self, nsid: N, cb: &CertifiedBlock<Validated>) -> Result<(), Error>
56+
where
57+
N: Into<NamespaceId>,
58+
{
59+
let trx = Transaction::new(nsid.into(), serialize(cb)?);
5160
let url = self.config.base_url.join("submit/submit")?;
5261
self.post_with_retry::<_, TaggedBase64<TX>>(url, &trx)
5362
.await?;
5463
Ok(())
5564
}
5665

57-
pub async fn verify(&mut self, h: &Header, cb: &CertifiedBlock) -> Result<(), Error> {
58-
let nsid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
59-
60-
let trxs = self.transactions(h.height(), nsid).await?;
66+
pub async fn verified<N, const C: usize>(
67+
&self,
68+
nsid: N,
69+
hdr: &Header,
70+
cvec: &CommitteeVec<C>,
71+
) -> impl Iterator<Item = BlockNumber>
72+
where
73+
N: Into<NamespaceId>,
74+
{
75+
let nsid = nsid.into();
76+
debug!(node = %self.config.label, %nsid, height = %hdr.height(), "verifying blocks");
77+
let Ok(trxs) = self.transactions(hdr.height(), nsid).await else {
78+
debug!(node = %self.config.label, %nsid, height = %hdr.height(), "no transactions");
79+
return Either::Left(empty());
80+
};
6181
let Some(proof) = trxs.proof else {
62-
return Err(ProofError::NoProof.into());
82+
debug!(node = %self.config.label, %nsid, height = %hdr.height(), "no proof");
83+
return Either::Left(empty());
6384
};
64-
if !trxs.transactions.iter().any(|t| matches(t.payload(), cb)) {
65-
return Err(Error::TransactionNotFound);
66-
}
67-
68-
let vidc = self.vid_common(h.height()).await?;
69-
70-
let Some((trxs, ns)) = proof.verify(h.ns_table(), &h.payload_commitment(), &vidc.common)
85+
let Ok(vidc) = self.vid_common(hdr.height()).await else {
86+
debug!(node = %self.config.label, height = %hdr.height(), "no vid common");
87+
return Either::Left(empty());
88+
};
89+
let Some((trxs, ns)) =
90+
proof.verify(hdr.ns_table(), &hdr.payload_commitment(), &vidc.common)
7191
else {
72-
return Err(ProofError::InvalidProof.into());
92+
warn!(node = %self.config.label, %nsid, height = %hdr.height(), "proof verification failed");
93+
return Either::Left(empty());
7394
};
7495
if ns != nsid {
75-
return Err(ProofError::NamespaceMismatch(ns, nsid).into());
76-
}
77-
if !trxs.iter().any(|t| matches(t.payload(), cb)) {
78-
return Err(ProofError::TransactionNotInProof.into());
96+
warn!(node = %self.config.label, a = %nsid, b = %ns, height = %hdr.height(), "namespace mismatch");
97+
return Either::Left(empty());
7998
}
80-
81-
Ok(())
99+
Either::Right(trxs.into_iter().filter_map(move |t| {
100+
match deserialize::<CertifiedBlock<Unchecked>>(t.payload()) {
101+
Ok(b) => {
102+
let Some(c) = cvec.get(b.committee()) else {
103+
warn!(
104+
node = %self.config.label,
105+
height = %hdr.height(),
106+
committee = %b.committee(),
107+
"unknown committee"
108+
);
109+
return None;
110+
};
111+
if let Some(b) = b.validated(c) {
112+
Some(b.cert().data().num())
113+
} else {
114+
warn!(node = %self.config.label, height = %hdr.height(), "invalid block");
115+
None
116+
}
117+
}
118+
Err(err) => {
119+
warn!(
120+
node = %self.config.label,
121+
nsid = %nsid,
122+
height = %hdr.height(),
123+
err = %err,
124+
"could not deserialize block"
125+
);
126+
None
127+
}
128+
}
129+
}))
82130
}
83131

84132
async fn transactions<H, N>(&self, height: H, nsid: N) -> Result<TransactionsWithProof, Error>
@@ -116,7 +164,7 @@ impl Client {
116164
match self.get(url.clone()).await {
117165
Ok(a) => return Ok(a),
118166
Err(err) => {
119-
warn!(%url, %err, "failed to get response");
167+
warn!(node = %self.config.label, %url, %err, "failed to get response");
120168
sleep(delay.next().expect("infinite delay sequence")).await;
121169
}
122170
}
@@ -133,7 +181,7 @@ impl Client {
133181
match self.post(url.clone(), a).await {
134182
Ok(b) => return Ok(b),
135183
Err(err) => {
136-
warn!(%url, %err, "failed to post request");
184+
warn!(node = %self.config.label, %url, %err, "failed to post request");
137185
sleep(delay.next().expect("infinite delay sequence")).await;
138186
}
139187
}
@@ -168,13 +216,6 @@ impl Client {
168216
}
169217
}
170218

171-
fn matches(a: &[u8], b: &CertifiedBlock) -> bool {
172-
let Ok(a) = deserialize::<CertifiedBlock>(a) else {
173-
return false;
174-
};
175-
a.data().hash() == b.data().hash() && a.data().hash() == b.cert().data().hash()
176-
}
177-
178219
/// Errors `Client` can not recover from.
179220
#[derive(Debug, thiserror::Error)]
180221
pub enum Error {
@@ -235,6 +276,9 @@ fn deserialize<T: DeserializeOwned>(d: &[u8]) -> Result<T, Error> {
235276

236277
#[cfg(test)]
237278
mod tests {
279+
use futures::StreamExt;
280+
use tokio::pin;
281+
238282
use super::{Client, Config};
239283

240284
#[tokio::test]
@@ -248,11 +292,13 @@ mod tests {
248292
.unwrap()
249293
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
250294
.unwrap()
295+
.label("decaf_smoke")
251296
.build();
252297

253-
let mut clt = Client::new(cfg.clone());
298+
let clt = Client::new(cfg.clone());
254299
let height = clt.height().await.unwrap();
255-
let header = super::watch(&cfg, height, None).await.unwrap();
256-
assert_eq!(u64::from(height), header.height());
300+
let headers = super::watch(&cfg, height, None).await.unwrap();
301+
pin!(headers);
302+
assert_eq!(u64::from(height), headers.next().await.unwrap().height());
257303
}
258304
}

robusta/src/watcher.rs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
use std::future::ready;
12
use std::str::from_utf8;
23

34
use espresso_types::{Header, NamespaceId};
4-
use futures::{SinkExt, TryStreamExt};
5+
use futures::{SinkExt, Stream, StreamExt, stream};
56
use reqwest::header::LOCATION;
67
use tokio::{net::TcpStream, time::sleep};
78
use tokio_tungstenite::tungstenite::{self, Message, client::IntoClientRequest};
@@ -12,55 +13,70 @@ use crate::{Config, types::Height};
1213

1314
type Ws = WebSocketStream<MaybeTlsStream<TcpStream>>;
1415

15-
pub async fn watch<H, N>(cfg: &Config, height: H, nsid: N) -> Result<Header, WatchError>
16+
pub async fn watch<H, N>(
17+
cfg: &Config,
18+
height: H,
19+
nsid: N,
20+
) -> Result<impl Stream<Item = Header> + use<H, N>, WatchError>
1621
where
1722
H: Into<Height>,
1823
N: Into<Option<NamespaceId>>,
1924
{
2025
let height = height.into();
2126
let nsid = nsid.into();
2227

23-
let mut ws = connect(cfg, height).await?;
24-
25-
while let Some(m) = ws.try_next().await? {
26-
match m {
27-
Message::Binary(_) => {
28-
debug!("bytes received")
28+
let ws = connect(cfg, height).await?;
29+
let ws = stream::unfold(ws, move |mut ws| async move {
30+
match ws.next().await? {
31+
Ok(Message::Binary(_)) => {
32+
debug!("bytes received");
33+
Some((None, ws))
2934
}
30-
Message::Text(text) => match serde_json::from_str::<Header>(text.as_str()) {
35+
Ok(Message::Text(text)) => match serde_json::from_str::<Header>(text.as_str()) {
3136
Ok(hdr) => {
3237
if let Some(id) = nsid {
3338
if hdr.ns_table().find_ns_id(&id).is_some() {
34-
return Ok(hdr);
39+
Some((Some(hdr), ws))
3540
} else {
3641
debug!(height = %hdr.height(), "namespace id not found");
42+
Some((None, ws))
3743
}
3844
} else {
39-
return Ok(hdr);
45+
Some((Some(hdr), ws))
4046
}
4147
}
4248
Err(err) => {
43-
warn!(%err, "could not read text frame as header")
49+
warn!(%err, "could not read text frame as header");
50+
None
4451
}
4552
},
46-
Message::Ping(bytes) => {
53+
Ok(Message::Ping(bytes)) => {
4754
debug!("ping received");
48-
ws.send(Message::Pong(bytes)).await?
55+
if let Err(err) = ws.send(Message::Pong(bytes)).await {
56+
warn!(%err, "failed to answer ping")
57+
}
58+
Some((None, ws))
4959
}
50-
Message::Pong(_) => {
51-
debug!("unexpected pong")
60+
Ok(Message::Pong(_)) => {
61+
debug!("unexpected pong");
62+
Some((None, ws))
5263
}
53-
Message::Close(frame) => {
64+
Ok(Message::Close(frame)) => {
5465
if let Some(f) = frame {
5566
warn!(code = ?f.code, reason = %f.reason.as_str(), "connection closed");
5667
}
57-
return Err(WatchError::Closed);
68+
None
69+
}
70+
Ok(Message::Frame(_)) => {
71+
unreachable!("tungstenite does not produce this while reading")
72+
}
73+
Err(err) => {
74+
warn!(%err, "websocket error");
75+
None
5876
}
59-
Message::Frame(_) => unreachable!("tungestnite does not produce this while reading"),
6077
}
61-
}
62-
63-
Err(WatchError::Closed)
78+
});
79+
Ok(ws.filter_map(ready))
6480
}
6581

6682
async fn connect(cfg: &Config, h: Height) -> Result<Ws, WatchError> {
@@ -69,9 +85,8 @@ async fn connect(cfg: &Config, h: Height) -> Result<Ws, WatchError> {
6985
.join(&format!("availability/stream/headers/{h}"))?;
7086

7187
let mut delay = cfg.delay_iter();
72-
let mut redirect = 0;
7388

74-
while redirect < cfg.max_redirects {
89+
loop {
7590
debug!(%url, "connecting");
7691
let r = (&url).into_client_request()?;
7792
match connect_async(r).await {
@@ -84,7 +99,6 @@ async fn connect(cfg: &Config, h: Height) -> Result<Ws, WatchError> {
8499
&& let Some(loc) = r.headers().get(LOCATION)
85100
&& let Ok(path) = from_utf8(loc.as_bytes())
86101
{
87-
redirect += 1;
88102
url.set_path(path);
89103
debug!(%url, "following redirection");
90104
continue;
@@ -98,8 +112,6 @@ async fn connect(cfg: &Config, h: Height) -> Result<Ws, WatchError> {
98112
let d = delay.next().expect("infinite delay sequence");
99113
sleep(d).await
100114
}
101-
102-
Err(WatchError::TooManyRedirects)
103115
}
104116

105117
#[derive(Debug, thiserror::Error)]
@@ -109,13 +121,4 @@ pub enum WatchError {
109121

110122
#[error("websocket error: {0}")]
111123
Ws(#[from] tungstenite::Error),
112-
113-
#[error("no tls")]
114-
NoTls,
115-
116-
#[error("connection closed")]
117-
Closed,
118-
119-
#[error("too many redirects")]
120-
TooManyRedirects,
121124
}

0 commit comments

Comments
 (0)