Skip to content

Commit ccad3cb

Browse files
committed
clean up
1 parent c1a0500 commit ccad3cb

File tree

13 files changed

+131
-64
lines changed

13 files changed

+131
-64
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ impl<
439439
let finalized_block_info = batch_reconciliation_result
440440
.target_status
441441
.is_finalized()
442-
.then(|| block_info.block_info);
442+
.then_some(block_info.block_info);
443443
self.engine
444444
.update_fcs(None, Some(block_info.block_info), finalized_block_info)
445445
.await?;
@@ -478,7 +478,7 @@ impl<
478478
let finalized_block_info = batch_reconciliation_result
479479
.target_status
480480
.is_finalized()
481-
.then(|| block_info.block_info);
481+
.then_some(block_info.block_info);
482482
self.engine
483483
.update_fcs(
484484
Some(block_info.block_info),
@@ -750,7 +750,7 @@ impl<
750750
self.engine.update_fcs(None, None, finalized_block_info).await?;
751751
}
752752

753-
for batch in triggered_batches.iter() {
753+
for batch in &triggered_batches {
754754
self.derivation_pipeline.push_batch(*batch, BatchStatus::Finalized).await;
755755
}
756756

crates/database/db/src/db.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use crate::{
88
};
99
use alloy_primitives::{Signature, B256};
1010
use rollup_node_primitives::{
11-
BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1MessageEnvelope,
12-
L2BlockInfoWithL1Messages,
11+
BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1BlockStartupInfo,
12+
L1MessageEnvelope, L2BlockInfoWithL1Messages,
1313
};
1414
use scroll_alloy_rpc_types_engine::BlockDataHint;
1515
use sea_orm::{
@@ -375,11 +375,11 @@ impl DatabaseWriteOperations for Database {
375375
)
376376
}
377377

378-
async fn prepare_on_startup(&self) -> Result<(Vec<BlockInfo>, Option<u64>), DatabaseError> {
378+
async fn prepare_l1_watcher_start_info(&self) -> Result<L1BlockStartupInfo, DatabaseError> {
379379
metered!(
380380
DatabaseOperation::PrepareOnStartup,
381381
self,
382-
tx_mut(move |tx| async move { tx.prepare_on_startup().await })
382+
tx_mut(move |tx| async move { tx.prepare_l1_watcher_start_info().await })
383383
)
384384
}
385385

@@ -1572,10 +1572,10 @@ mod test {
15721572
assert_eq!(retried_block_4, block_4);
15731573

15741574
// Call prepare_on_startup which should not error
1575-
let result = db.prepare_on_startup().await.unwrap();
1575+
let result = db.prepare_l1_watcher_start_info().await.unwrap();
15761576

15771577
// verify the result
1578-
assert_eq!(result, (vec![l1_block_info_3], Some(l1_block_info_3.number)));
1578+
assert_eq!(result, L1BlockStartupInfo::UnsafeBlocks(vec![l1_block_info_3]));
15791579
}
15801580

15811581
#[tokio::test]

crates/database/db/src/operations.rs

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{ReadConnectionProvider, WriteConnectionProvider};
44
use alloy_primitives::{Signature, B256};
55
use rollup_node_primitives::{
66
BatchCommitData, BatchConsolidationOutcome, BatchInfo, BatchStatus, BlockInfo,
7-
L1MessageEnvelope, L2BlockInfoWithL1Messages, Metadata,
7+
L1BlockStartupInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, Metadata,
88
};
99
use scroll_alloy_rpc_types_engine::BlockDataHint;
1010
use sea_orm::{
@@ -118,16 +118,8 @@ pub trait DatabaseWriteOperations {
118118
l1_block_number: u64,
119119
) -> Result<Vec<L1MessageEnvelope>, DatabaseError>;
120120

121-
/// Prepare the database on startup and return metadata used for other components in the
122-
/// rollup-node.
123-
///
124-
/// This method first unwinds the database to the finalized L1 block. It then fetches the batch
125-
/// info for the latest safe L2 block. It takes note of the L1 block number at which
126-
/// this batch was produced (currently the finalized block for the batch until we implement
127-
/// issue #273). It then retrieves the latest block for the previous batch (i.e., the batch
128-
/// before the latest safe block). It returns a tuple of this latest fetched block and the
129-
/// L1 block number of the batch.
130-
async fn prepare_on_startup(&self) -> Result<(Vec<BlockInfo>, Option<u64>), DatabaseError>;
121+
/// Returns the L1 block info required to start the L1 watcher on startup.
122+
async fn prepare_l1_watcher_start_info(&self) -> Result<L1BlockStartupInfo, DatabaseError>;
131123

132124
/// Delete all L2 blocks with a block number greater than the provided block number.
133125
async fn delete_l2_blocks_gt_block_number(
@@ -598,7 +590,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
598590
Ok(removed_messages.into_iter().map(Into::into).collect())
599591
}
600592

601-
async fn prepare_on_startup(&self) -> Result<(Vec<BlockInfo>, Option<u64>), DatabaseError> {
593+
async fn prepare_l1_watcher_start_info(&self) -> Result<L1BlockStartupInfo, DatabaseError> {
602594
tracing::trace!(target: "scroll::db", "Fetching startup safe block from database.");
603595

604596
// set all batches with processing status back to committed
@@ -608,7 +600,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
608600
let l1_block_infos = self.get_l1_block_info().await?;
609601
let latest_l1_block_info = self.get_latest_indexed_event_l1_block_number().await?;
610602

611-
Ok((l1_block_infos, latest_l1_block_info))
603+
Ok(L1BlockStartupInfo::new(l1_block_infos, latest_l1_block_info))
612604
}
613605

614606
async fn delete_l2_blocks_gt_block_number(
@@ -1041,13 +1033,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
10411033
.into_tuple::<(Option<i64>, Option<i64>, Option<i64>)>()
10421034
.one(self.get_connection())
10431035
.await?
1044-
.map(|(block_number, finalized_block_number, reverted_block_number)| {
1045-
[block_number, finalized_block_number, reverted_block_number]
1046-
.into_iter()
1047-
.flatten()
1048-
.max()
1049-
})
1050-
.flatten();
1036+
.and_then(|tuple| <[Option<i64>; 3]>::from(tuple).into_iter().flatten().max());
10511037

10521038
let latest_l1_block_number =
10531039
[latest_l1_message, latest_batch_event].into_iter().flatten().max();

crates/derivation-pipeline/benches/pipeline.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ fn benchmark_pipeline_derivation_in_file_blobs(c: &mut Criterion) {
127127
// commit 253 batches.
128128
for index in BATCHES_START_INDEX..=BATCHES_STOP_INDEX {
129129
let batch_info = BatchInfo { index, hash: Default::default() };
130-
pipeline.push_batch(batch_info.into(), BatchStatus::Committed).await;
130+
pipeline.push_batch(batch_info, BatchStatus::Committed).await;
131131
}
132132

133133
tx.send(pipeline).unwrap();
@@ -163,7 +163,7 @@ fn benchmark_pipeline_derivation_s3_blobs(c: &mut Criterion) {
163163
// commit 15 batches.
164164
for index in BATCHES_START_INDEX..=BATCHES_START_INDEX + 15 {
165165
let batch_info = BatchInfo { index, hash: Default::default() };
166-
pipeline.push_batch(batch_info.clone(), BatchStatus::Committed).await;
166+
pipeline.push_batch(batch_info, BatchStatus::Committed).await;
167167
}
168168

169169
tx.send(pipeline).unwrap();

crates/derivation-pipeline/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ mod tests {
515515
// as long as we don't call `push_batch`, pipeline should not return attributes.
516516
pipeline
517517
.push_batch(
518-
BatchInfo { index: 12, hash: Default::default() }.into(),
518+
BatchInfo { index: 12, hash: Default::default() },
519519
BatchStatus::Consolidated,
520520
)
521521
.await;

crates/node/src/args.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,16 +258,16 @@ impl ScrollRollupNodeConfig {
258258
let mut fcs =
259259
ForkchoiceState::from_provider(&l2_provider).await.unwrap_or_else(chain_spec_fcs);
260260

261-
let (l1_start_block_number, mut l2_head_block_number) = db
261+
let (l1_block_startup_info, mut l2_head_block_number) = db
262262
.tx_mut(move |tx| async move {
263263
// On startup we replay the latest batch of blocks from the database as such we set
264264
// the safe block hash to the latest block hash associated with the
265265
// previous consolidated batch in the database.
266-
let (_startup_safe_block, l1_start_block_number) = tx.prepare_on_startup().await?;
266+
let l1_block_startup_info = tx.prepare_l1_watcher_start_info().await?;
267267

268268
let l2_head_block_number = tx.get_l2_head_block_number().await?;
269269

270-
Ok::<_, DatabaseError>((l1_start_block_number, l2_head_block_number))
270+
Ok::<_, DatabaseError>((l1_block_startup_info, l2_head_block_number))
271271
})
272272
.await?;
273273

@@ -342,13 +342,13 @@ impl ScrollRollupNodeConfig {
342342

343343
let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
344344
if let Some(provider) = l1_provider.filter(|_| !self.test) {
345-
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
345+
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
346346
(
347347
None,
348348
Some(
349349
L1Watcher::spawn(
350350
provider,
351-
l1_start_block_number,
351+
l1_block_startup_info,
352352
node_config,
353353
self.l1_provider_args.logs_query_block_range,
354354
)

crates/primitives/src/batch.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,11 @@ impl BatchStatus {
8686
impl core::fmt::Display for BatchStatus {
8787
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
8888
match self {
89-
BatchStatus::Committed => write!(f, "committed"),
90-
BatchStatus::Processing => write!(f, "processing"),
91-
BatchStatus::Consolidated => write!(f, "consolidated"),
92-
BatchStatus::Reverted => write!(f, "reverted"),
93-
BatchStatus::Finalized => write!(f, "finalized"),
89+
Self::Committed => write!(f, "committed"),
90+
Self::Processing => write!(f, "processing"),
91+
Self::Consolidated => write!(f, "consolidated"),
92+
Self::Reverted => write!(f, "reverted"),
93+
Self::Finalized => write!(f, "finalized"),
9494
}
9595
}
9696
}
@@ -100,11 +100,11 @@ impl core::str::FromStr for BatchStatus {
100100

101101
fn from_str(s: &str) -> Result<Self, Self::Err> {
102102
match s {
103-
"committed" => Ok(BatchStatus::Committed),
104-
"processing" => Ok(BatchStatus::Processing),
105-
"consolidated" => Ok(BatchStatus::Consolidated),
106-
"reverted" => Ok(BatchStatus::Reverted),
107-
"finalized" => Ok(BatchStatus::Finalized),
103+
"committed" => Ok(Self::Committed),
104+
"processing" => Ok(Self::Processing),
105+
"consolidated" => Ok(Self::Consolidated),
106+
"reverted" => Ok(Self::Reverted),
107+
"finalized" => Ok(Self::Finalized),
108108
_ => Err(()),
109109
}
110110
}

crates/primitives/src/block.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,31 @@ pub struct BlockInfo {
2525
pub hash: B256,
2626
}
2727

28+
/// The startup configuration for the L1 watcher.
29+
#[derive(Debug, PartialEq, Eq)]
30+
pub enum L1BlockStartupInfo {
31+
/// The L1 block infos of the unsafe blocks stored in the database.
32+
UnsafeBlocks(Vec<BlockInfo>),
33+
/// The finalized block number to start from.
34+
FinalizedBlockNumber(u64),
35+
/// No startup information available.
36+
None,
37+
}
38+
39+
impl L1BlockStartupInfo {
40+
/// Creates a new [`L1BlockStartupInfo`] from the given unsafe blocks and finalized block
41+
/// number.
42+
pub fn new(unsafe_blocks: Vec<BlockInfo>, finalized_block_number: Option<u64>) -> Self {
43+
if !unsafe_blocks.is_empty() {
44+
Self::UnsafeBlocks(unsafe_blocks)
45+
} else if let Some(number) = finalized_block_number {
46+
Self::FinalizedBlockNumber(number)
47+
} else {
48+
Self::None
49+
}
50+
}
51+
}
52+
2853
impl PartialOrd for BlockInfo {
2954
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3055
self.number.partial_cmp(&other.number)

crates/primitives/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ pub use attributes::ScrollPayloadAttributesWithBatchInfo;
99

1010
mod block;
1111
pub use block::{
12-
BlockInfo, L2BlockInfoWithL1Messages, WithBatchInfo, WithBlockNumber, WithCommittedBatchInfo,
13-
WithFinalizedBatchInfo, WithFinalizedBlockNumber, DEFAULT_BLOCK_DIFFICULTY,
12+
BlockInfo, L1BlockStartupInfo, L2BlockInfoWithL1Messages, WithBatchInfo, WithBlockNumber,
13+
WithCommittedBatchInfo, WithFinalizedBatchInfo, WithFinalizedBlockNumber,
14+
DEFAULT_BLOCK_DIFFICULTY,
1415
};
1516

1617
mod batch;

crates/watcher/src/lib.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use alloy_sol_types::SolEvent;
1818
use error::L1WatcherResult;
1919
use itertools::Itertools;
2020
use rollup_node_primitives::{
21-
BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ConsensusUpdate, NodeConfig,
21+
BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ConsensusUpdate, L1BlockStartupInfo,
22+
NodeConfig,
2223
};
2324
use rollup_node_providers::SystemContractProvider;
2425
use scroll_alloy_consensus::TxL1Message;
@@ -192,11 +193,11 @@ where
192193
/// returning [`L1Notification`] in the returned channel.
193194
pub async fn spawn(
194195
execution_provider: EP,
195-
start_block: Option<u64>,
196+
l1_block_startup_info: L1BlockStartupInfo,
196197
config: Arc<NodeConfig>,
197198
log_query_block_range: u64,
198199
) -> mpsc::Receiver<Arc<L1Notification>> {
199-
tracing::trace!(target: "scroll::watcher", ?start_block, ?config, "spawning L1 watcher");
200+
tracing::trace!(target: "scroll::watcher", ?l1_block_startup_info, ?config, "spawning L1 watcher");
200201

201202
let (tx, rx) = mpsc::channel(log_query_block_range as usize);
202203

@@ -219,11 +220,39 @@ where
219220
finalized: fetch_block_info(BlockNumberOrTag::Finalized).await,
220221
};
221222

223+
let (reorg, start_block) = match l1_block_startup_info {
224+
L1BlockStartupInfo::UnsafeBlocks(blocks) => {
225+
let mut reorg = true;
226+
let mut start_block =
227+
blocks.first().expect("at least one unsafe block").number.saturating_sub(1);
228+
for (i, block) in blocks.into_iter().rev().enumerate() {
229+
let current_block =
230+
fetch_block_info(BlockNumberOrTag::Number(block.number)).await;
231+
if current_block.hash == block.hash {
232+
tracing::info!(target: "scroll::watcher", ?block, "found reorg block from unsafe blocks");
233+
reorg = i != 0;
234+
start_block = current_block.number;
235+
}
236+
}
237+
238+
(reorg, start_block)
239+
}
240+
L1BlockStartupInfo::FinalizedBlockNumber(number) => {
241+
tracing::info!(target: "scroll::watcher", ?number, "starting from finalized block number");
242+
243+
(false, number)
244+
}
245+
L1BlockStartupInfo::None => {
246+
tracing::info!(target: "scroll::watcher", "no L1 startup info, starting from config start block");
247+
(false, config.start_l1_block)
248+
}
249+
};
250+
222251
// init the watcher.
223252
let watcher = Self {
224253
execution_provider,
225254
unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY),
226-
current_block_number: start_block.unwrap_or(config.start_l1_block).saturating_sub(1),
255+
current_block_number: start_block.saturating_sub(1),
227256
l1_state,
228257
sender: tx,
229258
config,
@@ -233,6 +262,12 @@ where
233262
};
234263

235264
// notify at spawn.
265+
if reorg {
266+
watcher
267+
.notify(L1Notification::Reorg(start_block))
268+
.await
269+
.expect("channel is open in this context");
270+
}
236271
watcher
237272
.notify(L1Notification::Finalized(watcher.l1_state.finalized))
238273
.await

0 commit comments

Comments
 (0)