Skip to content

Commit 51100a5

Browse files
committed
add L1WatcherHandleTrait for easier testability
1 parent 10bc36c commit 51100a5

File tree

4 files changed

+126
-78
lines changed

4 files changed

+126
-78
lines changed

crates/chain-orchestrator/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,12 @@ alloy-transport.workspace = true
6969
# rollup-node
7070
scroll-db = { workspace = true, features = ["test-utils"] }
7171
rollup-node-primitives = { workspace = true, features = ["arbitrary"] }
72+
rollup-node-watcher = { workspace = true, features = ["test-utils"] }
7273

7374
# scroll
7475
reth-scroll-chainspec.workspace = true
7576
reth-scroll-forks.workspace = true
77+
reth-scroll-node = { workspace = true, features = ["test-utils"] }
7678

7779
# reth
7880
reth-eth-wire-types.workspace = true

crates/chain-orchestrator/src/lib.rs

Lines changed: 21 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use rollup_node_primitives::{
2020
use rollup_node_providers::L1MessageProvider;
2121
use rollup_node_sequencer::{Sequencer, SequencerEvent};
2222
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
23-
use rollup_node_watcher::L1Notification;
23+
use rollup_node_watcher::{L1Notification, L1WatcherHandle, L1WatcherHandleTrait};
2424
use scroll_alloy_consensus::TxL1Message;
2525
use scroll_alloy_hardforks::ScrollHardforks;
2626
use scroll_alloy_network::Scroll;
@@ -96,6 +96,7 @@ pub struct ChainOrchestrator<
9696
L1MP,
9797
L2P,
9898
EC,
99+
H: L1WatcherHandleTrait = L1WatcherHandle,
99100
> {
100101
/// The configuration for the chain orchestrator.
101102
config: ChainOrchestratorConfig<ChainSpec>,
@@ -112,7 +113,7 @@ pub struct ChainOrchestrator<
112113
/// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
113114
l1_notification_rx: Receiver<Arc<L1Notification>>,
114115
/// Handle to send commands to the L1 watcher (e.g., for gap recovery).
115-
l1_watcher_handle: Option<rollup_node_watcher::L1WatcherHandle>,
116+
l1_watcher_handle: Option<H>,
116117
/// The network manager that manages the scroll p2p network.
117118
network: ScrollNetwork<N>,
118119
/// The consensus algorithm used by the rollup node.
@@ -137,7 +138,8 @@ impl<
137138
L1MP: L1MessageProvider + Unpin + Clone + Send + Sync + 'static,
138139
L2P: Provider<Scroll> + 'static,
139140
EC: ScrollEngineApi + Sync + Send + 'static,
140-
> ChainOrchestrator<N, ChainSpec, L1MP, L2P, EC>
141+
H: L1WatcherHandleTrait,
142+
> ChainOrchestrator<N, ChainSpec, L1MP, L2P, EC, H>
141143
{
142144
/// Creates a new chain orchestrator.
143145
#[allow(clippy::too_many_arguments)]
@@ -147,7 +149,7 @@ impl<
147149
block_client: Arc<FullBlockClient<<N as BlockDownloaderProvider>::Client>>,
148150
l2_provider: L2P,
149151
l1_notification_rx: Receiver<Arc<L1Notification>>,
150-
l1_watcher_handle: Option<rollup_node_watcher::L1WatcherHandle>,
152+
l1_watcher_handle: Option<H>,
151153
network: ScrollNetwork<N>,
152154
consensus: Box<dyn Consensus + 'static>,
153155
engine: Engine<EC>,
@@ -2141,79 +2143,14 @@ mod tests {
21412143
use scroll_engine::ForkchoiceState;
21422144
use scroll_network::ScrollNetworkHandle;
21432145
use std::collections::HashMap;
2144-
use std::sync::{Arc, Mutex};
2146+
use std::sync::Arc;
21452147
use tokio::sync::mpsc;
21462148

2147-
/// Mock command handler for L1Watcher that tracks all reset_to_block calls.
2148-
/// Returns a real L1WatcherHandle and a tracker for verifying calls.
2149-
#[derive(Clone)]
2150-
struct MockL1WatcherCommandTracker {
2151-
inner: Arc<Mutex<Vec<(u64, usize)>>>, // (block_number, channel_capacity)
2152-
}
2153-
2154-
impl MockL1WatcherCommandTracker {
2155-
fn new() -> Self {
2156-
Self { inner: Arc::new(Mutex::new(Vec::new())) }
2157-
}
2158-
2159-
fn track_reset(&self, block: u64, capacity: usize) {
2160-
self.inner.lock().unwrap().push((block, capacity));
2161-
}
2162-
2163-
fn get_reset_calls(&self) -> Vec<(u64, usize)> {
2164-
self.inner.lock().unwrap().clone()
2165-
}
2166-
2167-
fn assert_reset_called_with(&self, block: u64) {
2168-
let calls = self.get_reset_calls();
2169-
assert!(
2170-
calls.iter().any(|(b, _)| *b == block),
2171-
"Expected reset_to_block to be called with block {}, but got calls: {:?}",
2172-
block,
2173-
calls
2174-
);
2175-
}
2176-
2177-
fn assert_not_called(&self) {
2178-
let calls = self.get_reset_calls();
2179-
assert!(calls.is_empty(), "Expected no reset_to_block calls, but got: {:?}", calls);
2180-
}
2181-
}
2182-
2183-
/// Creates a real L1WatcherHandle backed by a mock command handler.
2184-
/// Returns the handle and a tracker for verifying calls.
2185-
fn create_mock_l1_watcher_handle() -> (
2186-
rollup_node_watcher::L1WatcherHandle,
2187-
MockL1WatcherCommandTracker,
2188-
tokio::task::JoinHandle<()>,
2189-
) {
2190-
use rollup_node_watcher::{L1WatcherCommand, L1WatcherHandle};
2191-
2192-
let (command_tx, mut command_rx) = mpsc::unbounded_channel();
2193-
let handle = L1WatcherHandle::new(command_tx);
2194-
let tracker = MockL1WatcherCommandTracker::new();
2195-
let tracker_clone = tracker.clone();
2196-
2197-
// Spawn task to handle commands
2198-
let join_handle = tokio::spawn(async move {
2199-
while let Some(command) = command_rx.recv().await {
2200-
match command {
2201-
L1WatcherCommand::ResetToBlock { block, new_sender, response_sender } => {
2202-
let capacity = new_sender.max_capacity();
2203-
tracker_clone.track_reset(block, capacity);
2204-
// Respond success
2205-
let _ = response_sender.send(());
2206-
}
2207-
}
2208-
}
2209-
});
2210-
2211-
(handle, tracker, join_handle)
2212-
}
2213-
22142149
#[tokio::test]
22152150
async fn test_gap_recovery()
22162151
{
2152+
use rollup_node_watcher::MockL1WatcherHandle;
2153+
22172154
// setup a test node
22182155
let (mut nodes, _tasks, _wallet) = setup(1, false).await.unwrap();
22192156
let node = nodes.pop().unwrap();
@@ -2260,17 +2197,20 @@ mod tests {
22602197
// prepare L1 notification channel
22612198
let (l1_notification_tx, l1_notification_rx) = mpsc::channel(100);
22622199

2200+
// create mock L1 watcher handle for testing gap recovery
2201+
let mock_l1_watcher_handle = MockL1WatcherHandle::new();
22632202

22642203
// initialize database state
22652204
db.set_latest_l1_block_number(0).await.unwrap();
22662205

2267-
let chain_orchestrator = ChainOrchestrator::new(
2206+
println!("done");
2207+
let (mut chain_orchestrator, handle) = ChainOrchestrator::new(
22682208
db.clone(),
22692209
ChainOrchestratorConfig::new(node.inner.chain_spec().clone(), 0, 0),
22702210
Arc::new(block_client),
22712211
l2_provider,
22722212
l1_notification_rx,
2273-
None, // TODO: set handle
2213+
Some(mock_l1_watcher_handle.clone()),
22742214
network_handle.into_scroll_network().await,
22752215
Box::new(NoopConsensus::default()),
22762216
engine,
@@ -2292,6 +2232,13 @@ mod tests {
22922232
)
22932233
.await
22942234
.unwrap();
2235+
2236+
2237+
// chain_orchestrator.run_until_shutdown(None)
2238+
// TODO: Implement test scenarios:
2239+
// 1. Insert batches with non-sequential indices to trigger gap detection
2240+
// 2. Feed L1 notifications that trigger gap detection
2241+
// 3. Use mock_l1_watcher_handle.assert_reset_to() to verify gap recovery was triggered
22952242
}
22962243

22972244
// Helper function to create a simple test batch commit
@@ -2312,7 +2259,4 @@ mod tests {
23122259
fn create_test_l1_message(queue_index: u64) -> TxL1Message {
23132260
TxL1Message { queue_index, ..Default::default() }
23142261
}
2315-
2316-
#[tokio::test]
2317-
async fn test_batch_commit_gap_triggers_recovery() {}
23182262
}

crates/watcher/src/handle/mod.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,31 @@ use crate::L1Notification;
88
use std::sync::Arc;
99
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
1010

11+
/// Trait for interacting with the L1 Watcher.
12+
///
13+
/// This trait allows the chain orchestrator to send commands to the L1 watcher,
14+
/// primarily for gap recovery scenarios.
15+
#[async_trait::async_trait]
16+
pub trait L1WatcherHandleTrait: Send + Sync + 'static {
17+
/// Reset the L1 Watcher to a specific block number with a fresh notification channel.
18+
///
19+
/// This is used for gap recovery when the chain orchestrator detects missing L1 events.
20+
/// The watcher will reset its state to the specified block and begin sending notifications
21+
/// through the new channel.
22+
///
23+
/// # Arguments
24+
/// * `block` - The L1 block number to reset to
25+
/// * `new_sender` - A fresh channel sender for L1 notifications
26+
///
27+
/// # Returns
28+
/// `Ok(())` if the reset was successful, or an error if the command failed
29+
async fn reset_to_block(
30+
&self,
31+
block: u64,
32+
new_sender: mpsc::Sender<Arc<L1Notification>>,
33+
) -> Result<(), oneshot::error::RecvError>;
34+
}
35+
1136
/// Handle to interact with the L1 Watcher.
1237
#[derive(Debug)]
1338
pub struct L1WatcherHandle {
@@ -45,3 +70,78 @@ impl L1WatcherHandle {
4570
rx.await
4671
}
4772
}
73+
74+
#[async_trait::async_trait]
75+
impl L1WatcherHandleTrait for L1WatcherHandle {
76+
async fn reset_to_block(
77+
&self,
78+
block: u64,
79+
new_sender: mpsc::Sender<Arc<L1Notification>>,
80+
) -> Result<(), oneshot::error::RecvError> {
81+
self.reset_to_block(block, new_sender).await
82+
}
83+
}
84+
85+
#[cfg(any(test, feature = "test-utils"))]
86+
/// Mock implementation of L1WatcherHandleTrait for testing.
87+
///
88+
/// This mock tracks all reset calls for test assertions and always succeeds.
89+
#[derive(Debug, Clone)]
90+
pub struct MockL1WatcherHandle {
91+
/// Track reset calls as (block_number, channel_capacity)
92+
resets: Arc<std::sync::Mutex<Vec<(u64, usize)>>>,
93+
}
94+
95+
#[cfg(any(test, feature = "test-utils"))]
96+
impl MockL1WatcherHandle {
97+
/// Create a new mock handle.
98+
pub fn new() -> Self {
99+
Self {
100+
resets: Arc::new(std::sync::Mutex::new(Vec::new())),
101+
}
102+
}
103+
104+
/// Get all recorded reset calls as (block_number, channel_capacity).
105+
pub fn get_resets(&self) -> Vec<(u64, usize)> {
106+
self.resets.lock().unwrap().clone()
107+
}
108+
109+
/// Assert that reset_to_block was called with the specified block number.
110+
pub fn assert_reset_to(&self, expected_block: u64) {
111+
let resets = self.get_resets();
112+
assert!(
113+
resets.iter().any(|(block, _)| *block == expected_block),
114+
"Expected reset to block {}, but got resets: {:?}",
115+
expected_block,
116+
resets
117+
);
118+
}
119+
120+
/// Assert that no reset calls were made.
121+
pub fn assert_no_resets(&self) {
122+
let resets = self.get_resets();
123+
assert!(
124+
resets.is_empty(),
125+
"Expected no reset calls, but got: {:?}",
126+
resets
127+
);
128+
}
129+
}
130+
#[cfg(any(test, feature = "test-utils"))]
131+
#[async_trait::async_trait]
132+
impl L1WatcherHandleTrait for MockL1WatcherHandle {
133+
async fn reset_to_block(
134+
&self,
135+
block: u64,
136+
new_sender: mpsc::Sender<Arc<L1Notification>>,
137+
) -> Result<(), oneshot::error::RecvError> {
138+
// Track the reset call
139+
self.resets
140+
.lock()
141+
.unwrap()
142+
.push((block, new_sender.max_capacity()));
143+
144+
// Mock always succeeds
145+
Ok(())
146+
}
147+
}

crates/watcher/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ mod error;
44
pub use error::{EthRequestError, FilterLogError, L1WatcherError};
55

66
pub mod handle;
7-
pub use handle::{L1WatcherCommand, L1WatcherHandle};
7+
pub use handle::{L1WatcherCommand, L1WatcherHandle, L1WatcherHandleTrait};
8+
#[cfg(any(test, feature = "test-utils"))]
9+
pub use handle::MockL1WatcherHandle;
810

911
mod metrics;
1012
pub use metrics::WatcherMetrics;

0 commit comments

Comments
 (0)