Skip to content

Commit d35eba1

Browse files
committed
implement gap and skip detection for revert events
1 parent 2f2960c commit d35eba1

File tree

6 files changed

+238
-37
lines changed

6 files changed

+238
-37
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,15 @@ pub enum ChainOrchestratorEvent {
6666
/// The new safe head after the revert.
6767
safe_head: BlockInfo,
6868
},
69-
// TODO: revert events
69+
/// A gap has been detected in the reverted batches.
70+
BatchRevertGap {
71+
/// The missing batch index.
72+
missing_index: u64,
73+
/// The latest known L1 block number to reset to before the gap.
74+
l1_block_number_reset: u64,
75+
},
76+
/// A duplicate batch revert has been detected.
77+
BatchRevertDuplicate(u64),
7078
/// A new L1 block has been received returning the L1 block number.
7179
NewL1Block(u64),
7280
/// An L1 block has been finalized returning the L1 block number and the list of finalized

crates/chain-orchestrator/src/lib.rs

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
2626
use scroll_alloy_hardforks::ScrollHardforks;
2727
use scroll_alloy_network::Scroll;
2828
use scroll_alloy_provider::ScrollEngineApi;
29-
use scroll_db::{Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, TXMut, UnwindResult};
29+
use scroll_db::{
30+
Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, TXMut,
31+
UnwindResult,
32+
};
3033
use scroll_derivation_pipeline::{BatchDerivationResult, DerivationPipeline};
3134
use scroll_engine::Engine;
3235
use scroll_network::{
@@ -61,7 +64,8 @@ pub use sync::{SyncMode, SyncState};
6164

6265
mod status;
6366
use crate::ChainOrchestratorEvent::{
64-
BatchCommitDuplicate, BatchCommitGap, L1MessageDuplicate, L1MessageGap,
67+
BatchCommitDuplicate, BatchCommitGap, BatchRevertDuplicate, BatchRevertGap, L1MessageDuplicate,
68+
L1MessageGap,
6569
};
6670
pub use status::ChainOrchestratorStatus;
6771

@@ -755,7 +759,14 @@ impl<
755759

756760
// Perform a consistency check to ensure the previous commit batch exists in the
757761
// database.
758-
if tx.get_batch_by_index(prev_batch_index).await?.is_none() {
762+
if tx
763+
.get_batch_by_index(prev_batch_index)
764+
.await?
765+
.iter()
766+
.filter(|x| x.reverted_block_number.is_none())
767+
.count() ==
768+
0
769+
{
759770
// Query database for the L1 block of the last known batch
760771
let reset_block = tx.get_last_batch_commit_l1_block().await?.unwrap_or(0);
761772

@@ -766,16 +777,20 @@ impl<
766777
}
767778

768779
// Check if batch already exists in DB.
769-
if let Some(existing_batch) = tx.get_batch_by_index(batch.index).await? {
780+
for existing_batch in tx.get_batch_by_index(batch.index).await? {
770781
if existing_batch.hash == batch.hash {
771782
// This means we have already processed this batch commit, we will skip
772783
// it.
773784
return Ok(Some(BatchCommitDuplicate(existing_batch.index)));
785+
} else if existing_batch.reverted_block_number.is_none() {
786+
// This means we have received a different batch commit at the same
787+
// index which has not been reverted yet. ->
788+
// we missed a revert a event
789+
return Ok(Some(BatchRevertGap {
790+
missing_index: batch.index,
791+
l1_block_number_reset: existing_batch.block_number,
792+
}));
774793
}
775-
// TODO: once batch reverts are implemented, we need to handle this
776-
// case.
777-
// If we have a batch at the same index in the DB this means we have
778-
// missed a batch revert event.
779794
}
780795

781796
let event = ChainOrchestratorEvent::BatchCommitIndexed {
@@ -804,6 +819,16 @@ impl<
804819
);
805820
self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await;
806821
}
822+
Some(BatchRevertGap { missing_index, l1_block_number_reset }) => {
823+
tracing::warn!(
824+
target: "scroll::chain_orchestrator",
825+
"Batch revert gap detected at index {}, last known batch at L1 block {}",
826+
missing_index,
827+
l1_block_number_reset
828+
);
829+
// TODO: getting channel closed here
830+
// self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await;
831+
}
807832
Some(BatchCommitDuplicate(index)) => {
808833
tracing::info!(
809834
target: "scroll::chain_orchestrator",
@@ -867,6 +892,33 @@ impl<
867892
end_index: u64,
868893
l1_block_info: BlockInfo,
869894
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
895+
let event = self
896+
.database
897+
.tx(move |tx| async move {
898+
// Check if we received this revert already.
899+
// If any of the batches with same index is reverted with the same L1 block then the
900+
// event is duplicate
901+
for existing_batch in tx.get_batch_by_index(end_index).await? {
902+
if existing_batch.reverted_block_number == Some(l1_block_info.number) {
903+
return Ok::<_, ChainOrchestratorError>(Some(BatchRevertDuplicate(
904+
existing_batch.index,
905+
)));
906+
}
907+
}
908+
909+
Ok(None)
910+
})
911+
.await?;
912+
913+
if let Some(BatchRevertDuplicate(index)) = event {
914+
tracing::info!(
915+
target: "scroll::chain_orchestrator",
916+
"Duplicate batch revert detected at {:?}, skipping",
917+
index
918+
);
919+
return Ok(event);
920+
}
921+
870922
let (safe_block_info, batch_info) = self
871923
.database
872924
.tx_mut(move |tx| async move {
@@ -895,8 +947,7 @@ impl<
895947
l1_message: TxL1Message,
896948
l1_block_info: BlockInfo,
897949
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
898-
let l1_v2_message_queue_start_index =
899-
self.config.l1_v2_message_queue_start_index();
950+
let l1_v2_message_queue_start_index = self.config.l1_v2_message_queue_start_index();
900951

901952
let event = self.database
902953
.tx_mut(move |tx| {

crates/database/db/src/db.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ impl DatabaseReadOperations for Database {
523523
async fn get_batch_by_index(
524524
&self,
525525
batch_index: u64,
526-
) -> Result<Option<BatchCommitData>, DatabaseError> {
526+
) -> Result<Vec<BatchCommitData>, DatabaseError> {
527527
metered!(
528528
DatabaseOperation::GetBatchByIndex,
529529
self,
@@ -881,8 +881,7 @@ mod test {
881881

882882
// Round trip the BatchCommitData through the database.
883883
db.insert_batch(batch_commit.clone()).await.unwrap();
884-
let batch_commit_from_db =
885-
db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap();
884+
let batch_commit_from_db = db.get_batch_by_hash(batch_commit.hash).await.unwrap().unwrap();
886885

887886
assert_eq!(batch_commit, batch_commit_from_db);
888887
}
@@ -1402,7 +1401,7 @@ mod test {
14021401

14031402
// Insert L2 blocks with different batch indices
14041403
for i in 100..110 {
1405-
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
1404+
let batch_data = db.get_batch_by_index(i).await.unwrap().first().unwrap().clone();
14061405
let batch_info: BatchInfo = batch_data.into();
14071406
let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };
14081407

@@ -1579,9 +1578,9 @@ mod test {
15791578
db.set_finalized_l1_block_number(21).await.unwrap();
15801579

15811580
// Verify the batches and blocks were inserted correctly
1582-
let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().unwrap();
1583-
let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().unwrap();
1584-
let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().unwrap();
1581+
let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().first().unwrap().clone();
1582+
let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().first().unwrap().clone();
1583+
let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().first().unwrap().clone();
15851584
let retried_block_1 = db.get_l2_block_info_by_number(1).await.unwrap().unwrap();
15861585
let retried_block_2 = db.get_l2_block_info_by_number(2).await.unwrap().unwrap();
15871586
let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap().unwrap();

crates/database/db/src/operations.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,7 @@ pub trait DatabaseReadOperations {
915915
async fn get_batch_by_index(
916916
&self,
917917
batch_index: u64,
918-
) -> Result<Option<BatchCommitData>, DatabaseError>;
918+
) -> Result<Vec<BatchCommitData>, DatabaseError>;
919919

920920
/// Get a [`BatchCommitData`] from the database by its batch hash.
921921
async fn get_batch_by_hash(
@@ -1010,16 +1010,15 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
10101010
async fn get_batch_by_index(
10111011
&self,
10121012
batch_index: u64,
1013-
) -> Result<Option<BatchCommitData>, DatabaseError> {
1013+
) -> Result<Vec<BatchCommitData>, DatabaseError> {
10141014
Ok(models::batch_commit::Entity::find()
10151015
.filter(
10161016
models::batch_commit::Column::Index
1017-
.eq(TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"))
1018-
.and(models::batch_commit::Column::RevertedBlockNumber.is_null()),
1017+
.eq(TryInto::<i64>::try_into(batch_index).expect("index should fit in i64")),
10191018
)
1020-
.one(self.get_connection())
1019+
.all(self.get_connection())
10211020
.await
1022-
.map(|x| x.map(Into::into))?)
1021+
.map(|x| x.into_iter().map(Into::into).collect())?)
10231022
}
10241023

10251024
async fn get_batch_by_hash(
@@ -1151,6 +1150,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
11511150

11521151
async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
11531152
Ok(models::batch_commit::Entity::find()
1153+
.filter(models::batch_commit::Column::RevertedBlockNumber.is_null())
11541154
.order_by_desc(models::batch_commit::Column::BlockNumber)
11551155
.select_only()
11561156
.column(models::batch_commit::Column::BlockNumber)

crates/derivation-pipeline/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ where
218218

219219
// get the batch commit data.
220220
let batch = db
221-
.get_batch_by_index(batch_info.index)
221+
.get_batch_by_hash(batch_info.hash)
222222
.await
223223
.map_err(|err| (request.clone(), err.into()))?
224224
.ok_or((

0 commit comments

Comments
 (0)