Skip to content

Commit 89af6a6

Browse files
committed
Limit number of tasks.
1 parent 7b6e213 commit 89af6a6

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

timeboost-builder/src/submit.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use timeboost_types::{
1010
};
1111
use tokio::{
1212
spawn,
13-
sync::Mutex,
13+
sync::{Mutex, OwnedSemaphorePermit, Semaphore},
1414
task::JoinHandle,
1515
time::{Instant, error::Elapsed, sleep, timeout},
1616
};
@@ -19,14 +19,16 @@ use tracing::{debug, warn};
1919

2020
use crate::config::SubmitterConfig;
2121

22-
const CACHE_SIZE: usize = 10_000;
22+
const CACHE_SIZE: usize = 15_000;
23+
const MAX_TASKS: usize = 1000;
2324

2425
pub struct Submitter {
2526
config: SubmitterConfig,
2627
verify_task: JoinHandle<Empty>,
2728
submitters: TaskTracker,
2829
handler: Handler,
2930
committees: Arc<Mutex<CommitteeVec<2>>>,
31+
task_permits: Arc<Semaphore>,
3032
}
3133

3234
impl Drop for Submitter {
@@ -66,6 +68,7 @@ impl Submitter {
6668
verify_task: spawn(verifier.verify(height)),
6769
submitters: TaskTracker::new(),
6870
committees,
71+
task_permits: Arc::new(Semaphore::new(MAX_TASKS)),
6972
};
7073
}
7174
}
@@ -78,9 +81,18 @@ impl Submitter {
7881
self.committees.lock().await.add(c);
7982
}
8083

81-
pub fn submit(&mut self, cb: CertifiedBlock<Validated>) {
82-
debug!(node = %self.public_key(), num = %cb.cert().data().num(), "creating block handler");
83-
self.submitters.spawn(self.handler.clone().handle(cb));
84+
pub async fn submit(&mut self, cb: CertifiedBlock<Validated>) {
85+
let Ok(permit) = Semaphore::acquire_owned(self.task_permits.clone()).await else {
86+
return;
87+
};
88+
debug!(
89+
node = %self.public_key(),
90+
num = %cb.cert().data().num(),
91+
tasks = %self.submitters.len(),
92+
"creating block handler"
93+
);
94+
self.submitters
95+
.spawn(self.handler.clone().handle(permit, cb));
8496
}
8597

8698
pub async fn join(self) {
@@ -133,7 +145,7 @@ struct Handler {
133145
}
134146

135147
impl Handler {
136-
async fn handle(mut self, cb: CertifiedBlock<Validated>) {
148+
async fn handle(mut self, _: OwnedSemaphorePermit, cb: CertifiedBlock<Validated>) {
137149
enum State {
138150
Submit(bool),
139151
Wait(Duration),
@@ -281,7 +293,7 @@ mod tests {
281293

282294
tasks.spawn(async move {
283295
for _ in 0..NODES {
284-
s.submit(g.next());
296+
s.submit(g.next()).await;
285297
}
286298
s.join().await
287299
});

timeboost/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl Timeboost {
132132
blk = self.certifier.next_block() => match blk {
133133
Ok(b) => {
134134
info!(node = %self.label, block = %b.data().round(), "certified block");
135-
self.submitter.submit(b)
135+
self.submitter.submit(b).await
136136
}
137137
Err(e) => {
138138
let e: CertifierDown = e;

0 commit comments

Comments
 (0)