Skip to content

Commit ecea201

Browse files
committed
Make submit a method of Submitter.
1 parent 4f900fc commit ecea201

File tree

3 files changed

+150
-158
lines changed

3 files changed

+150
-158
lines changed

timeboost-builder/src/lib.rs

Lines changed: 0 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -7,155 +7,3 @@ pub use config::{CertifierConfig, CertifierConfigBuilder};
77
pub use config::{SubmitterConfig, SubmitterConfigBuilder};
88
pub use robusta;
99
pub use submit::Submitter;
10-
11-
use std::time::Duration;
12-
13-
use robusta::Height;
14-
use timeboost_types::CertifiedBlock;
15-
use tokio::time::error::Elapsed;
16-
use tokio::time::timeout;
17-
use tracing::debug;
18-
19-
pub async fn submit(s: &mut Submitter<Height>, cb: CertifiedBlock) {
20-
enum State {
21-
Submit(bool),
22-
Verify,
23-
}
24-
25-
let delay = Duration::from_secs(30);
26-
let mut state = State::Submit(false);
27-
28-
loop {
29-
match state {
30-
State::Submit(force) => match timeout(delay, s.submit(&cb, force)).await {
31-
Ok(()) => state = State::Verify,
32-
Err(e) => {
33-
debug!(
34-
node = %s.public_key(),
35-
num = %cb.cert().data().num(),
36-
"block submission timeout"
37-
);
38-
let _: Elapsed = e;
39-
state = State::Submit(true)
40-
}
41-
},
42-
State::Verify => match timeout(delay, s.verify(&cb)).await {
43-
Ok(Ok(())) => {
44-
debug!(
45-
node = %s.public_key(),
46-
num = %cb.cert().data().num(),
47-
"block submission verified"
48-
);
49-
return;
50-
}
51-
Ok(Err(())) => {
52-
debug!(
53-
node = %s.public_key(),
54-
num = %cb.cert().data().num(),
55-
"block submission verification failed"
56-
);
57-
state = State::Submit(true)
58-
}
59-
Err(e) => {
60-
debug!(
61-
node = %s.public_key(),
62-
num = %cb.cert().data().num(),
63-
"block submission verification timeout"
64-
);
65-
let _: Elapsed = e;
66-
state = State::Submit(true)
67-
}
68-
},
69-
}
70-
}
71-
}
72-
73-
#[cfg(test)]
74-
mod tests {
75-
use bytes::Bytes;
76-
use multisig::{Committee, Keypair, PublicKey, Signed, VoteAccumulator};
77-
use timeboost_types::{Block, BlockHash, BlockInfo, BlockNumber, NamespaceId, sailfish::Round};
78-
use tokio::{task::JoinSet, time::sleep};
79-
80-
use super::*;
81-
82-
struct BlockGen {
83-
p: PublicKey,
84-
n: NamespaceId,
85-
r: Round,
86-
i: BlockNumber,
87-
k: Vec<Keypair>,
88-
c: Committee,
89-
}
90-
91-
impl BlockGen {
92-
fn next(&mut self) -> CertifiedBlock {
93-
let i = BlockInfo::new(self.i, self.r, BlockHash::default());
94-
self.i = self.i + 1;
95-
self.r.set_num(self.r.num() + 1);
96-
let mut a = VoteAccumulator::new(self.c.clone());
97-
for k in &self.k {
98-
if a.add(Signed::new(i.clone(), k)).unwrap().is_some() {
99-
break;
100-
}
101-
}
102-
let b = Block::new(self.n, i.round().num(), *i.hash(), Bytes::new());
103-
let l = self.c.leader(*i.round().num() as usize) == self.p;
104-
CertifiedBlock::new(a.certificate().cloned().unwrap(), b, l)
105-
}
106-
}
107-
108-
#[tokio::test]
109-
async fn submit_random_block() {
110-
const NODES: usize = 5;
111-
112-
let _ = tracing_subscriber::fmt()
113-
.with_env_filter("timeboost_builder=debug,robusta=debug")
114-
.try_init();
115-
116-
let keys: Vec<Keypair> = (0..NODES).map(|_| Keypair::generate()).collect();
117-
118-
let committee = Committee::new(
119-
0,
120-
keys.iter()
121-
.enumerate()
122-
.map(|(i, k)| (i as u8, k.public_key())),
123-
);
124-
125-
let rcfg = robusta::Config::builder()
126-
.base_url("https://query.decaf.testnet.espresso.network/v1/")
127-
.unwrap()
128-
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
129-
.unwrap()
130-
.build();
131-
132-
let mut tasks = JoinSet::new();
133-
134-
for k in &keys {
135-
let mut g = BlockGen {
136-
p: k.public_key(),
137-
n: NamespaceId::from(10_101),
138-
r: Round::new(1, 0),
139-
i: BlockNumber::from(1),
140-
k: keys.clone(),
141-
c: committee.clone(),
142-
};
143-
144-
let scfg = SubmitterConfig::builder()
145-
.pubkey(k.public_key())
146-
.robusta(rcfg.clone())
147-
.build();
148-
149-
let mut s = Submitter::new(scfg).init().await;
150-
151-
tasks.spawn(async move {
152-
for _ in 0..3 {
153-
submit(&mut s, g.next()).await;
154-
sleep(Duration::from_secs(rand::random_range(0..5))).await
155-
}
156-
});
157-
}
158-
159-
tasks.join_all().await;
160-
}
161-
}

