Skip to content

Commit 6f497e8

Browse files
authored
feat: add revert to l1 block rpc (#463)
* add revert to l1 block rpc * fix tests * fix features * fixes * update fcu logic
1 parent c4927d6 commit 6f497e8

File tree

22 files changed

+473
-134
lines changed

22 files changed

+473
-134
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub enum ChainOrchestratorEvent {
7777
/// The L2 safe block info.
7878
l2_safe_block_info: Option<BlockInfo>,
7979
},
80+
/// The chain has been unwound to the specified L1 block number.
81+
UnwoundToL1Block(u64),
8082
/// The chain orchestrator has synced to the L1 head.
8183
L1Synced,
8284
/// An L2 block has been committed returning the [`L2BlockInfoWithL1Messages`] and an

crates/chain-orchestrator/src/handle/command.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
2727
DisableAutomaticSequencing(oneshot::Sender<bool>),
2828
/// Send a database query to the rollup manager.
2929
DatabaseQuery(DatabaseQuery),
30+
/// Revert the rollup node state to the specified L1 block number.
31+
RevertToL1Block((u64, oneshot::Sender<bool>)),
3032
/// Enable gossiping of blocks to peers.
3133
#[cfg(feature = "test-utils")]
3234
SetGossip((bool, oneshot::Sender<()>)),

crates/chain-orchestrator/src/handle/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,32 @@ use metrics::ChainOrchestratorHandleMetrics;
2222
pub struct ChainOrchestratorHandle<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
2323
/// The channel used to send commands to the rollup manager.
2424
to_manager_tx: mpsc::UnboundedSender<ChainOrchestratorCommand<N>>,
25+
/// The metrics for the handle.
2526
handle_metrics: ChainOrchestratorHandleMetrics,
27+
/// Mock for the L1 Watcher used in tests.
28+
#[cfg(feature = "test-utils")]
29+
pub l1_watcher_mock: Option<rollup_node_watcher::test_utils::L1WatcherMock>,
2630
}
2731

2832
impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHandle<N> {
2933
/// Create a new rollup manager handle.
3034
pub fn new(to_manager_tx: mpsc::UnboundedSender<ChainOrchestratorCommand<N>>) -> Self {
31-
Self { to_manager_tx, handle_metrics: ChainOrchestratorHandleMetrics::default() }
35+
Self {
36+
to_manager_tx,
37+
handle_metrics: ChainOrchestratorHandleMetrics::default(),
38+
#[cfg(feature = "test-utils")]
39+
l1_watcher_mock: None,
40+
}
41+
}
42+
43+
/// Sets the L1 watcher mock for the handle.
44+
#[cfg(feature = "test-utils")]
45+
pub fn with_l1_watcher_mock(
46+
mut self,
47+
l1_watcher_mock: Option<rollup_node_watcher::test_utils::L1WatcherMock>,
48+
) -> Self {
49+
self.l1_watcher_mock = l1_watcher_mock;
50+
self
3251
}
3352

3453
/// Sends a command to the rollup manager.
@@ -103,6 +122,16 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
103122
rx.await
104123
}
105124

