Skip to content

Commit 41b3ead

Browse files
committed
add l1_watcher_command_rx to addons for testing like l1_watcher_tx
1 parent f4a999e commit 41b3ead

File tree

7 files changed

+38
-195
lines changed

7 files changed

+38
-195
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 0 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -2233,179 +2233,3 @@ async fn compute_l1_message_queue_hash(
22332233
// );
22342234
// }
22352235
// }
2236-
2237-
#[cfg(test)]
2238-
mod tests {
2239-
// use super::*;
2240-
// use alloy_primitives::B256;
2241-
// use rollup_node_primitives::BatchCommitData;
2242-
// use std::sync::Arc;
2243-
2244-
// Commented out due to removal of MockL1WatcherHandle
2245-
// #[tokio::test]
2246-
// async fn test_gap_recovery() {
2247-
// use rollup_node_watcher::MockL1WatcherHandle;
2248-
//
2249-
// // setup a test node
2250-
// let (mut nodes, _tasks, _wallet) = setup(1, false).await.unwrap();
2251-
// let node = nodes.pop().unwrap();
2252-
//
2253-
// // create a fork choice state
2254-
// let genesis_hash = node.inner.chain_spec().genesis_hash();
2255-
// let fcs = ForkchoiceState::new(
2256-
// BlockInfo { hash: genesis_hash, number: 0 },
2257-
// Default::default(),
2258-
// Default::default(),
2259-
// );
2260-
//
2261-
// // create the engine driver connected to the node
2262-
// let auth_client = node.inner.engine_http_client();
2263-
// let engine_client = ScrollAuthApiEngineClient::new(auth_client);
2264-
// let engine = Engine::new(Arc::new(engine_client), fcs);
2265-
//
2266-
// // create a test database
2267-
// let db = Arc::new(setup_test_db().await);
2268-
//
2269-
// // prepare derivation pipeline
2270-
// let mock_l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() };
2271-
// let derivation_pipeline =
2272-
// DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX).await;
2273-
//
2274-
// let (scroll_network_manager, scroll_network_handle) =
2275-
// scroll_network::ScrollNetworkManager::new(
2276-
// node.inner.chain_spec().clone(),
2277-
// NetworkConfigBuilder::<ScrollNetworkPrimitives>::with_rng_secret_key()
2278-
// .build_with_noop_provider(node.inner.chain_spec().clone()),
2279-
// ScrollWireConfig::new(true),
2280-
// None,
2281-
// Default::default(),
2282-
// None,
2283-
// )
2284-
// .await;
2285-
// tokio::spawn(scroll_network_manager);
2286-
//
2287-
// // create full block client
2288-
// let block_client = FullBlockClient::new(
2289-
// scroll_network_handle
2290-
// .inner()
2291-
// .fetch_client()
2292-
// .await
2293-
// .expect("failed to fetch block client"),
2294-
// Arc::new(ScrollBeaconConsensus::new(node.inner.chain_spec())),
2295-
// );
2296-
//
2297-
// // create l2 provider
2298-
// let client = RpcClient::builder().http(node.rpc_url());
2299-
// let l2_provider = ProviderBuilder::<_, _, Scroll>::default().connect_client(client);
2300-
// let l2_provider = Arc::new(l2_provider);
2301-
//
2302-
// // prepare L1 notification channel
2303-
// let (l1_notification_tx, l1_notification_rx) = mpsc::channel(100);
2304-
//
2305-
// // create mock L1 watcher handle for testing gap recovery
2306-
// let mock_l1_watcher_handle = MockL1WatcherHandle::new();
2307-
//
2308-
// // initialize database state
2309-
// db.set_latest_l1_block_number(0).await.unwrap();
2310-
//
2311-
// let (chain_orchestrator, _handle) = ChainOrchestrator::new(
2312-
// db.clone(),
2313-
// ChainOrchestratorConfig::new(node.inner.chain_spec().clone(), 0, 0),
2314-
// Arc::new(block_client),
2315-
// l2_provider,
2316-
// l1_notification_rx,
2317-
// Some(mock_l1_watcher_handle.clone()),
2318-
// scroll_network_handle.into_scroll_network().await,
2319-
// Box::new(NoopConsensus::default()),
2320-
// engine,
2321-
// Some(Sequencer::new(
2322-
// Arc::new(MockL1Provider { db: db.clone(), blobs: HashMap::new() }),
2323-
// SequencerConfig {
2324-
// chain_spec: node.inner.chain_spec(),
2325-
// fee_recipient: Address::random(),
2326-
// auto_start: false,
2327-
// payload_building_config: PayloadBuildingConfig {
2328-
// block_gas_limit: 15_000_000,
2329-
// max_l1_messages_per_block: 4,
2330-
// l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0),
2331-
// },
2332-
// block_time: 1,
2333-
// payload_building_duration: 0,
2334-
// allow_empty_blocks: false,
2335-
// },
2336-
// )),
2337-
// None,
2338-
// derivation_pipeline,
2339-
// )
2340-
// .await
2341-
// .unwrap();
2342-
//
2343-
// // Spawn a task that constantly polls chain orchestrator to process L1 notifications
2344-
// let (_signal, shutdown) = shutdown_signal();
2345-
// tokio::spawn(async {
2346-
// let (_signal, inner) = shutdown_signal();
2347-
// let chain_orchestrator = chain_orchestrator.run_until_shutdown(inner);
2348-
// tokio::select! {
2349-
// biased;
2350-
//
2351-
// _ = shutdown => {},
2352-
// _ = chain_orchestrator => {},
2353-
// }
2354-
// });
2355-
//
2356-
// let genesis_batch = create_test_batch(1, 100);
2357-
// l1_notification_tx
2358-
// .send(Arc::new(L1Notification::BatchCommit(genesis_batch)))
2359-
// .await
2360-
// .unwrap();
2361-
// tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2362-
//
2363-
// let batch_with_gap = create_test_batch(3, 102);
2364-
// l1_notification_tx
2365-
// .send(Arc::new(L1Notification::BatchCommit(batch_with_gap)))
2366-
// .await
2367-
// .unwrap();
2368-
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2369-
//
2370-
// mock_l1_watcher_handle.assert_reset_to(100);
2371-
//
2372-
// // Insert first L1 message
2373-
// // let l1_msg_0 = create_test_l1_message(0);
2374-
// // l1_notification_tx.send(Arc::new(L1Notification::L1Message {
2375-
// // message: l1_msg_0,
2376-
// // block_number: 105,
2377-
// // block_timestamp: 0,
2378-
// // })).await.unwrap();
2379-
// // tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2380-
// //
2381-
// // let l1_msg_with_gap = create_test_l1_message(2);
2382-
// // l1_notification_tx.send(Arc::new(L1Notification::L1Message {
2383-
// // message: l1_msg_with_gap,
2384-
// // block_number: 107,
2385-
// // block_timestamp: 0,
2386-
// // })).await.unwrap();
2387-
// // tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2388-
// //
2389-
// // // Verify that reset was triggered to block 105 (last known L1 message)
2390-
// // mock_l1_watcher_handle.assert_reset_to(105);
2391-
// }
2392-
2393-
// Helper function to create a simple test batch commit
2394-
// fn create_test_batch(index: u64, block_number: u64) -> BatchCommitData {
2395-
// use alloy_primitives::Bytes;
2396-
// BatchCommitData {
2397-
// index,
2398-
// hash: B256::random(),
2399-
// block_number,
2400-
// block_timestamp: 0,
2401-
// calldata: Arc::new(Bytes::new()),
2402-
// blob_versioned_hash: None,
2403-
// finalized_block_number: None,
2404-
// }
2405-
// }
2406-
2407-
// Helper function to create a simple test L1 message
2408-
// fn create_test_l1_message(queue_index: u64) -> TxL1Message {
2409-
// TxL1Message { queue_index, ..Default::default() }
2410-
// }
2411-
}

crates/node/src/add_ons/handle.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ use reth_rpc_eth_api::EthApiTypes;
55
use reth_scroll_node::ScrollNetworkPrimitives;
66
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
77
#[cfg(feature = "test-utils")]
8+
use tokio::sync::mpsc::UnboundedReceiver;
9+
#[cfg(feature = "test-utils")]
10+
use tokio::sync::Mutex;
11+
#[cfg(feature = "test-utils")]
812
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};
913

