Skip to content

Commit 0ea4ef7

Browse files
committed
make sure that there's no deadlock with command receiver as L1Watcher blocks if the send channel is full
1 parent b509314 commit 0ea4ef7

File tree

1 file changed

+27
-8
lines changed

1 file changed

+27
-8
lines changed

crates/watcher/src/lib.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::{
3030
sync::Arc,
3131
time::Duration,
3232
};
33-
use tokio::sync::mpsc;
33+
use tokio::{select, sync::mpsc};
3434

3535
/// The maximum count of unfinalized blocks we can have in Ethereum.
3636
pub const MAX_UNFINALIZED_BLOCK_COUNT: usize = 96;
@@ -191,7 +191,7 @@ where
191191
};
192192

193193
// init the watcher.
194-
let watcher = Self {
194+
let mut watcher = Self {
195195
execution_provider,
196196
unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY),
197197
current_block_number: start_block.unwrap_or(config.start_l1_block).saturating_sub(1),
@@ -657,7 +657,7 @@ where
657657
}
658658

659659
/// Send all notifications on the channel.
660-
async fn notify_all(&self, notifications: Vec<L1Notification>) -> L1WatcherResult<()> {
660+
async fn notify_all(&mut self, notifications: Vec<L1Notification>) -> L1WatcherResult<()> {
661661
for notification in notifications {
662662
self.metrics.process_l1_notification(&notification);
663663
tracing::trace!(target: "scroll::watcher", %notification, "sending l1 notification");
@@ -667,11 +667,30 @@ where
667667
}
668668

669669
/// Send the notification in the channel.
670-
async fn notify(&self, notification: L1Notification) -> L1WatcherResult<()> {
671-
// TODO: make sure that this is not blocking if the channel is full.
672-
Ok(self.sender.send(Arc::new(notification)).await.inspect_err(
673-
|err| tracing::error!(target: "scroll::watcher", ?err, "failed to send notification"),
674-
)?)
670+
async fn notify(&mut self, notification: L1Notification) -> L1WatcherResult<()> {
671+
select! {
672+
biased;
673+
674+
Some(command) = self.command_rx.recv() => {
675+
// If a command is received while trying to send a notification,
676+
// we prioritize handling the command first.
677+
// This prevents potential deadlocks if the channel is full.
678+
tracing::trace!(target: "scroll::watcher", "command received while sending notification, prioritizing command handling");
679+
680+
if let Err(err) = self.handle_command(command).await {
681+
tracing::error!(target: "scroll::watcher", ?err, "error handling command");
682+
}
683+
684+
return Ok(());
685+
}
686+
result = self.sender.send(Arc::new(notification)) => {
687+
result.inspect_err(
688+
|err| tracing::error!(target: "scroll::watcher", ?err, "failed to send notification"),
689+
)?;
690+
}
691+
}
692+
693+
Ok(())
675694
}
676695

677696
/// Updates the current block number, saturating at the head of the chain.

0 commit comments

Comments
 (0)