125+
/// Revert the rollup node state to the specified L1 block number.
126+
pub async fn revert_to_l1_block(
127+
&self,
128+
block_number: u64,
129+
) -> Result<bool, oneshot::error::RecvError> {
130+
let (tx, rx) = oneshot::channel();
131+
self.send_command(ChainOrchestratorCommand::RevertToL1Block((block_number, tx)));
132+
rx.await
133+
}
134+
106135
/// Sends a command to the rollup manager to enable or disable gossiping of blocks to peers.
107136
#[cfg(feature = "test-utils")]
108137
pub async fn set_gossip(&self, enabled: bool) -> Result<(), oneshot::error::RecvError> {

crates/chain-orchestrator/src/lib.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use rollup_node_primitives::{
2020
use rollup_node_providers::L1MessageProvider;
2121
use rollup_node_sequencer::{Sequencer, SequencerEvent};
2222
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
23-
use rollup_node_watcher::L1Notification;
23+
use rollup_node_watcher::{L1Notification, L1WatcherHandle};
2424
use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
2525
use scroll_alloy_hardforks::ScrollHardforks;
2626
use scroll_alloy_network::Scroll;
@@ -35,7 +35,7 @@ use scroll_network::{
3535
BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent,
3636
};
3737
use std::{collections::VecDeque, sync::Arc, time::Instant, vec};
38-
use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver};
38+
use tokio::sync::mpsc::{self, UnboundedReceiver};
3939

4040
mod config;
4141
pub use config::ChainOrchestratorConfig;
@@ -115,8 +115,8 @@ pub struct ChainOrchestrator<
115115
database: Arc<Database>,
116116
/// The current sync state of the [`ChainOrchestrator`].
117117
sync_state: SyncState,
118-
/// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
119-
l1_notification_rx: Receiver<Arc<L1Notification>>,
118+
/// A handle for the [`rollup_node_watcher::L1Watcher`].
119+
l1_watcher: L1WatcherHandle,
120120
/// The network manager that manages the scroll p2p network.
121121
network: ScrollNetwork<N>,
122122
/// The consensus algorithm used by the rollup node.
@@ -150,7 +150,7 @@ impl<
150150
config: ChainOrchestratorConfig<ChainSpec>,
151151
block_client: Arc<FullBlockClient<<N as BlockDownloaderProvider>::Client>>,
152152
l2_provider: L2P,
153-
l1_notification_rx: Receiver<Arc<L1Notification>>,
153+
l1_watcher: L1WatcherHandle,
154154
network: ScrollNetwork<N>,
155155
consensus: Box<dyn Consensus + 'static>,
156156
engine: Engine<EC>,
@@ -167,7 +167,7 @@ impl<
167167
database,
168168
config,
169169
sync_state: SyncState::default(),
170-
l1_notification_rx,
170+
l1_watcher,
171171
network,
172172
consensus,
173173
engine,
@@ -224,7 +224,7 @@ impl<
224224
let res = self.handle_network_event(event).await;
225225
self.handle_outcome(res);
226226
}
227-
Some(notification) = self.l1_notification_rx.recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
227+
Some(notification) = self.l1_watcher.l1_notification_receiver().recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
228228
let res = self.handle_l1_notification(notification).await;
229229
self.handle_outcome(res);
230230
}
@@ -401,6 +401,30 @@ impl<
401401
let _ = sender.send(l1_message);
402402
}
403403
},
404+
ChainOrchestratorCommand::RevertToL1Block((block_number, tx)) => {
405+
self.sync_state.l1_mut().set_syncing();
406+
let unwind_result = self.database.unwind(block_number).await?;
407+
408+
// Check if the unwind impacts the fcs safe head.
409+
if let Some(block_info) = unwind_result.l2_safe_block_info {
410+
// If the new safe head is above the current finalized head, update the fcs safe
411+
// head to the new safe head.
412+
if block_info.number >= self.engine.fcs().finalized_block_info().number {
413+
self.engine.update_fcs(None, Some(block_info), None).await?;
414+
} else {
415+
// Otherwise, update the fcs safe head to the finalized head.
416+
self.engine
417+
.update_fcs(None, Some(*self.engine.fcs().finalized_block_info()), None)
418+
.await?;
419+
}
420+
}
421+
422+
// Revert the L1 watcher to the specified block.
423+
self.l1_watcher.revert_to_l1_block(block_number);
424+
425+
self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number));
426+
let _ = tx.send(true);
427+
}
404428
#[cfg(feature = "test-utils")]
405429
ChainOrchestratorCommand::SetGossip((enabled, tx)) => {
406430
self.network.handle().set_gossip(enabled).await;

crates/node/src/add_ons/handle.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider};
44
use reth_rpc_eth_api::EthApiTypes;
55
use reth_scroll_node::ScrollNetworkPrimitives;
66
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
7-
#[cfg(feature = "test-utils")]
8-
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};
97

108
/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
119
#[derive(Debug, Clone)]
@@ -17,9 +15,6 @@ pub struct ScrollAddOnsHandle<
1715
pub rollup_manager_handle: ChainOrchestratorHandle<Node::Network>,
1816
/// The handle used to send commands to the RPC server.
1917
pub rpc_handle: RpcHandle<Node, EthApi>,
20-
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
21-
#[cfg(feature = "test-utils")]
22-
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
2318
}
2419

2520
impl<

crates/node/src/add_ons/mod.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ where
152152
});
153153

154154
let rpc_handle = rpc_add_ons.launch_add_ons_with(ctx.clone(), |_| Ok(())).await?;
155-
let (rollup_manager_handle, l1_watcher_tx) =
155+
let rollup_manager_handle =
156156
rollup_node_manager_addon.launch(ctx.clone(), rpc_handle.clone()).await?;
157157

