Skip to content

Commit 476d906

Browse files
committed
use select in watcher main loop
1 parent ba20206 commit 476d906

File tree

1 file changed

+28
-19
lines changed

1 file changed

+28
-19
lines changed

crates/watcher/src/lib.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -214,29 +214,40 @@ where
214214
.await
215215
.expect("channel is open in this context");
216216

217-
tokio::spawn(watcher.run());
217+
tokio::spawn(async move { watcher.run().await });
218218

219219
(rx, handle)
220220
}
221221

222222
/// Main execution loop for the [`L1Watcher`].
223-
pub async fn run(mut self) {
223+
pub async fn run(&mut self) {
224224
loop {
225-
// Poll for commands first (non-blocking check)
226-
match self.command_rx.try_recv() {
227-
Ok(command) => {
228-
if let Err(err) = self.handle_command(command).await {
229-
tracing::error!(target: "scroll::watcher", ?err, "error handling command");
225+
// Determine sleep duration based on sync state
226+
let sleep_duration = if self.is_synced {
227+
SLOW_SYNC_INTERVAL
228+
} else {
229+
Duration::ZERO
230+
};
231+
232+
// Select between receiving commands and sleeping
233+
select! {
234+
result = self.command_rx.recv() => {
235+
match result {
236+
Some(command) => {
237+
if let Err(err) = self.handle_command(command).await {
238+
tracing::error!(target: "scroll::watcher", ?err, "error handling command");
239+
}
240+
// Continue to process commands without stepping, in case there are more
241+
continue;
242+
}
243+
None => {
244+
tracing::warn!(target: "scroll::watcher", "command channel closed, stopping the watcher");
245+
break;
246+
}
230247
}
231-
// Continue to process commands without stepping, in case there are more
232-
continue;
233248
}
234-
Err(mpsc::error::TryRecvError::Empty) => {
235-
// No commands, proceed with normal operation
236-
}
237-
Err(mpsc::error::TryRecvError::Disconnected) => {
238-
tracing::warn!(target: "scroll::watcher", "command channel closed, stopping the watcher");
239-
break;
249+
_ = tokio::time::sleep(sleep_duration) => {
250+
// Sleep completed, proceed to step
240251
}
241252
}
242253

@@ -250,10 +261,8 @@ where
250261
break;
251262
}
252263

253-
// sleep if we are synced.
254-
if self.is_synced {
255-
tokio::time::sleep(SLOW_SYNC_INTERVAL).await;
256-
} else if self.current_block_number == self.l1_state.head {
264+
// Check if we just synced to the head
265+
if !self.is_synced && self.current_block_number == self.l1_state.head {
257266
// if we have synced to the head of the L1, notify the channel and set the
258267
// `is_synced`` flag.
259268
if let Err(L1WatcherError::SendError(_)) = self.notify(L1Notification::Synced).await

0 commit comments

Comments
 (0)