Skip to content

Commit 9d65609

Browse files
committed
feat: database support retry
1 parent 15de9e3 commit 9d65609

File tree

4 files changed

+288
-88
lines changed

4 files changed

+288
-88
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 116 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ use scroll_alloy_consensus::TxL1Message;
1818
use scroll_alloy_hardforks::ScrollHardforks;
1919
use scroll_alloy_network::Scroll;
2020
use scroll_db::{
21-
Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider,
22-
DatabaseWriteOperations, L1MessageStart, UnwindResult,
21+
retry_with_defaults, Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, DatabaseWriteOperations, L1MessageStart, UnwindResult
2322
};
2423
use scroll_network::NewBlockWithPeer;
2524
use std::{
@@ -211,9 +210,12 @@ impl<
211210
let database = ctx.database.clone();
212211
let block_info: L2BlockInfoWithL1Messages = (&block_with_peer.block).into();
213212
Self::do_handle_block_from_peer(ctx, block_with_peer).await?;
214-
let tx = database.tx_mut().await?;
215-
tx.update_l1_messages_with_l2_block(block_info.clone()).await?;
216-
tx.commit().await?;
213+
retry_with_defaults("update_l1_messages_with_l2_block", || async {
214+
let tx = database.tx_mut().await?;
215+
tx.update_l1_messages_with_l2_block(block_info.clone()).await?;
216+
tx.commit().await?;
217+
Ok::<_, DatabaseError>(())
218+
}).await?;
217219
Ok(ChainOrchestratorEvent::L2ChainCommitted(block_info, None, true))
218220
}
219221

@@ -292,10 +294,12 @@ impl<
292294
let mut received_chain_headers = VecDeque::from(vec![received_block.header.clone()]);
293295

294296
// We should never have a re-org that is deeper than the current safe head.
295-
let tx = database.tx().await?;
296-
let (latest_safe_block, _) =
297-
tx.get_latest_safe_l2_info().await?.expect("safe block must exist");
298-
drop(tx);
297+
let (latest_safe_block, _) = retry_with_defaults("get_latest_safe_l2_info", || async {
298+
let tx = database.tx().await?;
299+
let (latest_safe_block, batch_info) =
300+
tx.get_latest_safe_l2_info().await?.expect("safe block must exist");
301+
Ok::<_, DatabaseError>((latest_safe_block, batch_info))
302+
}).await?;
299303

