Skip to content

Commit 98a6c18

Browse files
committed
Merge branch 'ax/crypto-update' into ax/reshare-core
2 parents 48c3764 + 21ce388 commit 98a6c18

File tree

26 files changed

+759
-417
lines changed

26 files changed

+759
-417
lines changed

robusta/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@ rust-version.workspace = true
88
[dependencies]
99
bincode = { workspace = true }
1010
bon = { workspace = true }
11+
bytes = { workspace = true }
1112
data-encoding = { workspace = true }
13+
either = { workspace = true }
1214
espresso-types = { workspace = true }
1315
futures = { workspace = true }
1416
hotshot-query-service = { workspace = true }
1517
hotshot-types = { workspace = true }
18+
multisig = { path = "../multisig" }
1619
reqwest = { workspace = true }
1720
serde = { workspace = true }
1821
serde_json = { workspace = true }
1922
thiserror = { workspace = true }
2023
timeboost-types = { path = "../timeboost-types" }
2124
tokio = { workspace = true }
25+
tokio-stream = { workspace = true }
2226
tokio-tungstenite = { workspace = true }
2327
tracing = { workspace = true }
2428
url = { workspace = true }

robusta/src/config.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,40 @@
1-
use std::{iter::repeat, time::Duration};
1+
use std::{iter::repeat, num::NonZeroUsize, time::Duration};
22

33
use bon::Builder;
4-
use url::{ParseError, Url};
4+
use url::Url;
55

6-
const NUM_DELAYS: usize = 5;
6+
const NUM_DELAYS: NonZeroUsize = NonZeroUsize::new(5).expect("5 > 0");
77

88
#[derive(Debug, Clone, Builder)]
99
pub struct Config {
10+
/// Log label.
11+
#[builder(into)]
12+
pub(crate) label: String,
13+
1014
/// Espresso network base URL.
11-
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
1215
pub(crate) base_url: Url,
1316

1417
/// Espresso network websocket base URL.
15-
#[builder(with = |s: &str| -> Result<_, ParseError> { Url::parse(s) })]
1618
pub(crate) wss_base_url: Url,
1719

18-
#[builder(default = 3)]
19-
pub(crate) max_redirects: usize,
20-
2120
/// The sequence of delays between successive requests.
2221
///
2322
/// The last value is repeated forever.
2423
#[builder(default = [1, 3, 5, 10, 15])]
25-
pub(crate) delays: [u8; NUM_DELAYS],
24+
pub(crate) delays: [u8; NUM_DELAYS.get()],
2625
}
2726

2827
impl Config {
28+
pub fn with_websocket_base_url(&self, u: Url) -> Self {
29+
let mut c = self.clone();
30+
c.wss_base_url = u;
31+
c
32+
}
33+
2934
pub fn delay_iter(&self) -> impl Iterator<Item = Duration> + use<> {
3035
self.delays
3136
.into_iter()
32-
.chain(repeat(self.delays[NUM_DELAYS - 1]))
37+
.chain(repeat(self.delays[NUM_DELAYS.get() - 1]))
3338
.map(|n| Duration::from_secs(n.into()))
3439
}
3540
}

robusta/src/lib.rs

Lines changed: 96 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,32 @@
11
mod config;
2+
mod multiwatcher;
23
mod types;
34
mod watcher;
45

6+
use std::iter::empty;
57
use std::time::Duration;
68

9+
use either::Either;
710
use espresso_types::{Header, NamespaceId, Transaction};
8-
use reqwest::{StatusCode, Url, redirect::Policy};
11+
use multisig::{Unchecked, Validated};
12+
use reqwest::{StatusCode, Url};
913
use serde::{Serialize, de::DeserializeOwned};
1014
use serde_json as json;
11-
use timeboost_types::CertifiedBlock;
15+
use timeboost_types::sailfish::CommitteeVec;
16+
use timeboost_types::{BlockNumber, CertifiedBlock};
1217
use tokio::time::sleep;
13-
use tracing::warn;
18+
use tracing::{debug, warn};
1419

1520
use crate::types::{TX, TaggedBase64, TransactionsWithProof, VidCommonResponse};
1621

