Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ impl<
let batch_info = batch.batch_info;
tracing::info!(target: "scroll::chain_orchestrator", batch_info = ?batch_info, num_blocks = batch.attributes.len(), "Handling derived batch");

let skipped_l1_messages = batch.skipped_l1_messages.clone();
let batch_reconciliation_result =
reconcile_batch(&self.l2_client, batch, self.engine.fcs()).await?;
let aggregated_actions = batch_reconciliation_result.aggregate_actions();
Expand Down Expand Up @@ -493,7 +494,9 @@ impl<
batch_reconciliation_result.into_batch_consolidation_outcome(reorg_results).await?;

// Insert the batch consolidation outcome into the database.
let consolidation_outcome = batch_consolidation_outcome.clone();
let mut consolidation_outcome = batch_consolidation_outcome.clone();
consolidation_outcome.with_skipped_l1_messages(skipped_l1_messages);

self.database.insert_batch_consolidation_outcome(consolidation_outcome).await?;

Ok(Some(ChainOrchestratorEvent::BatchConsolidated(batch_consolidation_outcome)))
Expand Down
14 changes: 10 additions & 4 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,8 @@ impl DatabaseWriteOperations for Database {
&self,
block_number: u64,
) -> Result<u64, DatabaseError> {
self.tx_mut(
move |tx| async move { tx.delete_l2_blocks_gt_block_number(block_number).await },
)
.await
self.tx_mut(move |tx| async move { tx.delete_batches_gt_block_number(block_number).await })
.await
}

async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result<u64, DatabaseError> {
Expand All @@ -186,6 +184,14 @@ impl DatabaseWriteOperations for Database {
.await
}

async fn update_skipped_l1_messages(&self, indexes: Vec<u64>) -> Result<(), DatabaseError> {
self.tx_mut(move |tx| {
let indexes = indexes.clone();
async move { tx.update_skipped_l1_messages(indexes).await }
})
.await
}

async fn delete_l1_messages_gt(
&self,
l1_block_number: u64,
Expand Down
2 changes: 2 additions & 0 deletions crates/database/db/src/models/l1_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Model {
sender: Vec<u8>,
input: Vec<u8>,
pub(crate) l2_block_number: Option<i64>,
skipped: bool,
}

/// The relation for the L1 message model.
Expand All @@ -40,6 +41,7 @@ impl From<L1MessageEnvelope> for ActiveModel {
sender: ActiveValue::Set(value.transaction.sender.to_vec()),
input: ActiveValue::Set(value.transaction.input.to_vec()),
l2_block_number: ActiveValue::Set(value.l2_block_number.map(|b| b as i64)),
skipped: ActiveValue::Set(false),
}
}
}
Expand Down
19 changes: 17 additions & 2 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub trait DatabaseWriteOperations {
/// Insert an [`L1MessageEnvelope`] into the database.
async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError>;

/// Sets the `skipped` column to true for the provided list of L1 messages queue indexes.
async fn update_skipped_l1_messages(&self, indexes: Vec<u64>) -> Result<(), DatabaseError>;

/// Delete all [`L1MessageEnvelope`]s with a block number greater than the provided block
/// number and return them.
async fn delete_l1_messages_gt(
Expand Down Expand Up @@ -329,6 +332,15 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
}
}

async fn update_skipped_l1_messages(&self, indexes: Vec<u64>) -> Result<(), DatabaseError> {
Ok(models::l1_message::Entity::update_many()
.col_expr(models::l1_message::Column::Skipped, Expr::value(true))
.filter(models::l1_message::Column::QueueIndex.is_in(indexes.iter().map(|&x| x as i64)))
.exec(self.get_connection())
.await
.map(|_| ())?)
}

async fn delete_l1_messages_gt(
&self,
l1_block_number: u64,
Expand Down Expand Up @@ -531,6 +543,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
self.insert_block(block.block_info, outcome.batch_info).await?;
self.update_l1_messages_with_l2_block(block).await?;
}
self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?;
Ok(())
}

