diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index be3d48ac..74869662 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -327,7 +327,18 @@ impl ScrollRollupNodeConfig { let (l1_notification_tx, l1_notification_rx): (Option>>, _) = if let Some(provider) = l1_provider.filter(|_| !self.test) { tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher"); - (None, Some(L1Watcher::spawn(provider, l1_start_block_number, node_config).await)) + ( + None, + Some( + L1Watcher::spawn( + provider, + l1_start_block_number, + node_config, + self.l1_provider_args.logs_query_block_range, + ) + .await, + ), + ) } else { // Create a channel for L1 notifications that we can use to inject L1 messages for // testing @@ -604,6 +615,9 @@ pub struct L1ProviderArgs { /// The initial backoff for the provider. #[arg(long = "l1.initial-backoff", id = "l1_initial_backoff", value_name = "L1_INITIAL_BACKOFF", default_value_t = constants::L1_PROVIDER_INITIAL_BACKOFF)] pub initial_backoff: u64, + /// The logs query block range. + #[arg(long = "l1.query-range", id = "l1_query_range", value_name = "L1_QUERY_RANGE", default_value_t = constants::LOGS_QUERY_BLOCK_RANGE)] + pub logs_query_block_range: u64, } /// The arguments for the Beacon provider. diff --git a/crates/node/src/constants.rs b/crates/node/src/constants.rs index 9c470557..148c0b2c 100644 --- a/crates/node/src/constants.rs +++ b/crates/node/src/constants.rs @@ -8,6 +8,9 @@ pub(crate) const L1_PROVIDER_MAX_RETRIES: u32 = 10; /// The initial backoff for the L1 provider. pub(crate) const L1_PROVIDER_INITIAL_BACKOFF: u64 = 100; +/// The block range used to fetch L1 logs. +pub(crate) const LOGS_QUERY_BLOCK_RANGE: u64 = 500; + /// The max retries for the L2 provider. pub(crate) const L2_PROVIDER_MAX_RETRIES: u32 = u32::MAX; diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index c5d1efba..04d036e6 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -53,6 +53,7 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> { compute_units_per_second: 500, max_retries: 10, initial_backoff: 100, + logs_query_block_range: 500, }, engine_driver_args: EngineDriverArgs { sync_at_startup: false }, sequencer_args: SequencerArgs { diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 9d455daa..9471d738 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -29,8 +29,6 @@ use std::{ }; use tokio::sync::mpsc; -/// The block range used to fetch L1 logs. -pub const LOGS_QUERY_BLOCK_RANGE: u64 = 500; /// The maximum count of unfinalized blocks we can have in Ethereum. pub const MAX_UNFINALIZED_BLOCK_COUNT: usize = 96; @@ -84,6 +82,8 @@ pub struct L1Watcher { metrics: WatcherMetrics, /// Whether the watcher is synced to the L1 head. is_synced: bool, + /// The log query block range. + log_query_block_range: u64, } /// The L1 notification type yielded by the [`L1Watcher`]. @@ -158,10 +158,11 @@ where execution_provider: EP, start_block: Option, config: Arc, + log_query_block_range: u64, ) -> mpsc::Receiver> { tracing::trace!(target: "scroll::watcher", ?start_block, ?config, "spawning L1 watcher"); - let (tx, rx) = mpsc::channel(LOGS_QUERY_BLOCK_RANGE as usize); + let (tx, rx) = mpsc::channel(log_query_block_range as usize); let fetch_block_number = async |tag: BlockNumberOrTag| { let block = loop { @@ -192,6 +193,7 @@ where config, metrics: WatcherMetrics::default(), is_synced: false, + log_query_block_range, }; // notify at spawn. @@ -612,7 +614,7 @@ where async fn update_current_block(&mut self, latest: &Block) -> L1WatcherResult<()> { self.current_block_number = self .current_block_number - .saturating_add(LOGS_QUERY_BLOCK_RANGE) + .saturating_add(self.log_query_block_range) .min(latest.header.number); self.notify(L1Notification::Processed(self.current_block_number)).await } @@ -637,7 +639,8 @@ where /// Returns the next range of logs, for the block range in /// \[[`current_block`](field@L1Watcher::current_block_number); - /// [`current_block`](field@L1Watcher::current_block_number) + [`LOGS_QUERY_BLOCK_RANGE`]\]. + /// [`current_block`](field@L1Watcher::current_block_number) + + /// [`field@L1Watcher::log_query_block_range`]\]. async fn next_filtered_logs(&self, latest_block_number: u64) -> L1WatcherResult> { // set the block range for the query let address_book = &self.config.address_book; @@ -654,7 +657,7 @@ where ]); let to_block = self .current_block_number - .saturating_add(LOGS_QUERY_BLOCK_RANGE) + .saturating_add(self.log_query_block_range) .min(latest_block_number); // skip a block for `from_block` since `self.current_block_number` is the last indexed @@ -679,6 +682,8 @@ mod tests { use arbitrary::Arbitrary; use scroll_l1::abi::calls::commitBatchCall; + const LOG_QUERY_BLOCK_RANGE: u64 = 500; + // Returns a L1Watcher along with the receiver end of the L1Notifications. fn l1_watcher( unfinalized_blocks: Vec
, @@ -699,7 +704,7 @@ mod tests { vec![latest], ); - let (tx, rx) = mpsc::channel(LOGS_QUERY_BLOCK_RANGE as usize); + let (tx, rx) = mpsc::channel(LOG_QUERY_BLOCK_RANGE as usize); ( L1Watcher { execution_provider: provider, @@ -710,6 +715,7 @@ mod tests { config: Arc::new(NodeConfig::mainnet()), metrics: WatcherMetrics::default(), is_synced: false, + log_query_block_range: LOG_QUERY_BLOCK_RANGE, }, rx, ) diff --git a/crates/watcher/tests/indexing.rs b/crates/watcher/tests/indexing.rs index 1da4ff00..dc224a83 100644 --- a/crates/watcher/tests/indexing.rs +++ b/crates/watcher/tests/indexing.rs @@ -18,6 +18,7 @@ use tokio::select; async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> { const CHAIN_LEN: usize = 200; const HALF_CHAIN_LEN: usize = 100; + const LOGS_QUERY_BLOCK_RANGE: u64 = 500; // Given let (finalized, latest, headers) = chain(CHAIN_LEN); @@ -58,7 +59,8 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await; + let mut l1_watcher = + L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; let mut prev_block_number = 0; let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(2)); let _ = ticker.tick().await; diff --git a/crates/watcher/tests/logs.rs b/crates/watcher/tests/logs.rs index 962700b9..3a41ca05 100644 --- a/crates/watcher/tests/logs.rs +++ b/crates/watcher/tests/logs.rs @@ -20,6 +20,7 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { const CHAIN_LEN: usize = 200; const HALF_CHAIN_LEN: usize = CHAIN_LEN / 2; + const LOGS_QUERY_BLOCK_RANGE: u64 = 500; // Given let (finalized, _, headers) = chain(CHAIN_LEN); @@ -62,7 +63,8 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await; + let mut l1_watcher = + L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; let mut received_logs = Vec::new(); loop { let notification = l1_watcher.recv().await.map(|notif| (*notif).clone()); diff --git a/crates/watcher/tests/reorg.rs b/crates/watcher/tests/reorg.rs index 3fc6b2b9..fdb32c2f 100644 --- a/crates/watcher/tests/reorg.rs +++ b/crates/watcher/tests/reorg.rs @@ -10,6 +10,7 @@ use rollup_node_primitives::NodeConfig; use rollup_node_watcher::{ random, test_utils::provider::MockProvider, Block, L1Notification, L1Watcher, }; +const LOGS_QUERY_BLOCK_RANGE: u64 = 500; // Generate a set blocks that will be fed to the l1 watcher. // Every fork_cycle blocks, generates a small reorg. @@ -71,7 +72,8 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await; + let mut l1_watcher = + L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; // skip the first two events l1_watcher.recv().await.unwrap(); @@ -92,7 +94,7 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { } if latest_number == latest.header.number { - continue + continue; } let mut notification = l1_watcher.recv().await.unwrap(); @@ -172,7 +174,8 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await; + let mut l1_watcher = + L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; // skip the first two events l1_watcher.recv().await.unwrap(); @@ -193,7 +196,7 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { } if latest_number == latest.header.number { - continue + continue; } let mut notification = l1_watcher.recv().await.unwrap();