300304
// We search for the re-org index in the in-memory chain.
301305
const BATCH_FETCH_SIZE: usize = 50;
@@ -455,11 +459,14 @@ impl<
455459
ChainOrchestratorItem::InsertConsolidatedL2Blocks,
456460
Box::pin(async move {
457461
let head = block_infos.last().expect("block info must not be empty").clone();
458-
let tx = database.tx_mut().await?;
459-
for block in block_infos {
460-
tx.insert_block(block, batch_info).await?;
461-
}
462-
tx.commit().await?;
462+
retry_with_defaults("insert_block", || async {
463+
let tx = database.tx_mut().await?;
464+
for block in block_infos.clone() {
465+
tx.insert_block(block, batch_info).await?;
466+
}
467+
tx.commit().await?;
468+
Ok::<_, DatabaseError>(())
469+
}).await?;
463470
Result::<_, ChainOrchestratorError>::Ok(Some(
464471
ChainOrchestratorEvent::L2ConsolidatedBlockCommitted(head),
465472
))
@@ -503,9 +510,12 @@ impl<
503510

504511
// Insert the blocks into the database.
505512
let head = block_info.last().expect("block info must not be empty").clone();
506-
let tx = database.tx_mut().await?;
507-
tx.update_l1_messages_from_l2_blocks(block_info).await?;
508-
tx.commit().await?;
513+
retry_with_defaults("update_l1_messages_from_l2_blocks", || async {
514+
let tx = database.tx_mut().await?;
515+
tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?;
516+
tx.commit().await?;
517+
Ok::<_, DatabaseError>(())
518+
}).await?;
509519

510520
Result::<_, ChainOrchestratorError>::Ok(Some(
511521
ChainOrchestratorEvent::L2ChainCommitted(head, None, consolidated),
@@ -589,10 +599,13 @@ impl<
589599
l2_client: Arc<P>,
590600
current_chain: Arc<Mutex<Chain>>,
591601
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
592-
let txn = database.tx_mut().await?;
593-
let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } =
594-
txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?;
595-
txn.commit().await?;
602+
let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = retry_with_defaults("unwind", || async {
603+
let txn = database.tx_mut().await?;
604+
let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } =
605+
txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?;
606+
txn.commit().await?;
607+
Ok::<_, DatabaseError>(UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info })
608+
}).await?;
596609
let l2_head_block_info = if let Some(block_number) = l2_head_block_number {
597610
// Fetch the block hash of the new L2 head block.
598611
let block_hash = l2_client
@@ -623,16 +636,21 @@ impl<
623636
block_number: u64,
624637
l1_block_number: Arc<AtomicU64>,
625638
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
626-
let tx = database.tx_mut().await?;
639+
let finalized_batches = retry_with_defaults("set_latest_finalized_l1_block_number", || async {
640+
let tx = database.tx_mut().await?;
627641

628-
// Set the latest finalized L1 block in the database.
629-
tx.set_latest_finalized_l1_block_number(block_number).await?;
642+
// Set the latest finalized L1 block in the database.
643+
tx.set_latest_finalized_l1_block_number(block_number).await?;
644+
645+
// Get all unprocessed batches that have been finalized by this L1 block finalization.
646+
let finalized_batches =
647+
tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?;
648+
649+
tx.commit().await?;
630650

631-
// Get all unprocessed batches that have been finalized by this L1 block finalization.
632-
let finalized_batches =
633-
tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?;
651+
Ok::<_, DatabaseError>(finalized_batches)
652+
}).await?;
634653

635-
tx.commit().await?;
636654

637655
// Update the chain orchestrator L1 block number.
638656
l1_block_number.store(block_number, Ordering::Relaxed);
@@ -654,17 +672,20 @@ impl<
654672
let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash);
655673

656674
// Perform a consistency check to ensure the previous L1 message exists in the database.
657-
let tx = database.tx_mut().await?;
658-
if l1_message.transaction.queue_index > 0 &&
659-
tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1).await?.is_none()
660-
{
661-
return Err(ChainOrchestratorError::L1MessageQueueGap(
662-
l1_message.transaction.queue_index,
663-
))
664-
}
675+
let _ = retry_with_defaults("handle_l1_message", || async {
676+
let tx = database.tx_mut().await?;
677+
if l1_message.transaction.queue_index > 0 &&
678+
tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1).await?.is_none()
679+
{
680+
return Err(ChainOrchestratorError::L1MessageQueueGap(
681+
l1_message.transaction.queue_index,
682+
))
683+
}
665684

666-
tx.insert_l1_message(l1_message).await?;
667-
tx.commit().await?;
685+
tx.insert_l1_message(l1_message.clone()).await?;
686+
tx.commit().await?;
687+
Ok::<_, ChainOrchestratorError>(())
688+
}).await;
668689
Ok(Some(event))
669690
}
670691

@@ -673,36 +694,41 @@ impl<
673694
database: Arc<Database>,
674695
batch: BatchCommitData,
675696
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
676-
let tx = database.tx_mut().await?;
677-
let prev_batch_index = batch.index - 1;
678-
679-
// Perform a consistency check to ensure the previous commit batch exists in the database.
680-
if tx.get_batch_by_index(prev_batch_index).await?.is_none() {
681-
return Err(ChainOrchestratorError::BatchCommitGap(batch.index))
682-
}
683-
684-
// remove any batches with an index greater than the previous batch.
685-
let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?;
686-
687-
// handle the case of a batch revert.
688-
let new_safe_head = if affected > 0 {
689-
tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?;
690-
tx.get_highest_block_for_batch_index(prev_batch_index).await?
691-
} else {
692-
None
693-
};
694-
695-
let event = ChainOrchestratorEvent::BatchCommitIndexed {
696-
batch_info: BatchInfo::new(batch.index, batch.hash),
697-
l1_block_number: batch.block_number,
698-
safe_head: new_safe_head,
699-
};
697+
let event = retry_with_defaults("handle_batch_commit", || async {
698+
let tx = database.tx_mut().await?;
699+
let batch_clone = batch.clone();
700+
let prev_batch_index = batch_clone.clone().index - 1;
701+
702+
// Perform a consistency check to ensure the previous commit batch exists in the database.
703+
if tx.get_batch_by_index(prev_batch_index).await?.is_none() {
704+
return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index))
705+
}
706+
707+
// remove any batches with an index greater than the previous batch.
708+
let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?;
709+
710+
// handle the case of a batch revert.
711+
let new_safe_head = if affected > 0 {
712+
tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?;
713+
tx.get_highest_block_for_batch_index(prev_batch_index).await?
714+
} else {
715+
None
716+
};
717+
718+
let event = ChainOrchestratorEvent::BatchCommitIndexed {
719+
batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash),
720+
l1_block_number: batch.block_number,
721+
safe_head: new_safe_head,
722+
};
723+
724+
// insert the batch and commit the transaction.
725+
tx.insert_batch(batch_clone).await?;
726+
tx.commit().await?;
700727

701-
// insert the batch and commit the transaction.
702-
tx.insert_batch(batch).await?;
703-
tx.commit().await?;
728+
Ok::<_, ChainOrchestratorError>(Some(event))
729+
}).await?;
704730