timeboost-builder/src/submit.rs

Lines changed: 149 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{iter::repeat, time::Duration};
33
use multisig::PublicKey;
44
use robusta::{Error, Height, espresso_types::NamespaceId};
55
use timeboost_types::CertifiedBlock;
6-
use tokio::time::sleep;
6+
use tokio::time::{error::Elapsed, sleep, timeout};
77
use tracing::{debug, warn};
88

99
use crate::config::SubmitterConfig;
@@ -48,7 +48,61 @@ impl<H> Submitter<H> {
4848
}
4949

5050
impl Submitter<Height> {
51-
pub async fn submit(&mut self, cb: &CertifiedBlock, force: bool) {
51+
pub async fn submit(&mut self, cb: CertifiedBlock) {
52+
enum State {
53+
Submit(bool),
54+
Verify,
55+
}
56+
57+
let delay = Duration::from_secs(30);
58+
let mut state = State::Submit(false);
59+
60+
loop {
61+
match state {
62+
State::Submit(force) => match timeout(delay, self.submit_block(&cb, force)).await {
63+
Ok(()) => state = State::Verify,
64+
Err(e) => {
65+
debug!(
66+
node = %self.public_key(),
67+
num = %cb.cert().data().num(),
68+
"block submission timeout"
69+
);
70+
let _: Elapsed = e;
71+
state = State::Submit(true)
72+
}
73+
},
74+
State::Verify => match timeout(delay, self.verify_inclusion(&cb)).await {
75+
Ok(Ok(())) => {
76+
debug!(
77+
node = %self.public_key(),
78+
num = %cb.cert().data().num(),
79+
"block submission verified"
80+
);
81+
return;
82+
}
83+
Ok(Err(())) => {
84+
debug!(
85+
node = %self.public_key(),
86+
num = %cb.cert().data().num(),
87+
"block submission verification failed"
88+
);
89+
state = State::Submit(true)
90+
}
91+
Err(e) => {
92+
debug!(
93+
node = %self.public_key(),
94+
num = %cb.cert().data().num(),
95+
"block submission verification timeout"
96+
);
97+
let _: Elapsed = e;
98+
state = State::Submit(true)
99+
}
100+
},
101+
}
102+
}
103+
}
104+
105+
pub async fn submit_block(&mut self, cb: &CertifiedBlock, force: bool) {
52106
if !(cb.is_leader() || force) {
53107
return;
54108
}
@@ -68,12 +122,12 @@ impl Submitter<Height> {
68122
}
69123
}
70124

71-
pub async fn verify(&mut self, cb: &CertifiedBlock) -> Result<(), ()> {
125+
pub async fn verify_inclusion(&mut self, cb: &CertifiedBlock) -> Result<(), ()> {
72126
debug!(
73127
node = %self.public_key(),
74128
num = %cb.cert().data().num(),
75129
round = %cb.cert().data().round(),
76-
"verifying block submission"
130+
"verifying block inclusion"
77131
);
78132
let nsid = NamespaceId::from(u64::from(u32::from(cb.data().namespace())));
79133
let mut delays = delay_iter();
@@ -98,7 +152,7 @@ impl Submitter<Height> {
98152
return Err(());
99153
}
100154
Err(err) => {
101-
warn!(node = %self.config.pubkey, %err, "error during validation");
155+
warn!(node = %self.config.pubkey, %err, "error during verification");
102156
let d = delays.next().expect("delay iterator repeats");
103157
sleep(d).await
104158
}
@@ -113,3 +167,93 @@ fn delay_iter() -> impl Iterator<Item = Duration> {
113167
.chain(repeat(5))
114168
.map(Duration::from_secs)
115169
}
170+
171+
#[cfg(test)]
172+
mod tests {
173+
use bytes::Bytes;
174+
use multisig::{Committee, Keypair, PublicKey, Signed, VoteAccumulator};
175+
use timeboost_types::{Block, BlockHash, BlockInfo, BlockNumber, NamespaceId, sailfish::Round};
176+
use tokio::{task::JoinSet, time::sleep};
177+
178+
use super::*;
179+
180+
struct BlockGen {
181+
p: PublicKey,
182+
n: NamespaceId,
183+
r: Round,
184+
i: BlockNumber,
185+
k: Vec<Keypair>,
186+
c: Committee,
187+
}
188+
189+
impl BlockGen {
190+
fn next(&mut self) -> CertifiedBlock {
191+
let i = BlockInfo::new(self.i, self.r, BlockHash::default());
192+
self.i = self.i + 1;
193+
self.r.set_num(self.r.num() + 1);
194+
let mut a = VoteAccumulator::new(self.c.clone());
195+
for k in &self.k {
196+
if a.add(Signed::new(i.clone(), k)).unwrap().is_some() {
197+
break;
198+
}
199+
}
200+
let b = Block::new(self.n, i.round().num(), *i.hash(), Bytes::new());
201+
let l = self.c.leader(*i.round().num() as usize) == self.p;
202+
CertifiedBlock::new(a.certificate().cloned().unwrap(), b, l)
203+
}
204+
}
205+
206+
#[tokio::test]
207+
async fn submit_random_block() {
208+
const NODES: usize = 5;
209+
210+
let _ = tracing_subscriber::fmt()
211+
.with_env_filter("timeboost_builder=debug,robusta=debug")
212+
.try_init();
213+
214+
let keys: Vec<Keypair> = (0..NODES).map(|_| Keypair::generate()).collect();
215+
216+
let committee = Committee::new(
217+
0,
218+
keys.iter()
219+
.enumerate()
220+
.map(|(i, k)| (i as u8, k.public_key())),
221+
);
222+
223+
let rcfg = robusta::Config::builder()
224+
.base_url("https://query.decaf.testnet.espresso.network/v1/")
225+
.unwrap()
226+
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
227+
.unwrap()
228+
.build();
229+
230+
let mut tasks = JoinSet::new();
231+
232+
for k in &keys {
233+
let mut g = BlockGen {
234+
p: k.public_key(),
235+
n: NamespaceId::from(10_101),
236+
r: Round::new(1, 0),
237+
i: BlockNumber::from(1),
238+
k: keys.clone(),
239+
c: committee.clone(),
240+
};
241+
242+
let scfg = SubmitterConfig::builder()
243+
.pubkey(k.public_key())
244+
.robusta(rcfg.clone())
245+
.build();
246+
247+
let mut s = Submitter::new(scfg).init().await;
248+
249+
tasks.spawn(async move {
250+
for _ in 0..3 {
251+
s.submit(g.next()).await;
252+
sleep(Duration::from_secs(rand::random_range(0..5))).await
253+
}
254+
});
255+
}
256+
257+
tasks.join_all().await;
258+
}
259+
}

timeboost/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl Timeboost {
9595
internal_api: spawn(internal_api),
9696
submitter_task: spawn(async move {
9797
while let Some(cb) = submit_rx.recv().await {
98-
timeboost_builder::submit(&mut submitter, cb).await
98+
submitter.submit(cb).await
9999
}
100100
}),
101101
submit_queue: submit_tx,

0 commit comments

Comments
 (0)