Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,18 @@ impl ScrollRollupNodeConfig {
let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
if let Some(provider) = l1_provider.filter(|_| !self.test) {
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
(None, Some(L1Watcher::spawn(provider, l1_start_block_number, node_config).await))
(
None,
Some(
L1Watcher::spawn(
provider,
l1_start_block_number,
node_config,
self.l1_provider_args.logs_query_block_range,
)
.await,
),
)
} else {
// Create a channel for L1 notifications that we can use to inject L1 messages for
// testing
Expand Down Expand Up @@ -604,6 +615,9 @@ pub struct L1ProviderArgs {
/// The initial backoff for the provider.
#[arg(long = "l1.initial-backoff", id = "l1_initial_backoff", value_name = "L1_INITIAL_BACKOFF", default_value_t = constants::L1_PROVIDER_INITIAL_BACKOFF)]
pub initial_backoff: u64,
/// The logs query block range.
#[arg(long = "l1.query-range", id = "l1_query_range", value_name = "L1_QUERY_RANGE", default_value_t = constants::LOGS_QUERY_BLOCK_RANGE)]
pub logs_query_block_range: u64,
}

/// The arguments for the Beacon provider.
Expand Down
3 changes: 3 additions & 0 deletions crates/node/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub(crate) const L1_PROVIDER_MAX_RETRIES: u32 = 10;
/// The initial backoff for the L1 provider.
pub(crate) const L1_PROVIDER_INITIAL_BACKOFF: u64 = 100;

/// The block range used to fetch L1 logs.
pub(crate) const LOGS_QUERY_BLOCK_RANGE: u64 = 500;

/// The max retries for the L2 provider.
pub(crate) const L2_PROVIDER_MAX_RETRIES: u32 = u32::MAX;

Expand Down
1 change: 1 addition & 0 deletions crates/node/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> {
compute_units_per_second: 500,
max_retries: 10,
initial_backoff: 100,
logs_query_block_range: 500,
},
engine_driver_args: EngineDriverArgs { sync_at_startup: false },
sequencer_args: SequencerArgs {
Expand Down
20 changes: 13 additions & 7 deletions crates/watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use std::{
};
use tokio::sync::mpsc;

/// The block range used to fetch L1 logs.
pub const LOGS_QUERY_BLOCK_RANGE: u64 = 500;
/// The maximum count of unfinalized blocks we can have in Ethereum.
pub const MAX_UNFINALIZED_BLOCK_COUNT: usize = 96;

Expand Down Expand Up @@ -84,6 +82,8 @@ pub struct L1Watcher<EP> {
metrics: WatcherMetrics,
/// Whether the watcher is synced to the L1 head.
is_synced: bool,
/// The log query block range.
log_query_block_range: u64,
}

/// The L1 notification type yielded by the [`L1Watcher`].
Expand Down Expand Up @@ -158,10 +158,11 @@ where
execution_provider: EP,
start_block: Option<u64>,
config: Arc<NodeConfig>,
log_query_block_range: u64,
) -> mpsc::Receiver<Arc<L1Notification>> {
tracing::trace!(target: "scroll::watcher", ?start_block, ?config, "spawning L1 watcher");

let (tx, rx) = mpsc::channel(LOGS_QUERY_BLOCK_RANGE as usize);
let (tx, rx) = mpsc::channel(log_query_block_range as usize);

let fetch_block_number = async |tag: BlockNumberOrTag| {
let block = loop {
Expand Down Expand Up @@ -192,6 +193,7 @@ where
config,
metrics: WatcherMetrics::default(),
is_synced: false,
log_query_block_range,
};

// notify at spawn.
Expand Down Expand Up @@ -612,7 +614,7 @@ where
async fn update_current_block(&mut self, latest: &Block) -> L1WatcherResult<()> {
self.current_block_number = self
.current_block_number
.saturating_add(LOGS_QUERY_BLOCK_RANGE)
.saturating_add(self.log_query_block_range)
.min(latest.header.number);
self.notify(L1Notification::Processed(self.current_block_number)).await
}
Expand All @@ -637,7 +639,8 @@ where

/// Returns the next range of logs, for the block range in
/// \[[`current_block`](field@L1Watcher::current_block_number);
/// [`current_block`](field@L1Watcher::current_block_number) + [`LOGS_QUERY_BLOCK_RANGE`]\].
/// [`current_block`](field@L1Watcher::current_block_number) +
/// [`field@L1Watcher::log_query_block_range`]\].
async fn next_filtered_logs(&self, latest_block_number: u64) -> L1WatcherResult<Vec<Log>> {
// set the block range for the query
let address_book = &self.config.address_book;
Expand All @@ -654,7 +657,7 @@ where
]);
let to_block = self
.current_block_number
.saturating_add(LOGS_QUERY_BLOCK_RANGE)
.saturating_add(self.log_query_block_range)
.min(latest_block_number);

// skip a block for `from_block` since `self.current_block_number` is the last indexed
Expand All @@ -679,6 +682,8 @@ mod tests {
use arbitrary::Arbitrary;
use scroll_l1::abi::calls::commitBatchCall;

const LOG_QUERY_BLOCK_RANGE: u64 = 500;

// Returns a L1Watcher along with the receiver end of the L1Notifications.
fn l1_watcher(
unfinalized_blocks: Vec<Header>,
Expand All @@ -699,7 +704,7 @@ mod tests {
vec![latest],
);

let (tx, rx) = mpsc::channel(LOGS_QUERY_BLOCK_RANGE as usize);
let (tx, rx) = mpsc::channel(LOG_QUERY_BLOCK_RANGE as usize);
(
L1Watcher {
execution_provider: provider,
Expand All @@ -710,6 +715,7 @@ mod tests {
config: Arc::new(NodeConfig::mainnet()),
metrics: WatcherMetrics::default(),
is_synced: false,
log_query_block_range: LOG_QUERY_BLOCK_RANGE,
},
rx,
)
Expand Down
4 changes: 3 additions & 1 deletion crates/watcher/tests/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::select;
async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> {
const CHAIN_LEN: usize = 200;
const HALF_CHAIN_LEN: usize = 100;
const LOGS_QUERY_BLOCK_RANGE: u64 = 500;

// Given
let (finalized, latest, headers) = chain(CHAIN_LEN);
Expand Down Expand Up @@ -58,7 +59,8 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()>
);

// spawn the watcher and verify received notifications are consistent.
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
let mut l1_watcher =
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;
let mut prev_block_number = 0;
let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(2));
let _ = ticker.tick().await;
Expand Down
4 changes: 3 additions & 1 deletion crates/watcher/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> {

const CHAIN_LEN: usize = 200;
const HALF_CHAIN_LEN: usize = CHAIN_LEN / 2;
const LOGS_QUERY_BLOCK_RANGE: u64 = 500;

// Given
let (finalized, _, headers) = chain(CHAIN_LEN);
Expand Down Expand Up @@ -62,7 +63,8 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> {
);

// spawn the watcher and verify received notifications are consistent.
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
let mut l1_watcher =
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;
let mut received_logs = Vec::new();
loop {
let notification = l1_watcher.recv().await.map(|notif| (*notif).clone());
Expand Down
11 changes: 7 additions & 4 deletions crates/watcher/tests/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rollup_node_primitives::NodeConfig;
use rollup_node_watcher::{
random, test_utils::provider::MockProvider, Block, L1Notification, L1Watcher,
};
const LOGS_QUERY_BLOCK_RANGE: u64 = 500;

// Generate a set blocks that will be fed to the l1 watcher.
// Every fork_cycle blocks, generates a small reorg.
Expand Down Expand Up @@ -71,7 +72,8 @@ async fn test_should_detect_reorg() -> eyre::Result<()> {
);

// spawn the watcher and verify received notifications are consistent.
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
let mut l1_watcher =
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;

// skip the first two events
l1_watcher.recv().await.unwrap();
Expand All @@ -92,7 +94,7 @@ async fn test_should_detect_reorg() -> eyre::Result<()> {
}

if latest_number == latest.header.number {
continue
continue;
}

let mut notification = l1_watcher.recv().await.unwrap();
Expand Down Expand Up @@ -172,7 +174,8 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> {
);

// spawn the watcher and verify received notifications are consistent.
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
let mut l1_watcher =
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;

// skip the first two events
l1_watcher.recv().await.unwrap();
Expand All @@ -193,7 +196,7 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> {
}

if latest_number == latest.header.number {
continue
continue;
}

let mut notification = l1_watcher.recv().await.unwrap();
Expand Down