705-
Ok(Some(event))
731+
Ok(event)
706732
}
707733

708734
/// Handles a batch finalization event by updating the batch input in the database.
@@ -712,22 +738,24 @@ impl<
712738
block_number: u64,
713739
finalized_block_number: Arc<AtomicU64>,
714740
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
715-
let tx = database.tx_mut().await?;
741+
retry_with_defaults("handle_batch_finalization", || async {
742+
let tx = database.tx_mut().await?;
716743

717-
// finalize all batches up to `batch_index`.
718-
tx.finalize_batches_up_to_index(batch_index, block_number).await?;
744+
// finalize all batches up to `batch_index`.
745+
tx.finalize_batches_up_to_index(batch_index, block_number).await?;
719746

720-
// Get all unprocessed batches that have been finalized by this L1 block finalization.
721-
let finalized_block_number = finalized_block_number.load(Ordering::Relaxed);
722-
if finalized_block_number >= block_number {
723-
let finalized_batches =
724-
tx.fetch_and_update_unprocessed_finalized_batches(finalized_block_number).await?;
725-
tx.commit().await?;
726-
return Ok(Some(ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches)))
727-
}
747+
// Get all unprocessed batches that have been finalized by this L1 block finalization.
748+
let finalized_block_number = finalized_block_number.load(Ordering::Relaxed);
749+
if finalized_block_number >= block_number {
750+
let finalized_batches =
751+
tx.fetch_and_update_unprocessed_finalized_batches(finalized_block_number).await?;
752+
tx.commit().await?;
753+
return Ok(Some(ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches)))
754+
}
728755

729-
tx.commit().await?;
730-
Ok(None)
756+
tx.commit().await?;
757+
Ok::<_, ChainOrchestratorError>(None)
758+
}).await
731759
}
732760
}
733761

@@ -772,8 +800,8 @@ async fn init_chain_from_db<P: Provider<Scroll> + 'static>(
772800
) -> Result<BoundedVec<Header>, ChainOrchestratorError> {
773801
let blocks = {
774802
let mut blocks = Vec::with_capacity(chain_buffer_size);
775-
let tx = database.tx().await?;
776-
let blocks_stream = tx.get_l2_blocks().await?.take(chain_buffer_size);
803+
let tx = retry_with_defaults("get_l2_blocks_new_tx", || database.tx()).await?;
804+
let blocks_stream = retry_with_defaults("get_l2_blocks", || tx.get_l2_blocks()).await?.take(chain_buffer_size);
777805
pin_mut!(blocks_stream);
778806
while let Some(block_info) = blocks_stream.try_next().await? {
779807
let header = l2_client
@@ -869,9 +897,11 @@ async fn consolidate_chain<P: Provider<Scroll> + 'static>(
869897

870898
// Fetch the safe head from the database. We use this as a trust anchor to reconcile the chain
871899
// back to.
872-
let tx = database.tx().await?;
873-
let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0;
874-
drop(tx);
900+
let safe_head = retry_with_defaults("get_latest_safe_l2_info", || async {
901+
let tx = database.tx().await?;
902+
let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0;
903+
Ok::<_, DatabaseError>(safe_head)
904+
}).await?;
875905

876906
// If the in-memory chain contains the safe head, we check if the safe hash from the
877907
// database (L1 consolidation) matches the in-memory value. If it does not match, we return an
@@ -970,9 +1000,8 @@ async fn validate_l1_messages(
9701000
// TODO: instead of using `l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))` to
9711001
// determine the start of the L1 message stream, we should use a more robust method to determine
9721002
// the start of the L1 message stream.
973-
let tx = database.tx().await?;
974-
let l1_message_stream =
975-
tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))).await?;
1003+
let tx = retry_with_defaults("get_l1_messages_new_tx", || database.tx()).await?;
1004+
let l1_message_stream = retry_with_defaults("get_l1_messages", || tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx)))).await?;
9761005
pin_mut!(l1_message_stream);
9771006

9781007
for message_hash in l1_message_hashes {

crates/database/db/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl From<DatabaseConnection> for Database {
136136
}
137137
}
138138

139-
#[cfg(test)]
139+
#[cfg(all(test, feature = "test-utils"))]
140140
mod test {
141141
use super::*;
142142
use crate::{

crates/database/db/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ pub use operations::{
1919
DatabaseReadOperations, DatabaseWriteOperations, L1MessageStart, UnwindResult,
2020
};
2121

22+
mod retry;
23+
pub use retry::{RetryConfig, retry_config, retry_operation_with_name, retry_with_defaults};
24+
2225
mod transaction;
2326
pub use transaction::{DatabaseTransactionProvider, TXMut, TX};
2427

0 commit comments

Comments
 (0)