Skip to content

Commit 00a8898

Browse files
authored
feat: finalize all batches up to received index (#288)
* feat: finalize all batches up to received index * test: fix finalized block number
1 parent 0c21c64 commit 00a8898

File tree

4 files changed

+65
-49
lines changed

4 files changed

+65
-49
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -721,8 +721,8 @@ impl<
721721
l1_block_number: Arc<AtomicU64>,
722722
l2_block_number: Arc<AtomicU64>,
723723
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
724-
// finalized the batch.
725-
database.finalize_batch(batch_hash, block_number).await?;
724+
// finalize all batches up to `batch_index`.
725+
database.finalize_batches_up_to_index(batch_index, block_number).await?;
726726

727727
let mut finalized_block = None;
728728
let mut finalized_batch = None;

crates/database/db/src/db.rs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -108,33 +108,54 @@ mod test {
108108
}
109109

110110
#[tokio::test]
111-
async fn test_database_finalize_batch_commit() {
111+
async fn test_database_finalize_batch_commits() {
112112
// Set up the test database.
113113
let db = setup_test_db().await;
114114

115115
// Generate unstructured bytes.
116-
let mut bytes = [0u8; 1024];
116+
let mut bytes = [0u8; 2048];
117117
rand::rng().fill(bytes.as_mut_slice());
118118
let mut u = Unstructured::new(&bytes);
119119

120-
// Generate a random BatchCommitData.
121-
let batch_commit = BatchCommitData::arbitrary(&mut u).unwrap();
120+
// Generate 10 finalized batches at L1 block 100.
121+
for i in 0..10 {
122+
let batch_commit = BatchCommitData {
123+
index: i,
124+
calldata: Arc::new(vec![].into()),
125+
finalized_block_number: Some(100),
126+
..Arbitrary::arbitrary(&mut u).unwrap()
127+
};
128+
db.insert_batch(batch_commit.clone()).await.unwrap();
129+
}
130+
// Finalize all batches below batch index 10.
131+
db.finalize_batches_up_to_index(10, 100).await.unwrap();
122132

123-
// Store the batch and finalize it.
124-
let finalized_block_number = u64::arbitrary(&mut u).unwrap();
125-
db.insert_batch(batch_commit.clone()).await.unwrap();
126-
db.finalize_batch(batch_commit.hash, finalized_block_number).await.unwrap();
133+
// Generate 10 commit batches not finalized.
134+
for i in 10..20 {
135+
let batch_commit = BatchCommitData {
136+
index: i,
137+
calldata: Arc::new(vec![].into()),
138+
finalized_block_number: None,
139+
..Arbitrary::arbitrary(&mut u).unwrap()
140+
};
141+
db.insert_batch(batch_commit.clone()).await.unwrap();
142+
}
143+
144+
// Finalize all batches below batch index 15.
145+
db.finalize_batches_up_to_index(15, 200).await.unwrap();
127146

128147
// Verify the finalized_block_number is correctly updated.
129-
let finalized_block_number_from_db = models::batch_commit::Entity::find()
130-
.filter(models::batch_commit::Column::Hash.eq(batch_commit.hash.to_vec()))
131-
.one(db.get_connection())
132-
.await
133-
.unwrap()
134-
.unwrap()
135-
.finalized_block_number
136-
.unwrap();
137-
assert_eq!(finalized_block_number, finalized_block_number_from_db as u64);
148+
let batches = db.get_batches().await.unwrap().collect::<Vec<Result<_, _>>>().await;
149+
for batch in batches {
150+
let batch = batch.unwrap();
151+
if batch.index < 10 {
152+
assert_eq!(batch.finalized_block_number, Some(100));
153+
} else if batch.index <= 15 {
154+
assert_eq!(batch.finalized_block_number, Some(200));
155+
} else {
156+
assert_eq!(batch.finalized_block_number, None);
157+
}
158+
}
138159
}
139160

140161
#[tokio::test]
@@ -281,7 +302,7 @@ mod test {
281302
// Finalize batch up to block number 110.
282303
if block_number <= 110 {
283304
finalized_batches_hashes.push(hash);
284-
db.finalize_batch(hash, block_number).await.unwrap();
305+
db.finalize_batches_up_to_index(batch_index, block_number).await.unwrap();
285306
}
286307

287308
block_number += 1;

crates/database/db/src/operations.rs

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use rollup_node_primitives::{
99
use scroll_alloy_rpc_types_engine::BlockDataHint;
1010
use sea_orm::{
1111
sea_query::{Expr, OnConflict},
12-
ActiveModelTrait, ColumnTrait, Condition, DbErr, EntityTrait, QueryFilter, QueryOrder,
13-
QuerySelect, Set,
12+
ColumnTrait, Condition, DbErr, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
1413
};
1514
use std::fmt;
1615

@@ -39,34 +38,27 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
3938
.map(|_| ())?)
4039
}
4140

42-
/// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
41+
/// Finalizes all [`BatchCommitData`] up to the provided `batch_index` by setting their
4342
/// finalized block number to the provided block number.
44-
///
45-
/// Errors if the [`BatchCommitData`] associated with the provided `batch_hash` is not found in
46-
/// the database, this method logs and returns an error.
47-
async fn finalize_batch(
43+
async fn finalize_batches_up_to_index(
4844
&self,
49-
batch_hash: B256,
45+
batch_index: u64,
5046
block_number: u64,
5147
) -> Result<(), DatabaseError> {
52-
if let Some(batch) = models::batch_commit::Entity::find()
53-
.filter(models::batch_commit::Column::Hash.eq(batch_hash.to_vec()))
54-
.one(self.get_connection())
55-
.await?
56-
{
57-
tracing::trace!(target: "scroll::db", batch_hash = ?batch_hash, block_number, "Finalizing batch commit in database.");
58-
let mut batch: models::batch_commit::ActiveModel = batch.into();
59-
batch.finalized_block_number = Set(Some(block_number as i64));
60-
batch.update(self.get_connection()).await?;
61-
} else {
62-
tracing::error!(
63-
target: "scroll::db",
64-
batch_hash = ?batch_hash,
65-
block_number,
66-
"Batch not found in DB when trying to finalize."
67-
);
68-
return Err(DatabaseError::BatchNotFound(batch_hash));
69-
}
48+
tracing::trace!(target: "scroll::db", ?batch_index, block_number, "Finalizing batch commits in database up to index.");
49+
50+
models::batch_commit::Entity::update_many()
51+
.filter(
52+
models::batch_commit::Column::Index
53+
.lte(batch_index)
54+
.and(models::batch_commit::Column::FinalizedBlockNumber.is_null()),
55+
)
56+
.col_expr(
57+
models::batch_commit::Column::FinalizedBlockNumber,
58+
Expr::value(Some(block_number as i64)),
59+
)
60+
.exec(self.get_connection())
61+
.await?;
7062

7163
Ok(())
7264
}

crates/watcher/src/lib.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ where
263263
}
264264

265265
// send all notifications on the channel.
266-
self.notify_all(notifications).await;
266+
self.notify_all(notifications).await?;
267267

268268
// update the latest block the l1 watcher has indexed.
269269
self.update_current_block(&latest);
@@ -587,12 +587,13 @@ where
587587
}
588588

589589
/// Send all notifications on the channel.
590-
async fn notify_all(&self, notifications: Vec<L1Notification>) {
590+
async fn notify_all(&self, notifications: Vec<L1Notification>) -> L1WatcherResult<()> {
591591
for notification in notifications {
592592
self.metrics.process_l1_notification(&notification);
593593
tracing::trace!(target: "scroll::watcher", %notification, "sending l1 notification");
594-
let _ = self.notify(notification).await;
594+
self.notify(notification).await?;
595595
}
596+
Ok(())
596597
}
597598

598599
/// Send the notification in the channel.
@@ -654,6 +655,8 @@ where
654655
// block.
655656
filter = filter.from_block(self.current_block_number + 1).to_block(to_block);
656657

658+
tracing::trace!(target: "scroll::watcher", ?filter, "fetching logs");
659+
657660
Ok(self.execution_provider.get_logs(&filter).await?)
658661
}
659662
}

0 commit comments

Comments
 (0)