Skip to content

Commit e008127

Browse files
committed
add revert to l1 block rpc
1 parent c4927d6 commit e008127

File tree

21 files changed

+417
-119
lines changed

21 files changed

+417
-119
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub enum ChainOrchestratorEvent {
7777
/// The L2 safe block info.
7878
l2_safe_block_info: Option<BlockInfo>,
7979
},
80+
/// The chain has been unwound to the specified L1 block number.
81+
UnwoundToL1Block(u64),
8082
/// The chain orchestrator has synced to the L1 head.
8183
L1Synced,
8284
/// An L2 block has been committed returning the [`L2BlockInfoWithL1Messages`] and an

crates/chain-orchestrator/src/handle/command.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
2727
DisableAutomaticSequencing(oneshot::Sender<bool>),
2828
/// Send a database query to the rollup manager.
2929
DatabaseQuery(DatabaseQuery),
30+
/// Revert the rollup node state to the specified L1 block number.
31+
RevertToL1Block((u64, oneshot::Sender<bool>)),
3032
/// Enable gossiping of blocks to peers.
3133
#[cfg(feature = "test-utils")]
3234
SetGossip((bool, oneshot::Sender<()>)),

crates/chain-orchestrator/src/handle/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
103103
rx.await
104104
}
105105

106+
/// Revert the rollup node state to the specified L1 block number.
107+
pub async fn revert_to_l1_block(
108+
&self,
109+
block_number: u64,
110+
) -> Result<bool, oneshot::error::RecvError> {
111+
let (tx, rx) = oneshot::channel();
112+
self.send_command(ChainOrchestratorCommand::RevertToL1Block((block_number, tx)));
113+
rx.await
114+
}
115+
106116
/// Sends a command to the rollup manager to enable or disable gossiping of blocks to peers.
107117
#[cfg(feature = "test-utils")]
108118
pub async fn set_gossip(&self, enabled: bool) -> Result<(), oneshot::error::RecvError> {

crates/chain-orchestrator/src/lib.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use rollup_node_primitives::{
2020
use rollup_node_providers::L1MessageProvider;
2121
use rollup_node_sequencer::{Sequencer, SequencerEvent};
2222
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
23-
use rollup_node_watcher::L1Notification;
23+
use rollup_node_watcher::{L1Notification, L1WatcherHandle};
2424
use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
2525
use scroll_alloy_hardforks::ScrollHardforks;
2626
use scroll_alloy_network::Scroll;
@@ -35,7 +35,7 @@ use scroll_network::{
3535
BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent,
3636
};
3737
use std::{collections::VecDeque, sync::Arc, time::Instant, vec};
38-
use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver};
38+
use tokio::sync::mpsc::{self, UnboundedReceiver};
3939

4040
mod config;
4141
pub use config::ChainOrchestratorConfig;
@@ -115,8 +115,8 @@ pub struct ChainOrchestrator<
115115
database: Arc<Database>,
116116
/// The current sync state of the [`ChainOrchestrator`].
117117
sync_state: SyncState,
118-
/// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
119-
l1_notification_rx: Receiver<Arc<L1Notification>>,
118+
/// A handle for the [`rollup_node_watcher::L1Watcher`].
119+
l1_watcher: L1WatcherHandle,
120120
/// The network manager that manages the scroll p2p network.
121121
network: ScrollNetwork<N>,
122122
/// The consensus algorithm used by the rollup node.
@@ -150,7 +150,7 @@ impl<
150150
config: ChainOrchestratorConfig<ChainSpec>,
151151
block_client: Arc<FullBlockClient<<N as BlockDownloaderProvider>::Client>>,
152152
l2_provider: L2P,
153-
l1_notification_rx: Receiver<Arc<L1Notification>>,
153+
l1_watcher: L1WatcherHandle,
154154
network: ScrollNetwork<N>,
155155
consensus: Box<dyn Consensus + 'static>,
156156
engine: Engine<EC>,
@@ -167,7 +167,7 @@ impl<
167167
database,
168168
config,
169169
sync_state: SyncState::default(),
170-
l1_notification_rx,
170+
l1_watcher,
171171
network,
172172
consensus,
173173
engine,
@@ -224,7 +224,7 @@ impl<
224224
let res = self.handle_network_event(event).await;
225225
self.handle_outcome(res);
226226
}
227-
Some(notification) = self.l1_notification_rx.recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
227+
Some(notification) = self.l1_watcher.l1_notification_receiver().recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
228228
let res = self.handle_l1_notification(notification).await;
229229
self.handle_outcome(res);
230230
}
@@ -401,6 +401,25 @@ impl<
401401
let _ = sender.send(l1_message);
402402
}
403403
},
404+
ChainOrchestratorCommand::RevertToL1Block((block_number, tx)) => {
405+
self.sync_state.l1_mut().set_syncing();
406+
let unwind_result = self.database.unwind(block_number).await?;
407+
408+
println!("Unwind result: {:?}", unwind_result);
409+
410+
// Check if the unwind impacts the fcs safe head.
411+
if let Some(block_info) = unwind_result.l2_safe_block_info {
412+
// If the safe head was unwound and is above or equal to the finalized head,
413+
// update the fcs.
414+
if block_info.number != self.engine.fcs().safe_block_info().number &&
415+
block_info.number >= self.engine.fcs().finalized_block_info().number
416+
{
417+
self.engine.update_fcs(None, Some(block_info), None).await?;
418+
}
419+
}
420+
self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number));
421+
let _ = tx.send(true);
422+
}
404423
#[cfg(feature = "test-utils")]
405424
ChainOrchestratorCommand::SetGossip((enabled, tx)) => {
406425
self.network.handle().set_gossip(enabled).await;

crates/database/db/src/operations.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
852852
// delete batch commits, l1 messages and batch finalization effects greater than the
853853
// provided l1 block number
854854
let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?;
855+
println!("Deleted {} batches", batches_removed);
855856
let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?;
856857
self.delete_batch_finalization_gt_block_number(l1_block_number).await?;
857858
let batch_reverts_removed: u64 =

crates/node/src/add_ons/handle.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
#[cfg(feature = "test-utils")]
2+
use crate::test_utils::l1_helpers::L1WatcherMock;
13
use reth_network_api::FullNetwork;
24
use reth_node_api::FullNodeComponents;
35
use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider};
46
use reth_rpc_eth_api::EthApiTypes;
57
use reth_scroll_node::ScrollNetworkPrimitives;
68
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
7-
#[cfg(feature = "test-utils")]
8-
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};
99

