diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 660fe5f6..0845c18c 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -8,7 +8,7 @@ use alloy_rpc_types_engine::ExecutionPayloadV1; use futures::StreamExt; use reth_chainspec::EthChainSpec; use reth_network_api::{BlockDownloaderProvider, FullNetwork}; -use reth_network_p2p::FullBlockClient; +use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient}; use reth_scroll_node::ScrollNetworkPrimitives; use reth_scroll_primitives::ScrollBlock; use reth_tasks::shutdown::Shutdown; @@ -1081,36 +1081,34 @@ impl< if head_block_number == safe_block_number { tracing::trace!(target: "scroll::chain_orchestrator", "No unsafe blocks to consolidate"); + } else { + let start_block_number = safe_block_number + 1; + // TODO: Make fetching parallel but ensure concurrency limits are respected. + let mut blocks_to_validate = vec![]; + for block_number in start_block_number..=head_block_number { + let block = self + .l2_client + .get_block_by_number(block_number.into()) + .full() + .await? + .ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))? + .into_consensus() + .map_transactions(|tx| tx.inner.into_inner()); + blocks_to_validate.push(block); + } - self.notify(ChainOrchestratorEvent::ChainConsolidated { - from: safe_block_number, - to: head_block_number, - }); - return Ok(()); - } + self.validate_l1_messages(&blocks_to_validate).await?; - let start_block_number = safe_block_number + 1; - // TODO: Make fetching parallel but ensure concurrency limits are respected. - let mut blocks_to_validate = vec![]; - for block_number in start_block_number..=head_block_number { - let block = self - .l2_client - .get_block_by_number(block_number.into()) - .full() - .await? - .ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))? - .into_consensus() - .map_transactions(|tx| tx.inner.into_inner()); - blocks_to_validate.push(block); - } - - self.validate_l1_messages(&blocks_to_validate).await?; + self.database + .update_l1_messages_from_l2_blocks( + blocks_to_validate.into_iter().map(|b| (&b).into()).collect(), + ) + .await?; + }; - self.database - .update_l1_messages_from_l2_blocks( - blocks_to_validate.into_iter().map(|b| (&b).into()).collect(), - ) - .await?; + // send a notification to the network that the chain is synced such that it accepts + // transactions into the transaction pool. + self.network.handle().inner().update_sync_state(RethSyncState::Idle); self.notify(ChainOrchestratorEvent::ChainConsolidated { from: safe_block_number, diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 04d036e6..2d2686a2 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -10,7 +10,7 @@ use reth_tokio_util::EventStream; use rollup_node::{ test_utils::{ default_sequencer_test_scroll_rollup_node_config, default_test_scroll_rollup_node_config, - setup_engine, + generate_tx, setup_engine, }, BlobProviderArgs, ChainOrchestratorArgs, ConsensusArgs, EngineDriverArgs, L1ProviderArgs, RollupNodeDatabaseArgs, RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, RpcArgs, @@ -98,6 +98,68 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> { Ok(()) } +#[allow(clippy::large_stack_frames)] +#[tokio::test] +async fn test_node_produces_block_on_startup() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let mut sequencer_node_config = default_sequencer_test_scroll_rollup_node_config(); + sequencer_node_config.sequencer_args.auto_start = true; + sequencer_node_config.sequencer_args.allow_empty_blocks = false; + + let (mut nodes, _tasks, wallet) = + setup_engine(sequencer_node_config, 2, (*SCROLL_DEV).clone(), false, false).await?; + + let follower = nodes.pop().unwrap(); + let mut follower_events = + follower.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; + let follower_l1_watcher_tx = follower.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + + let sequencer = nodes.pop().unwrap(); + let mut sequencer_events = + sequencer.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; + let sequencer_l1_watcher_tx = sequencer.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + + // Send a notification to the sequencer and follower nodes that the L1 watcher is synced. + sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + follower_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + + // wait for both nodes to be synced. + wait_n_events( + &mut sequencer_events, + |e| matches!(e, ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ }), + 1, + ) + .await; + wait_n_events( + &mut follower_events, + |e| matches!(e, ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ }), + 1, + ) + .await; + + // construct a transaction and send it to the follower node. + let wallet = Arc::new(tokio::sync::Mutex::new(wallet)); + let handle = tokio::spawn(async move { + loop { + let tx = generate_tx(wallet.clone()).await; + follower.rpc.inject_tx(tx).await.unwrap(); + } + }); + + // Assert that the follower node receives the new block. + wait_n_events( + &mut follower_events, + |e| matches!(e, ChainOrchestratorEvent::ChainExtended(_)), + 1, + ) + .await; + + drop(handle); + + Ok(()) +} + /// We test if the syncing of the RN is correctly triggered and released when the EN reaches sync. #[allow(clippy::large_stack_frames)] #[tokio::test]