158158
// Only send handle if RPC is enabled
@@ -161,12 +161,7 @@ where
161161
.map_err(|_| eyre::eyre!("failed to send rollup manager handle"))?;
162162
}
163163

164-
Ok(ScrollAddOnsHandle {
165-
rollup_manager_handle,
166-
rpc_handle,
167-
#[cfg(feature = "test-utils")]
168-
l1_watcher_tx,
169-
})
164+
Ok(ScrollAddOnsHandle { rollup_manager_handle, rpc_handle })
170165
}
171166
}
172167

crates/node/src/add_ons/rollup.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@ use reth_rpc_eth_api::EthApiTypes;
99
use reth_scroll_chainspec::{ChainConfig, ScrollChainConfig, ScrollChainSpec};
1010
use reth_scroll_node::ScrollNetworkPrimitives;
1111
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
12-
use rollup_node_watcher::L1Notification;
1312
use scroll_alloy_hardforks::ScrollHardforks;
1413
use scroll_wire::ScrollWireEvent;
15-
use std::sync::Arc;
16-
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
14+
use tokio::sync::mpsc::UnboundedReceiver;
1715

1816
/// Implementing the trait allows the type to return whether it is configured for dev chain.
1917
#[auto_impl::auto_impl(Arc)]
@@ -55,13 +53,13 @@ impl RollupManagerAddOn {
5553
self,
5654
ctx: AddOnsContext<'_, N>,
5755
rpc: RpcHandle<N, EthApi>,
58-
) -> eyre::Result<(ChainOrchestratorHandle<N::Network>, Option<Sender<Arc<L1Notification>>>)>
56+
) -> eyre::Result<ChainOrchestratorHandle<N::Network>>
5957
where
6058
<<N as FullNodeTypes>::Types as NodeTypes>::ChainSpec:
6159
ChainConfig<Config = ScrollChainConfig> + ScrollHardforks + IsDevChain,
6260
N::Network: NetworkProtocols + FullNetwork<Primitives = ScrollNetworkPrimitives>,
6361
{
64-
let (chain_orchestrator, handle, l1_notification_tx) = self
62+
let (chain_orchestrator, handle) = self
6563
.config
6664
.build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles)
6765
.await?;
@@ -70,6 +68,6 @@ impl RollupManagerAddOn {
7068
.spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| {
7169
chain_orchestrator.run_until_shutdown(shutdown)
7270
});
73-
Ok((handle, l1_notification_tx))
71+
Ok(handle)
7472
}
7573
}

crates/node/src/add_ons/rpc.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ pub trait RollupNodeAdminApi {
115115
/// Disables automatic sequencing in the rollup node.
116116
#[method(name = "disableAutomaticSequencing")]
117117
async fn disable_automatic_sequencing(&self) -> RpcResult<bool>;
118+
119+
/// Reverts the rollup node state to a specified L1 block number.
120+
#[method(name = "revertToL1Block")]
121+
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool>;
118122
}
119123

120124
#[async_trait]
@@ -220,6 +224,24 @@ where
220224
)
221225
})
222226
}
227+
228+
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool> {
229+
let handle = self.rollup_manager_handle().await.map_err(|e| {
230+
ErrorObjectOwned::owned(
231+
error::INTERNAL_ERROR_CODE,
232+
format!("Failed to get rollup manager handle: {}", e),
233+
None::<()>,
234+
)
235+
})?;
236+
237+
handle.revert_to_l1_block(block_number).await.map_err(|e| {
238+
ErrorObjectOwned::owned(
239+
error::INTERNAL_ERROR_CODE,
240+
format!("Failed to revert to L1 block {}: {}", block_number, e),
241+
None::<()>,
242+
)
243+
})
244+
}
223245
}
224246

225247
// Implement RollupNodeApiServer for Arc<RollupNodeRpcExt<N>> to allow shared ownership
@@ -257,4 +279,8 @@ where
257279
async fn disable_automatic_sequencing(&self) -> RpcResult<bool> {
258280
(**self).disable_automatic_sequencing().await
259281
}
282+
283+
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool> {
284+
(**self).revert_to_l1_block(block_number).await
285+
}
260286
}

