diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index e4338bbc..1f2529ef 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -1,7 +1,7 @@ use alloy_json_rpc::RpcError; use alloy_primitives::B256; use alloy_transport::TransportErrorKind; -use scroll_db::{DatabaseError, L1MessageStart}; +use scroll_db::{DatabaseError, L1MessageKey}; /// A type that represents an error that occurred in the chain orchestrator. #[derive(Debug, thiserror::Error)] @@ -28,7 +28,7 @@ pub enum ChainOrchestratorError { }, /// An L1 message was not found in the database. #[error("L1 message not found at {0}")] - L1MessageNotFound(L1MessageStart), + L1MessageNotFound(L1MessageKey), /// A gap was detected in the L1 message queue: the previous message before index {0} is /// missing. #[error("L1 message queue gap detected at index {0}, previous L1 message not found")] diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index aa5b2389..176f0213 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -19,7 +19,7 @@ use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_db::{ Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, - DatabaseWriteOperations, L1MessageStart, UnwindResult, + DatabaseWriteOperations, L1MessageKey, UnwindResult, }; use scroll_network::NewBlockWithPeer; use std::{ @@ -838,7 +838,7 @@ async fn compute_l1_message_queue_hash( }) .await? .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .ok_or(DatabaseError::L1MessageNotFound(L1MessageKey::QueueIndex(index)))? .unwrap_or_default() .to_vec(); input.append(&mut l1_message.tx_hash().to_vec()); @@ -1078,7 +1078,9 @@ async fn validate_l1_messages( let l1_message_stream = Retry::default() .retry("get_l1_messages", || async { let messages = tx - .get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) + .get_l1_messages( + l1_message_hashes.first().map(|tx| L1MessageKey::TransactionHash(*tx)), + ) .await?; Ok::<_, ChainOrchestratorError>(messages) }) @@ -1093,7 +1095,9 @@ async fn validate_l1_messages( .await .map(|m| m.map(|msg| msg.transaction.tx_hash())) .transpose()? - .ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageStart::Hash(message_hash)))?; + .ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageKey::TransactionHash( + message_hash, + )))?; // If the received and expected L1 messages do not match return an error. if message_hash != expected_hash { diff --git a/crates/database/db/src/error.rs b/crates/database/db/src/error.rs index 7702cb2f..f7bf3f32 100644 --- a/crates/database/db/src/error.rs +++ b/crates/database/db/src/error.rs @@ -1,4 +1,4 @@ -use super::L1MessageStart; +use super::L1MessageKey; use sea_orm::sqlx::Error as SqlxError; /// The error type for database operations. @@ -17,6 +17,6 @@ pub enum DatabaseError { #[error("failed to serde metadata value: {0}")] MetadataSerdeError(#[from] serde_json::Error), /// The L1 message was not found in database. - #[error("L1 message at index [{0}] not found in database")] - L1MessageNotFound(L1MessageStart), + #[error("L1 message at key [{0}] not found in database")] + L1MessageNotFound(L1MessageKey), } diff --git a/crates/database/db/src/lib.rs b/crates/database/db/src/lib.rs index 80325faa..dcae77bc 100644 --- a/crates/database/db/src/lib.rs +++ b/crates/database/db/src/lib.rs @@ -15,9 +15,7 @@ mod models; pub use models::*; mod operations; -pub use operations::{ - DatabaseReadOperations, DatabaseWriteOperations, L1MessageStart, UnwindResult, -}; +pub use operations::{DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, UnwindResult}; mod transaction; pub use transaction::{DatabaseTransactionProvider, TXMut, TX}; diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 5eee0609..b11835fb 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -531,18 +531,34 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { /// `start` point. async fn get_l1_messages<'a>( &'a self, - start: Option, + start: Option, ) -> Result> + 'a, DatabaseError> { let queue_index = match start { - Some(L1MessageStart::Index(i)) => i, - Some(L1MessageStart::Hash(ref h)) => { + Some(L1MessageKey::QueueIndex(i)) => i, + Some(L1MessageKey::TransactionHash(ref h)) => { // Lookup message by hash let record = models::l1_message::Entity::find() .filter(models::l1_message::Column::Hash.eq(h.to_vec())) .one(self.get_connection()) .await? - .ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageStart::Hash(*h)))?; + .ok_or_else(|| { + DatabaseError::L1MessageNotFound(L1MessageKey::TransactionHash(*h)) + })?; + + record.queue_index as u64 + } + Some(L1MessageKey::QueueHash(ref h)) => { + // Lookup message by queue hash + let record = models::l1_message::Entity::find() + .filter( + Condition::all() + .add(models::l1_message::Column::QueueHash.is_not_null()) + .add(models::l1_message::Column::QueueHash.eq(h.to_vec())), + ) + .one(self.get_connection()) + .await? + .ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageKey::QueueHash(*h)))?; record.queue_index as u64 } @@ -691,36 +707,43 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { } } -/// This type defines the start of an L1 message stream. +/// A key for an L1 message stored in the database. /// -/// It can either be an index, which is the queue index of the first message to return, or a hash, -/// which is the hash of the first message to return. +/// It can either be the queue index, queue hash or the transaction hash. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum L1MessageStart { - /// Start from the provided queue index. - Index(u64), - /// Start from the provided queue hash. - Hash(B256), +pub enum L1MessageKey { + /// The queue index of the message. + QueueIndex(u64), + /// The queue hash of the message. + QueueHash(B256), + /// The transaction hash of the message. + TransactionHash(B256), } -impl fmt::Display for L1MessageStart { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Index(index) => write!(f, "Index({index})"), - Self::Hash(hash) => write!(f, "Hash({hash:#x})"), - } +impl L1MessageKey { + /// Create a new [`L1MessageKey`] from a queue index. + pub const fn from_queue_index(index: u64) -> Self { + Self::QueueIndex(index) + } + + /// Create a new [`L1MessageKey`] from a queue hash. + pub const fn from_queue_hash(hash: B256) -> Self { + Self::QueueHash(hash) } -} -impl From for L1MessageStart { - fn from(value: u64) -> Self { - Self::Index(value) + /// Create a new [`L1MessageKey`] from a transaction hash. + pub const fn from_transaction_hash(hash: B256) -> Self { + Self::TransactionHash(hash) } } -impl From for L1MessageStart { - fn from(value: B256) -> Self { - Self::Hash(value) +impl fmt::Display for L1MessageKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::QueueIndex(index) => write!(f, "QueueIndex({index})"), + Self::QueueHash(hash) => write!(f, "QueueHash({hash:#x})"), + Self::TransactionHash(hash) => write!(f, "TransactionHash({hash:#x})"), + } } } diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 7fee71bc..9ce63dee 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -30,7 +30,7 @@ use rollup_node_primitives::{ use rollup_node_providers::{BlockDataProvider, L1Provider}; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; use scroll_codec::{decoding::payload::PayloadData, Codec}; -use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider}; +use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey}; use tokio::time::Interval; /// A future that resolves to a stream of [`ScrollPayloadAttributesWithBatchInfo`]. @@ -352,7 +352,10 @@ async fn iter_l1_messages_from_payload( let total_l1_messages = data.blocks.iter().map(|b| b.context.num_l1_messages as u64).sum(); let messages = if let Some(index) = data.queue_index_start() { - provider.get_n_messages(index.into(), total_l1_messages).await.map_err(Into::into)? + provider + .get_n_messages(L1MessageKey::from_queue_index(index), total_l1_messages) + .await + .map_err(Into::into)? } else if let Some(hash) = data.prev_l1_message_queue_hash() { // If the message queue hash is zero then we should use the V2 L1 message queue start // index. We must apply this branch logic because we do not have a L1 @@ -360,17 +363,20 @@ async fn iter_l1_messages_from_payload( // hash for the first L1 message of the V2 contract). if hash == &B256::ZERO { provider - .get_n_messages(l1_v2_message_queue_start_index.into(), total_l1_messages) + .get_n_messages( + L1MessageKey::from_queue_index(l1_v2_message_queue_start_index), + total_l1_messages, + ) .await .map_err(Into::into)? } else { let mut messages = provider - .get_n_messages((*hash).into(), total_l1_messages + 1) + .get_n_messages(L1MessageKey::from_queue_hash(*hash), total_l1_messages + 1) .await .map_err(Into::into)?; // we skip the first l1 message, as we are interested in the one starting after // prev_l1_message_queue_hash. - messages.pop(); + messages.remove(0); messages } } else { diff --git a/crates/manager/src/manager/event.rs b/crates/manager/src/manager/event.rs index dfa970d8..61749ea8 100644 --- a/crates/manager/src/manager/event.rs +++ b/crates/manager/src/manager/event.rs @@ -3,7 +3,7 @@ use reth_scroll_primitives::ScrollBlock; use rollup_node_chain_orchestrator::ChainOrchestratorEvent; use rollup_node_signer::SignerEvent; use rollup_node_watcher::L1Notification; -use scroll_db::L1MessageStart; +use scroll_db::L1MessageKey; use scroll_engine::ConsolidationOutcome; use scroll_network::NewBlockWithPeer; @@ -33,8 +33,8 @@ pub enum RollupManagerEvent { }, /// A block has been received containing an L1 message that is not in the database. L1MessageMissingInDatabase { - /// The L1 message start index or hash. - start: L1MessageStart, + /// The L1 message key. + key: L1MessageKey, }, /// An event was received from the L1 watcher. L1NotificationEvent(L1Notification), diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 1896b59d..1af74f22 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -435,7 +435,7 @@ where if let Some(event_sender) = self.event_sender.as_ref() { event_sender.notify(RollupManagerEvent::L1MessageMissingInDatabase { - start: start.clone(), + key: start.clone(), }); } } diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 44322970..bf187b31 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -37,7 +37,7 @@ use rollup_node_sequencer::L1MessageInclusionMode; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; use scroll_alloy_rpc_types::Transaction as ScrollAlloyTransaction; -use scroll_db::{test_utils::setup_test_db, L1MessageStart}; +use scroll_db::{test_utils::setup_test_db, L1MessageKey}; use scroll_network::NewBlockWithPeer; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; use std::{ @@ -1719,7 +1719,7 @@ async fn can_reject_l2_block_with_unknown_l1_message() -> eyre::Result<()> { wait_for_event_5s( &mut node1_rnm_events, RollupManagerEvent::L1MessageMissingInDatabase { - start: L1MessageStart::Hash(b256!( + key: L1MessageKey::TransactionHash(b256!( "0x0a2f8e75392ab51a26a2af835042c614eb141cd934fe1bdd4934c10f2fe17e98" )), }, diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 9c48ff5b..0b391ae0 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -423,7 +423,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { // message. wait_n_events( &mut follower_events, - |e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { start: _ }), + |e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { key: _ }), 1, ) .await; @@ -599,7 +599,7 @@ async fn test_consolidation() -> eyre::Result<()> { // Assert that the follower node rejects the new block as it hasn't received the L1 message. wait_n_events( &mut follower_events, - |e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { start: _ }), + |e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { key: _ }), 1, ) .await; diff --git a/crates/providers/src/l1/message.rs b/crates/providers/src/l1/message.rs index 2136ac53..07ca0f3b 100644 --- a/crates/providers/src/l1/message.rs +++ b/crates/providers/src/l1/message.rs @@ -2,9 +2,7 @@ use crate::L1ProviderError; use futures::{StreamExt, TryStreamExt}; use rollup_node_primitives::L1MessageEnvelope; -use scroll_db::{ - DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageStart, -}; +use scroll_db::{DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey}; /// An instance of the trait can provide L1 messages iterators. #[async_trait::async_trait] @@ -24,7 +22,7 @@ pub trait L1MessageProvider: Send + Sync { /// avoid capturing the lifetime of `self`. async fn get_n_messages( &self, - start: L1MessageStart, + start: L1MessageKey, n: u64, ) -> Result, Self::Error>; } @@ -38,7 +36,7 @@ where async fn get_n_messages( &self, - start: L1MessageStart, + start: L1MessageKey, n: u64, ) -> Result, Self::Error> { let tx = self.tx().await?; diff --git a/crates/providers/src/l1/mod.rs b/crates/providers/src/l1/mod.rs index c001474d..230e626c 100644 --- a/crates/providers/src/l1/mod.rs +++ b/crates/providers/src/l1/mod.rs @@ -9,7 +9,7 @@ use alloy_eips::eip4844::Blob; use alloy_primitives::B256; use alloy_transport::{RpcError, TransportErrorKind}; use rollup_node_primitives::L1MessageEnvelope; -use scroll_db::{DatabaseError, L1MessageStart}; +use scroll_db::{DatabaseError, L1MessageKey}; /// An instance of the trait can be used to provide L1 data. pub trait L1Provider: BlobProvider + L1MessageProvider {} @@ -77,7 +77,7 @@ impl L1MessageProvider for FullL1Provi async fn get_n_messages( &self, - start: L1MessageStart, + start: L1MessageKey, n: u64, ) -> Result, Self::Error> { self.l1_message_provider.get_n_messages(start, n).await diff --git a/crates/providers/src/test_utils.rs b/crates/providers/src/test_utils.rs index b8d7f5dd..fd37a45a 100644 --- a/crates/providers/src/test_utils.rs +++ b/crates/providers/src/test_utils.rs @@ -6,7 +6,7 @@ use std::{collections::HashMap, sync::Arc}; use alloy_eips::eip4844::Blob; use alloy_primitives::B256; use rollup_node_primitives::L1MessageEnvelope; -use scroll_db::L1MessageStart; +use scroll_db::L1MessageKey; /// Implementation of the [`crate::L1Provider`] that never returns blobs. #[derive(Clone, Default, Debug)] @@ -34,7 +34,7 @@ impl L1MessageProvider for MockL1Provider

async fn get_n_messages( &self, - start: L1MessageStart, + start: L1MessageKey, n: u64, ) -> Result, Self::Error> { self.l1_messages_provider.get_n_messages(start, n).await diff --git a/crates/sequencer/src/lib.rs b/crates/sequencer/src/lib.rs index 58ef4979..86d4f283 100644 --- a/crates/sequencer/src/lib.rs +++ b/crates/sequencer/src/lib.rs @@ -26,6 +26,7 @@ pub use error::SequencerError; mod metrics; pub use metrics::SequencerMetrics; +use scroll_db::L1MessageKey; /// A type alias for the payload building job future. pub type PayloadBuildingJobFuture = @@ -250,7 +251,7 @@ async fn build_payload_attributes::into)?;