Skip to content

Commit 975c806

Browse files
greged93frisitano
andauthored
feat: handle batch reverts (#187)
* feat: update batch header struct * feat: add skipped l1 messages bitmap in derivation pipeline * feat: comments * feat: handle batch revert * test: add testing * feat: order commit logs in watcher by block number first --------- Co-authored-by: frisitano <[email protected]>
1 parent ad8f0a7 commit 975c806

File tree

9 files changed

+354
-89
lines changed

9 files changed

+354
-89
lines changed

crates/database/db/src/db.rs

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,14 @@ mod test {
172172
block_number += 1;
173173
}
174174

175+
// Fetch the highest block for the batch hash and verify number.
176+
let highest_block_info =
177+
db.get_highest_block_for_batch_hash(batch_info.hash).await.unwrap().unwrap();
178+
assert_eq!(highest_block_info.number, block_number - 1);
179+
175180
// Fetch the highest block for the batch and verify number.
176181
let highest_block_info =
177-
db.get_highest_block_for_batch(batch_info.hash).await.unwrap().unwrap();
182+
db.get_highest_block_for_batch_index(batch_info.index).await.unwrap().unwrap();
178183
assert_eq!(highest_block_info.number, block_number - 1);
179184
}
180185

@@ -211,9 +216,14 @@ mod test {
211216
block_number += 1;
212217
}
213218

214-
// Fetch the highest block for the batch and verify number.
219+
// Fetch the highest block for the batch hash and verify number.
215220
let highest_block_info =
216-
db.get_highest_block_for_batch(second_batch_info.hash).await.unwrap().unwrap();
221+
db.get_highest_block_for_batch_hash(second_batch_info.hash).await.unwrap().unwrap();
222+
assert_eq!(highest_block_info.number, block_number - 1);
223+
224+
// Fetch the highest block for the batch index and verify number.
225+
let highest_block_info =
226+
db.get_highest_block_for_batch_index(second_batch_info.index).await.unwrap().unwrap();
217227
assert_eq!(highest_block_info.number, block_number - 1);
218228
}
219229

@@ -480,7 +490,7 @@ mod test {
480490
}
481491

482492
#[tokio::test]
483-
async fn test_delete_l2_blocks_gt() {
493+
async fn test_delete_l2_blocks_gt_block_number() {
484494
// Set up the test database.
485495
let db = setup_test_db().await;
486496

@@ -499,7 +509,7 @@ mod test {
499509
}
500510

501511
// Delete blocks with number > 405
502-
let deleted_count = db.delete_l2_blocks_gt(405).await.unwrap();
512+
let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap();
503513
assert_eq!(deleted_count, 4); // Blocks 406, 407, 408, 409
504514

505515
// Verify remaining blocks still exist
@@ -515,6 +525,68 @@ mod test {
515525
}
516526
}
517527

528+
#[tokio::test]
529+
async fn test_delete_l2_blocks_gt_batch_index() {
530+
// Set up the test database.
531+
let db = setup_test_db().await;
532+
533+
// Generate unstructured bytes.
534+
let mut bytes = [0u8; 1024];
535+
rand::rng().fill(bytes.as_mut_slice());
536+
let mut u = Unstructured::new(&bytes);
537+
538+
// Insert multiple batches
539+
for i in 100..110 {
540+
let batch_data = BatchCommitData {
541+
index: i,
542+
calldata: Arc::new(vec![].into()),
543+
..Arbitrary::arbitrary(&mut u).unwrap()
544+
};
545+
db.insert_batch(batch_data).await.unwrap();
546+
}
547+
548+
// Insert L2 blocks with different batch indices
549+
for i in 100..110 {
550+
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
551+
let batch_info: BatchInfo = batch_data.into();
552+
553+
let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };
554+
let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] };
555+
556+
db.insert_block(l2_block, Some(batch_info)).await.unwrap();
557+
}
558+
559+
// Insert some blocks without batch index (should not be deleted)
560+
for i in 0..3 {
561+
let block_info = BlockInfo { number: 600 + i, hash: B256::arbitrary(&mut u).unwrap() };
562+
let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] };
563+
564+
db.insert_block(l2_block, None).await.unwrap();
565+
}
566+
567+
// Delete L2 blocks with batch index > 105
568+
let deleted_count = db.delete_l2_blocks_gt_batch_index(105).await.unwrap();
569+
assert_eq!(deleted_count, 4); // Blocks with batch indices 106, 107, 108, 109
570+
571+
// Verify remaining blocks with batch index <= 105 still exist
572+
for i in 100..=105 {
573+
let block = db.get_l2_block_info_by_number(500 + i).await.unwrap();
574+
assert!(block.is_some());
575+
}
576+
577+
// Verify deleted blocks with batch index > 105 no longer exist
578+
for i in 106..110 {
579+
let block = db.get_l2_block_info_by_number(500 + i).await.unwrap();
580+
assert!(block.is_none());
581+
}
582+
583+
// Verify blocks without batch index are still there (not affected by batch index filter)
584+
for i in 0..3 {
585+
let block = db.get_l2_block_info_by_number(600 + i).await.unwrap();
586+
assert!(block.is_some());
587+
}
588+
}
589+
518590
#[tokio::test]
519591
async fn test_insert_block_with_l1_messages() {
520592
// Set up the test database.

crates/database/db/src/operations.rs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::{models, DatabaseError};
22
use crate::DatabaseConnectionProvider;
3+
34
use alloy_primitives::B256;
45
use futures::{Stream, StreamExt};
56
use rollup_node_primitives::{
@@ -26,6 +27,8 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
2627
models::batch_commit::Column::Hash,
2728
models::batch_commit::Column::BlockNumber,
2829
models::batch_commit::Column::BlockTimestamp,
30+
models::batch_commit::Column::Calldata,
31+
models::batch_commit::Column::BlobHash,
2932
models::batch_commit::Column::FinalizedBlockNumber,
3033
])
3134
.to_owned(),
@@ -132,7 +135,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
132135
}
133136