1014
/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
@@ -20,6 +24,10 @@ pub struct ScrollAddOnsHandle<
2024
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
2125
#[cfg(feature = "test-utils")]
2226
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
27+
/// An optional channel used to receive commands from the `RollupNodeManager` to the
28+
/// `L1Watcher`.
29+
#[cfg(feature = "test-utils")]
30+
pub l1_watcher_command_rx: Arc<Mutex<UnboundedReceiver<rollup_node_watcher::L1WatcherCommand>>>,
2331
}
2432

2533
impl<

crates/node/src/add_ons/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use super::args::ScrollRollupNodeConfig;
44
use crate::constants;
5+
use std::sync::Arc;
56

67
use reth_evm::{ConfigureEngineEvm, EvmFactory, EvmFactoryFor};
78
use reth_network::NetworkProtocols;
@@ -37,7 +38,7 @@ pub use rpc::{RollupNodeExtApiClient, RollupNodeExtApiServer, RollupNodeRpcExt};
3738
mod rollup;
3839
pub use rollup::IsDevChain;
3940
use rollup::RollupManagerAddOn;
40-
use tokio::sync::mpsc::UnboundedReceiver;
41+
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
4142

4243
/// Add-ons for the Scroll follower node.
4344
#[derive(Debug)]
@@ -137,7 +138,7 @@ where
137138
}
138139

