diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index ac27f3f0..d84f45ef 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -320,18 +320,6 @@ impl DatabaseWriteOperations for Database { ) } - async fn insert_block( - &self, - block_info: BlockInfo, - batch_info: BatchInfo, - ) -> Result<(), DatabaseError> { - metered!( - DatabaseOperation::InsertBlock, - self, - tx_mut(move |tx| async move { tx.insert_block(block_info, batch_info).await }) - ) - } - async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> { metered!( DatabaseOperation::InsertGenesisBlock, @@ -354,16 +342,16 @@ impl DatabaseWriteOperations for Database { ) } - async fn update_l1_messages_with_l2_block( + async fn update_l1_messages_with_l2_blocks( &self, - block_info: L2BlockInfoWithL1Messages, + blocks: Vec, ) -> Result<(), DatabaseError> { metered!( DatabaseOperation::UpdateL1MessagesWithL2Block, self, tx_mut(move |tx| { - let block_info = block_info.clone(); - async move { tx.update_l1_messages_with_l2_block(block_info).await } + let blocks = blocks.clone(); + async move { tx.update_l1_messages_with_l2_blocks(blocks).await } }) ) } @@ -854,12 +842,14 @@ mod test { let batch_info: BatchInfo = data.clone().into(); db.insert_batch(data).await.unwrap(); + let mut blocks = Vec::new(); for _ in 0..10 { let block_info = BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() }; - db.insert_block(block_info, batch_info).await.unwrap(); block_number += 1; + blocks.push(block_info); } + db.insert_blocks(blocks, batch_info).await.unwrap(); // Fetch the highest block for the batch hash and verify number. let highest_block_info = @@ -893,12 +883,14 @@ mod test { db.insert_batch(first_batch).await.unwrap(); db.insert_batch(second_batch).await.unwrap(); + let mut blocks = Vec::new(); for _ in 0..10 { let block_info = BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() }; - db.insert_block(block_info, first_batch_info).await.unwrap(); block_number += 1; + blocks.push(block_info); } + db.insert_blocks(blocks, first_batch_info).await.unwrap(); // Fetch the highest block for the batch hash and verify number. let highest_block_info = @@ -1136,10 +1128,9 @@ mod test { let mut block_infos = Vec::new(); for i in 200..205 { let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() }; - let l2_block = block_info; block_infos.push(block_info); - db.insert_block(l2_block, batch_info).await.unwrap(); } + db.insert_blocks(block_infos.clone(), batch_info).await.unwrap(); // Test getting existing blocks for expected_block in block_infos { @@ -1177,9 +1168,7 @@ mod test { let safe_block_1 = BlockInfo { number: 200, hash: B256::arbitrary(&mut u).unwrap() }; let safe_block_2 = BlockInfo { number: 201, hash: B256::arbitrary(&mut u).unwrap() }; - db.insert_block(safe_block_1, batch_info).await.unwrap(); - - db.insert_block(safe_block_2, batch_info).await.unwrap(); + db.insert_blocks(vec![safe_block_1, safe_block_2], batch_info).await.unwrap(); // Should return the highest safe block (block 201) let latest_safe = db.get_latest_safe_l2_info().await.unwrap(); @@ -1198,11 +1187,12 @@ mod test { // Insert multiple L2 blocks with batch info let batch_info = BatchInfo { index: 0, hash: B256::default() }; + let mut blocks = Vec::new(); for i in 400..410 { let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() }; - - db.insert_block(block_info, batch_info).await.unwrap(); + blocks.push(block_info); } + db.insert_blocks(blocks, batch_info).await.unwrap(); // Delete blocks with number > 405 let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap(); @@ -1245,10 +1235,9 @@ mod test { for i in 100..110 { let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap(); let batch_info: BatchInfo = batch_data.into(); - let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() }; - db.insert_block(block_info, batch_info).await.unwrap(); + db.insert_blocks(vec![block_info], batch_info).await.unwrap(); } // Delete L2 blocks with batch index > 105 @@ -1304,8 +1293,8 @@ mod test { L2BlockInfoWithL1Messages { block_info, l1_messages: l1_message_hashes.clone() }; // Insert block - db.insert_block(l2_block.block_info, batch_info).await.unwrap(); - db.update_l1_messages_with_l2_block(l2_block).await.unwrap(); + db.insert_blocks(vec![l2_block.block_info], batch_info).await.unwrap(); + db.update_l1_messages_with_l2_blocks(vec![l2_block]).await.unwrap(); // Verify block was inserted let retrieved_block = db.get_l2_block_info_by_number(500).await.unwrap(); @@ -1340,7 +1329,7 @@ mod test { // Insert initial block let block_info = BlockInfo { number: 600, hash: B256::arbitrary(&mut u).unwrap() }; - db.insert_block(block_info, batch_info_1).await.unwrap(); + db.insert_blocks(vec![block_info], batch_info_1).await.unwrap(); // Verify initial insertion let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap(); @@ -1359,7 +1348,7 @@ mod test { assert_eq!(initial_batch_info, batch_info_1); // Update the same block with different batch info (upsert) - db.insert_block(block_info, batch_info_2).await.unwrap(); + db.insert_blocks(vec![block_info], batch_info_2).await.unwrap(); // Verify the block still exists and was updated let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap().unwrap(); @@ -1393,15 +1382,14 @@ mod test { let block_1 = BlockInfo { number: 1, hash: B256::arbitrary(&mut u).unwrap() }; let block_2 = BlockInfo { number: 2, hash: B256::arbitrary(&mut u).unwrap() }; db.insert_batch(batch_data_1.clone()).await.unwrap(); - db.insert_block(block_1, batch_data_1.clone().into()).await.unwrap(); - db.insert_block(block_2, batch_data_1.clone().into()).await.unwrap(); + db.insert_blocks(vec![block_1, block_2], batch_data_1.clone().into()).await.unwrap(); // Insert batch 2 and associate it with one block in the database let batch_data_2 = BatchCommitData { index: 2, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_3 = BlockInfo { number: 3, hash: B256::arbitrary(&mut u).unwrap() }; db.insert_batch(batch_data_2.clone()).await.unwrap(); - db.insert_block(block_3, batch_data_2.clone().into()).await.unwrap(); + db.insert_blocks(vec![block_3], batch_data_2.clone().into()).await.unwrap(); // Insert batch 3 produced at the same block number as batch 2 and associate it with one // block @@ -1409,7 +1397,7 @@ mod test { BatchCommitData { index: 3, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_4 = BlockInfo { number: 4, hash: B256::arbitrary(&mut u).unwrap() }; db.insert_batch(batch_data_3.clone()).await.unwrap(); - db.insert_block(block_4, batch_data_3.clone().into()).await.unwrap(); + db.insert_blocks(vec![block_4], batch_data_3.clone().into()).await.unwrap(); db.set_finalized_l1_block_number(21).await.unwrap(); diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index b97abecb..ad27900b 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -8,7 +8,7 @@ use rollup_node_primitives::{ }; use scroll_alloy_rpc_types_engine::BlockDataHint; use sea_orm::{ - sea_query::{Expr, OnConflict}, + sea_query::{CaseStatement, Expr, OnConflict}, ColumnTrait, Condition, DbErr, EntityTrait, QueryFilter, QueryOrder, QuerySelect, }; use std::fmt; @@ -99,13 +99,6 @@ pub trait DatabaseWriteOperations { batch_info: BatchInfo, ) -> Result<(), DatabaseError>; - /// Insert a new block in the database. - async fn insert_block( - &self, - block_info: BlockInfo, - batch_info: BatchInfo, - ) -> Result<(), DatabaseError>; - /// Insert the genesis block into the database. async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError>; @@ -116,9 +109,9 @@ pub trait DatabaseWriteOperations { ) -> Result<(), DatabaseError>; /// Update the executed L1 messages with the provided L2 block number in the database. - async fn update_l1_messages_with_l2_block( + async fn update_l1_messages_with_l2_blocks( &self, - block_info: L2BlockInfoWithL1Messages, + block_info: Vec, ) -> Result<(), DatabaseError>; /// Purge all L1 message to L2 block mappings from the database for blocks greater or equal to @@ -426,29 +419,18 @@ impl DatabaseWriteOperations for T { &self, blocks: Vec, batch_info: BatchInfo, - ) -> Result<(), DatabaseError> { - for block in blocks { - self.insert_block(block, batch_info).await?; - } - Ok(()) - } - - async fn insert_block( - &self, - block_info: BlockInfo, - batch_info: BatchInfo, ) -> Result<(), DatabaseError> { // We only insert safe blocks into the database, we do not persist unsafe blocks. tracing::trace!( target: "scroll::db", batch_hash = ?batch_info.hash, batch_index = batch_info.index, - block_number = block_info.number, - block_hash = ?block_info.hash, - "Inserting block into database." + blocks = ?blocks, + "Inserting blocks into database." ); - let l2_block: models::l2_block::ActiveModel = (block_info, batch_info).into(); - models::l2_block::Entity::insert(l2_block) + let l2_blocks: Vec = + blocks.into_iter().map(|b| (b, batch_info).into()).collect(); + models::l2_block::Entity::insert_many(l2_blocks) .on_conflict( OnConflict::column(models::l2_block::Column::BlockNumber) .update_columns([ @@ -458,6 +440,7 @@ impl DatabaseWriteOperations for T { ]) .to_owned(), ) + .on_empty_do_nothing() .exec(self.get_connection()) .await?; @@ -467,7 +450,7 @@ impl DatabaseWriteOperations for T { async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> { let genesis_block = BlockInfo::new(0, genesis_hash); let genesis_batch = BatchInfo::new(0, B256::ZERO); - self.insert_block(genesis_block, genesis_batch).await + self.insert_blocks(vec![genesis_block], genesis_batch).await } async fn update_l1_messages_from_l2_blocks( @@ -481,31 +464,63 @@ impl DatabaseWriteOperations for T { .await?; // Then, update the executed L1 messages for each block. - for block in blocks { - self.update_l1_messages_with_l2_block(block).await?; - } + self.update_l1_messages_with_l2_blocks(blocks).await?; + Ok(()) } - async fn update_l1_messages_with_l2_block( + async fn update_l1_messages_with_l2_blocks( &self, - block_info: L2BlockInfoWithL1Messages, + blocks: Vec, ) -> Result<(), DatabaseError> { - tracing::trace!( - target: "scroll::db", - block_number = block_info.block_info.number, - l1_messages = ?block_info.l1_messages, - "Updating executed L1 messages from block with L2 block number in the database." - ); - models::l1_message::Entity::update_many() - .col_expr( - models::l1_message::Column::L2BlockNumber, + if blocks.is_empty() { + return Ok(()); + } + let start = blocks.first().unwrap().block_info.number; + let end = blocks.last().unwrap().block_info.number; + tracing::trace!(target: "scroll::db", start_block = start, end_block = end, "Updating executed L1 messages from blocks with L2 block number in the database."); + + let mut case = CaseStatement::new(); + let mut all_hashes = Vec::new(); + + for block_info in blocks { + if block_info.l1_messages.is_empty() { + continue; + } + + tracing::trace!( + target: "scroll::db", + block_number = block_info.block_info.number, + l1_messages = ?block_info.l1_messages, + "Including L1 messages from block in batch update." + ); + + let hashes: Vec> = block_info.l1_messages.iter().map(|x| x.to_vec()).collect(); + + case = case.case( + models::l1_message::Column::Hash.is_in(hashes.clone()), Expr::value(block_info.block_info.number as i64), - ) - .filter( - models::l1_message::Column::Hash - .is_in(block_info.l1_messages.iter().map(|x| x.to_vec())), - ) + ); + + all_hashes.extend(hashes); + } + + if all_hashes.is_empty() { + return Ok(()); + } + + // query translates to the following sql: + // UPDATE l1_message + // SET l2_block_number = CASE + // WHEN hash IN (block1_hashes) THEN block1_number + // WHEN hash IN (block2_hashes) THEN block2_number + // WHEN hash IN (block3_hashes) THEN block3_number + // ELSE 0 + // END + // WHERE hash IN (all_hashes) + models::l1_message::Entity::update_many() + .col_expr(models::l1_message::Column::L2BlockNumber, case.into()) + .filter(models::l1_message::Column::Hash.is_in(all_hashes)) .exec(self.get_connection()) .await?; @@ -539,10 +554,12 @@ impl DatabaseWriteOperations for T { &self, outcome: BatchConsolidationOutcome, ) -> Result<(), DatabaseError> { - for block in outcome.blocks { - self.insert_block(block.block_info, outcome.batch_info).await?; - self.update_l1_messages_with_l2_block(block).await?; - } + self.insert_blocks( + outcome.blocks.iter().map(|b| b.block_info).collect(), + outcome.batch_info, + ) + .await?; + self.update_l1_messages_with_l2_blocks(outcome.blocks).await?; self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?; Ok(()) } diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index e2261aab..0fa6cecb 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -439,7 +439,7 @@ async fn can_build_blocks_with_finalized_l1_messages() { assert!(!block.body.transactions.iter().any(|tx| tx.tx_hash() == &unfinalized_message_hash)); // Handle the build block with the sequencer in order to update L1 message queue index. - database.update_l1_messages_with_l2_block((&block).into()).await.unwrap(); + database.update_l1_messages_with_l2_blocks(vec![(&block).into()]).await.unwrap(); // update finalized block number to 3, now both messages should be available database.set_finalized_l1_block_number(3).await.unwrap();