Expand Down Expand Up @@ -867,9 +880,10 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {

// Create a filter condition for messages that have an L1 block number less than or
// equal to the finalized block number and have not been included in an L2 block
// (i.e. L2BlockNumber is null).
// (i.e. L2BlockNumber is null) nor skipped.
let condition = Condition::all()
.add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64))
.add(models::l1_message::Column::Skipped.eq(false))
.add(models::l1_message::Column::L2BlockNumber.is_null());
// Yield n messages matching the condition ordered by increasing queue index.
Ok(models::l1_message::Entity::find()
Expand Down Expand Up @@ -899,9 +913,10 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
};
// Create a filter condition for messages that have an L1 block number less than
// or equal to the target block number and have not been included in an L2 block
// (i.e. L2BlockNumber is null).
// (i.e. L2BlockNumber is null) nor skipped.
let condition = Condition::all()
.add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64))
.add(models::l1_message::Column::Skipped.eq(false))
.add(models::l1_message::Column::L2BlockNumber.is_null());
// Yield n messages matching the condition ordered by increasing queue index.
Ok(models::l1_message::Entity::find()
Expand Down
2 changes: 2 additions & 0 deletions crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod m20250929_161536_add_additional_indexes;
mod m20251001_125444_add_index_processed;
mod m20251005_160938_add_initial_l1_block_numbers;
mod m20251013_140946_add_initial_l1_processed_block_number;
mod m20251021_070729_add_skipped_column;

mod migration_info;
pub use migration_info::{
Expand Down Expand Up @@ -42,6 +43,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
Box::new(m20251001_125444_add_index_processed::Migration),
Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration),
Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration),
Box::new(m20251021_070729_add_skipped_column::Migration),
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ pub(crate) enum L1Message {
Value,
Sender,
Input,
Skipped,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::m20250304_125946_add_l1_msg_table::L1Message;
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Add the `skipped` column to the `l1_message` table.
manager
.alter_table(
Table::alter()
.table(L1Message::Table)
.add_column(
ColumnDef::new(L1Message::Skipped).boolean().not_null().default(false),
)
.to_owned(),
)
.await?;

// Add index on `skipped` for the `l1_message` table.
manager
.create_index(
Index::create()
.name("idx_l1_message_skipped")
.col(L1Message::Skipped)
.table(L1Message::Table)
.to_owned(),
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// drop the `skipped` column on the `l1_message` table.
manager
.alter_table(
Table::alter().table(L1Message::Table).drop_column(L1Message::Skipped).to_owned(),
)
.await?;

// Drop index `skipped` for the `l1_message` table.
manager
.drop_index(
Index::drop().name("idx_l1_message_skipped").table(L1Message::Table).to_owned(),
)
.await?;

Ok(())
}
}
16 changes: 12 additions & 4 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ pub struct BatchDerivationResult {
pub attributes: Vec<DerivedAttributes>,
/// The batch info associated with the derived attributes.
pub batch_info: BatchInfo,
/// The list of skipped L1 messages indexes.
pub skipped_l1_messages: Vec<u64>,
}

/// The derived attributes along with the block number they correspond to.
Expand Down Expand Up @@ -287,8 +289,11 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
iter_l1_messages_from_payload(&l1_provider, payload_data, l1_v2_message_queue_start_index)
.await?;

let skipped_l1_messages = decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default();
let mut skipped_l1_messages = skipped_l1_messages.into_iter();
let skipped_l1_messages_bitmap =
decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default();
let mut skipped_l1_messages_bitmap = skipped_l1_messages_bitmap.into_iter();
let mut skipped_l1_messages = Vec::new();

let blocks = decoded.data.into_l2_blocks();
let mut attributes = Vec::with_capacity(blocks.len());

Expand All @@ -304,8 +309,10 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
let mut txs = Vec::with_capacity(block.context.num_transactions as usize);
for _ in 0..block.context.num_l1_messages {
// check if the next l1 message should be skipped.
if matches!(skipped_l1_messages.next(), Some(bit) if bit) {
let _ = l1_messages_iter.next();
if matches!(skipped_l1_messages_bitmap.next(), Some(bit) if bit) {
if let Some(msg) = l1_messages_iter.next() {
skipped_l1_messages.push(msg.transaction.queue_index)
}
continue;
}

Expand Down Expand Up @@ -346,6 +353,7 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
Ok(BatchDerivationResult {
attributes,
batch_info: BatchInfo { index: batch.index, hash: batch.hash },
skipped_l1_messages,
})
}

Expand Down
9 changes: 8 additions & 1 deletion crates/primitives/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,25 @@ pub struct BatchConsolidationOutcome {
pub batch_info: BatchInfo,
/// The consolidation outcomes for each block in the batch.
pub blocks: Vec<L2BlockInfoWithL1Messages>,
/// The list of skipped L1 messages index.
pub skipped_l1_messages: Vec<u64>,
}

impl BatchConsolidationOutcome {
/// Creates a new empty batch consolidation outcome for the given batch info.
pub const fn new(batch_info: BatchInfo) -> Self {
Self { batch_info, blocks: Vec::new() }
Self { batch_info, blocks: Vec::new(), skipped_l1_messages: Vec::new() }
}

/// Pushes a block consolidation outcome to the batch.
pub fn push_block(&mut self, block: L2BlockInfoWithL1Messages) {
self.blocks.push(block);
}

/// Adds the skipped L1 messages indexes.
pub fn with_skipped_l1_messages(&mut self, skipped: Vec<u64>) {
self.skipped_l1_messages = skipped;
}
}

/// The outcome of consolidating a block with the L2 chain.
Expand Down
Loading
Loading