139140
let rpc_handle = rpc_add_ons.launch_add_ons_with(ctx.clone(), |_| Ok(())).await?;
140-
let (rollup_manager_handle, l1_watcher_tx) =
141+
let (rollup_manager_handle, l1_watcher_tx, l1_watcher_command_rx) =
141142
rollup_node_manager_addon.launch(ctx.clone(), rpc_handle.clone()).await?;
142143

143144
tx.send(rollup_manager_handle.clone())
@@ -148,6 +149,10 @@ where
148149
rpc_handle,
149150
#[cfg(feature = "test-utils")]
150151
l1_watcher_tx,
152+
#[cfg(feature = "test-utils")]
153+
l1_watcher_command_rx: Arc::new(Mutex::new(
154+
l1_watcher_command_rx.expect("l1_watcher_command_rx must exist in test utils"),
155+
)),
151156
})
152157
}
153158
}

crates/node/src/add_ons/rollup.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use reth_rpc_eth_api::EthApiTypes;
99
use reth_scroll_chainspec::{ChainConfig, ScrollChainConfig, ScrollChainSpec};
1010
use reth_scroll_node::ScrollNetworkPrimitives;
1111
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
12-
use rollup_node_watcher::L1Notification;
12+
use rollup_node_watcher::{L1Notification, L1WatcherCommand};
1313
use scroll_alloy_hardforks::ScrollHardforks;
1414
use scroll_wire::ScrollWireEvent;
1515
use std::sync::Arc;
@@ -55,13 +55,17 @@ impl RollupManagerAddOn {
5555
self,
5656
ctx: AddOnsContext<'_, N>,
5757
rpc: RpcHandle<N, EthApi>,
58-
) -> eyre::Result<(ChainOrchestratorHandle<N::Network>, Option<Sender<Arc<L1Notification>>>)>
58+
) -> eyre::Result<(
59+
ChainOrchestratorHandle<N::Network>,
60+
Option<Sender<Arc<L1Notification>>>,
61+
Option<UnboundedReceiver<L1WatcherCommand>>,
62+
)>
5963
where
6064
<<N as FullNodeTypes>::Types as NodeTypes>::ChainSpec:
6165
ChainConfig<Config = ScrollChainConfig> + ScrollHardforks + IsDevChain,
6266
N::Network: NetworkProtocols + FullNetwork<Primitives = ScrollNetworkPrimitives>,
6367
{
64-
let (chain_orchestrator, handle, l1_notification_tx) = self
68+
let (chain_orchestrator, handle, l1_notification_tx, l1_watcher_command_rx) = self
6569
.config
6670
.build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles)
6771
.await?;
@@ -70,6 +74,6 @@ impl RollupManagerAddOn {
7074
.spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| {
7175
chain_orchestrator.run_until_shutdown(shutdown)
7276
});
73-
Ok((handle, l1_notification_tx))
77+
Ok((handle, l1_notification_tx, l1_watcher_command_rx))
7478
}
7579
}

