Skip to content

Commit b0e1e94

Browse files
committed
add test cases
1 parent ccad3cb commit b0e1e94

File tree

9 files changed

+259
-43
lines changed

9 files changed

+259
-43
lines changed

crates/chain-orchestrator/src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub enum ChainOrchestratorEvent {
5454
},
5555
/// A batch has been reverted returning the batch info and the new safe head.
5656
BatchReverted {
57-
/// The batch info of the reverted batch.
57+
/// The latest batch info after the revert.
5858
batch_info: BatchInfo,
5959
/// The new safe head after the revert.
6060
safe_head: BlockInfo,

crates/chain-orchestrator/src/handle/command.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
3030
/// Enable gossiping of blocks to peers.
3131
#[cfg(feature = "test-utils")]
3232
SetGossip((bool, oneshot::Sender<()>)),
33+
/// Returns a database handle for direct database access.
34+
#[cfg(feature = "test-utils")]
35+
DatabaseHandle(oneshot::Sender<std::sync::Arc<scroll_db::Database>>),
3336
}
3437

3538
/// The database queries that can be sent to the rollup manager.

crates/chain-orchestrator/src/handle/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,14 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
110110
self.send_command(ChainOrchestratorCommand::SetGossip((enabled, tx)));
111111
rx.await
112112
}
113+
114+
/// Sends a command to the rollup manager to get a database handle for direct database access.
115+
#[cfg(feature = "test-utils")]
116+
pub async fn get_database_handle(
117+
&self,
118+
) -> Result<std::sync::Arc<scroll_db::Database>, oneshot::error::RecvError> {
119+
let (tx, rx) = oneshot::channel();
120+
self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx));
121+
rx.await
122+
}
113123
}

crates/chain-orchestrator/src/lib.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,10 @@ impl<
390390
self.network.handle().set_gossip(enabled).await;
391391
let _ = tx.send(());
392392
}
393+
#[cfg(feature = "test-utils")]
394+
ChainOrchestratorCommand::DatabaseHandle(tx) => {
395+
let _ = tx.send(self.database.clone());
396+
}
393397
}
394398

395399
Ok(())
@@ -708,7 +712,7 @@ impl<
708712
.await?;
709713

710714
if self.sync_state.is_synced() {
711-
self.derivation_pipeline.push_batch(batch_info, BatchStatus::Committed).await;
715+
self.derivation_pipeline.push_batch(batch_info, BatchStatus::Consolidated).await;
712716
}
713717

714718
Ok(event)
@@ -763,12 +767,12 @@ impl<
763767

764768
/// Handles a batch revert event by updating the database.
765769
async fn handle_batch_revert(
766-
&self,
770+
&mut self,
767771
start_index: u64,
768772
end_index: u64,
769773
l1_block_info: BlockInfo,
770774
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
771-
let event = self
775+
let (safe_block_info, batch_info) = self
772776
.database
773777
.tx_mut(move |tx| async move {
774778
tx.insert_l1_block_info(l1_block_info).await?;
@@ -780,15 +784,14 @@ impl<
780784
.await?;
781785

782786
// handle the case of a batch revert.
783-
let (safe_head, batch_info) = tx.get_latest_safe_l2_info().await?;
784-
785-
let event = ChainOrchestratorEvent::BatchReverted { batch_info, safe_head };
786-
787-
Ok::<_, ChainOrchestratorError>(Some(event))
787+
Ok::<_, ChainOrchestratorError>(tx.get_latest_safe_l2_info().await?)
788788
})
789789
.await?;
790790

791-
Ok(event)
791+
// Update the forkchoice state to the new safe block.
792+
self.engine.update_fcs(None, Some(safe_block_info), None).await?;
793+
794+
Ok(Some(ChainOrchestratorEvent::BatchReverted { batch_info, safe_head: safe_block_info }))
792795
}
793796