134137
/// Delete all [`BatchCommitData`]s with a block number greater than the provided block number.
135-
async fn delete_batches_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
138+
async fn delete_batches_gt_block_number(
139+
&self,
140+
block_number: u64,
141+
) -> Result<u64, DatabaseError> {
136142
tracing::trace!(target: "scroll::db", block_number, "Deleting batch inputs greater than block number.");
137143
Ok(models::batch_commit::Entity::delete_many()
138144
.filter(models::batch_commit::Column::BlockNumber.gt(block_number as i64))
@@ -141,6 +147,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
141147
.map(|x| x.rows_affected)?)
142148
}
143149

150+
/// Delete all [`BatchCommitData`]s with a batch index greater than the provided index.
151+
async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result<u64, DatabaseError> {
152+
tracing::trace!(target: "scroll::db", batch_index, "Deleting batch inputs greater than batch index.");
153+
Ok(models::batch_commit::Entity::delete_many()
154+
.filter(models::batch_commit::Column::Index.gt(batch_index as i64))
155+
.exec(self.get_connection())
156+
.await
157+
.map(|x| x.rows_affected)?)
158+
}
159+
144160
/// Get an iterator over all [`BatchCommitData`]s in the database.
145161
async fn get_batches<'a>(
146162
&'a self,
@@ -327,7 +343,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
327343
.get_batch_by_index(batch_info.index - 1)
328344
.await?
329345
.expect("Batch info must be present due to database query arguments");
330-
let l2_block = self.get_highest_block_for_batch(previous_batch.hash).await?;
346+
let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?;
331347
(l2_block, Some(batch.block_number))
332348
} else {
333349
(None, None)
@@ -337,7 +353,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
337353
}
338354

339355
/// Delete all L2 blocks with a block number greater than the provided block number.
340-
async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
356+
async fn delete_l2_blocks_gt_block_number(
357+
&self,
358+
block_number: u64,
359+
) -> Result<u64, DatabaseError> {
341360
tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number.");
342361
Ok(models::l2_block::Entity::delete_many()
343362
.filter(models::l2_block::Column::BlockNumber.gt(block_number as i64))
@@ -346,6 +365,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
346365
.map(|x| x.rows_affected)?)
347366
}
348367