crates/node/src/args.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use rollup_node_providers::{
3838
use rollup_node_sequencer::{
3939
L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig,
4040
};
41-
use rollup_node_watcher::{L1Notification, L1Watcher};
41+
use rollup_node_watcher::{L1Watcher, L1WatcherHandle};
4242
use scroll_alloy_hardforks::ScrollHardforks;
4343
use scroll_alloy_network::Scroll;
4444
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
@@ -51,7 +51,7 @@ use scroll_engine::{Engine, ForkchoiceState};
5151
use scroll_migration::traits::ScrollMigrator;
5252
use scroll_network::ScrollNetworkManager;
5353
use scroll_wire::ScrollWireEvent;
54-
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
54+
use tokio::sync::mpsc::UnboundedReceiver;
5555

5656
/// A struct that represents the arguments for the rollup node.
5757
#[derive(Debug, Clone, clap::Args)]
@@ -166,7 +166,6 @@ impl ScrollRollupNodeConfig {
166166
impl ScrollEngineApi,
167167
>,
168168
ChainOrchestratorHandle<N>,
169-
Option<Sender<Arc<L1Notification>>>,
170169
)>
171170
where
172171
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + NetworkProtocols,
@@ -350,7 +349,14 @@ impl ScrollRollupNodeConfig {
350349
};
351350
let consensus = self.consensus_args.consensus(authorized_signer)?;
352351

353-
let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
352+
// Define some types to support definitions of return type of following function in no_std.
353+
#[cfg(feature = "test-utils")]
354+
type L1WatcherMockOpt = Option<rollup_node_watcher::test_utils::L1WatcherMock>;
355+
356+
#[cfg(not(feature = "test-utils"))]
357+
type L1WatcherMockOpt = Option<std::convert::Infallible>;
358+
359+
let (_l1_watcher_mock, l1_watcher_handle): (L1WatcherMockOpt, Option<L1WatcherHandle>) =
354360
if let Some(provider) = l1_provider.filter(|_| !self.test) {
355361
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
356362
(
@@ -370,8 +376,15 @@ impl ScrollRollupNodeConfig {
370376
// testing
371377
#[cfg(feature = "test-utils")]
372378
{
373-
let (tx, rx) = tokio::sync::mpsc::channel(1000);
374-
(Some(tx), Some(rx))
379+
let (notification_tx, notification_rx) = tokio::sync::mpsc::channel(1000);
380+
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
381+
let handle =
382+
rollup_node_watcher::L1WatcherHandle::new(command_tx, notification_rx);
383+
let watcher_mock = rollup_node_watcher::test_utils::L1WatcherMock {
384+
command_rx: Arc::new(tokio::sync::Mutex::new(command_rx)),
385+
notification_tx,
386+
};
387+
(Some(watcher_mock), Some(handle))
375388
}
376389

377390
#[cfg(not(feature = "test-utils"))]
@@ -457,7 +470,7 @@ impl ScrollRollupNodeConfig {
457470
config,
458471
Arc::new(block_client),
459472
l2_provider,
460-
l1_notification_rx.expect("L1 notification receiver should be set"),
473+
l1_watcher_handle.expect("L1 notification receiver should be set"),
461474
scroll_network_handle.into_scroll_network().await,
462475
consensus,
463476
engine,
@@ -467,7 +480,10 @@ impl ScrollRollupNodeConfig {
467480
)
468481
.await?;
469482

470-
Ok((chain_orchestrator, handle, l1_notification_tx))
483+
#[cfg(feature = "test-utils")]
484+
let handle = handle.with_l1_watcher_mock(_l1_watcher_mock);
485+
486+
Ok((chain_orchestrator, handle))
471487
}
472488
}
473489

crates/node/src/test_utils/event_utils.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ impl<'a> EventWaiter<'a> {
130130
Ok(())
131131
}
132132

133+
/// Wait for chain unwound event on all specified nodes.
134+
pub async fn revert_to_l1_block(self) -> eyre::Result<()> {
135+
self.wait_for_event_on_all(|e| {
136+
matches!(e, ChainOrchestratorEvent::UnwoundToL1Block(_)).then_some(())
137+
})
138+
.await?;
139+
Ok(())
140+
}
141+
133142
/// Wait for block consolidated event on all specified nodes.
134143
pub async fn block_consolidated(self, target_block: u64) -> eyre::Result<()> {
135144
self.wait_for_event_on_all(|e| {

0 commit comments

Comments
 (0)