1010
/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
1111
#[derive(Debug, Clone)]
@@ -19,7 +19,7 @@ pub struct ScrollAddOnsHandle<
1919
pub rpc_handle: RpcHandle<Node, EthApi>,
2020
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
2121
#[cfg(feature = "test-utils")]
22-
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
22+
pub l1_watcher_tx: Option<L1WatcherMock>,
2323
}
2424

2525
impl<

crates/node/src/add_ons/rollup.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::args::ScrollRollupNodeConfig;
1+
use crate::{args::ScrollRollupNodeConfig, test_utils::l1_helpers::L1WatcherMock};
22

33
use reth_chainspec::NamedChain;
44
use reth_network::NetworkProtocols;
@@ -9,11 +9,9 @@ use reth_rpc_eth_api::EthApiTypes;
99
use reth_scroll_chainspec::{ChainConfig, ScrollChainConfig, ScrollChainSpec};
1010
use reth_scroll_node::ScrollNetworkPrimitives;
1111
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
12-
use rollup_node_watcher::L1Notification;
1312
use scroll_alloy_hardforks::ScrollHardforks;
1413
use scroll_wire::ScrollWireEvent;
15-
use std::sync::Arc;
16-
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
14+
use tokio::sync::mpsc::UnboundedReceiver;
1715

