Skip to content

Commit 2f2960c

Browse files
committed
implement test for gap detection for batch and L1 messages. fix issues with L1 message queue hash calculation
1 parent 9865e89 commit 2f2960c

File tree

4 files changed

+222
-132
lines changed

4 files changed

+222
-132
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ pub enum ChainOrchestratorEvent {
4545
},
4646
/// A gap has been detected in the committed batches.
4747
BatchCommitGap {
48+
/// The missing batch index.
4849
missing_index: u64,
50+
/// The latest known L1 block number to reset to before the gap.
4951
l1_block_number_reset: u64,
5052
},
5153
/// A duplicate batch commit has been detected.
@@ -73,8 +75,10 @@ pub enum ChainOrchestratorEvent {
7375
/// A `L1Message` event has been committed returning the message queue index.
7476
L1MessageCommitted(u64),
7577
/// A gap has been detected in the L1 message queue.
76-
L1MessageQueueGap{
78+
L1MessageGap {
79+
/// The missing L1 message queue index.
7780
missing_index: u64,
81+
/// The latest known L1 block number to reset to before the gap.
7882
l1_block_number_reset: u64,
7983
},
8084
/// A duplicate L1 message has been detected.

crates/chain-orchestrator/src/lib.rs

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
2626
use scroll_alloy_hardforks::ScrollHardforks;
2727
use scroll_alloy_network::Scroll;
2828
use scroll_alloy_provider::ScrollEngineApi;
29-
use scroll_db::{
30-
Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey,
31-
UnwindResult,
32-
};
29+
use scroll_db::{Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, TXMut, UnwindResult};
3330
use scroll_derivation_pipeline::{BatchDerivationResult, DerivationPipeline};
3431
use scroll_engine::Engine;
3532
use scroll_network::{
@@ -63,7 +60,9 @@ mod sync;
6360
pub use sync::{SyncMode, SyncState};
6461

6562
mod status;
66-
use crate::ChainOrchestratorEvent::{BatchCommitDuplicate, BatchCommitGap, L1MessageDuplicate, L1MessageQueueGap};
63+
use crate::ChainOrchestratorEvent::{
64+
BatchCommitDuplicate, BatchCommitGap, L1MessageDuplicate, L1MessageGap,
65+
};
6766
pub use status::ChainOrchestratorStatus;
6867

6968
/// Wraps a future, metering the completion of it.
@@ -552,11 +551,7 @@ impl<
552551
metered!(Task::L1Finalization, self, handle_l1_finalized(*block_number))
553552
}
554553
L1Notification::BatchCommit { block_info, data } => {
555-
metered!(
556-
Task::BatchCommit,
557-
self,
558-
handle_batch_commit(*block_info, data.clone())
559-
)
554+
metered!(Task::BatchCommit, self, handle_batch_commit(*block_info, data.clone()))
560555
}
561556
L1Notification::BatchRevert { batch_info, block_info } => {
562557
metered!(
@@ -573,11 +568,7 @@ impl<
573568
)
574569
}
575570
L1Notification::L1Message { message, block_info, block_timestamp: _ } => {
576-
metered!(
577-
Task::L1Message,
578-
self,
579-
handle_l1_message(message.clone(), *block_info)
580-
)
571+
metered!(Task::L1Message, self, handle_l1_message(message.clone(), *block_info))
581572
}
582573
L1Notification::Synced => {
583574
tracing::info!(target: "scroll::chain_orchestrator", "L1 is now synced");
@@ -766,10 +757,12 @@ impl<
766757
// database.
767758
if tx.get_batch_by_index(prev_batch_index).await?.is_none() {
768759
// Query database for the L1 block of the last known batch
769-
let reset_block =
770-
tx.get_last_batch_commit_l1_block().await?.unwrap_or(0);
760+
let reset_block = tx.get_last_batch_commit_l1_block().await?.unwrap_or(0);
771761

772-
return Ok(Some(BatchCommitGap{ missing_index: batch_info.index, l1_block_number_reset: reset_block }));
762+
return Ok(Some(BatchCommitGap {
763+
missing_index: batch_info.index,
764+
l1_block_number_reset: reset_block,
765+
}));
773766
}
774767

775768
// Check if batch already exists in DB.
@@ -802,28 +795,30 @@ impl<
802795
.await?;
803796

804797
match event {
805-
Some(BatchCommitGap {missing_index, l1_block_number_reset}) => {
798+
Some(BatchCommitGap { missing_index, l1_block_number_reset }) => {
806799
tracing::warn!(
807-
target: "scroll::chain_orchestrator",
808-
"Batch commit gap detected at index {}, last known batch at L1 block {}",
809-
missing_index,
810-
l1_block_number_reset
811-
);
800+
target: "scroll::chain_orchestrator",
801+
"Batch commit gap detected at index {}, last known batch at L1 block {}",
802+
missing_index,
803+
l1_block_number_reset
804+
);
812805
self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await;
813-
},
806+
}
814807
Some(BatchCommitDuplicate(index)) => {
815808
tracing::info!(
816-
target: "scroll::chain_orchestrator",
817-
"Duplicate batch commit detected at {:?}, skipping",
818-
index
819-
);
820-
},
821-
Some(ChainOrchestratorEvent::BatchCommitIndexed {..}) => {
809+
target: "scroll::chain_orchestrator",
810+
"Duplicate batch commit detected at {:?}, skipping",
811+
index
812+
);
813+
}
814+
Some(ChainOrchestratorEvent::BatchCommitIndexed { .. }) => {
822815
if self.sync_state.is_synced() {
823-
self.derivation_pipeline.push_batch(batch_info, BatchStatus::Consolidated).await;
816+
self.derivation_pipeline
817+
.push_batch(batch_info, BatchStatus::Consolidated)
818+
.await;
824819
}
825820
}
826-
_ => { }
821+
_ => {}
827822
}
828823

829824
Ok(event)
@@ -900,24 +895,19 @@ impl<
900895
l1_message: TxL1Message,
901896
l1_block_info: BlockInfo,
902897
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
903-
let queue_hash = compute_l1_message_queue_hash(
904-
&self.database,
905-
&l1_message,
906-
self.config.l1_v2_message_queue_start_index(),
907-
)
908-
.await?;
909-
let l1_message = L1MessageEnvelope::new(l1_message, l1_block_info.number, None, queue_hash);
910-
911-
// Perform a consistency check to ensure the previous L1 message exists in the database.
898+
let l1_v2_message_queue_start_index =
899+
self.config.l1_v2_message_queue_start_index();
900+
912901
let event = self.database
913902
.tx_mut(move |tx| {
914903
let l1_message = l1_message.clone();
904+
915905
async move {
916906
// check for gaps in the L1 message queue
917-
if l1_message.transaction.queue_index > 0 &&
907+
if l1_message.queue_index > 0 &&
918908
tx.get_n_l1_messages(
919909
Some(L1MessageKey::from_queue_index(
920-
l1_message.transaction.queue_index - 1,
910+
l1_message.queue_index - 1,
921911
)),
922912
1,
923913
)
@@ -928,38 +918,48 @@ impl<
928918
let reset_block =
929919
tx.get_last_l1_message_l1_block().await?.unwrap_or(0);
930920

931-
return Ok::<_, ChainOrchestratorError>(Some(L1MessageQueueGap{ missing_index: l1_message.transaction.queue_index, l1_block_number_reset: reset_block }) );
921+
return Ok::<_, ChainOrchestratorError>(Some(L1MessageGap { missing_index: l1_message.queue_index, l1_block_number_reset: reset_block }) );
932922
}
933923

934924
// check if the L1 message already exists in the DB
935925
if let Some(existing_message) = tx
936926
.get_n_l1_messages(
937927
Some(L1MessageKey::from_queue_index(
938-
l1_message.transaction.queue_index,
928+
l1_message.queue_index,
939929
)),
940930
1,
941931
)
942932
.await?
943933
.pop()
944934
{
945935
if existing_message.transaction.tx_hash() ==
946-
l1_message.transaction.tx_hash()
936+
l1_message.tx_hash()
947937
{
948938
// We have already processed this L1 message, we will skip it.
949-
return Ok(Some(L1MessageDuplicate(l1_message.transaction.queue_index)));
939+
return Ok(Some(L1MessageDuplicate(l1_message.queue_index)));
950940
}
951941

952942
// This should not happen in normal operation as messages should be
953943
// deleted when a L1 reorg is handled, log warning.
954944
tracing::warn!(
955945
target: "scroll::chain_orchestrator",
956946
"L1 message queue index {} already exists with different hash in DB {:?} vs {:?}",
957-
l1_message.transaction.queue_index,
947+
l1_message.queue_index,
958948
existing_message.transaction.tx_hash(),
959-
l1_message.transaction.tx_hash()
949+
l1_message.tx_hash()
960950
);
961951
}
962952

953+
// We compute the queue hash at the end as it requires the previous message.
954+
let queue_hash = compute_l1_message_queue_hash(
955+
&tx,
956+
&l1_message,
957+
l1_v2_message_queue_start_index,
958+
)
959+
.await?;
960+
961+
let l1_message = L1MessageEnvelope::new(l1_message, l1_block_info.number, None, queue_hash);
962+
963963
tx.insert_l1_message(l1_message.clone()).await?;
964964
tx.insert_l1_block_info(l1_block_info).await?;
965965

@@ -969,22 +969,22 @@ impl<
969969
.await?;
970970

971971
match event {
972-
Some(L1MessageQueueGap{missing_index, l1_block_number_reset}) => {
972+
Some(L1MessageGap { missing_index, l1_block_number_reset }) => {
973973
tracing::warn!(
974-
target: "scroll::chain_orchestrator",
975-
"L1 message queue gap detected at index {}, last known message at L1 block {}",
976-
missing_index,
977-
l1_block_number_reset
978-
);
974+
target: "scroll::chain_orchestrator",
975+
"L1 message queue gap detected at index {}, last known message at L1 block {}",
976+
missing_index,
977+
l1_block_number_reset
978+
);
979979
self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await;
980-
},
980+
}
981981
Some(L1MessageDuplicate(index)) => {
982982
tracing::info!(
983-
target: "scroll::chain_orchestrator",
984-
"Duplicate L1 message detected at {:?}, skipping",
985-
index
986-
);
987-
},
983+
target: "scroll::chain_orchestrator",
984+
"Duplicate L1 message detected at {:?}, skipping",
985+
index
986+
);
987+
}
988988
_ => {}
989989
}
990990

@@ -1441,7 +1441,7 @@ impl<
14411441
///
14421442
/// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: <https://github.com/scroll-tech/scroll-contracts/blob/67c1bde19c1d3462abf8c175916a2bb3c89530e4/src/L1/rollup/L1MessageQueueV2.sol#L379-L403>
14431443
async fn compute_l1_message_queue_hash(
1444-
database: &Arc<Database>,
1444+
tx: &Arc<TXMut>,
14451445
l1_message: &TxL1Message,
14461446
l1_v2_message_queue_start_index: u64,
14471447
) -> Result<Option<alloy_primitives::FixedBytes<32>>, ChainOrchestratorError> {
@@ -1451,7 +1451,7 @@ async fn compute_l1_message_queue_hash(
14511451
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
14521452
} else if l1_message.queue_index > l1_v2_message_queue_start_index {
14531453
let index = l1_message.queue_index - 1;
1454-
let mut input = database
1454+
let mut input = tx
14551455
.get_n_l1_messages(Some(L1MessageKey::from_queue_index(index)), 1)
14561456
.await?
14571457
.first()

0 commit comments

Comments
 (0)