22+
pub use crate::multiwatcher::Multiwatcher;
1723
pub use crate::types::Height;
18-
pub use crate::watcher::{WatchError, watch};
24+
pub use crate::watcher::{WatchError, Watcher};
1925
pub use config::{Config, ConfigBuilder};
2026
pub use espresso_types;
2127

2228
/// A client for the Espresso network.
23-
#[derive(Debug)]
29+
#[derive(Debug, Clone)]
2430
pub struct Client {
2531
config: Config,
2632
client: reqwest::Client,
@@ -31,7 +37,6 @@ impl Client {
3137
let r = reqwest::Client::builder()
3238
.https_only(true)
3339
.timeout(Duration::from_secs(30))
34-
.redirect(Policy::limited(c.max_redirects))
3540
.build()
3641
.expect("TLS and DNS resolver work");
3742
Self {
@@ -40,45 +45,90 @@ impl Client {
4045
}
4146
}
4247

43-
pub async fn height(&mut self) -> Result<Height, Error> {
48+
pub fn config(&self) -> &Config {
49+
&self.config
50+
}
51+
52+
pub async fn height(&self) -> Result<Height, Error> {
4453
let u = self.config.base_url.join("status/block-height")?;
4554
self.get_with_retry(u).await
4655
}
4756

48-
pub async fn submit(&mut self, cb: &CertifiedBlock) -> Result<(), Error> {
49-
let nid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
50-
let trx = Transaction::new(nid, serialize(cb)?);
57+
pub async fn submit<N>(&self, nsid: N, cb: &CertifiedBlock<Validated>) -> Result<(), Error>
58+
where
59+
N: Into<NamespaceId>,
60+
{
61+
let trx = Transaction::new(nsid.into(), serialize(cb)?);
5162
let url = self.config.base_url.join("submit/submit")?;
5263
self.post_with_retry::<_, TaggedBase64<TX>>(url, &trx)
5364
.await?;
5465
Ok(())
5566
}
5667

57-
pub async fn verify(&mut self, h: &Header, cb: &CertifiedBlock) -> Result<(), Error> {
58-
let nsid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
59-
60-
let trxs = self.transactions(h.height(), nsid).await?;
68+
pub async fn verified<N, const C: usize>(
69+
&self,
70+
nsid: N,
71+
hdr: &Header,
72+
cvec: &CommitteeVec<C>,
73+
) -> impl Iterator<Item = BlockNumber>
74+
where
75+
N: Into<NamespaceId>,
76+
{
77+
let nsid = nsid.into();
78+
debug!(node = %self.config.label, %nsid, height = %hdr.height(), "verifying blocks");
79+
let Ok(trxs) = self.transactions(hdr.height(), nsid).await else {
80+
debug!(node = %self.config.label, %nsid, height = %hdr.height(), "no transactions");
81+
return Either::Left(empty());
82+
};
6183
let Some(proof) = trxs.proof else {
62-
return Err(ProofError::NoProof.into());
84+
debug!(node = %self.config.label, %nsid, height = %hdr.height(), "no proof");
85+
return Either::Left(empty());
6386
};
64-
if !trxs.transactions.iter().any(|t| matches(t.payload(), cb)) {
65-
return Err(Error::TransactionNotFound);
66-
}
67-
68-
let vidc = self.vid_common(h.height()).await?;
69-
70-
let Some((trxs, ns)) = proof.verify(h.ns_table(), &h.payload_commitment(), &vidc.common)
87+
let Ok(vidc) = self.vid_common(hdr.height()).await else {
88+
debug!(node = %self.config.label, height = %hdr.height(), "no vid common");
89+
return Either::Left(empty());
90+
};
91+
let Some((trxs, ns)) =
92+
proof.verify(hdr.ns_table(), &hdr.payload_commitment(), &vidc.common)
7193
else {
72-
return Err(ProofError::InvalidProof.into());
94+
warn!(node = %self.config.label, %nsid, height = %hdr.height(), "proof verification failed");
95+
return Either::Left(empty());
7396
};
7497
if ns != nsid {
75-
return Err(ProofError::NamespaceMismatch(ns, nsid).into());
76-
}
77-
if !trxs.iter().any(|t| matches(t.payload(), cb)) {
78-
return Err(ProofError::TransactionNotInProof.into());
98+
warn!(node = %self.config.label, a = %nsid, b = %ns, height = %hdr.height(), "namespace mismatch");
99+
return Either::Left(empty());
79100
}
80-
81-
Ok(())
101+
Either::Right(trxs.into_iter().filter_map(move |t| {
102+
match deserialize::<CertifiedBlock<Unchecked>>(t.payload()) {
103+
Ok(b) => {
104+
let Some(c) = cvec.get(b.committee()) else {
105+
warn!(
106+
node = %self.config.label,
107+
height = %hdr.height(),
108+
committee = %b.committee(),
109+
"unknown committee"
110+
);
111+
return None;
112+
};
113+
if let Some(b) = b.validated(c) {
114+
Some(b.cert().data().num())
115+
} else {
116+
warn!(node = %self.config.label, height = %hdr.height(), "invalid block");
117+
None
118+
}
119+
}
120+
Err(err) => {
121+
warn!(
122+
node = %self.config.label,
123+
nsid = %nsid,
124+
height = %hdr.height(),
125+
err = %err,
126+
"could not deserialize block"
127+
);
128+
None
129+
}
130+
}
131+
}))
82132
}
83133

84134
async fn transactions<H, N>(&self, height: H, nsid: N) -> Result<TransactionsWithProof, Error>
@@ -116,7 +166,7 @@ impl Client {
116166
match self.get(url.clone()).await {
117167
Ok(a) => return Ok(a),
118168
Err(err) => {
119-
warn!(%url, %err, "failed to get response");
169+
warn!(node = %self.config.label, %url, %err, "failed to get response");
120170
sleep(delay.next().expect("infinite delay sequence")).await;
121171
}
122172
}
@@ -133,7 +183,7 @@ impl Client {
133183
match self.post(url.clone(), a).await {
134184
Ok(b) => return Ok(b),
135185
Err(err) => {
136-
warn!(%url, %err, "failed to post request");
186+
warn!(node = %self.config.label, %url, %err, "failed to post request");
137187
sleep(delay.next().expect("infinite delay sequence")).await;
138188
}
139189
}
@@ -168,13 +218,6 @@ impl Client {
168218
}
169219
}
170220

171-
fn matches(a: &[u8], b: &CertifiedBlock) -> bool {
172-
let Ok(a) = deserialize::<CertifiedBlock>(a) else {
173-
return false;
174-
};
175-
a.data().hash() == b.data().hash() && a.data().hash() == b.cert().data().hash()
176-
}
177-
178221
/// Errors `Client` can not recover from.
179222
#[derive(Debug, thiserror::Error)]
180223
pub enum Error {
@@ -235,7 +278,7 @@ fn deserialize<T: DeserializeOwned>(d: &[u8]) -> Result<T, Error> {
235278

236279
#[cfg(test)]
237280
mod tests {
238-
use super::{Client, Config};
281+
use super::{Client, Config, Watcher};
239282

240283
#[tokio::test]
241284
async fn decaf_smoke() {
@@ -244,15 +287,23 @@ mod tests {
244287
.try_init();
245288

246289
let cfg = Config::builder()
247-
.base_url("https://query.decaf.testnet.espresso.network/v1/")
248-
.unwrap()
249-
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
250-
.unwrap()
290+
.base_url(
291+
"https://query.decaf.testnet.espresso.network/v1/"
292+
.parse()
293+
.unwrap(),
294+
)
295+
.wss_base_url(
296+
"wss://query.decaf.testnet.espresso.network/v1/"
297+
.parse()
298+
.unwrap(),
299+
)
300+
.label("decaf_smoke")
251301
.build();
252302

253-
let mut clt = Client::new(cfg.clone());
303+
let clt = Client::new(cfg.clone());
254304
let height = clt.height().await.unwrap();
255-
let header = super::watch(&cfg, height, None).await.unwrap();
305+
let mut watcher = Watcher::new(cfg, height, None);
306+
let header = watcher.next().await;
256307
assert_eq!(u64::from(height), header.height());
257308
}
258309
}

0 commit comments

Comments
 (0)