@@ -91,7 +91,7 @@ const EVENT_CHANNEL_SIZE: usize = 5000;
9191/// based on data consolidated from L1 and the data received over the p2p network.
9292#[ derive( Debug ) ]
9393pub struct ChainOrchestrator <
94- N : FullNetwork < Primitives = ScrollNetworkPrimitives > ,
94+ N : FullNetwork < Primitives = ScrollNetworkPrimitives > ,
9595 ChainSpec ,
9696 L1MP ,
9797 L2P ,
@@ -132,12 +132,12 @@ pub struct ChainOrchestrator<
132132}
133133
134134impl <
135- N : FullNetwork < Primitives = ScrollNetworkPrimitives > + Send + Sync + ' static ,
136- ChainSpec : ScrollHardforks + EthChainSpec + Send + Sync + ' static ,
137- L1MP : L1MessageProvider + Unpin + Clone + Send + Sync + ' static ,
138- L2P : Provider < Scroll > + ' static ,
139- EC : ScrollEngineApi + Sync + Send + ' static ,
140- > ChainOrchestrator < N , ChainSpec , L1MP , L2P , EC >
135+ N : FullNetwork < Primitives = ScrollNetworkPrimitives > + Send + Sync + ' static ,
136+ ChainSpec : ScrollHardforks + EthChainSpec + Send + Sync + ' static ,
137+ L1MP : L1MessageProvider + Unpin + Clone + Send + Sync + ' static ,
138+ L2P : Provider < Scroll > + ' static ,
139+ EC : ScrollEngineApi + Sync + Send + ' static ,
140+ > ChainOrchestrator < N , ChainSpec , L1MP , L2P , EC >
141141{
142142 /// Creates a new chain orchestrator.
143143 #[ allow( clippy:: too_many_arguments) ]
@@ -816,7 +816,7 @@ impl<
816816 & l1_message,
817817 self . config . l1_v2_message_queue_start_index ( ) ,
818818 )
819- . await ?;
819+ . await ?;
820820 let l1_message = L1MessageEnvelope :: new ( l1_message, l1_block_number, None , queue_hash) ;
821821
822822 // Perform a consistency check to ensure the previous L1 message exists in the database.
@@ -2123,3 +2123,196 @@ async fn compute_l1_message_queue_hash(
21232123// );
21242124// }
21252125// }
2126+
2127+ #[ cfg( test) ]
2128+ mod tests {
2129+ use super :: * ;
2130+ use alloy_primitives:: { Address , B256 } ;
2131+ use alloy_provider:: ProviderBuilder ;
2132+ use alloy_rpc_client:: RpcClient ;
2133+ use reth_scroll_consensus:: ScrollBeaconConsensus ;
2134+ use reth_scroll_node:: test_utils:: setup;
2135+ use rollup_node_primitives:: BatchCommitData ;
2136+ use rollup_node_providers:: test_utils:: MockL1Provider ;
2137+ use rollup_node_sequencer:: { L1MessageInclusionMode , PayloadBuildingConfig , SequencerConfig } ;
2138+ use scroll_alloy_consensus:: TxL1Message ;
2139+ use scroll_alloy_provider:: ScrollAuthApiEngineClient ;
2140+ use scroll_db:: test_utils:: setup_test_db;
2141+ use scroll_engine:: ForkchoiceState ;
2142+ use scroll_network:: ScrollNetworkHandle ;
2143+ use std:: collections:: HashMap ;
2144+ use std:: sync:: { Arc , Mutex } ;
2145+ use tokio:: sync:: mpsc;
2146+
2147+ /// Mock command handler for L1Watcher that tracks all reset_to_block calls.
2148+ /// Returns a real L1WatcherHandle and a tracker for verifying calls.
2149+ #[ derive( Clone ) ]
2150+ struct MockL1WatcherCommandTracker {
2151+ inner : Arc < Mutex < Vec < ( u64 , usize ) > > > , // (block_number, channel_capacity)
2152+ }
2153+
2154+ impl MockL1WatcherCommandTracker {
2155+ fn new ( ) -> Self {
2156+ Self { inner : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) }
2157+ }
2158+
2159+ fn track_reset ( & self , block : u64 , capacity : usize ) {
2160+ self . inner . lock ( ) . unwrap ( ) . push ( ( block, capacity) ) ;
2161+ }
2162+
2163+ fn get_reset_calls ( & self ) -> Vec < ( u64 , usize ) > {
2164+ self . inner . lock ( ) . unwrap ( ) . clone ( )
2165+ }
2166+
2167+ fn assert_reset_called_with ( & self , block : u64 ) {
2168+ let calls = self . get_reset_calls ( ) ;
2169+ assert ! (
2170+ calls. iter( ) . any( |( b, _) | * b == block) ,
2171+ "Expected reset_to_block to be called with block {}, but got calls: {:?}" ,
2172+ block,
2173+ calls
2174+ ) ;
2175+ }
2176+
2177+ fn assert_not_called ( & self ) {
2178+ let calls = self . get_reset_calls ( ) ;
2179+ assert ! ( calls. is_empty( ) , "Expected no reset_to_block calls, but got: {:?}" , calls) ;
2180+ }
2181+ }
2182+
2183+ /// Creates a real L1WatcherHandle backed by a mock command handler.
2184+ /// Returns the handle and a tracker for verifying calls.
2185+ fn create_mock_l1_watcher_handle ( ) -> (
2186+ rollup_node_watcher:: L1WatcherHandle ,
2187+ MockL1WatcherCommandTracker ,
2188+ tokio:: task:: JoinHandle < ( ) > ,
2189+ ) {
2190+ use rollup_node_watcher:: { L1WatcherCommand , L1WatcherHandle } ;
2191+
2192+ let ( command_tx, mut command_rx) = mpsc:: unbounded_channel ( ) ;
2193+ let handle = L1WatcherHandle :: new ( command_tx) ;
2194+ let tracker = MockL1WatcherCommandTracker :: new ( ) ;
2195+ let tracker_clone = tracker. clone ( ) ;
2196+
2197+ // Spawn task to handle commands
2198+ let join_handle = tokio:: spawn ( async move {
2199+ while let Some ( command) = command_rx. recv ( ) . await {
2200+ match command {
2201+ L1WatcherCommand :: ResetToBlock { block, new_sender, response_sender } => {
2202+ let capacity = new_sender. max_capacity ( ) ;
2203+ tracker_clone. track_reset ( block, capacity) ;
2204+ // Respond success
2205+ let _ = response_sender. send ( ( ) ) ;
2206+ }
2207+ }
2208+ }
2209+ } ) ;
2210+
2211+ ( handle, tracker, join_handle)
2212+ }
2213+
2214+ #[ tokio:: test]
2215+ async fn test_gap_recovery ( )
2216+ {
2217+ // setup a test node
2218+ let ( mut nodes, _tasks, _wallet) = setup ( 1 , false ) . await . unwrap ( ) ;
2219+ let node = nodes. pop ( ) . unwrap ( ) ;
2220+
2221+ // create a fork choice state
2222+ let genesis_hash = node. inner . chain_spec ( ) . genesis_hash ( ) ;
2223+ let fcs = ForkchoiceState :: new (
2224+ BlockInfo { hash : genesis_hash, number : 0 } ,
2225+ Default :: default ( ) ,
2226+ Default :: default ( ) ,
2227+ ) ;
2228+
2229+ // create the engine driver connected to the node
2230+ let auth_client = node. inner . engine_http_client ( ) ;
2231+ let engine_client = ScrollAuthApiEngineClient :: new ( auth_client) ;
2232+ let engine = Engine :: new ( Arc :: new ( engine_client) , fcs) ;
2233+
2234+ // create a test database
2235+ let db = Arc :: new ( setup_test_db ( ) . await ) ;
2236+
2237+ // prepare derivation pipeline
2238+ let mock_l1_provider = MockL1Provider { db : db. clone ( ) , blobs : HashMap :: new ( ) } ;
2239+ let derivation_pipeline = DerivationPipeline :: new ( mock_l1_provider, db. clone ( ) , u64:: MAX ) . await ;
2240+
2241+ // create Scroll network
2242+ let ( tx, _rx) = mpsc:: unbounded_channel ( ) ;
2243+ let network_handle = ScrollNetworkHandle :: new ( tx, node. inner . clone ( ) . network ) ;
2244+
2245+ // create full block client
2246+ let block_client = FullBlockClient :: new (
2247+ network_handle
2248+ . inner ( )
2249+ . fetch_client ( )
2250+ . await
2251+ . expect ( "failed to fetch block client" ) ,
2252+ Arc :: new ( ScrollBeaconConsensus :: new ( node. inner . chain_spec ( ) . clone ( ) ) ) ,
2253+ ) ;
2254+
2255+ // create l2 provider
2256+ let client = RpcClient :: builder ( ) . http ( node. rpc_url ( ) ) ;
2257+ let l2_provider = ProviderBuilder :: < _ , _ , Scroll > :: default ( ) . connect_client ( client) ;
2258+ let l2_provider = Arc :: new ( l2_provider) ;
2259+
2260+ // prepare L1 notification channel
2261+ let ( l1_notification_tx, l1_notification_rx) = mpsc:: channel ( 100 ) ;
2262+
2263+
2264+ // initialize database state
2265+ db. set_latest_l1_block_number ( 0 ) . await . unwrap ( ) ;
2266+
2267+ let chain_orchestrator = ChainOrchestrator :: new (
2268+ db. clone ( ) ,
2269+ ChainOrchestratorConfig :: new ( node. inner . chain_spec ( ) . clone ( ) , 0 , 0 ) ,
2270+ Arc :: new ( block_client) ,
2271+ l2_provider,
2272+ l1_notification_rx,
2273+ None , // TODO: set handle
2274+ network_handle. into_scroll_network ( ) . await ,
2275+ Box :: new ( NoopConsensus :: default ( ) ) ,
2276+ engine,
2277+ Some ( Sequencer :: new ( Arc :: new ( MockL1Provider { db : db. clone ( ) , blobs : HashMap :: new ( ) } ) , SequencerConfig {
2278+ chain_spec : node. inner . chain_spec ( ) ,
2279+ fee_recipient : Address :: random ( ) ,
2280+ auto_start : false ,
2281+ payload_building_config : PayloadBuildingConfig {
2282+ block_gas_limit : 15_000_000 ,
2283+ max_l1_messages_per_block : 4 ,
2284+ l1_message_inclusion_mode : L1MessageInclusionMode :: BlockDepth ( 0 ) ,
2285+ } ,
2286+ block_time : 1 ,
2287+ payload_building_duration : 0 ,
2288+ allow_empty_blocks : false ,
2289+ } ) ) ,
2290+ None ,
2291+ derivation_pipeline,
2292+ )
2293+ . await
2294+ . unwrap ( ) ;
2295+ }
2296+
2297+ // Helper function to create a simple test batch commit
2298+ fn create_test_batch ( index : u64 , block_number : u64 ) -> BatchCommitData {
2299+ use alloy_primitives:: Bytes ;
2300+ BatchCommitData {
2301+ index,
2302+ hash : B256 :: random ( ) ,
2303+ block_number,
2304+ block_timestamp : 0 ,
2305+ calldata : Arc :: new ( Bytes :: new ( ) ) ,
2306+ blob_versioned_hash : None ,
2307+ finalized_block_number : None ,
2308+ }
2309+ }
2310+
2311+ // Helper function to create a simple test L1 message
2312+ fn create_test_l1_message ( queue_index : u64 ) -> TxL1Message {
2313+ TxL1Message { queue_index, ..Default :: default ( ) }
2314+ }
2315+
2316+ #[ tokio:: test]
2317+ async fn test_batch_commit_gap_triggers_recovery ( ) { }
2318+ }
0 commit comments