Skip to content

Commit 08b1018

Browse files
committed
Remove verified block numbers.
Also acquire the initial height in the verify task itself and optimistically check first, if a block has already been verified.
1 parent 89af6a6 commit 08b1018

File tree

3 files changed

+36
-28
lines changed

3 files changed

+36
-28
lines changed

robusta/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const NUM_DELAYS: usize = 5;
88
#[derive(Debug, Clone, Builder)]
99
pub struct Config {
1010
/// Log label.
11+
#[builder(into)]
1112
pub(crate) label: String,
1213

1314
/// Espresso network base URL.

robusta/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ mod tests {
292292
.unwrap()
293293
.wss_base_url("wss://query.decaf.testnet.espresso.network/v1/")
294294
.unwrap()
295-
.label(String::new())
295+
.label("decaf_smoke")
296296
.build();
297297

298298
let clt = Client::new(cfg.clone());

timeboost-builder/src/submit.rs

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ use std::{cmp::min, collections::BTreeSet, sync::Arc, time::Duration};
22

33
use futures::stream::StreamExt;
44
use multisig::{Committee, PublicKey, Validated};
5-
use parking_lot::RwLock;
6-
use robusta::{Height, espresso_types::NamespaceId};
5+
use parking_lot::Mutex;
6+
use robusta::espresso_types::NamespaceId;
77
use timeboost_types::{
88
BlockNumber, CertifiedBlock,
99
sailfish::{CommitteeVec, Empty},
1010
};
1111
use tokio::{
1212
spawn,
13-
sync::{Mutex, OwnedSemaphorePermit, Semaphore},
13+
sync::{Mutex as AsyncMutex, OwnedSemaphorePermit, Semaphore},
1414
task::JoinHandle,
1515
time::{Instant, error::Elapsed, sleep, timeout},
1616
};
@@ -27,7 +27,7 @@ pub struct Submitter {
2727
verify_task: JoinHandle<Empty>,
2828
submitters: TaskTracker,
2929
handler: Handler,
30-
committees: Arc<Mutex<CommitteeVec<2>>>,
30+
committees: Arc<AsyncMutex<CommitteeVec<2>>>,
3131
task_permits: Arc<Semaphore>,
3232
}
3333

@@ -40,8 +40,8 @@ impl Drop for Submitter {
4040
impl Submitter {
4141
pub async fn create(cfg: SubmitterConfig) -> Self {
4242
let client = robusta::Client::new(cfg.robusta.clone());
43-
let verified = Arc::new(RwLock::new(BTreeSet::new()));
44-
let committees = Arc::new(Mutex::new(CommitteeVec::new(cfg.committee.clone())));
43+
let verified = Arc::new(Mutex::new(BTreeSet::new()));
44+
let committees = Arc::new(AsyncMutex::new(CommitteeVec::new(cfg.committee.clone())));
4545
let handler = Handler {
4646
label: cfg.pubkey,
4747
nsid: cfg.namespace,
@@ -55,21 +55,13 @@ impl Submitter {
5555
client: client.clone(),
5656
verified,
5757
};
58-
let mut delays = cfg.robusta.delay_iter();
59-
loop {
60-
let Ok(height) = client.height().await else {
61-
let d = delays.next().expect("delay iterator repeats");
62-
sleep(d).await;
63-
continue;
64-
};
65-
return Submitter {
66-
handler,
67-
config: cfg,
68-
verify_task: spawn(verifier.verify(height)),
69-
submitters: TaskTracker::new(),
70-
committees,
71-
task_permits: Arc::new(Semaphore::new(MAX_TASKS)),
72-
};
58+
Submitter {
59+
handler,
60+
config: cfg,
61+
verify_task: spawn(verifier.verify()),
62+
submitters: TaskTracker::new(),
63+
committees,
64+
task_permits: Arc::new(Semaphore::new(MAX_TASKS)),
7365
}
7466
}
7567

@@ -105,12 +97,20 @@ struct Verifier {
10597
label: PublicKey,
10698
nsid: NamespaceId,
10799
client: robusta::Client,
108-
committees: Arc<Mutex<CommitteeVec<2>>>,
109-
verified: Arc<RwLock<BTreeSet<BlockNumber>>>,
100+
committees: Arc<AsyncMutex<CommitteeVec<2>>>,
101+
verified: Arc<Mutex<BTreeSet<BlockNumber>>>,
110102
}
111103

112104
impl Verifier {
113-
async fn verify(self, mut height: Height) -> Empty {
105+
async fn verify(self) -> Empty {
106+
let mut delays = self.client.config().delay_iter();
107+
let mut height = loop {
108+
if let Ok(h) = self.client.height().await {
109+
break h;
110+
};
111+
let d = delays.next().expect("delay iterator repeats");
112+
sleep(d).await;
113+
};
114114
loop {
115115
let mut headers;
116116
loop {
@@ -122,7 +122,7 @@ impl Verifier {
122122
while let Some(h) = headers.next().await {
123123
let committees = self.committees.lock().await;
124124
let numbers = self.client.verified(self.nsid, &h, &committees).await;
125-
let mut set = self.verified.write();
125+
let mut set = self.verified.lock();
126126
for n in numbers {
127127
debug!(node = %self.label, num = %n, "verified");
128128
if set.len() == CACHE_SIZE {
@@ -141,7 +141,7 @@ struct Handler {
141141
label: PublicKey,
142142
nsid: NamespaceId,
143143
client: robusta::Client,
144-
verified: Arc<RwLock<BTreeSet<BlockNumber>>>,
144+
verified: Arc<Mutex<BTreeSet<BlockNumber>>>,
145145
}
146146

147147
impl Handler {
@@ -153,6 +153,13 @@ impl Handler {
153153
}
154154

155155
let num = cb.cert().data().num();
156+
157+
// Maybe the block has already been verified?
158+
if self.verified.lock().remove(&num) {
159+
debug!(node = %self.label, %num, "block submission verified");
160+
return;
161+
}
162+
156163
let max_delay = Duration::from_secs(30);
157164
let mut state = State::Submit(false);
158165

@@ -175,7 +182,7 @@ impl Handler {
175182
state = State::Verify(delay.saturating_sub(d))
176183
}
177184
State::Verify(delay) => {
178-
if self.verified.read().contains(&cb.cert().data().num()) {
185+
if self.verified.lock().remove(&num) {
179186
debug!(node = %self.label, %num, "block submission verified");
180187
return;
181188
} else {

0 commit comments

Comments
 (0)