Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/chain-orchestrator/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloy_json_rpc::RpcError;
use alloy_primitives::B256;
use alloy_transport::TransportErrorKind;
use scroll_db::{DatabaseError, L1MessageStart};
use scroll_db::{DatabaseError, L1MessageKey};

/// A type that represents an error that occurred in the chain orchestrator.
#[derive(Debug, thiserror::Error)]
Expand All @@ -28,7 +28,7 @@ pub enum ChainOrchestratorError {
},
/// An L1 message was not found in the database.
#[error("L1 message not found at {0}")]
L1MessageNotFound(L1MessageStart),
L1MessageNotFound(L1MessageKey),
/// A gap was detected in the L1 message queue: the previous message before index {0} is
/// missing.
#[error("L1 message queue gap detected at index {0}, previous L1 message not found")]
Expand Down
12 changes: 8 additions & 4 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
use scroll_db::{
Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider,
DatabaseWriteOperations, L1MessageStart, UnwindResult,
DatabaseWriteOperations, L1MessageKey, UnwindResult,
};
use scroll_network::NewBlockWithPeer;
use std::{
Expand Down Expand Up @@ -838,7 +838,7 @@ async fn compute_l1_message_queue_hash(
})
.await?
.map(|m| m.queue_hash)
.ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?
.ok_or(DatabaseError::L1MessageNotFound(L1MessageKey::QueueIndex(index)))?
.unwrap_or_default()
.to_vec();
input.append(&mut l1_message.tx_hash().to_vec());
Expand Down Expand Up @@ -1078,7 +1078,9 @@ async fn validate_l1_messages(
let l1_message_stream = Retry::default()
.retry("get_l1_messages", || async {
let messages = tx
.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx)))
.get_l1_messages(
l1_message_hashes.first().map(|tx| L1MessageKey::TransactionHash(*tx)),
)
.await?;
Ok::<_, ChainOrchestratorError>(messages)
})
Expand All @@ -1093,7 +1095,9 @@ async fn validate_l1_messages(
.await
.map(|m| m.map(|msg| msg.transaction.tx_hash()))
.transpose()?
.ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageStart::Hash(message_hash)))?;
.ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageKey::TransactionHash(
message_hash,
)))?;

// If the received and expected L1 messages do not match return an error.
if message_hash != expected_hash {
Expand Down
6 changes: 3 additions & 3 deletions crates/database/db/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::L1MessageStart;
use super::L1MessageKey;
use sea_orm::sqlx::Error as SqlxError;

/// The error type for database operations.
Expand All @@ -17,6 +17,6 @@ pub enum DatabaseError {
#[error("failed to serde metadata value: {0}")]
MetadataSerdeError(#[from] serde_json::Error),
/// The L1 message was not found in database.
#[error("L1 message at index [{0}] not found in database")]
L1MessageNotFound(L1MessageStart),
#[error("L1 message at key [{0}] not found in database")]
L1MessageNotFound(L1MessageKey),
}
4 changes: 1 addition & 3 deletions crates/database/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ mod models;
pub use models::*;

mod operations;
pub use operations::{
DatabaseReadOperations, DatabaseWriteOperations, L1MessageStart, UnwindResult,
};
pub use operations::{DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, UnwindResult};

mod transaction;
pub use transaction::{DatabaseTransactionProvider, TXMut, TX};
Expand Down
73 changes: 48 additions & 25 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,18 +531,34 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
/// `start` point.
async fn get_l1_messages<'a>(
&'a self,
start: Option<L1MessageStart>,
start: Option<L1MessageKey>,
) -> Result<impl Stream<Item = Result<L1MessageEnvelope, DatabaseError>> + 'a, DatabaseError>
{
let queue_index = match start {
Some(L1MessageStart::Index(i)) => i,
Some(L1MessageStart::Hash(ref h)) => {
Some(L1MessageKey::QueueIndex(i)) => i,
Some(L1MessageKey::TransactionHash(ref h)) => {
// Lookup message by hash
let record = models::l1_message::Entity::find()
.filter(models::l1_message::Column::Hash.eq(h.to_vec()))
.one(self.get_connection())
.await?
.ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageStart::Hash(*h)))?;
.ok_or_else(|| {
DatabaseError::L1MessageNotFound(L1MessageKey::TransactionHash(*h))
})?;

record.queue_index as u64
}
Some(L1MessageKey::QueueHash(ref h)) => {
// Lookup message by queue hash
let record = models::l1_message::Entity::find()
.filter(
Condition::all()
.add(models::l1_message::Column::QueueHash.is_not_null())
.add(models::l1_message::Column::QueueHash.eq(h.to_vec())),
)
.one(self.get_connection())
.await?
.ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageKey::QueueHash(*h)))?;

record.queue_index as u64
}
Expand Down Expand Up @@ -691,36 +707,43 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
}
}

