Skip to content

Commit 0c4e4fb

Browse files
authored
fix: handle batch revert (#275)
* feat: update the chain orchestrator to return finalized batches on finalized events * feat: update manager to only push finalized batches in the derivation pipeline * feat: update engine to correctly set finalized head * test: fix graceful shutdown test * fix: clippy * fix: codespell
1 parent 95237eb commit 0c4e4fb

File tree

12 files changed

+279
-123
lines changed

12 files changed

+279
-123
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use alloy_consensus::Header;
22
use alloy_primitives::{Signature, B256};
33
use reth_network_peers::PeerId;
44
use reth_scroll_primitives::ScrollBlock;
5-
use rollup_node_primitives::{BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages};
5+
use rollup_node_primitives::{
6+
BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages, WithFinalizedBlockNumber,
7+
};
68

79
/// An event emitted by the `ChainOrchestrator`.
810
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -38,12 +40,12 @@ pub enum ChainOrchestratorEvent {
3840
/// The safe L2 block info.
3941
safe_head: Option<BlockInfo>,
4042
},
41-
/// A batch has been finalized returning the batch hash and new an optional finalized
42-
/// L2 block.
43-
BatchFinalized(B256, Option<BlockInfo>),
44-
/// An L1 block has been finalized returning the L1 block number and an optional
45-
/// finalized L2 block.
46-
L1BlockFinalized(u64, Option<BlockInfo>),
43+
/// A batch has been finalized returning an optional finalized L2 block. Also returns a
44+
/// [`BatchInfo`] if the finalized event occurred in a finalized L1 block.
45+
BatchFinalized(Option<WithFinalizedBlockNumber<BatchInfo>>, Option<BlockInfo>),
46+
/// An L1 block has been finalized returning the L1 block number, the list of finalized batches
47+
/// and an optional finalized L2 block.
48+
L1BlockFinalized(u64, Vec<BatchInfo>, Option<BlockInfo>),
4749
/// A `L1Message` event has been committed returning the message queue index.
4850
L1MessageCommitted(u64),
4951
/// A reorg has occurred on L1, returning the L1 block number of the new L1 head,

crates/chain-orchestrator/src/lib.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use reth_network_p2p::{BlockClient, BodiesClient};
1111
use reth_scroll_primitives::ScrollBlock;
1212
use rollup_node_primitives::{
1313
BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ChainImport, L1MessageEnvelope,
14-
L2BlockInfoWithL1Messages,
14+
L2BlockInfoWithL1Messages, WithBlockNumber,
1515
};
1616
use rollup_node_watcher::L1Notification;
1717
use scroll_alloy_consensus::TxL1Message;
@@ -21,6 +21,7 @@ use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, Unw
2121
use scroll_network::NewBlockWithPeer;
2222
use std::{
2323
collections::{HashMap, VecDeque},
24+
ops::Add,
2425
pin::Pin,
2526
sync::{
2627
atomic::{AtomicU64, Ordering},
@@ -545,12 +546,13 @@ impl<
545546
)),
546547
))
547548
}
548-
L1Notification::BatchFinalization { hash, block_number, .. } => {
549+
L1Notification::BatchFinalization { hash, index, block_number } => {
549550
ChainOrchestratorFuture::HandleBatchFinalization(self.handle_metered(
550551
ChainOrchestratorItem::BatchFinalization,
551552
Box::pin(Self::handle_batch_finalization(
552553
self.database.clone(),
553554
hash,
555+
index,
554556
block_number,
555557
self.l1_finalized_block_number.clone(),
556558
self.l2_finalized_block_number.clone(),
@@ -602,8 +604,8 @@ impl<
602604
}))
603605
}
604606

605-
/// Handles a finalized event by updating the chain orchestrator L1 finalized block and
606-
/// returning the new finalized L2 chain block.
607+
/// Handles a finalized event by updating the chain orchestrator L1 finalized block, returning
608+
/// the new finalized L2 chain block and the list of finalized batches.
607609
async fn handle_finalized(
608610
database: Arc<Database>,
609611
block_number: u64,
@@ -613,20 +615,30 @@ impl<
613615
// Set the latest finalized L1 block in the database.
614616
database.set_latest_finalized_l1_block_number(block_number).await?;
615617

616-
// get the newest finalized batch.
617-
let batch_hash = database.get_finalized_batch_hash_at_height(block_number).await?;
618+
// get the finalized batch infos.
619+
// we add 1 to the low finalized l1 block number to avoid fetching the last finalized batch
620+
// a second time.
621+
let low_finalized_l1_block_number =
622+
l1_block_number.load(Ordering::Relaxed).add(1).max(block_number);
623+
let finalized_batches = database
624+
.get_batches_by_finalized_block_range(low_finalized_l1_block_number, block_number)
625+
.await?;
618626

619627
// get the finalized block for the batch.
620-
let finalized_block = if let Some(hash) = batch_hash {
621-
Self::fetch_highest_finalized_block(database, hash, l2_block_number).await?
628+
let finalized_block = if let Some(info) = finalized_batches.last() {
629+
Self::fetch_highest_finalized_block(database, info.hash, l2_block_number).await?
622630
} else {
623631
None
624632
};
625633

626634
// update the chain orchestrator l1 block number.
627635
l1_block_number.store(block_number, Ordering::Relaxed);
628636

629-
Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_block)))
637+
Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(
638+
block_number,
639+
finalized_batches,
640+
finalized_block,
641+
)))
630642
}
631643

