From 23d37c856eee934db7ade20e4f27de04ba61de65 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Tue, 21 Oct 2025 14:10:43 +0200 Subject: [PATCH 1/3] feat: skipped l1 messages --- crates/chain-orchestrator/src/lib.rs | 5 +- crates/database/db/src/db.rs | 8 ++ crates/database/db/src/models/l1_message.rs | 2 + crates/database/db/src/operations.rs | 19 ++- crates/database/migration/src/lib.rs | 2 + .../src/m20250304_125946_add_l1_msg_table.rs | 1 + .../m20251021_070729_add_skipped_column.rs | 53 +++++++ crates/derivation-pipeline/src/lib.rs | 16 ++- crates/primitives/src/batch.rs | 9 +- crates/sequencer/tests/e2e.rs | 133 +++++++++++++++++- 10 files changed, 239 insertions(+), 9 deletions(-) create mode 100644 crates/database/migration/src/m20251021_070729_add_skipped_column.rs diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 576ba7d4..b3004f4d 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -493,7 +493,10 @@ impl< batch_reconciliation_result.into_batch_consolidation_outcome(reorg_results).await?; // Insert the batch consolidation outcome into the database. - let consolidation_outcome = batch_consolidation_outcome.clone(); + let mut consolidation_outcome = batch_consolidation_outcome.clone(); + consolidation_outcome + .with_skipped_l1_messages(batch_consolidation_outcome.skipped_l1_messages.clone()); + self.database.insert_batch_consolidation_outcome(consolidation_outcome).await?; Ok(Some(ChainOrchestratorEvent::BatchConsolidated(batch_consolidation_outcome))) diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 1ac5f401..dc7248d3 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -186,6 +186,14 @@ impl DatabaseWriteOperations for Database { .await } + async fn update_skipped_l1_messages(&self, indexes: Vec) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let indexes = indexes.clone(); + async move { tx.update_skipped_l1_messages(indexes).await } + }) + .await + } + async fn delete_l1_messages_gt( &self, l1_block_number: u64, diff --git a/crates/database/db/src/models/l1_message.rs b/crates/database/db/src/models/l1_message.rs index 373d4cfc..bb9eb126 100644 --- a/crates/database/db/src/models/l1_message.rs +++ b/crates/database/db/src/models/l1_message.rs @@ -18,6 +18,7 @@ pub struct Model { sender: Vec, input: Vec, pub(crate) l2_block_number: Option, + skipped: bool, } /// The relation for the L1 message model. @@ -40,6 +41,7 @@ impl From for ActiveModel { sender: ActiveValue::Set(value.transaction.sender.to_vec()), input: ActiveValue::Set(value.transaction.input.to_vec()), l2_block_number: ActiveValue::Set(value.l2_block_number.map(|b| b as i64)), + skipped: ActiveValue::Set(false), } } } diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 09ec3ed0..a3703844 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -58,6 +58,9 @@ pub trait DatabaseWriteOperations { /// Insert an [`L1MessageEnvelope`] into the database. async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError>; + /// Sets the `skipped` column to true for the provided list of L1 messages queue indexes. + async fn update_skipped_l1_messages(&self, indexes: Vec) -> Result<(), DatabaseError>; + /// Delete all [`L1MessageEnvelope`]s with a block number greater than the provided block /// number and return them. async fn delete_l1_messages_gt( @@ -329,6 +332,15 @@ impl DatabaseWriteOperations for T { } } + async fn update_skipped_l1_messages(&self, indexes: Vec) -> Result<(), DatabaseError> { + Ok(models::l1_message::Entity::update_many() + .col_expr(models::l1_message::Column::Skipped, Expr::value(true)) + .filter(models::l1_message::Column::QueueIndex.is_in(indexes.iter().map(|&x| x as i64))) + .exec(self.get_connection()) + .await + .map(|_| ())?) + } + async fn delete_l1_messages_gt( &self, l1_block_number: u64, @@ -531,6 +543,7 @@ impl DatabaseWriteOperations for T { self.insert_block(block.block_info, outcome.batch_info).await?; self.update_l1_messages_with_l2_block(block).await?; } + self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?; Ok(()) } @@ -867,9 +880,10 @@ impl DatabaseReadOperations for T { // Create a filter condition for messages that have an L1 block number less than or // equal to the finalized block number and have not been included in an L2 block - // (i.e. L2BlockNumber is null). + // (i.e. L2BlockNumber is null) nor skipped. let condition = Condition::all() .add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64)) + .add(models::l1_message::Column::Skipped.eq(false)) .add(models::l1_message::Column::L2BlockNumber.is_null()); // Yield n messages matching the condition ordered by increasing queue index. Ok(models::l1_message::Entity::find() @@ -899,9 +913,10 @@ impl DatabaseReadOperations for T { }; // Create a filter condition for messages that have an L1 block number less than // or equal to the target block number and have not been included in an L2 block - // (i.e. L2BlockNumber is null). + // (i.e. L2BlockNumber is null) nor skipped. let condition = Condition::all() .add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64)) + .add(models::l1_message::Column::Skipped.eq(false)) .add(models::l1_message::Column::L2BlockNumber.is_null()); // Yield n messages matching the condition ordered by increasing queue index. Ok(models::l1_message::Entity::find() diff --git a/crates/database/migration/src/lib.rs b/crates/database/migration/src/lib.rs index 9a8bff99..fb7fda90 100644 --- a/crates/database/migration/src/lib.rs +++ b/crates/database/migration/src/lib.rs @@ -15,6 +15,7 @@ mod m20250929_161536_add_additional_indexes; mod m20251001_125444_add_index_processed; mod m20251005_160938_add_initial_l1_block_numbers; mod m20251013_140946_add_initial_l1_processed_block_number; +mod m20251021_070729_add_skipped_column; mod migration_info; pub use migration_info::{ @@ -42,6 +43,7 @@ impl MigratorTrait for Migrator { Box::new(m20251001_125444_add_index_processed::Migration), Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration), Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration), + Box::new(m20251021_070729_add_skipped_column::Migration), ] } } diff --git a/crates/database/migration/src/m20250304_125946_add_l1_msg_table.rs b/crates/database/migration/src/m20250304_125946_add_l1_msg_table.rs index 8583e33a..3c6c367e 100644 --- a/crates/database/migration/src/m20250304_125946_add_l1_msg_table.rs +++ b/crates/database/migration/src/m20250304_125946_add_l1_msg_table.rs @@ -46,4 +46,5 @@ pub(crate) enum L1Message { Value, Sender, Input, + Skipped, } diff --git a/crates/database/migration/src/m20251021_070729_add_skipped_column.rs b/crates/database/migration/src/m20251021_070729_add_skipped_column.rs new file mode 100644 index 00000000..fa962bdc --- /dev/null +++ b/crates/database/migration/src/m20251021_070729_add_skipped_column.rs @@ -0,0 +1,53 @@ +use crate::m20250304_125946_add_l1_msg_table::L1Message; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Add the `skipped` column to the `l1_message` table. + manager + .alter_table( + Table::alter() + .table(L1Message::Table) + .add_column( + ColumnDef::new(L1Message::Skipped).boolean().not_null().default(false), + ) + .to_owned(), + ) + .await?; + + // Add index on `skipped` for the `l1_message` table. + manager + .create_index( + Index::create() + .name("idx_l1_message_skipped") + .col(L1Message::Skipped) + .table(L1Message::Table) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // drop the `skipped` column on the `l1_message` table. + manager + .alter_table( + Table::alter().table(L1Message::Table).drop_column(L1Message::Skipped).to_owned(), + ) + .await?; + + // Drop index `skipped` for the `l1_message` table. + manager + .drop_index( + Index::drop().name("idx_l1_message_skipped").table(L1Message::Table).to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 461740b6..ed245e89 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -244,6 +244,8 @@ pub struct BatchDerivationResult { pub attributes: Vec, /// The batch info associated with the derived attributes. pub batch_info: BatchInfo, + /// The list of skipped L1 messages indexes. + pub skipped_l1_messages: Vec, } /// The derived attributes along with the block number they correspond to. @@ -287,8 +289,11 @@ pub async fn derive( iter_l1_messages_from_payload(&l1_provider, payload_data, l1_v2_message_queue_start_index) .await?; - let skipped_l1_messages = decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default(); - let mut skipped_l1_messages = skipped_l1_messages.into_iter(); + let skipped_l1_messages_bitmap = + decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default(); + let mut skipped_l1_messages_bitmap = skipped_l1_messages_bitmap.into_iter(); + let mut skipped_l1_messages = Vec::new(); + let blocks = decoded.data.into_l2_blocks(); let mut attributes = Vec::with_capacity(blocks.len()); @@ -304,8 +309,10 @@ pub async fn derive( let mut txs = Vec::with_capacity(block.context.num_transactions as usize); for _ in 0..block.context.num_l1_messages { // check if the next l1 message should be skipped. - if matches!(skipped_l1_messages.next(), Some(bit) if bit) { - let _ = l1_messages_iter.next(); + if matches!(skipped_l1_messages_bitmap.next(), Some(bit) if bit) { + if let Some(msg) = l1_messages_iter.next() { + skipped_l1_messages.push(msg.transaction.queue_index) + } continue; } @@ -346,6 +353,7 @@ pub async fn derive( Ok(BatchDerivationResult { attributes, batch_info: BatchInfo { index: batch.index, hash: batch.hash }, + skipped_l1_messages, }) } diff --git a/crates/primitives/src/batch.rs b/crates/primitives/src/batch.rs index 016c5a31..54aa981e 100644 --- a/crates/primitives/src/batch.rs +++ b/crates/primitives/src/batch.rs @@ -60,18 +60,25 @@ pub struct BatchConsolidationOutcome { pub batch_info: BatchInfo, /// The consolidation outcomes for each block in the batch. pub blocks: Vec, + /// The list of skipped L1 messages index. + pub skipped_l1_messages: Vec, } impl BatchConsolidationOutcome { /// Creates a new empty batch consolidation outcome for the given batch info. pub const fn new(batch_info: BatchInfo) -> Self { - Self { batch_info, blocks: Vec::new() } + Self { batch_info, blocks: Vec::new(), skipped_l1_messages: Vec::new() } } /// Pushes a block consolidation outcome to the batch. pub fn push_block(&mut self, block: L2BlockInfoWithL1Messages) { self.blocks.push(block); } + + /// Adds the skipped L1 messages indexes. + pub fn with_skipped_l1_messages(&mut self, skipped: Vec) { + self.skipped_l1_messages = skipped; + } } /// The outcome of consolidating a block with the L2 chain. diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index 0b8db65f..997dcf52 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -20,7 +20,7 @@ use rollup_node_sequencer::{ L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig, SequencerEvent, }; use rollup_node_watcher::L1Notification; -use scroll_alloy_consensus::TxL1Message; +use scroll_alloy_consensus::{ScrollTransaction, TxL1Message}; use scroll_alloy_provider::ScrollAuthApiEngineClient; use scroll_db::{test_utils::setup_test_db, DatabaseWriteOperations}; use scroll_engine::{Engine, ForkchoiceState}; @@ -956,3 +956,134 @@ async fn should_limit_l1_message_cumulative_gas() { assert_eq!(block.body.transactions.len(), 1); assert_eq!(block.header.gas_used(), 21_000); } + +#[tokio::test] +async fn should_not_add_skipped_messages() { + reth_tracing::init_test_tracing(); + + // setup a test node + let chain_spec = SCROLL_DEV.clone(); + let (mut nodes, _tasks, wallet) = + setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false) + .await + .unwrap(); + let node = nodes.pop().unwrap(); + let wallet = Arc::new(Mutex::new(wallet)); + + // create a forkchoice state + let genesis_hash = node.inner.chain_spec().genesis_hash(); + let fcs = ForkchoiceState::new( + BlockInfo { hash: genesis_hash, number: 0 }, + Default::default(), + Default::default(), + ); + + // create the engine driver connected to the node + let auth_client = node.inner.engine_http_client(); + let engine_client = ScrollAuthApiEngineClient::new(auth_client); + let mut engine = Engine::new(Arc::new(engine_client), fcs); + + // create a test database + let database = Arc::new(setup_test_db().await); + let provider = database.clone(); + + // Set the latest and finalized block number + database.set_latest_l1_block_number(5).await.unwrap(); + database.set_finalized_l1_block_number(1).await.unwrap(); + + // create a sequencer + let config = SequencerConfig { + chain_spec: node.inner.chain_spec(), + fee_recipient: Address::random(), + auto_start: false, + payload_building_config: PayloadBuildingConfig { + block_gas_limit: SCROLL_GAS_LIMIT, + max_l1_messages_per_block: 4, + l1_message_inclusion_mode: L1MessageInclusionMode::FinalizedWithBlockDepth(0), + }, + block_time: 0, + payload_building_duration: 0, + allow_empty_blocks: true, + }; + let mut sequencer = Sequencer::new(provider, config); + + // add L1 messages to database + let wallet_lock = wallet.lock().await; + let l1_messages = [ + L1MessageEnvelope { + l1_block_number: 1, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 0, + gas_limit: 100_000, + to: Address::random(), + value: U256::from(1), + sender: wallet_lock.inner.address(), + input: vec![].into(), + }, + }, + L1MessageEnvelope { + l1_block_number: 1, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 1, + gas_limit: 100_000, + to: Address::random(), + value: U256::from(1), + sender: wallet_lock.inner.address(), + input: vec![].into(), + }, + }, + L1MessageEnvelope { + l1_block_number: 1, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 2, + gas_limit: 100_000, + to: Address::random(), + value: U256::from(1), + sender: wallet_lock.inner.address(), + input: vec![].into(), + }, + }, + L1MessageEnvelope { + l1_block_number: 1, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 3, + gas_limit: 100_000, + to: Address::random(), + value: U256::from(1), + sender: wallet_lock.inner.address(), + input: vec![].into(), + }, + }, + ]; + for l1_message in l1_messages { + database.insert_l1_message(l1_message).await.unwrap(); + } + // mark the first two messages as skipped. + database.update_skipped_l1_messages(vec![0, 1]).await.unwrap(); + + // build payload, should only include the last two messages. + sequencer.start_payload_building(&mut engine).await.unwrap(); + let block = if let SequencerEvent::PayloadReady(payload_id) = sequencer.next().await.unwrap() { + let block = sequencer.finalize_payload_building(payload_id, &mut engine).await.unwrap(); + assert!(block.is_some(), "expected a new payload, but got: {:?}", block); + block.unwrap() + } else { + panic!("expected a payload ready event"); + }; + + // verify only one L1 message is included + assert_eq!(block.body.transactions.len(), 2); + assert_eq!( + block.body.transactions.into_iter().filter_map(|x| x.queue_index()).collect::>(), + vec![2, 3] + ); + assert_eq!(block.header.gas_used, 42_000); +} From 40cb106a16bdcc0d4f3372ae5c30a7ac431bdb4b Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Tue, 21 Oct 2025 14:28:04 +0200 Subject: [PATCH 2/3] fix: comments --- crates/chain-orchestrator/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index b3004f4d..70782b01 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -426,6 +426,7 @@ impl< let batch_info = batch.batch_info; tracing::info!(target: "scroll::chain_orchestrator", batch_info = ?batch_info, num_blocks = batch.attributes.len(), "Handling derived batch"); + let skipped_l1_messages = batch.skipped_l1_messages.clone(); let batch_reconciliation_result = reconcile_batch(&self.l2_client, batch, self.engine.fcs()).await?; let aggregated_actions = batch_reconciliation_result.aggregate_actions(); @@ -494,8 +495,7 @@ impl< // Insert the batch consolidation outcome into the database. let mut consolidation_outcome = batch_consolidation_outcome.clone(); - consolidation_outcome - .with_skipped_l1_messages(batch_consolidation_outcome.skipped_l1_messages.clone()); + consolidation_outcome.with_skipped_l1_messages(skipped_l1_messages); self.database.insert_batch_consolidation_outcome(consolidation_outcome).await?; From a0e7d00ff1473eac29eb63b33d421fa9a067d3e5 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Tue, 21 Oct 2025 14:46:51 +0200 Subject: [PATCH 3/3] fix: function proxy issue --- crates/database/db/src/db.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index dc7248d3..f28ae5bc 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -167,10 +167,8 @@ impl DatabaseWriteOperations for Database { &self, block_number: u64, ) -> Result { - self.tx_mut( - move |tx| async move { tx.delete_l2_blocks_gt_block_number(block_number).await }, - ) - .await + self.tx_mut(move |tx| async move { tx.delete_batches_gt_block_number(block_number).await }) + .await } async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result {