Skip to content

Commit 5670af8

Browse files
committed
feat: add skipping logic for duplicate L1 messages and batch commits in ChainOrchestrator
1 parent 0ea4ef7 commit 5670af8

File tree

2 files changed

+71
-2
lines changed

2 files changed

+71
-2
lines changed

crates/chain-orchestrator/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ pub enum ChainOrchestratorError {
4040
/// missing.
4141
#[error("L1 message queue gap detected at index {0}, previous L1 message not found")]
4242
L1MessageQueueGap(u64),
43+
/// A duplicate L1 message was detected at index {0}.
44+
#[error("Duplicate L1 message detected at index {0}")]
45+
DuplicateL1Message(u64),
4346
/// An inconsistency was detected when trying to consolidate the chain.
4447
#[error("Chain inconsistency detected")]
4548
ChainInconsistency,
@@ -60,6 +63,9 @@ pub enum ChainOrchestratorError {
6063
/// A gap was detected in batch commit events: the previous batch before index {0} is missing.
6164
#[error("Batch commit gap detected at index {0}, previous batch commit not found")]
6265
BatchCommitGap(u64),
66+
/// A duplicate batch commit was detected at index {0}.
67+
#[error("Duplicate batch commit detected at {0}")]
68+
DuplicateBatchCommit(BatchInfo),
6369
/// An error occurred while making a network request.
6470
#[error("Network request error: {0}")]
6571
NetworkRequestError(#[from] reth_network_p2p::error::RequestError),

crates/chain-orchestrator/src/lib.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,15 @@ impl<
546546
// Return no event, recovery will re-process
547547
Ok(None)
548548
}
549+
Err(ChainOrchestratorError::DuplicateBatchCommit(batch_info)) => {
550+
tracing::info!(
551+
target: "scroll::chain_orchestrator",
552+
"Duplicate batch commit detected at {:?}, skipping",
553+
batch_info
554+
);
555+
// Return no event, as the batch has already been processed
556+
Ok(None)
557+
}
549558
result => result,
550559
}
551560
}
@@ -574,6 +583,15 @@ impl<
574583
// Return no event, recovery will re-process
575584
Ok(None)
576585
}
586+
Err(ChainOrchestratorError::DuplicateL1Message(queue_index)) => {
587+
tracing::info!(
588+
target: "scroll::chain_orchestrator",
589+
"Duplicate L1 message detected at {:?}, skipping",
590+
queue_index
591+
);
592+
// Return no event, as the message has already been processed
593+
Ok(None)
594+
}
577595
result => result,
578596
}
579597
}
@@ -705,8 +723,21 @@ impl<
705723
return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index));
706724
}
707725

708-
// TODO: check for duplicate batch commit and skip if same hash
709-
// -> if different hash then we missed a batch revert event.
726+
// Check if batch already exists in DB.
727+
if let Some(existing_batch) = tx.get_batch_by_index(batch_clone.index).await? {
728+
if existing_batch.hash == batch_clone.hash {
729+
// This means we have already processed this batch commit, we will skip
730+
// it.
731+
return Err(ChainOrchestratorError::DuplicateBatchCommit(
732+
BatchInfo::new(batch_clone.index, batch_clone.hash),
733+
));
734+
} else {
735+
// TODO: once batch reverts are implemented, we need to handle this
736+
// case.
737+
// If we have a batch at the same index in the DB this means we have
738+
// missed a batch revert event.
739+
}
740+
}
710741

711742
// remove any batches with an index greater than the previous batch.
712743
let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?;
@@ -795,6 +826,7 @@ impl<
795826
// TODO: check for duplicate L1 message and skip if same hash
796827
let l1_message = l1_message.clone();
797828
async move {
829+
// check for gaps in the L1 message queue
798830
if l1_message.transaction.queue_index > 0 &&
799831
tx.get_n_l1_messages(
800832
Some(L1MessageKey::from_queue_index(
@@ -810,6 +842,37 @@ impl<
810842
));
811843
}
812844

845+
// check if the L1 message already exists in the DB
846+
if let Some(existing_message) = tx
847+
.get_n_l1_messages(
848+
Some(L1MessageKey::from_queue_index(
849+
l1_message.transaction.queue_index,
850+
)),
851+
1,
852+
)
853+
.await?
854+
.pop()
855+
{
856+
if existing_message.transaction.tx_hash() ==
857+
l1_message.transaction.tx_hash()
858+
{
859+
// We have already processed this L1 message, we will skip it.
860+
return Err(ChainOrchestratorError::DuplicateL1Message(
861+
l1_message.transaction.queue_index,
862+
));
863+
} else {
864+
// This should not happen in normal operation as messages should be
865+
// deleted when a L1 reorg is handled, log warning.
866+
tracing::warn!(
867+
target: "scroll::chain_orchestrator",
868+
"L1 message queue index {} already exists with different hash in DB {:?} vs {:?}",
869+
l1_message.transaction.queue_index,
870+
existing_message.transaction.tx_hash(),
871+
l1_message.transaction.tx_hash()
872+
);
873+
}
874+
}
875+
813876
tx.insert_l1_message(l1_message.clone()).await?;
814877
Ok::<_, ChainOrchestratorError>(())
815878
}

0 commit comments

Comments
 (0)