632644
/// Handles an L1 message by inserting it into the database.
@@ -699,23 +711,30 @@ impl<
699711
async fn handle_batch_finalization(
700712
database: Arc<Database>,
701713
batch_hash: B256,
714+
batch_index: u64,
702715
block_number: u64,
703716
l1_block_number: Arc<AtomicU64>,
704717
l2_block_number: Arc<AtomicU64>,
705718
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
706719
// finalized the batch.
707720
database.finalize_batch(batch_hash, block_number).await?;
708721

709-
// check if the block where the batch was finalized is finalized on L1.
710722
let mut finalized_block = None;
723+
let mut finalized_batch = None;
724+
725+
// check if the block where the batch was finalized is finalized on L1.
711726
let l1_block_number_value = l1_block_number.load(Ordering::Relaxed);
712-
if l1_block_number_value > block_number {
727+
if l1_block_number_value >= block_number {
713728
// fetch the finalized block.
714729
finalized_block =
715730
Self::fetch_highest_finalized_block(database, batch_hash, l2_block_number).await?;
731+
732+
// set the finalized batch info.
733+
finalized_batch =
734+
Some(WithBlockNumber::new(block_number, BatchInfo::new(batch_index, batch_hash)));
716735
}
717736

718-
let event = ChainOrchestratorEvent::BatchFinalized(batch_hash, finalized_block);
737+
let event = ChainOrchestratorEvent::BatchFinalized(finalized_batch, finalized_block);
719738
Ok(Some(event))
720739
}
721740

crates/database/db/src/db.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ mod test {
228228
}
229229

230230
#[tokio::test]
231-
async fn test_database_finalized_batch_hash_at_height() {
231+
async fn test_database_batches_by_finalized_block_range() {
232232
// Set up the test database.
233233
let db = setup_test_db().await;
234234

@@ -240,7 +240,7 @@ mod test {
240240
// Generate randoms BatchInfoCommitData, insert in database and finalize.
241241
let mut block_number = 100;
242242
let mut batch_index = 100;
243-
let mut highest_finalized_batch_hash = B256::ZERO;
243+
let mut finalized_batches_hashes = vec![];
244244

245245
for _ in 0..20 {
246246
let data = BatchCommitData {
@@ -251,13 +251,9 @@ mod test {
251251
let hash = data.hash;
252252
db.insert_batch(data).await.unwrap();
253253

254-
// save batch hash finalized at block number 109.
255-
if block_number == 109 {
256-
highest_finalized_batch_hash = hash;
257-
}
258-
259254
// Finalize batch up to block number 110.
260255
if block_number <= 110 {
256+
finalized_batches_hashes.push(hash);
261257
db.finalize_batch(hash, block_number).await.unwrap();
262258
}
263259

@@ -266,9 +262,14 @@ mod test {
266262
}
267263

268264
// Fetch the finalized batch for provided height and verify number.
269-
let highest_batch_hash_from_db =
270-
db.get_finalized_batch_hash_at_height(109).await.unwrap().unwrap();
271-
assert_eq!(highest_finalized_batch_hash, highest_batch_hash_from_db);
265+
let batch_infos = db
266+
.get_batches_by_finalized_block_range(100, 110)
267+
.await
268+
.unwrap()
269+
.into_iter()
270+
.map(|b| b.hash)
271+
.collect::<Vec<_>>();
272+
assert_eq!(finalized_batches_hashes, batch_infos);
272273
}
273274

274275
#[tokio::test]

crates/database/db/src/operations.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,24 +115,31 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
115115
.map(|x| x.and_then(|x| x.parse::<u64>().ok()))?)
116116
}
117117

118-
/// Get the newest finalized batch hash up to or at the provided height.
119-
async fn get_finalized_batch_hash_at_height(
118+
/// Get the finalized batches between the provided range \[low; high\].
119+
async fn get_batches_by_finalized_block_range(
120120
&self,
121-
height: u64,
122-
) -> Result<Option<B256>, DatabaseError> {
121+
low: u64,
122+
high: u64,
123+
) -> Result<Vec<BatchInfo>, DatabaseError> {
123124
Ok(models::batch_commit::Entity::find()
124125
.filter(
125126
Condition::all()
126127
.add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null())
127-
.add(models::batch_commit::Column::FinalizedBlockNumber.lte(height)),
128+
.add(models::batch_commit::Column::FinalizedBlockNumber.gte(low))
129+
.add(models::batch_commit::Column::FinalizedBlockNumber.lte(high)),
128130
)
129-
.order_by_desc(models::batch_commit::Column::Index)
131+
.order_by_asc(models::batch_commit::Column::Index)
130132
.select_only()
133+
.column(models::batch_commit::Column::Index)
131134
.column(models::batch_commit::Column::Hash)
132-
.into_tuple::<Vec<u8>>()
133-
.one(self.get_connection())
135+
.into_tuple::<(i64, Vec<u8>)>()
136+
.all(self.get_connection())
134137
.await
135-
.map(|x| x.map(|x| B256::from_slice(&x)))?)
138+
.map(|x| {
139+
x.into_iter()
140+
.map(|(index, hash)| BatchInfo::new(index as u64, B256::from_slice(&hash)))
141+
.collect()
142+
})?)
136143
}
137144

138145
/// Delete all [`BatchCommitData`]s with a block number greater than the provided block number.

crates/derivation-pipeline/benches/pipeline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ fn benchmark_pipeline_derivation(c: &mut Criterion) {
8484

8585
// commit 1000 batches.
8686
for _ in 0..1000 {
87-
pipeline.handle_batch_commit(batch_info, 0);
87+
pipeline.push_batch(batch_info, 0);
8888
}
8989

9090
tx.send(pipeline).unwrap();

0 commit comments

Comments
 (0)