Skip to content

Commit 526cc3c

Browse files
authored
feat: add budget to L1 notification channel (#265)
* add budget to L1 notification channel * add budget.rs
1 parent aa1e977 commit 526cc3c

File tree

2 files changed

+47
-6
lines changed

2 files changed

+47
-6
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/// The budget for the L1 notification channel.
2+
pub(crate) const L1_NOTIFICATION_CHANNEL_BUDGET: u32 = 10;
3+
4+
/// Polls the given stream. Breaks with `true` if there maybe is more work.
5+
#[macro_export]
6+
macro_rules! poll_nested_stream_with_budget {
7+
($target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
8+
let mut budget: u32 = $budget;
9+
10+
loop {
11+
match $poll_stream {
12+
Poll::Ready(Some(item)) => {
13+
$on_ready_some(item);
14+
15+
budget -= 1;
16+
if budget == 0 {
17+
break true
18+
}
19+
}
20+
Poll::Ready(None) => {
21+
$($on_ready_none;)? // todo: handle error case with $target and $label
22+
break false
23+
}
24+
Poll::Pending => break false,
25+
}
26+
}
27+
}};
28+
}

crates/manager/src/manager/mod.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//! responsible for handling events from these components and coordinating their actions.
44
55
use super::Consensus;
6+
use crate::poll_nested_stream_with_budget;
67
use alloy_primitives::Signature;
78
use alloy_provider::Provider;
89
use futures::StreamExt;
@@ -41,6 +42,9 @@ use rollup_node_providers::{L1MessageProvider, L1Provider};
4142
use scroll_db::Database;
4243
use scroll_derivation_pipeline::DerivationPipeline;
4344

45+
mod budget;
46+
use budget::L1_NOTIFICATION_CHANNEL_BUDGET;
47+
4448
mod command;
4549
pub use command::RollupManagerCommand;
4650

@@ -473,14 +477,19 @@ where
473477
}
474478
);
475479

480+
let mut maybe_more_l1_rx_events = false;
476481
proceed_if!(
477482
en_synced,
478-
// Drain all L1 notifications.
479-
while let Some(Poll::Ready(Some(event))) =
480-
this.l1_notification_rx.as_mut().map(|rx| rx.poll_next_unpin(cx))
481-
{
482-
this.handle_l1_notification((*event).clone());
483-
}
483+
maybe_more_l1_rx_events = poll_nested_stream_with_budget!(
484+
"l1_notification_rx",
485+
"L1Notification channel",
486+
L1_NOTIFICATION_CHANNEL_BUDGET,
487+
this.l1_notification_rx
488+
.as_mut()
489+
.map(|rx| rx.poll_next_unpin(cx))
490+
.unwrap_or(Poll::Ready(None)),
491+
|event: Arc<L1Notification>| this.handle_l1_notification((*event).clone()),
492+
)
484493
);
485494

486495
// Drain all Indexer events.
@@ -551,6 +560,10 @@ where
551560
this.handle_network_manager_event(event);
552561
}
553562

563+
if maybe_more_l1_rx_events {
564+
cx.waker().wake_by_ref();
565+
}
566+
554567
Poll::Pending
555568
}
556569
}

0 commit comments

Comments
 (0)