368+
/// Delete all L2 blocks with a batch index greater than the batch index.
369+
async fn delete_l2_blocks_gt_batch_index(
370+
&self,
371+
batch_index: u64,
372+
) -> Result<u64, DatabaseError> {
373+
tracing::trace!(target: "scroll::db", batch_index, "Deleting L2 blocks greater than provided batch index.");
374+
Ok(models::l2_block::Entity::delete_many()
375+
.filter(
376+
Condition::all()
377+
.add(models::l2_block::Column::BatchIndex.is_not_null())
378+
.add(models::l2_block::Column::BatchIndex.gt(batch_index as i64)),
379+
)
380+
.exec(self.get_connection())
381+
.await
382+
.map(|x| x.rows_affected)?)
383+
}
384+
349385
/// Insert a new block in the database.
350386
async fn insert_block(
351387
&self,
@@ -365,6 +401,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
365401
.on_conflict(
366402
OnConflict::column(models::l2_block::Column::BlockNumber)
367403
.update_columns([
404+
models::l2_block::Column::BlockHash,
368405
models::l2_block::Column::BatchHash,
369406
models::l2_block::Column::BatchIndex,
370407
])
@@ -396,7 +433,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
396433

397434
/// Returns the highest L2 block originating from the provided `batch_hash` or the highest block
398435
/// for the batch's index.
399-
async fn get_highest_block_for_batch(
436+
async fn get_highest_block_for_batch_hash(
400437
&self,
401438
batch_hash: B256,
402439
) -> Result<Option<BlockInfo>, DatabaseError> {
@@ -420,14 +457,28 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
420457
}
421458
}
422459

460+
/// Returns the highest L2 block originating from the provided `batch_index` or the highest
461+
/// block for the batch's index.
462+
async fn get_highest_block_for_batch_index(
463+
&self,
464+
batch_index: u64,
465+
) -> Result<Option<BlockInfo>, DatabaseError> {
466+
Ok(models::l2_block::Entity::find()
467+
.filter(models::l2_block::Column::BatchIndex.lte(batch_index))
468+
.order_by_desc(models::l2_block::Column::BlockNumber)
469+
.one(self.get_connection())
470+
.await?
471+
.map(|model| model.block_info()))
472+
}
473+
423474
/// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number.
424475
async fn unwind(
425476
&self,
426477
genesis_hash: B256,
427478
l1_block_number: u64,
428479
) -> Result<UnwindResult, DatabaseError> {
429480
// delete batch inputs and l1 messages
430-
let batches_removed = self.delete_batches_gt(l1_block_number).await?;
481+
let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?;
431482
let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?;
432483

433484
// filter and sort the executed L1 messages
@@ -441,10 +492,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
441492
if let Some(msg) = removed_executed_l1_messages.first() {
442493
let l2_reorg_block_number = msg
443494
.l2_block_number
444-
.expect("we guarantee that this is Some(u64) due to the filter above") -
445-
1;
495+
.expect("we guarantee that this is Some(u64) due to the filter above")
496+
.saturating_sub(1);
446497
let l2_block_info = self.get_l2_block_info_by_number(l2_reorg_block_number).await?;
447-
self.delete_l2_blocks_gt(l2_reorg_block_number).await?;
498+
self.delete_l2_blocks_gt_block_number(l2_reorg_block_number).await?;
448499
(Some(msg.transaction.queue_index), l2_block_info)
449500
} else {
450501
(None, None)

crates/derivation-pipeline/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ where
123123
}
124124
None
125125
}
126+
127+
/// Flushes all the data in the pipeline.
128+
pub fn flush(&mut self) {
129+
self.attributes_queue.clear();
130+
self.batch_queue.clear();
131+
self.pipeline_futures = FuturesOrdered::new();
132+
}
126133
}
127134

128135
impl<P> Stream for DerivationPipeline<P>

crates/engine/src/driver.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ where
108108
self.block_building_duration = block_building_duration;
109109
}
110110

111+
/// Clear the l1 attributes queue.
112+
pub fn clear_l1_payload_attributes(&mut self) {
113+
// clear the L1 attributes queue.
114+
self.l1_payload_attributes.clear();
115+
116+
// drop the engine future if it is a L1 consolidation.
117+
if let Some(MeteredFuture { fut: EngineFuture::L1Consolidation(_), .. }) =
118+
self.engine_future
119+
{
120+
self.engine_future.take();
121+
}
122+
}
123+
111124
/// Handles a block import request by adding it to the queue and waking up the driver.
112125
pub fn handle_block_import(&mut self, block_with_peer: NewBlockWithPeer) {
113126
tracing::trace!(target: "scroll::engine", ?block_with_peer, "new block import request received");

crates/indexer/src/event.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@ use rollup_node_primitives::{BatchInfo, BlockInfo, L2BlockInfoWithL1Messages};
44
/// An event emitted by the indexer.
55
#[derive(Debug, Clone, PartialEq, Eq)]
66
pub enum IndexerEvent {
7-
/// A `BatchCommit` event has been indexed returning the batch info.
8-
BatchCommitIndexed(BatchInfo),
7+
/// A `BatchCommit` event has been indexed returning the batch info and the L2 block info to
8+
/// revert to due to a batch revert.
9+
BatchCommitIndexed {
10+
/// The batch info.
11+
batch_info: BatchInfo,
12+
/// The safe L2 block info.
13+
safe_head: Option<BlockInfo>,
14+
},
915
/// A `BatchFinalization` event has been indexed returning the batch hash and new finalized L2
1016
/// block.
1117
BatchFinalizationIndexed(B256, Option<BlockInfo>),

0 commit comments

Comments
 (0)