crates/node/src/args.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use rollup_node_providers::{
3838
use rollup_node_sequencer::{
3939
L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig,
4040
};
41-
use rollup_node_watcher::{L1Notification, L1Watcher, L1WatcherHandle};
41+
use rollup_node_watcher::{L1Notification, L1Watcher, L1WatcherCommand, L1WatcherHandle};
4242
use scroll_alloy_hardforks::ScrollHardforks;
4343
use scroll_alloy_network::Scroll;
4444
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
@@ -167,6 +167,7 @@ impl ScrollRollupNodeConfig {
167167
>,
168168
ChainOrchestratorHandle<N>,
169169
Option<Sender<Arc<L1Notification>>>,
170+
Option<UnboundedReceiver<L1WatcherCommand>>,
170171
)>
171172
where
172173
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + NetworkProtocols,
@@ -340,9 +341,11 @@ impl ScrollRollupNodeConfig {
340341
};
341342
let consensus = self.consensus_args.consensus(authorized_signer)?;
342343

343-
let (l1_notification_tx, l1_watcher_handle): (
344+
#[allow(clippy::type_complexity)]
345+
let (l1_notification_tx, l1_watcher_handle, l1_watcher_command_rx): (
344346
Option<Sender<Arc<L1Notification>>>,
345347
Option<L1WatcherHandle>,
348+
Option<UnboundedReceiver<L1WatcherCommand>>,
346349
) = if let Some(provider) = l1_provider.filter(|_| !self.test) {
347350
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
348351
let handle = L1Watcher::spawn(
@@ -352,24 +355,23 @@ impl ScrollRollupNodeConfig {
352355
self.l1_provider_args.logs_query_block_range,
353356
)
354357
.await;
355-
(None, Some(handle))
358+
(None, Some(handle), None)
356359
} else {
357360
// Create a channel for L1 notifications that we can use to inject L1 messages for
358361
// testing
359362
#[cfg(feature = "test-utils")]
360363
{
361364
let (tx, rx) = tokio::sync::mpsc::channel(1000);
362365

363-
// TODO: expose command_rx to allow for tests to assert commands sent to the watcher
364-
let (command_tx, _command_rx) = tokio::sync::mpsc::unbounded_channel();
366+
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
365367
let handle = L1WatcherHandle::new(command_tx, rx);
366368

367-
(Some(tx), Some(handle))
369+
(Some(tx), Some(handle), Some(command_rx))
368370
}
369371

370372
#[cfg(not(feature = "test-utils"))]
371373
{
372-
(None, None)
374+
(None, None, None)
373375
}
374376
};
375377

@@ -460,7 +462,7 @@ impl ScrollRollupNodeConfig {
460462
)
461463
.await?;
462464

463-
Ok((chain_orchestrator, handle, l1_notification_tx))
465+
Ok((chain_orchestrator, handle, l1_notification_tx, l1_watcher_command_rx))
464466
}
465467
}
466468

crates/node/tests/e2e.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<()
839839
config.hydrate(node.inner.config.clone()).await?;
840840

841841
let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true));
842-
let (chain_orchestrator, handle, l1_notification_tx) = config
842+
let (chain_orchestrator, handle, l1_notification_tx, _) = config
843843
.clone()
844844
.build(
845845
RollupNodeContext::new(
@@ -989,7 +989,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<()
989989

990990
// Start the RNM again.
991991
let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true));
992-
let (chain_orchestrator, handle, l1_notification_tx) = config
992+
let (chain_orchestrator, handle, l1_notification_tx, _) = config
993993
.clone()
994994
.build(
995995
RollupNodeContext::new(
@@ -1119,7 +1119,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() -
11191119
config.hydrate(node.inner.config.clone()).await?;
11201120

11211121
let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true));
1122-
let (rnm, handle, l1_watcher_tx) = config
1122+
let (rnm, handle, l1_watcher_tx, _) = config
11231123
.clone()
11241124
.build(
11251125
RollupNodeContext::new(
@@ -1192,7 +1192,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() -
11921192

11931193
// Start the RNM again.
11941194
let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true));
1195-
let (rnm, handle, _) = config
1195+
let (rnm, handle, _, _) = config
11961196
.clone()
11971197
.build(
11981198
RollupNodeContext::new(

crates/watcher/src/handle/command.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use tokio::sync::mpsc;
44

55
/// Commands that can be sent to the L1 Watcher.
6-
#[derive(Debug)]
6+
#[derive(Debug, Clone)]
77
pub enum L1WatcherCommand {
88
/// Reset the watcher to a specific L1 block number.
99
///

0 commit comments

Comments
 (0)