1816
/// Implementing the trait allows the type to return whether it is configured for dev chain.
1917
#[auto_impl::auto_impl(Arc)]
@@ -55,13 +53,13 @@ impl RollupManagerAddOn {
5553
self,
5654
ctx: AddOnsContext<'_, N>,
5755
rpc: RpcHandle<N, EthApi>,
58-
) -> eyre::Result<(ChainOrchestratorHandle<N::Network>, Option<Sender<Arc<L1Notification>>>)>
56+
) -> eyre::Result<(ChainOrchestratorHandle<N::Network>, Option<L1WatcherMock>)>
5957
where
6058
<<N as FullNodeTypes>::Types as NodeTypes>::ChainSpec:
6159
ChainConfig<Config = ScrollChainConfig> + ScrollHardforks + IsDevChain,
6260
N::Network: NetworkProtocols + FullNetwork<Primitives = ScrollNetworkPrimitives>,
6361
{
64-
let (chain_orchestrator, handle, l1_notification_tx) = self
62+
let (chain_orchestrator, handle, l1_watcher_mock) = self
6563
.config
6664
.build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles)
6765
.await?;
@@ -70,6 +68,6 @@ impl RollupManagerAddOn {
7068
.spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| {
7169
chain_orchestrator.run_until_shutdown(shutdown)
7270
});
73-
Ok((handle, l1_notification_tx))
71+
Ok((handle, l1_watcher_mock))
7472
}
7573
}

crates/node/src/add_ons/rpc.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ pub trait RollupNodeAdminApi {
115115
/// Disables automatic sequencing in the rollup node.
116116
#[method(name = "disableAutomaticSequencing")]
117117
async fn disable_automatic_sequencing(&self) -> RpcResult<bool>;
118+
119+
/// Reverts the rollup node state to a specified L1 block number.
120+
#[method(name = "revertToL1Block")]
121+
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool>;
118122
}
119123

120124
#[async_trait]
@@ -220,6 +224,24 @@ where
220224
)
221225
})
222226
}
227+
228+
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool> {
229+
let handle = self.rollup_manager_handle().await.map_err(|e| {
230+
ErrorObjectOwned::owned(
231+
error::INTERNAL_ERROR_CODE,
232+
format!("Failed to get rollup manager handle: {}", e),
233+
None::<()>,
234+
)
235+
})?;
236+
237+
handle.revert_to_l1_block(block_number).await.map_err(|e| {
238+
ErrorObjectOwned::owned(
239+
error::INTERNAL_ERROR_CODE,
240+
format!("Failed to revert to L1 block {}: {}", block_number, e),
241+
None::<()>,
242+
)
243+
})
244+
}
223245
}
224246

225247
// Implement RollupNodeApiServer for Arc<RollupNodeRpcExt<N>> to allow shared ownership
@@ -257,4 +279,8 @@ where
257279
async fn disable_automatic_sequencing(&self) -> RpcResult<bool> {
258280
(**self).disable_automatic_sequencing().await
259281
}
282+
283+
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool> {
284+
(**self).revert_to_l1_block(block_number).await
285+
}
260286
}

crates/node/src/args.rs

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
use scroll_migration::MigratorTrait;
77
use std::{fs, path::PathBuf, sync::Arc};
88