/// This type defines the start of an L1 message stream.
/// A key for an L1 message stored in the database.
///
/// It can either be an index, which is the queue index of the first message to return, or a hash,
/// which is the hash of the first message to return.
/// It can either be the queue index, queue hash or the transaction hash.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum L1MessageStart {
/// Start from the provided queue index.
Index(u64),
/// Start from the provided queue hash.
Hash(B256),
pub enum L1MessageKey {
/// The queue index of the message.
QueueIndex(u64),
/// The queue hash of the message.
QueueHash(B256),
/// The transaction hash of the message.
TransactionHash(B256),
}

impl fmt::Display for L1MessageStart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Index(index) => write!(f, "Index({index})"),
Self::Hash(hash) => write!(f, "Hash({hash:#x})"),
}
impl L1MessageKey {
/// Create a new [`L1MessageKey`] from a queue index.
pub const fn from_queue_index(index: u64) -> Self {
Self::QueueIndex(index)
}

/// Create a new [`L1MessageKey`] from a queue hash.
pub const fn from_queue_hash(hash: B256) -> Self {
Self::QueueHash(hash)
}
}

impl From<u64> for L1MessageStart {
fn from(value: u64) -> Self {
Self::Index(value)
/// Create a new [`L1MessageKey`] from a transaction hash.
pub const fn from_transaction_hash(hash: B256) -> Self {
Self::TransactionHash(hash)
}
}

impl From<B256> for L1MessageStart {
fn from(value: B256) -> Self {
Self::Hash(value)
impl fmt::Display for L1MessageKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::QueueIndex(index) => write!(f, "QueueIndex({index})"),
Self::QueueHash(hash) => write!(f, "QueueHash({hash:#x})"),
Self::TransactionHash(hash) => write!(f, "TransactionHash({hash:#x})"),
}
}
}

Expand Down
16 changes: 11 additions & 5 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use rollup_node_primitives::{
use rollup_node_providers::{BlockDataProvider, L1Provider};
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
use scroll_codec::{decoding::payload::PayloadData, Codec};
use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider};
use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey};
use tokio::time::Interval;

