Skip to content

Commit e59f09a

Browse files
authored
Merge pull request #377 from EspressoSystems/tw/robusta
Add espresso network client.
2 parents bd0ef25 + e8fb582 commit e59f09a

File tree

18 files changed

+917
-19
lines changed

18 files changed

+917
-19
lines changed

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ members = [
33
"cliquenet",
44
"metrics",
55
"multisig",
6+
"robusta",
67
"sailfish",
78
"sailfish-consensus",
89
"sailfish-rbc",
@@ -76,7 +77,7 @@ prost = "0.13.5"
7677
quickcheck = "1.0.3"
7778
rand = "0.9"
7879
rayon = "1.10"
79-
reqwest = { version = "0.12" }
80+
reqwest = { version = "0.12", features = ["json"] }
8081
secp256k1 = { version = "0.31.0", features = ["global-context", "hashes", "rand", "serde"] }
8182
serde = { version = "1", features = ["derive", "rc"] }
8283
serde_bytes = "0.11.15"
@@ -93,16 +94,22 @@ thiserror = "2.0"
9394
tide-disco = "0.9.3"
9495
tokio = { version = "1", default-features = false, features = ["full"] }
9596
tokio-stream = "0.1.17"
97+
tokio-tungstenite = { version = "0.27.0", features = ["rustls-tls-webpki-roots", "url"] }
9698
tokio-util = "0.7.15"
9799
toml = "0.8.19"
98100
tonic = "0.13.1"
99101
tonic-build = { version = "0.13.1", features = ["prost"] }
100102
tracing = "0.1"
101103
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
102104
turmoil = "0.6.4"
105+
url = "2.5.4"
103106
vbs = "0.1"
104107
zeroize = { version = "1.8", features = ["zeroize_derive"] }
105108

109+
espresso-types = { git = "https://github.com/EspressoSystems/espresso-network.git" }
110+
hotshot-query-service = { git = "https://github.com/EspressoSystems/espresso-network.git" }
111+
hotshot-types = { git = "https://github.com/EspressoSystems/espresso-network.git" }
112+
106113
[profile.test]
107114
codegen-units = 16
108115
incremental = false

docker/timeboost.Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ FROM debian:bullseye-slim
1212

1313
WORKDIR /app
1414

15+
RUN apt update && apt-get install -y libcurl4
16+
1517
# Create non-root user and group
1618
RUN groupadd -r appgroup && useradd -r -g appgroup timeboostuser
1719

robusta/Cargo.toml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "robusta"
3+
description = "espresso client"
4+
version.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
8+
[dependencies]
9+
bincode = { workspace = true }
10+
bon = { workspace = true }
11+
data-encoding = { workspace = true }
12+
espresso-types = { workspace = true }
13+
futures = { workspace = true }
14+
hotshot-query-service = { workspace = true }
15+
hotshot-types = { workspace = true }
16+
reqwest = { workspace = true }
17+
serde = { workspace = true }
18+
serde_json = { workspace = true }
19+
thiserror = { workspace = true }
20+
timeboost-types = { path = "../timeboost-types" }
21+
tokio = { workspace = true }
22+
tokio-tungstenite = { workspace = true }
23+
tracing = { workspace = true }
24+
url = { workspace = true }
25+
26+
[dev-dependencies]
27+
tracing-subscriber = { workspace = true }

robusta/src/config.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use std::{iter::repeat, time::Duration};
2+
3+
use bon::Builder;
4+
use url::{ParseError, Url};
5+
6+
const NUM_DELAYS: usize = 5;
7+
8+
#[derive(Debug, Clone, Builder)]
9+
pub struct Config {
10+
/// Espresso network base URL.
11+
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
12+
pub(crate) base_url: Url,
13+
14+
/// Espresso network websocket base URL.
15+
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
16+
pub(crate) wss_base_url: Url,
17+
18+
#[builder(default = 3)]
19+
pub(crate) max_redirects: usize,
20+
21+
/// The sequence of delays between successive requests.
22+
///
23+
/// The last value is repeated forever.
24+
#[builder(default = [1, 3, 5, 10, 15])]
25+
pub(crate) delays: [u8; NUM_DELAYS],
26+
}
27+
28+
impl Config {
29+
pub fn delay_iter(&self) -> impl Iterator<Item = Duration> + use<> {
30+
self.delays
31+
.into_iter()
32+
.chain(repeat(self.delays[NUM_DELAYS - 1]))
33+
.map(|n| Duration::from_secs(n.into()))
34+
}
35+
}

robusta/src/lib.rs

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
mod config;
2+
mod types;
3+
mod watcher;
4+
5+
use std::time::Duration;
6+
7+
use espresso_types::{Header, NamespaceId, Transaction};
8+
use reqwest::{StatusCode, Url, redirect::Policy};
9+
use serde::{Serialize, de::DeserializeOwned};
10+
use serde_json as json;
11+
use timeboost_types::CertifiedBlock;
12+
use tokio::time::sleep;
13+
use tracing::warn;
14+
15+
use crate::types::{TX, TaggedBase64, TransactionsWithProof, VidCommonResponse};
16+
17+
pub use crate::types::Height;
18+
pub use crate::watcher::{WatchError, watch};
19+
pub use config::{Config, ConfigBuilder};
20+
pub use espresso_types;
21+
22+
/// A client for the Espresso network.
23+
#[derive(Debug)]
24+
pub struct Client {
25+
config: Config,
26+
client: reqwest::Client,
27+
}
28+
29+
impl Client {
30+
pub fn new(c: Config) -> Self {
31+
let r = reqwest::Client::builder()
32+
.https_only(true)
33+
.timeout(Duration::from_secs(30))
34+
.redirect(Policy::limited(c.max_redirects))
35+
.build()
36+
.expect("TLS and DNS resolver work");
37+
Self {
38+
config: c,
39+
client: r,
40+
}
41+
}
42+
43+
pub async fn height(&mut self) -> Result<Height, Error> {
44+
let u = self.config.base_url.join("status/block-height")?;
45+
self.get_with_retry(u).await
46+
}
47+
48+
pub async fn submit(&mut self, cb: &CertifiedBlock) -> Result<(), Error> {
49+
let nid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
50+
let trx = Transaction::new(nid, serialize(cb)?);
51+
let url = self.config.base_url.join("submit/submit")?;
52+
self.post_with_retry::<_, TaggedBase64<TX>>(url, &trx)
53+
.await?;
54+
Ok(())
55+
}
56+
57+
pub async fn verify(&mut self, h: &Header, cb: &CertifiedBlock) -> Result<(), Error> {
58+
let nsid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
59+
60+
let trxs = self.transactions(h.height(), nsid).await?;
61+
let Some(proof) = trxs.proof else {
62+
return Err(ProofError::NoProof.into());
63+
};
64+
if !trxs.transactions.iter().any(|t| matches(t.payload(), cb)) {
65+
return Err(Error::TransactionNotFound);
66+
}
67+
68+
let vidc = self.vid_common(h.height()).await?;
69+
70+
let Some((trxs, ns)) = proof.verify(h.ns_table(), &h.payload_commitment(), &vidc.common)
71+
else {
72+
return Err(ProofError::InvalidProof.into());
73+
};
74+
if ns != nsid {
75+
return Err(ProofError::NamespaceMismatch(ns, nsid).into());
76+
}
77+
if !trxs.iter().any(|t| matches(t.payload(), cb)) {
78+
return Err(ProofError::TransactionNotInProof.into());
79+
}
80+
81+
Ok(())
82+
}
83+
84+
async fn transactions<H, N>(&self, height: H, nsid: N) -> Result<TransactionsWithProof, Error>
85+
where
86+
H: Into<Height>,
87+
N: Into<NamespaceId>,
88+
{
89+
let h = height.into();
90+
let n = nsid.into();
91+
let u = self
92+
.config
93+
.base_url
94+
.join(&format!("availability/block/{h}/namespace/{n}"))?;
95+
self.get_with_retry(u).await
96+
}
97+
98+
async fn vid_common<H>(&self, height: H) -> Result<VidCommonResponse, Error>
99+
where
100+
H: Into<Height>,
101+
{
102+
let h = height.into();
103+
let u = self
104+
.config
105+
.base_url
106+
.join(&format!("availability/vid/common/{h}"))?;
107+
self.get_with_retry(u).await
108+
}
109+
110+
async fn get_with_retry<A>(&self, url: Url) -> Result<A, Error>
111+
where
112+
A: DeserializeOwned,
113+
{
114+
let mut delay = self.config.delay_iter();
115+
loop {
116+
match self.get(url.clone()).await {
117+
Ok(a) => return Ok(a),
118+
Err(err) => {
119+
warn!(%url, %err, "failed to get response");
120+
sleep(delay.next().expect("infinite delay sequence")).await;
121+
}
122+
}
123+
}
124+
}
125+
126+
async fn post_with_retry<A, B>(&self, url: Url, a: &A) -> Result<B, Error>
127+
where
128+
A: Serialize,
129+
B: DeserializeOwned,
130+
{
131+
let mut delay = self.config.delay_iter();
132+
loop {
133+
match self.post(url.clone(), a).await {
134+
Ok(b) => return Ok(b),
135+
Err(err) => {
136+
warn!(%url, %err, "failed to post request");
137+
sleep(delay.next().expect("infinite delay sequence")).await;
138+
}
139+
}
140+
}
141+
}
142+
143+
async fn get<A>(&self, url: Url) -> Result<A, InternalError>
144+
where
145+
A: DeserializeOwned,
146+
{
147+
let res = self.client.get(url).send().await?;
148+
149+
if !res.status().is_success() {
150+
return Err(InternalError::Status(res.status()));
151+
}
152+
153+
Ok(res.json().await?)
154+
}
155+
156+
async fn post<A, B>(&self, u: Url, t: &A) -> Result<B, InternalError>
157+
where
158+
A: Serialize,
159+
B: DeserializeOwned,
160+
{
161+
let res = self.client.post(u).json(t).send().await?;
162+
163+
if !res.status().is_success() {
164+
return Err(InternalError::Status(res.status()));
165+
}
166+
167+
Ok(res.json().await?)
168+
}
169+
}
170+
171+
fn matches(a: &[u8], b: &CertifiedBlock) -> bool {
172+
let Ok(a) = deserialize::<CertifiedBlock>(a) else {
173+
return false;
174+
};
175+
a.data().hash() == b.data().hash() && a.data().hash() == b.cert().data().hash()
176+
}
177+
178+
/// Errors `Client` can not recover from.
179+
#[derive(Debug, thiserror::Error)]
180+
pub enum Error {
181+
#[error("json error: {0}")]
182+
Json(#[from] json::Error),
183+
184+
#[error("bincode encode error: {0}")]
185+
BincodeEncode(#[from] bincode::error::EncodeError),
186+
187+
#[error("bincode decode error: {0}")]
188+
BincodeDecode(#[from] bincode::error::DecodeError),
189+
190+
#[error("url error: {0}")]
191+
Url(#[from] url::ParseError),
192+
193+
#[error("proof error: {0}")]
194+
Proof(#[from] ProofError),
195+
196+
#[error("transaction not found")]
197+
TransactionNotFound,
198+
}
199+
200+
#[derive(Debug, thiserror::Error)]
201+
pub enum ProofError {
202+
#[error("no proof term")]
203+
NoProof,
204+
#[error("proof verification failed")]
205+
InvalidProof,
206+
#[error("namespace mismatch: {0} != {1}")]
207+
NamespaceMismatch(NamespaceId, NamespaceId),
208+
#[error("transaction not found in proof")]
209+
TransactionNotInProof,
210+
}
211+
212+
/// Internal, hopefully transient errors.
213+
#[derive(Debug, thiserror::Error)]
214+
enum InternalError {
215+
#[error("json error: {0}")]
216+
Json(#[from] json::Error),
217+
218+
#[error("http error: {0}")]
219+
Http(#[from] reqwest::Error),
220+
221+
#[error("api status: {0}")]
222+
Status(StatusCode),
223+
}
224+
225+
fn serialize<T: Serialize>(d: &T) -> Result<Vec<u8>, Error> {
226+
let v = bincode::serde::encode_to_vec(d, bincode::config::standard())?;
227+
Ok(v)
228+
}
229+
230+
fn deserialize<T: DeserializeOwned>(d: &[u8]) -> Result<T, Error> {
231+
bincode::serde::decode_from_slice(d, bincode::config::standard())
232+
.map(|(msg, _)| msg)
233+
.map_err(Into::into)
234+
}
235+
236+
#[cfg(test)]
237+
mod tests {
238+
use super::{Client, Config};
239+
240+
#[tokio::test]
241+
async fn decaf_smoke() {
242+
let _ = tracing_subscriber::fmt()
243+
.with_env_filter("robusta=debug")
244+
.try_init();
245+
246+
let cfg = Config::builder()
247+
.base_url("https://query.decaf.testnet.espresso.network/v1/")
248+
.unwrap()
249+
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
250+
.unwrap()
251+
.build();
252+
253+
let mut clt = Client::new(cfg.clone());
254+
let height = clt.height().await.unwrap();
255+
let header = super::watch(&cfg, height, None).await.unwrap();
256+
assert_eq!(u64::from(height), header.height());
257+
}
258+
}

0 commit comments

Comments
 (0)