Skip to content

Commit 9642498

Browse files
frisitanogreged93
andauthored
introduce watcher back pressure in the rollup node manager (#278)
Co-authored-by: greg <[email protected]>
1 parent 892d276 commit 9642498

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ impl<
125125
})
126126
}
127127

128+
/// Returns the number of pending futures.
129+
pub fn pending_futures_len(&self) -> usize {
130+
self.pending_futures.len()
131+
}
132+
128133
/// Wraps a pending chain orchestrator future, metering the completion of it.
129134
pub fn handle_metered(
130135
&mut self,

crates/engine/src/driver.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ where
8888
}
8989
}
9090

91+
/// Returns the number of pending futures in the queue.
92+
///
93+
/// This only considers the length of the L1 payload attributes and chain import queues.
94+
pub fn pending_futures_len(&self) -> usize {
95+
self.l1_payload_attributes.len() + self.chain_imports.len()
96+
}
97+
9198
/// Sets the finalized block info.
9299
pub fn set_finalized_block_info(&mut self, block_info: BlockInfo) {
93100
self.fcs.update_finalized_block_info(block_info);

crates/manager/src/manager/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ pub use handle::RollupManagerHandle;
5959
/// The size of the event channel.
6060
const EVENT_CHANNEL_SIZE: usize = 100;
6161

62+
/// The maximum capacity of the pending futures queue in the chain orchestrator for acceptance of
63+
/// new events from the L1 notification channel.
64+
const CHAIN_ORCHESTRATOR_MAX_PENDING_FUTURES: usize = 5000;
65+
66+
/// The maximum number of pending futures in the engine driver for acceptance of new events from the
67+
/// L1 notification channel.
68+
const ENGINE_MAX_PENDING_FUTURES: usize = 5000;
69+
6270
/// The main manager for the rollup node.
6371
///
6472
/// This is an endless [`Future`] that drives the state of the entire network forward and includes
@@ -444,6 +452,14 @@ where
444452

445453
drop(graceful_guard);
446454
}
455+
456+
/// Returns true if the manager has capacity to accept new L1 notifications.
457+
pub fn has_capacity_for_l1_notifications(&self) -> bool {
458+
let chain_orchestrator_has_capacity = self.chain.pending_futures_len() <
459+
CHAIN_ORCHESTRATOR_MAX_PENDING_FUTURES - L1_NOTIFICATION_CHANNEL_BUDGET as usize;
460+
let engine_has_capacity = self.engine.pending_futures_len() < ENGINE_MAX_PENDING_FUTURES;
461+
chain_orchestrator_has_capacity && engine_has_capacity
462+
}
447463
}
448464

449465
impl<N, EC, P, L1P, L1MP, CS> Future for RollupNodeManager<N, EC, P, L1P, L1MP, CS>
@@ -516,7 +532,7 @@ where
516532

517533
let mut maybe_more_l1_rx_events = false;
518534
proceed_if!(
519-
en_synced,
535+
en_synced && this.has_capacity_for_l1_notifications(),
520536
maybe_more_l1_rx_events = poll_nested_stream_with_budget!(
521537
"l1_notification_rx",
522538
"L1Notification channel",

0 commit comments

Comments
 (0)