/// A future that resolves to a stream of [`ScrollPayloadAttributesWithBatchInfo`].
Expand Down Expand Up @@ -352,25 +352,31 @@ async fn iter_l1_messages_from_payload<L1P: L1Provider>(
let total_l1_messages = data.blocks.iter().map(|b| b.context.num_l1_messages as u64).sum();

let messages = if let Some(index) = data.queue_index_start() {
provider.get_n_messages(index.into(), total_l1_messages).await.map_err(Into::into)?
provider
.get_n_messages(L1MessageKey::from_queue_index(index), total_l1_messages)
.await
.map_err(Into::into)?
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
// If the message queue hash is zero then we should use the V2 L1 message queue start
// index. We must apply this branch logic because we do not have a L1
// message associated with a queue hash of ZERO (we only compute a queue
// hash for the first L1 message of the V2 contract).
if hash == &B256::ZERO {
provider
.get_n_messages(l1_v2_message_queue_start_index.into(), total_l1_messages)
.get_n_messages(
L1MessageKey::from_queue_index(l1_v2_message_queue_start_index),
total_l1_messages,
)
.await
.map_err(Into::into)?
} else {
let mut messages = provider
.get_n_messages((*hash).into(), total_l1_messages + 1)
.get_n_messages(L1MessageKey::from_queue_hash(*hash), total_l1_messages + 1)
.await
.map_err(Into::into)?;
// we skip the first l1 message, as we are interested in the one starting after
// prev_l1_message_queue_hash.
messages.pop();
messages.remove(0);
messages
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions crates/manager/src/manager/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use reth_scroll_primitives::ScrollBlock;
use rollup_node_chain_orchestrator::ChainOrchestratorEvent;
use rollup_node_signer::SignerEvent;
use rollup_node_watcher::L1Notification;
use scroll_db::L1MessageStart;
use scroll_db::L1MessageKey;
use scroll_engine::ConsolidationOutcome;
use scroll_network::NewBlockWithPeer;

Expand Down Expand Up @@ -33,8 +33,8 @@ pub enum RollupManagerEvent {
},
/// A block has been received containing an L1 message that is not in the database.
L1MessageMissingInDatabase {
/// The L1 message start index or hash.
start: L1MessageStart,
/// The L1 message key.
key: L1MessageKey,
},
/// An event was received from the L1 watcher.
L1NotificationEvent(L1Notification),
Expand Down
2 changes: 1 addition & 1 deletion crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ where

if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1MessageMissingInDatabase {
start: start.clone(),
key: start.clone(),
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/node/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use rollup_node_sequencer::L1MessageInclusionMode;
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
use scroll_alloy_rpc_types::Transaction as ScrollAlloyTransaction;
use scroll_db::{test_utils::setup_test_db, L1MessageStart};
use scroll_db::{test_utils::setup_test_db, L1MessageKey};
use scroll_network::NewBlockWithPeer;
use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler};
use std::{
Expand Down Expand Up @@ -1719,7 +1719,7 @@ async fn can_reject_l2_block_with_unknown_l1_message() -> eyre::Result<()> {
wait_for_event_5s(
&mut node1_rnm_events,
RollupManagerEvent::L1MessageMissingInDatabase {
start: L1MessageStart::Hash(b256!(
key: L1MessageKey::TransactionHash(b256!(
"0x0a2f8e75392ab51a26a2af835042c614eb141cd934fe1bdd4934c10f2fe17e98"
)),
},
Expand Down
4 changes: 2 additions & 2 deletions crates/node/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> {
// message.
wait_n_events(
&mut follower_events,
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { start: _ }),
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { key: _ }),
1,
)
.await;
Expand Down Expand Up @@ -599,7 +599,7 @@ async fn test_consolidation() -> eyre::Result<()> {
// Assert that the follower node rejects the new block as it hasn't received the L1 message.
wait_n_events(
&mut follower_events,
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { start: _ }),
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { key: _ }),
1,
)
.await;
Expand Down
8 changes: 3 additions & 5 deletions crates/providers/src/l1/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use crate::L1ProviderError;

use futures::{StreamExt, TryStreamExt};
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::{
DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageStart,
};
use scroll_db::{DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey};

/// An instance of the trait can provide L1 messages iterators.
#[async_trait::async_trait]
Expand All @@ -24,7 +22,7 @@ pub trait L1MessageProvider: Send + Sync {
/// avoid capturing the lifetime of `self`.
async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error>;
}
Expand All @@ -38,7 +36,7 @@ where

async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error> {
let tx = self.tx().await?;
Expand Down
4 changes: 2 additions & 2 deletions crates/providers/src/l1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use alloy_eips::eip4844::Blob;
use alloy_primitives::B256;
use alloy_transport::{RpcError, TransportErrorKind};
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::{DatabaseError, L1MessageStart};
use scroll_db::{DatabaseError, L1MessageKey};

/// An instance of the trait can be used to provide L1 data.
pub trait L1Provider: BlobProvider + L1MessageProvider {}
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<L1MP: L1MessageProvider, BP: Sync + Send> L1MessageProvider for FullL1Provi

async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error> {
self.l1_message_provider.get_n_messages(start, n).await
Expand Down
4 changes: 2 additions & 2 deletions crates/providers/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{collections::HashMap, sync::Arc};
use alloy_eips::eip4844::Blob;
use alloy_primitives::B256;
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::L1MessageStart;
use scroll_db::L1MessageKey;

/// Implementation of the [`crate::L1Provider`] that never returns blobs.
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -34,7 +34,7 @@ impl<P: L1MessageProvider + Send + Sync> L1MessageProvider for MockL1Provider<P>

async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error> {
self.l1_messages_provider.get_n_messages(start, n).await
Expand Down
3 changes: 2 additions & 1 deletion crates/sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use error::SequencerError;

mod metrics;
pub use metrics::SequencerMetrics;
use scroll_db::L1MessageKey;

/// A type alias for the payload building job future.
pub type PayloadBuildingJobFuture =
Expand Down Expand Up @@ -250,7 +251,7 @@ async fn build_payload_attributes<P: L1MessageProvider + Unpin + Send + Sync + '

// Collect L1 messages to include in payload.
let db_l1_messages = provider
.get_n_messages(l1_messages_queue_index.into(), max_l1_messages)
.get_n_messages(L1MessageKey::from_queue_index(l1_messages_queue_index), max_l1_messages)
.await
.map_err(Into::<L1ProviderError>::into)?;

Expand Down