794797
/// Handles an L1 message by inserting it into the database.
@@ -1144,7 +1147,7 @@ impl<
11441147
///
11451148
/// This involves validating the L1 messages in the blocks against the expected L1 messages
11461149
/// synced from L1.
1147-
async fn consolidate_chain(&self) -> Result<(), ChainOrchestratorError> {
1150+
async fn consolidate_chain(&mut self) -> Result<(), ChainOrchestratorError> {
11481151
tracing::trace!(target: "scroll::chain_orchestrator", fcs = ?self.engine.fcs(), "Consolidating chain from safe to head");
11491152

11501153
let safe_block_number = self.engine.fcs().safe_block_info().number;
@@ -1181,6 +1184,14 @@ impl<
11811184
// transactions into the transaction pool.
11821185
self.network.handle().inner().update_sync_state(RethSyncState::Idle);
11831186

1187+
// Fetch all unprocessed committed batches and push them to the derivation pipeline as
1188+
// consolidated.
1189+
let committed_batches =
1190+
self.database.fetch_and_update_unprocessed_committed_batches().await?;
1191+
for batch_commit in committed_batches {
1192+
self.derivation_pipeline.push_batch(batch_commit, BatchStatus::Consolidated).await;
1193+
}
1194+
11841195
self.notify(ChainOrchestratorEvent::ChainConsolidated {
11851196
from: safe_block_number,
11861197
to: head_block_number,

crates/database/db/src/db.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,18 @@ impl DatabaseWriteOperations for Database {
323323
)
324324
}
325325

326+
async fn fetch_and_update_unprocessed_committed_batches(
327+
&self,
328+
) -> Result<Vec<BatchInfo>, DatabaseError> {
329+
metered!(
330+
DatabaseOperation::FetchAndUpdateUnprocessedCommittedBatches,
331+
self,
332+
tx_mut(
333+
move |tx| async move { tx.fetch_and_update_unprocessed_committed_batches().await }
334+
)
335+
)
336+
}
337+
326338
async fn delete_batches_gt_block_number(
327339
&self,
328340
block_number: u64,

crates/database/db/src/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub(crate) enum DatabaseOperation {
3333
SetProcessedL1BlockNumber,
3434
SetL2HeadBlockNumber,
3535
FetchAndUpdateUnprocessedFinalizedBatches,
36+
FetchAndUpdateUnprocessedCommittedBatches,
3637
DeleteBatchesGtBlockNumber,
3738
DeleteBatchesGtBatchIndex,
3839
InsertL1Message,
@@ -98,6 +99,9 @@ impl DatabaseOperation {
9899
Self::FetchAndUpdateUnprocessedFinalizedBatches => {
99100
"fetch_and_update_unprocessed_finalized_batches"
100101
}
102+
Self::FetchAndUpdateUnprocessedCommittedBatches => {
103+
"fetch_and_update_unprocessed_committed_batches"
104+
}
101105
Self::DeleteBatchesGtBlockNumber => "delete_batches_gt_block_number",
102106
Self::DeleteBatchesGtBatchIndex => "delete_batches_gt_batch_index",
103107
Self::InsertL1Message => "insert_l1_message",

crates/database/db/src/operations.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,17 @@ pub trait DatabaseWriteOperations {
5353
async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError>;
5454

5555
/// Fetches unprocessed batches up to the provided finalized L1 block number and updates their
56-
/// status.
56+
/// status to processing.
5757
async fn fetch_and_update_unprocessed_finalized_batches(
5858
&self,
5959
finalized_l1_block_number: u64,
6060
) -> Result<Vec<BatchInfo>, DatabaseError>;
6161

62+
/// Fetches unprocessed committed batches and updates their status to processing.
63+
async fn fetch_and_update_unprocessed_committed_batches(
64+
&self,
65+
) -> Result<Vec<BatchInfo>, DatabaseError>;
66+
6267
/// Delete all [`BatchCommitData`]s with a block number greater than the provided block number.
6368
async fn delete_batches_gt_block_number(&self, block_number: u64)
6469
-> Result<u64, DatabaseError>;
@@ -520,6 +525,35 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
520525
Ok(batches)
521526
}
522527

528+
async fn fetch_and_update_unprocessed_committed_batches(
529+
&self,
530+
) -> Result<Vec<BatchInfo>, DatabaseError> {
531+
let conditions = Condition::all().add(models::batch_commit::Column::Status.eq("committed"));
532+
533+
let batches = models::batch_commit::Entity::find()
534+
.filter(conditions.clone())
535+
.order_by_asc(models::batch_commit::Column::Index)
536+
.select_only()
537+
.column(models::batch_commit::Column::Index)
538+
.column(models::batch_commit::Column::Hash)
539+
.into_tuple::<(i64, Vec<u8>)>()
540+
.all(self.get_connection())
541+
.await
542+
.map(|x| {
543+
x.into_iter()
544+
.map(|(index, hash)| BatchInfo::new(index as u64, B256::from_slice(&hash)))
545+
.collect()
546+
})?;
547+
548+
models::batch_commit::Entity::update_many()
549+
.col_expr(models::batch_commit::Column::Status, Expr::value("processing"))
550+
.filter(conditions)
551+
.exec(self.get_connection())
552+
.await?;
553+
554+
Ok(batches)
555+
}
556+
523557
async fn delete_batches_gt_block_number(
524558
&self,
525559
block_number: u64,

0 commit comments

Comments
 (0)