9+
use super::test_utils::l1_helpers::L1WatcherMock;
910
use alloy_chains::NamedChain;
1011
use alloy_primitives::{hex, Address, U128};
1112
use alloy_provider::{layers::CacheLayer, Provider, ProviderBuilder};
@@ -38,7 +39,7 @@ use rollup_node_providers::{
3839
use rollup_node_sequencer::{
3940
L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig,
4041
};
41-
use rollup_node_watcher::{L1Notification, L1Watcher};
42+
use rollup_node_watcher::{L1Watcher, L1WatcherHandle};
4243
use scroll_alloy_hardforks::ScrollHardforks;
4344
use scroll_alloy_network::Scroll;
4445
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
@@ -51,7 +52,7 @@ use scroll_engine::{Engine, ForkchoiceState};
5152
use scroll_migration::traits::ScrollMigrator;
5253
use scroll_network::ScrollNetworkManager;
5354
use scroll_wire::ScrollWireEvent;
54-
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
55+
use tokio::sync::mpsc::UnboundedReceiver;
5556

5657
/// A struct that represents the arguments for the rollup node.
5758
#[derive(Debug, Clone, clap::Args)]
@@ -166,7 +167,7 @@ impl ScrollRollupNodeConfig {
166167
impl ScrollEngineApi,
167168
>,
168169
ChainOrchestratorHandle<N>,
169-
Option<Sender<Arc<L1Notification>>>,
170+
Option<L1WatcherMock>,
170171
)>
171172
where
172173
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + NetworkProtocols,
@@ -350,35 +351,42 @@ impl ScrollRollupNodeConfig {
350351
};
351352
let consensus = self.consensus_args.consensus(authorized_signer)?;
352353

353-
let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
354-
if let Some(provider) = l1_provider.filter(|_| !self.test) {
355-
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
356-
(
357-
None,
358-
Some(
359-
L1Watcher::spawn(
360-
provider,
361-
l1_block_startup_info,
362-
node_config,
363-
self.l1_provider_args.logs_query_block_range,
364-
)
365-
.await,
366-
),
367-
)
368-
} else {
369-
// Create a channel for L1 notifications that we can use to inject L1 messages for
370-
// testing
371-
#[cfg(feature = "test-utils")]
372-
{
373-
let (tx, rx) = tokio::sync::mpsc::channel(1000);
374-
(Some(tx), Some(rx))
375-
}
376-
377-
#[cfg(not(feature = "test-utils"))]
378-
{
379-
(None, None)
380-
}
381-
};
354+
let (l1_watcher_mock, l1_watcher_handle) = if let Some(provider) =
355+
l1_provider.filter(|_| !self.test)
356+
{
357+
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
358+
(
359+
None,
360+
Some(
361+
L1Watcher::spawn(
362+
provider,
363+
l1_block_startup_info,
364+
node_config,
365+
self.l1_provider_args.logs_query_block_range,
366+
)
367+
.await,
368+
),
369+
)
370+
} else {
371+
// Create a channel for L1 notifications that we can use to inject L1 messages for
372+
// testing
373+
#[cfg(feature = "test-utils")]
374+
{
375+
let (notification_tx, notification_rx) = tokio::sync::mpsc::channel(1000);
376+
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
377+
let handle = L1WatcherHandle::new(command_tx, notification_rx);
378+
let watcher_mock = L1WatcherMock {
379+
command_rx: Arc::new(tokio::sync::Mutex::new(command_rx)),
380+
notification_tx,
381+
};
382+
(Some(watcher_mock), Some(handle))
383+
}
384+
385+
#[cfg(not(feature = "test-utils"))]
386+
{
387+
(None, None)
388+
}
389+
};
382390

383391
// Construct the l1 provider.
384392
let l1_messages_provider = db.clone();
@@ -457,7 +465,7 @@ impl ScrollRollupNodeConfig {
457465
config,
458466
Arc::new(block_client),
459467
l2_provider,
460-
l1_notification_rx.expect("L1 notification receiver should be set"),
468+
l1_watcher_handle.expect("L1 notification receiver should be set"),
461469
scroll_network_handle.into_scroll_network().await,
462470
consensus,
463471
engine,
@@ -467,7 +475,7 @@ impl ScrollRollupNodeConfig {
467475
)
468476
.await?;
469477

470-
Ok((chain_orchestrator, handle, l1_notification_tx))
478+
Ok((chain_orchestrator, handle, l1_watcher_mock))
471479
}
472480
}
473481

crates/node/src/test_utils/event_utils.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ impl<'a> EventWaiter<'a> {
130130
Ok(())
131131
}
132132

133+
/// Wait for chain unwound event on all specified nodes.
134+
pub async fn revert_to_l1_block(self) -> eyre::Result<()> {
135+
self.wait_for_event_on_all(|e| {
136+
matches!(e, ChainOrchestratorEvent::UnwoundToL1Block(_)).then_some(())
137+
})
138+
.await?;
139+
Ok(())
140+
}
141+
133142
/// Wait for block consolidated event on all specified nodes.
134143
pub async fn block_consolidated(self, target_block: u64) -> eyre::Result<()> {
135144
self.wait_for_event_on_all(|e| {

0 commit comments

Comments
 (0)