Skip to content

Commit 9865e89

Browse files
committed
move checks into respective functions
1 parent 90fc085 commit 9865e89

File tree

4 files changed

+203
-94
lines changed

4 files changed

+203
-94
lines changed

crates/chain-orchestrator/src/error.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,6 @@ pub enum ChainOrchestratorError {
3636
/// An L1 message was not found in the database.
3737
#[error("L1 message not found at {0}")]
3838
L1MessageNotFound(L1MessageKey),
39-
/// A gap was detected in the L1 message queue: the previous message before index {0} is
40-
/// missing.
41-
#[error("L1 message queue gap detected at index {0}, previous L1 message not found")]
42-
L1MessageQueueGap(u64),
43-
/// A duplicate L1 message was detected at index {0}.
44-
#[error("Duplicate L1 message detected at index {0}")]
45-
DuplicateL1Message(u64),
4639
/// An inconsistency was detected when trying to consolidate the chain.
4740
#[error("Chain inconsistency detected")]
4841
ChainInconsistency,
@@ -60,12 +53,6 @@ pub enum ChainOrchestratorError {
6053
/// The actual number of blocks.
6154
actual: usize,
6255
},
63-
/// A gap was detected in batch commit events: the previous batch before index {0} is missing.
64-
#[error("Batch commit gap detected at index {0}, previous batch commit not found")]
65-
BatchCommitGap(u64),
66-
/// A duplicate batch commit was detected at index {0}.
67-
#[error("Duplicate batch commit detected at {0}")]
68-
DuplicateBatchCommit(BatchInfo),
6956
/// An error occurred while making a network request.
7057
#[error("Network request error: {0}")]
7158
NetworkRequestError(#[from] reth_network_p2p::error::RequestError),

crates/chain-orchestrator/src/event.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ pub enum ChainOrchestratorEvent {
4343
/// The L1 block number in which the batch was committed.
4444
l1_block_number: u64,
4545
},
46+
/// A gap has been detected in the committed batches.
47+
BatchCommitGap {
48+
missing_index: u64,
49+
l1_block_number_reset: u64,
50+
},
51+
/// A duplicate batch commit has been detected.
52+
BatchCommitDuplicate(u64),
4653
/// A batch has been finalized returning a list of finalized batches.
4754
BatchFinalized {
4855
/// The L1 block info at which the batch finalization event was received.
@@ -57,13 +64,21 @@ pub enum ChainOrchestratorEvent {
5764
/// The new safe head after the revert.
5865
safe_head: BlockInfo,
5966
},
67+
// TODO: revert events
6068
/// A new L1 block has been received returning the L1 block number.
6169
NewL1Block(u64),
6270
/// An L1 block has been finalized returning the L1 block number and the list of finalized
6371
/// batches.
6472
L1BlockFinalized(u64, Vec<BatchInfo>),
6573
/// A `L1Message` event has been committed returning the message queue index.
6674
L1MessageCommitted(u64),
75+
/// A gap has been detected in the L1 message queue.
76+
L1MessageQueueGap{
77+
missing_index: u64,
78+
l1_block_number_reset: u64,
79+
},
80+
/// A duplicate L1 message has been detected.
81+
L1MessageDuplicate(u64),
6782
/// A reorg has occurred on L1, returning the L1 block number of the new L1 head,
6883
/// the L1 message queue index of the new L1 head, and optionally the L2 head and safe block
6984
/// info if the reorg resulted in a new L2 head or safe block.

crates/chain-orchestrator/src/lib.rs

Lines changed: 65 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ mod sync;
6363
pub use sync::{SyncMode, SyncState};
6464

6565
mod status;
66+
use crate::ChainOrchestratorEvent::{BatchCommitDuplicate, BatchCommitGap, L1MessageDuplicate, L1MessageQueueGap};
6667
pub use status::ChainOrchestratorStatus;
6768

6869
/// Wraps a future, metering the completion of it.
@@ -551,40 +552,11 @@ impl<
551552
metered!(Task::L1Finalization, self, handle_l1_finalized(*block_number))
552553
}
553554
L1Notification::BatchCommit { block_info, data } => {
554-
match metered!(
555+
metered!(
555556
Task::BatchCommit,
556557
self,
557558
handle_batch_commit(*block_info, data.clone())
558-
) {
559-
Err(ChainOrchestratorError::BatchCommitGap(batch_index)) => {
560-
// Query database for the L1 block of the last known batch
561-
let reset_block =
562-
self.database.get_last_batch_commit_l1_block().await?.unwrap_or(0);
563-
564-
tracing::warn!(
565-
target: "scroll::chain_orchestrator",
566-
"Batch commit gap detected at index {}, last known batch at L1 block {}",
567-
batch_index,
568-
reset_block
569-
);
570-
571-
// Trigger gap recovery
572-
self.l1_watcher_handle.trigger_gap_recovery(reset_block).await;
573-
574-
// Return no event, recovery will re-process
575-
Ok(None)
576-
}
577-
Err(ChainOrchestratorError::DuplicateBatchCommit(batch_info)) => {
578-
tracing::info!(
579-
target: "scroll::chain_orchestrator",
580-
"Duplicate batch commit detected at {:?}, skipping",
581-
batch_info
582-
);
583-
// Return no event, as the batch has already been processed
584-
Ok(None)
585-
}
586-
result => result,
587-
}
559+
)
588560
}
589561
L1Notification::BatchRevert { batch_info, block_info } => {
590562
metered!(
@@ -601,40 +573,11 @@ impl<
601573
)
602574
}
603575
L1Notification::L1Message { message, block_info, block_timestamp: _ } => {
604-
match metered!(
576+
metered!(
605577
Task::L1Message,
606578
self,
607579
handle_l1_message(message.clone(), *block_info)
608-
) {
609-
Err(ChainOrchestratorError::L1MessageQueueGap(queue_index)) => {
610-
// Query database for the L1 block of the last known L1 message
611-
let reset_block =
612-
self.database.get_last_l1_message_l1_block().await?.unwrap_or(0);
613-
614-
tracing::warn!(
615-
target: "scroll::chain_orchestrator",
616-
"L1 message queue gap detected at index {}, last known message at L1 block {}",
617-
queue_index,
618-
reset_block
619-
);
620-
621-
// Trigger gap recovery
622-
self.l1_watcher_handle.trigger_gap_recovery(reset_block).await;
623-
624-
// Return no event, recovery will re-process
625-
Ok(None)
626-
}
627-
Err(ChainOrchestratorError::DuplicateL1Message(queue_index)) => {
628-
tracing::info!(
629-
target: "scroll::chain_orchestrator",
630-
"Duplicate L1 message detected at {:?}, skipping",
631-
queue_index
632-
);
633-
// Return no event, as the message has already been processed
634-
Ok(None)
635-
}
636-
result => result,
637-
}
580+
)
638581
}
639582
L1Notification::Synced => {
640583
tracing::info!(target: "scroll::chain_orchestrator", "L1 is now synced");
@@ -822,17 +765,19 @@ impl<
822765
// Perform a consistency check to ensure the previous commit batch exists in the
823766
// database.
824767
if tx.get_batch_by_index(prev_batch_index).await?.is_none() {
825-
return Err(ChainOrchestratorError::BatchCommitGap(batch.index));
768+
// Query database for the L1 block of the last known batch
769+
let reset_block =
770+
tx.get_last_batch_commit_l1_block().await?.unwrap_or(0);
771+
772+
return Ok(Some(BatchCommitGap{ missing_index: batch_info.index, l1_block_number_reset: reset_block }));
826773
}
827774

828775
// Check if batch already exists in DB.
829776
if let Some(existing_batch) = tx.get_batch_by_index(batch.index).await? {
830777
if existing_batch.hash == batch.hash {
831778
// This means we have already processed this batch commit, we will skip
832779
// it.
833-
return Err(ChainOrchestratorError::DuplicateBatchCommit(
834-
BatchInfo::new(batch.index, batch.hash),
835-
));
780+
return Ok(Some(BatchCommitDuplicate(existing_batch.index)));
836781
}
837782
// TODO: once batch reverts are implemented, we need to handle this
838783
// case.
@@ -856,8 +801,29 @@ impl<
856801
})
857802
.await?;
858803

859-
if self.sync_state.is_synced() {
860-
self.derivation_pipeline.push_batch(batch_info, BatchStatus::Consolidated).await;
804+
match event {
805+
Some(BatchCommitGap {missing_index, l1_block_number_reset}) => {
806+
tracing::warn!(
807+
target: "scroll::chain_orchestrator",
808+
"Batch commit gap detected at index {}, last known batch at L1 block {}",
809+
missing_index,
810+
l1_block_number_reset
811+
);
812+
self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await;
813+
},
814+
Some(BatchCommitDuplicate(index)) => {
815+
tracing::info!(
816+
target: "scroll::chain_orchestrator",
817+
"Duplicate batch commit detected at {:?}, skipping",
818+
index
819+
);
820+
},
821+
Some(ChainOrchestratorEvent::BatchCommitIndexed {..}) => {
822+
if self.sync_state.is_synced() {
823+
self.derivation_pipeline.push_batch(batch_info, BatchStatus::Consolidated).await;
824+
}
825+
}
826+
_ => { }
861827
}
862828

863829
Ok(event)
@@ -930,11 +896,10 @@ impl<
930896

931897
/// Handles an L1 message by inserting it into the database.
932898
async fn handle_l1_message(
933-
&self,
899+
&mut self,
934900
l1_message: TxL1Message,
935901
l1_block_info: BlockInfo,
936902
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
937-
let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index);
938903
let queue_hash = compute_l1_message_queue_hash(
939904
&self.database,
940905
&l1_message,
@@ -944,7 +909,7 @@ impl<
944909
let l1_message = L1MessageEnvelope::new(l1_message, l1_block_info.number, None, queue_hash);
945910

946911
// Perform a consistency check to ensure the previous L1 message exists in the database.
947-
self.database
912+
let event = self.database
948913
.tx_mut(move |tx| {
949914
let l1_message = l1_message.clone();
950915
async move {
@@ -959,9 +924,11 @@ impl<
959924
.await?
960925
.is_empty()
961926
{
962-
return Err(ChainOrchestratorError::L1MessageQueueGap(
963-
l1_message.transaction.queue_index,
964-
));
927+
// Query database for the L1 block of the last known L1 message
928+
let reset_block =
929+
tx.get_last_l1_message_l1_block().await?.unwrap_or(0);
930+
931+
return Ok::<_, ChainOrchestratorError>(Some(L1MessageQueueGap{ missing_index: l1_message.transaction.queue_index, l1_block_number_reset: reset_block }) );
965932
}
966933

967934
// check if the L1 message already exists in the DB
@@ -979,9 +946,7 @@ impl<
979946
l1_message.transaction.tx_hash()
980947
{
981948
// We have already processed this L1 message, we will skip it.
982-
return Err(ChainOrchestratorError::DuplicateL1Message(
983-
l1_message.transaction.queue_index,
984-
));
949+
return Ok(Some(L1MessageDuplicate(l1_message.transaction.queue_index)));
985950
}
986951

987952
// This should not happen in normal operation as messages should be
@@ -997,12 +962,33 @@ impl<
997962

998963
tx.insert_l1_message(l1_message.clone()).await?;
999964
tx.insert_l1_block_info(l1_block_info).await?;
1000-
Ok::<_, ChainOrchestratorError>(())
965+
966+
Ok(Some(ChainOrchestratorEvent::L1MessageCommitted(l1_message.transaction.queue_index)))
1001967
}
1002968
})
1003969
.await?;
1004970

1005-
Ok(Some(event))
971+
match event {
972+
Some(L1MessageQueueGap{missing_index, l1_block_number_reset}) => {
973+
tracing::warn!(
974+
target: "scroll::chain_orchestrator",
975+
"L1 message queue gap detected at index {}, last known message at L1 block {}",
976+
missing_index,
977+
l1_block_number_reset
978+
);
979+
self.l1_watcher_handle.trigger_gap_recovery(l1_block_number_reset).await;
980+
},
981+
Some(L1MessageDuplicate(index)) => {
982+
tracing::info!(
983+
target: "scroll::chain_orchestrator",
984+
"Duplicate L1 message detected at {:?}, skipping",
985+
index
986+
);
987+
},
988+
_ => {}
989+
}
990+
991+
Ok(event)
1006992
}
1007993

1008994
async fn handle_network_event(

0 commit comments

Comments
 (0)