diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 044e22ab..47f97d17 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -128,11 +128,11 @@ jobs: - type: wasm target: wasm32-unknown-unknown exclude: | - scroll-engine,scroll-wire,rollup-node,scroll-network,rollup-node-manager,rollup-node-watcher,scroll-db,scroll-migration,rollup-node-indexer,scroll-derivation-pipeline + scroll-engine,scroll-wire,rollup-node,scroll-network,rollup-node-manager,rollup-node-watcher,scroll-db,scroll-migration,rollup-node-indexer,scroll-derivation-pipeline,rollup-node-providers - type: riscv target: riscv32imac-unknown-none-elf exclude: | - scroll-engine,scroll-wire,rollup-node,scroll-network,rollup-node-manager,rollup-node-watcher,scroll-db,scroll-migration,rollup-node-indexer,scroll-codec,scroll-derivation-pipeline + scroll-engine,scroll-wire,rollup-node,scroll-network,rollup-node-manager,rollup-node-watcher,scroll-db,scroll-migration,rollup-node-indexer,scroll-codec,scroll-derivation-pipeline,rollup-node-providers steps: - uses: actions/checkout@v4 - uses: rui314/setup-mold@v1 diff --git a/Cargo.lock b/Cargo.lock index 557eb6e2..6bb9519f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2718,6 +2718,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "enr" version = "0.13.0" @@ -3675,6 +3684,22 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -6092,19 +6117,23 @@ checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "http-body-util", "hyper", "hyper-rustls", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -6117,7 +6146,9 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper", + "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tower 0.5.2", "tower-service", @@ -9247,6 +9278,7 @@ dependencies = [ "reth-tokio-util", "rollup-node-indexer", "rollup-node-primitives", + "rollup-node-providers", "rollup-node-watcher", "scroll-alloy-network", "scroll-alloy-provider", @@ -9270,6 +9302,26 @@ dependencies = [ "scroll-alloy-consensus", ] +[[package]] +name = "rollup-node-providers" +version = "0.0.1" +dependencies = [ + "alloy-eips", + "alloy-primitives", + "alloy-rpc-types-beacon", + "alloy-rpc-types-engine", + "alloy-serde", + "async-trait", + "eyre", + "lru 0.13.0", + "reqwest", + "scroll-alloy-consensus", + "scroll-db", + "serde", + "thiserror 2.0.12", + "tokio", +] + [[package]] name = "rollup-node-watcher" version = "0.0.1" @@ -9743,13 +9795,16 @@ dependencies = [ "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", + "async-trait", "eyre", "reth-scroll-chainspec", "rollup-node-primitives", + "rollup-node-providers", "scroll-alloy-consensus", "scroll-alloy-rpc-types-engine", "scroll-codec", "thiserror 2.0.12", + "tokio", ] [[package]] @@ -9770,6 +9825,7 @@ dependencies = [ "reth-scroll-primitives", "reth-testing-utils", "rollup-node-primitives", + "rollup-node-providers", "scroll-alloy-network", "scroll-alloy-provider", "scroll-alloy-rpc-types-engine", @@ -10818,6 +10874,27 @@ dependencies = [ "windows 0.57.0", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -10998,6 +11075,16 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" diff --git a/Cargo.toml b/Cargo.toml index 80718f64..c5332e73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "crates/node", "crates/network", "crates/primitives", + "crates/providers", "crates/scroll-wire", "crates/watcher", ] @@ -152,6 +153,7 @@ reth-scroll-primitives = { git = "https://github.com/scroll-tech/reth.git", defa rollup-node-indexer = { path = "crates/indexer" } rollup-node-manager = { path = "crates/node" } rollup-node-primitives = { path = "crates/primitives" } +rollup-node-providers = { path = "crates/providers" } rollup-node-watcher = { path = "crates/watcher" } scroll-codec = { path = "crates/codec" } scroll-db = { path = "crates/database/db" } diff --git a/crates/database/db/src/models/batch_commit.rs b/crates/database/db/src/models/batch_commit.rs index 2553e9c9..f061140c 100644 --- a/crates/database/db/src/models/batch_commit.rs +++ b/crates/database/db/src/models/batch_commit.rs @@ -11,6 +11,7 @@ pub struct Model { index: i64, hash: Vec, block_number: i64, + block_timestamp: i64, calldata: Vec, blob_hash: Option>, finalized_block_number: Option, @@ -33,6 +34,9 @@ impl From for ActiveModel { block_number: ActiveValue::Set( batch_commit.block_number.try_into().expect("block number should fit in i64"), ), + block_timestamp: ActiveValue::Set( + batch_commit.block_timestamp.try_into().expect("block timestamp should fit in i64"), + ), calldata: ActiveValue::Set(batch_commit.calldata.0.to_vec()), blob_hash: ActiveValue::Set(batch_commit.blob_versioned_hash.map(|b| b.to_vec())), finalized_block_number: ActiveValue::Unchanged(None), @@ -46,6 +50,7 @@ impl From for BatchCommitData { hash: value.hash.as_slice().try_into().expect("data persisted in database is valid"), index: value.index as u64, block_number: value.block_number as u64, + block_timestamp: value.block_timestamp as u64, calldata: Arc::new(value.calldata.into()), blob_versioned_hash: value .blob_hash diff --git a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs index 226a5b49..bbb41f1d 100644 --- a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs +++ b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs @@ -17,6 +17,7 @@ impl MigrationTrait for Migration { .col(pk_auto(BatchCommit::Index)) .col(binary_len(BatchCommit::Hash, HASH_LENGTH)) .col(big_unsigned(BatchCommit::BlockNumber)) + .col(big_unsigned(BatchCommit::BlockTimestamp)) .col(binary(BatchCommit::Calldata)) .col(binary_len_null(BatchCommit::BlobHash, HASH_LENGTH)) .col(boolean_null(BatchCommit::FinalizedBlockNumber)) @@ -36,6 +37,7 @@ enum BatchCommit { Index, Hash, BlockNumber, + BlockTimestamp, Calldata, BlobHash, FinalizedBlockNumber, diff --git a/crates/derivation-pipeline/Cargo.toml b/crates/derivation-pipeline/Cargo.toml index d5b82cbe..2d3aaec6 100644 --- a/crates/derivation-pipeline/Cargo.toml +++ b/crates/derivation-pipeline/Cargo.toml @@ -22,14 +22,17 @@ scroll-alloy-rpc-types-engine.workspace = true # rollup node rollup-node-primitives.workspace = true +rollup-node-providers.workspace = true scroll-codec.workspace = true # misc thiserror.workspace = true [dev-dependencies] +async-trait.workspace = true eyre.workspace = true scroll-codec = { workspace = true, features = ["test-utils"] } +tokio = { workspace = true, features = ["macros"] } [features] default = ["std"] diff --git a/crates/derivation-pipeline/src/error.rs b/crates/derivation-pipeline/src/error.rs index 4023e63e..f3f762ec 100644 --- a/crates/derivation-pipeline/src/error.rs +++ b/crates/derivation-pipeline/src/error.rs @@ -1,3 +1,4 @@ +use rollup_node_providers::L1ProviderError; use scroll_codec::CodecError; /// An error occurred during the derivation process. @@ -9,4 +10,10 @@ pub enum DerivationPipelineError { /// Missing L1 messages cursor. #[error("missing l1 message queue cursor")] MissingL1MessageQueueCursor, + /// Missing L1 message. + #[error("missing l1 message")] + MissingL1Message, + /// An error at the L1 provider. + #[error(transparent)] + L1Provider(#[from] L1ProviderError), } diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index a4e13896..6efb1000 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -11,49 +11,30 @@ mod error; #[cfg(not(feature = "std"))] extern crate alloc as std; +use crate::{data_source::CodecDataSource, error::DerivationPipelineError}; use std::vec::Vec; -use crate::{data_source::CodecDataSource, error::DerivationPipelineError}; -use alloy_eips::eip4844::Blob; use alloy_primitives::B256; use alloy_rpc_types_engine::PayloadAttributes; use reth_scroll_chainspec::SCROLL_FEE_VAULT_ADDRESS; use rollup_node_primitives::BatchCommitData; -use scroll_alloy_consensus::TxL1Message; +use rollup_node_providers::L1Provider; use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes; use scroll_codec::Codec; -/// An instance of the trait can provide L1 messages using a cursor approach. Set the cursor for the -/// provider using the queue index or hash and then call [`L1MessageProvider::next_l1_message`] to -/// iterate the queue. -pub trait L1MessageProvider { - /// Returns the L1 message at the current cursor and advances the cursor. - fn next_l1_message(&self) -> TxL1Message; - /// Set the index cursor for the provider. - fn set_index_cursor(&mut self, index: u64); - /// Set the hash cursor for the provider. - fn set_hash_cursor(&mut self, hash: B256); -} - -/// An instance of the trait can be used to fetch L1 blob data. -pub trait L1BlobProvider { - /// Returns corresponding blob data for the provided hash. - fn blob(&self, hash: B256) -> Option; -} - -/// An instance of the trait can be used to provide L1 data. -pub trait L1Provider: L1BlobProvider + L1MessageProvider {} -impl L1Provider for T where T: L1BlobProvider + L1MessageProvider {} - -/// Returns an iterator over [`ScrollPayloadAttributes`] from the [`BatchCommitData`] and a +/// Returns a vector of [`ScrollPayloadAttributes`] from the [`BatchCommitData`] and a /// [`L1Provider`]. -pub fn derive( +pub async fn derive( batch: BatchCommitData, l1_provider: &mut P, -) -> Result + use<'_, P>, DerivationPipelineError> { +) -> Result, DerivationPipelineError> { // fetch the blob then decode the input batch. - let blob = batch.blob_versioned_hash.and_then(|hash| l1_provider.blob(hash)); - let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_ref() }; + let blob = if let Some(hash) = batch.blob_versioned_hash { + l1_provider.blob(batch.block_timestamp, hash).await? + } else { + None + }; + let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_deref() }; let decoded = Codec::decode(&data)?; // set the cursor for the l1 provider. @@ -64,65 +45,89 @@ pub fn derive( l1_provider.set_hash_cursor(*hash); // we skip the first l1 message, as we are interested in the one starting after // prev_l1_message_queue_hash. - let _ = l1_provider.next_l1_message(); + let _ = l1_provider.next_l1_message().await.map_err(Into::into)?; } else { return Err(DerivationPipelineError::MissingL1MessageQueueCursor) } - let iter = decoded.data.into_l2_blocks().into_iter().map(|mut block| { + let blocks = decoded.data.into_l2_blocks(); + let mut attributes = Vec::with_capacity(blocks.len()); + for mut block in blocks { // query the appropriate amount of l1 messages. - let mut txs = (0..block.context.num_l1_messages) - .map(|_| l1_provider.next_l1_message()) - .map(|tx| { - let mut bytes = Vec::new(); - tx.eip2718_encode(&mut bytes); - bytes.into() - }) - .collect::>(); + let mut txs = Vec::with_capacity(block.context.num_l1_messages as usize); + for _ in 0..block.context.num_l1_messages { + let l1_message = l1_provider + .next_l1_message() + .await + .map_err(Into::into)? + .ok_or(DerivationPipelineError::MissingL1Message)?; + let mut bytes = Vec::with_capacity(l1_message.eip2718_encoded_length()); + l1_message.eip2718_encode(&mut bytes); + txs.push(bytes.into()); + } // add the block transactions. txs.append(&mut block.transactions); // construct the payload attributes. - ScrollPayloadAttributes { + let attribute = ScrollPayloadAttributes { payload_attributes: PayloadAttributes { timestamp: block.context.timestamp, - prev_randao: B256::ZERO, // TODO: this should be based off the current configuration value. suggested_fee_recipient: SCROLL_FEE_VAULT_ADDRESS, + prev_randao: B256::ZERO, withdrawals: None, parent_beacon_block_root: None, }, transactions: Some(txs), no_tx_pool: true, - } - }); + }; + attributes.push(attribute); + } - Ok(iter) + Ok(attributes) } #[cfg(test)] mod tests { use super::*; - use core::cell::RefCell; use std::sync::Arc; + use alloy_eips::eip4844::Blob; use alloy_primitives::{address, b256, bytes, U256}; + use rollup_node_providers::{L1BlobProvider, L1MessageProvider, L1ProviderError}; + use scroll_alloy_consensus::TxL1Message; use scroll_codec::decoding::test_utils::read_to_bytes; + use tokio::sync::Mutex; struct TestL1MessageProvider { - messages: RefCell>, + messages: Arc>>, + } + + struct Infallible; + impl From for L1ProviderError { + fn from(_value: Infallible) -> Self { + Self::Other("infallible") + } } + #[async_trait::async_trait] impl L1BlobProvider for TestL1MessageProvider { - fn blob(&self, _hash: B256) -> Option { - None + async fn blob( + &self, + _block_timestamp: u64, + _hash: B256, + ) -> Result>, L1ProviderError> { + Ok(None) } } + #[async_trait::async_trait] impl L1MessageProvider for TestL1MessageProvider { - fn next_l1_message(&self) -> TxL1Message { - self.messages.borrow_mut().remove(0) + type Error = Infallible; + + async fn next_l1_message(&self) -> Result, Self::Error> { + Ok(Some(self.messages.try_lock().expect("lock is free").remove(0))) } fn set_index_cursor(&mut self, _index: u64) {} @@ -130,14 +135,15 @@ mod tests { fn set_hash_cursor(&mut self, _hash: B256) {} } - #[test] - fn test_should_derive_batch() -> eyre::Result<()> { + #[tokio::test] + async fn test_should_derive_batch() -> eyre::Result<()> { // https://etherscan.io/tx/0x8f4f0fcab656aa81589db5b53255094606c4624bfd99702b56b2debaf6211f48 let raw_calldata = read_to_bytes("./testdata/calldata_v0.bin")?; let batch_data = BatchCommitData { hash: b256!("7f26edf8e3decbc1620b4d2ba5f010a6bdd10d6bb16430c4f458134e36ab3961"), index: 12, block_number: 18319648, + block_timestamp: 1696935971, calldata: Arc::new(raw_calldata), blob_versioned_hash: None, }; @@ -157,37 +163,34 @@ mod tests { sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), input: bytes!("8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf000000000000000000000000000000000000000000000000000470de4df820000000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f00000000000000000000000000000000000000000000000000470de4df8200000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), }]; - let mut provider = TestL1MessageProvider { messages: RefCell::new(l1_messages) }; + let mut provider = TestL1MessageProvider { messages: Arc::new(Mutex::new(l1_messages)) }; - let mut attributes = derive(batch_data, &mut provider)?; - let attribute = attributes.find(|a| a.payload_attributes.timestamp == 1696935384).unwrap(); + let attributes = derive(batch_data, &mut provider).await?; + let attribute = + attributes.iter().find(|a| a.payload_attributes.timestamp == 1696935384).unwrap(); let expected = ScrollPayloadAttributes{ payload_attributes: PayloadAttributes{ timestamp: 1696935384, - prev_randao: B256::ZERO, suggested_fee_recipient: SCROLL_FEE_VAULT_ADDRESS, - withdrawals: None, - parent_beacon_block_root: None, + ..Default::default() }, transactions: Some(vec![bytes!("7ef901b7218302904094781e90f1c8fc4611c9b7497c3b47f99ef6969cbc80b901848ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf0000000000000000000000000000000000000000000000000006a94d74f430000000000000000000000000000000000000000000000000000000000000000002100000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000ca266224613396a0e8d4c2497dbc4f33dd6cdeff000000000000000000000000ca266224613396a0e8d4c2497dbc4f33dd6cdeff000000000000000000000000000000000000000000000000006a94d74f4300000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000947885bcbd5cecef1336b5300fb5186a12ddd8c478"), bytes!("7ef901b7228302904094781e90f1c8fc4611c9b7497c3b47f99ef6969cbc80b901848ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf000000000000000000000000000000000000000000000000000470de4df820000000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f00000000000000000000000000000000000000000000000000470de4df8200000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000947885bcbd5cecef1336b5300fb5186a12ddd8c478")]), no_tx_pool: true, }; - assert_eq!(attribute, expected); + assert_eq!(attribute, &expected); let attribute = attributes.last().unwrap(); let expected = ScrollPayloadAttributes{ payload_attributes: PayloadAttributes{ timestamp: 1696935657, - prev_randao: B256::ZERO, suggested_fee_recipient: SCROLL_FEE_VAULT_ADDRESS, - withdrawals: None, - parent_beacon_block_root: None, + ..Default::default() }, transactions: Some(vec![bytes!("f88c8202658417d7840082a4f294530000000000000000000000000000000000000280a4bede39b500000000000000000000000000000000000000000000000000000001669aa2f583104ec4a07461e6555f927393ebdf5f183738450c3842bc3b86a1db7549d9bee21fadd0b1a06d7ba96897bd9fb8e838a327d3ca34be66da11955f10d1fb2264949071e9e8cd")]), no_tx_pool: true, }; - assert_eq!(attribute, expected); + assert_eq!(attribute, &expected); Ok(()) } diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index d7a113e0..d8ad9c3f 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -32,6 +32,7 @@ reth-scroll-engine-primitives = { git = "https://github.com/scroll-tech/reth.git # rollup-node rollup-node-primitives.workspace = true +rollup-node-providers.workspace = true # misc async-trait.workspace = true diff --git a/crates/engine/src/engine.rs b/crates/engine/src/engine.rs index d13dba5a..429d74fa 100644 --- a/crates/engine/src/engine.rs +++ b/crates/engine/src/engine.rs @@ -1,5 +1,5 @@ use super::error::EngineDriverError; -use crate::{payload::matching_payloads, ExecutionPayloadProvider}; +use crate::payload::matching_payloads; use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadV1, ForkchoiceState, ForkchoiceUpdated, PayloadId, @@ -9,8 +9,8 @@ use eyre::Result; use reth_payload_primitives::PayloadTypes; use reth_scroll_engine_primitives::ScrollEngineTypes; use rollup_node_primitives::BlockInfo; +use rollup_node_providers::ExecutionPayloadProvider; use scroll_alloy_provider::ScrollEngineApi; - use tokio::time::Duration; use tracing::{debug, error, info, instrument, trace}; diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 1fd6ceeb..10958f9d 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -1,17 +1,16 @@ //! Engine Driver for the Scroll Rollup Node. The [`EngineDriver`] exposes the main interface for //! the Rollup Node to the Engine API. -mod engine; pub use engine::EngineDriver; +mod engine; -mod error; pub use error::EngineDriverError; +mod error; -mod fcs; pub use fcs::ForkchoiceState; +mod fcs; mod payload; -pub use payload::ExecutionPayloadProvider; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/engine/src/payload.rs b/crates/engine/src/payload.rs index 0f56b80c..583e0ff2 100644 --- a/crates/engine/src/payload.rs +++ b/crates/engine/src/payload.rs @@ -1,12 +1,9 @@ -use alloy_eips::BlockId; use alloy_primitives::B256; use alloy_rpc_types_engine::ExecutionPayload; use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes; use tracing::debug; -use crate::EngineDriverError; - /// Returns true if the [`ScrollPayloadAttributes`] matches the [`ExecutionPayload`]: /// - provided parent hash matches the parent hash of the [`ExecutionPayload`] /// - all transactions match @@ -64,16 +61,6 @@ pub(crate) fn matching_payloads( true } -/// Implementers of the trait can provide the L2 execution payload for a block id. -#[async_trait::async_trait] -pub trait ExecutionPayloadProvider { - /// Returns the [`ExecutionPayload`] for the provided [`BlockId`], or [None]. - async fn execution_payload_by_block( - &self, - block_id: BlockId, - ) -> Result, EngineDriverError>; -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/engine/src/test_utils.rs b/crates/engine/src/test_utils.rs index 6921806c..3ba1a150 100644 --- a/crates/engine/src/test_utils.rs +++ b/crates/engine/src/test_utils.rs @@ -2,8 +2,8 @@ use crate::EngineDriverError; -use super::ExecutionPayloadProvider; use alloy_rpc_types_engine::ExecutionPayload; +use rollup_node_providers::ExecutionPayloadProvider; /// A default execution payload for testing that returns `Ok(None)` for all block IDs. #[derive(Debug)] @@ -11,10 +11,12 @@ pub struct NoopExecutionPayloadProvider; #[async_trait::async_trait] impl ExecutionPayloadProvider for NoopExecutionPayloadProvider { + type Error = EngineDriverError; + async fn execution_payload_by_block( &self, _block_id: alloy_eips::BlockId, - ) -> Result, EngineDriverError> { + ) -> Result, Self::Error> { Ok(None) } } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 86e8e1ac..e4b63588 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -27,8 +27,9 @@ scroll-network.workspace = true scroll-wire.workspace = true # rollup node -rollup-node-primitives.workspace = true rollup-node-indexer.workspace = true +rollup-node-primitives.workspace = true +rollup-node-providers.workspace = true rollup-node-watcher.workspace = true # misc diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 91f96a59..a0a04d82 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -10,7 +10,7 @@ use rollup_node_indexer::Indexer; use rollup_node_watcher::L1Notification; use scroll_alloy_network::Scroll as ScrollNetwork; use scroll_alloy_provider::ScrollEngineApi; -use scroll_engine::{EngineDriver, EngineDriverError, ExecutionPayloadProvider, ForkchoiceState}; +use scroll_engine::{EngineDriver, EngineDriverError, ForkchoiceState}; use scroll_network::{ BlockImportError, BlockImportOutcome, BlockValidation, BlockValidationError, NetworkManager, NetworkManagerEvent, NewBlockWithPeer, @@ -33,6 +33,7 @@ mod consensus; use consensus::Consensus; pub use consensus::PoAConsensus; use rollup_node_primitives::BlockInfo; +use rollup_node_providers::ExecutionPayloadProvider; /// The size of the event channel. const EVENT_CHANNEL_SIZE: usize = 100; diff --git a/crates/primitives/src/batch.rs b/crates/primitives/src/batch.rs index 38465cfc..ce1943cf 100644 --- a/crates/primitives/src/batch.rs +++ b/crates/primitives/src/batch.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use alloy_primitives::{BlockNumber, Bytes, B256}; +use alloy_primitives::{Bytes, B256}; /// The input data for a batch. /// @@ -12,8 +12,10 @@ pub struct BatchCommitData { pub hash: B256, /// The index of the batch. pub index: u64, - /// The block number the batch was committed at. - pub block_number: BlockNumber, + /// The block number in which the batch was committed. + pub block_number: u64, + /// The block timestamp in which the batch was committed. + pub block_timestamp: u64, /// The commit transaction calldata. pub calldata: Arc, /// The optional blob hash for the commit. @@ -29,6 +31,7 @@ mod arbitrary_impl { let batch_index = u.arbitrary::()? as u64; let batch_hash = u.arbitrary::()?; let block_number = u.arbitrary::()? as u64; + let block_timestamp = u.arbitrary::()? as u64; let bytes = u.arbitrary::()?; let blob_hash = u.arbitrary::()?.then_some(u.arbitrary::()?); @@ -36,6 +39,7 @@ mod arbitrary_impl { hash: batch_hash, index: batch_index, block_number, + block_timestamp, calldata: Arc::new(bytes), blob_versioned_hash: blob_hash, }) diff --git a/crates/providers/Cargo.toml b/crates/providers/Cargo.toml new file mode 100644 index 00000000..8981360c --- /dev/null +++ b/crates/providers/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "rollup-node-providers" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +exclude.workspace = true + +[lints] +workspace = true + +[dependencies] +# alloy +alloy-eips = { workspace = true, features = ["kzg"] } +alloy-primitives.workspace = true +alloy-rpc-types-beacon = "0.12" +alloy-rpc-types-engine.workspace = true +alloy-serde = "0.12" + +# scroll +scroll-alloy-consensus.workspace = true +scroll-db.workspace = true + +# misc +async-trait.workspace = true +lru = "0.13.0" +reqwest = { version = "0.12", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +thiserror.workspace = true +tokio = { workspace = true, default-features = false } + +[dev-dependencies] +eyre.workspace = true +tokio = { workspace = true, features = ["macros"] } diff --git a/crates/providers/src/beacon_client.rs b/crates/providers/src/beacon_client.rs new file mode 100644 index 00000000..eef459a9 --- /dev/null +++ b/crates/providers/src/beacon_client.rs @@ -0,0 +1,114 @@ +//! Contains an implementation of a Beacon client. +//! Credit to + +use alloy_rpc_types_beacon::sidecar::{BeaconBlobBundle, BlobData}; +use reqwest::Client; +use std::{format, vec::Vec}; + +/// The config spec engine api method. +const SPEC_METHOD: &str = "eth/v1/config/spec"; + +/// The beacon genesis engine api method. +const GENESIS_METHOD: &str = "eth/v1/beacon/genesis"; + +/// The blob sidecars engine api method prefix. +const SIDECARS_METHOD_PREFIX: &str = "eth/v1/beacon/blob_sidecars"; + +/// An API response. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct APIResponse { + /// The data. + pub data: T, +} + +/// A reduced genesis data. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct ReducedGenesisData { + /// The genesis time. + #[serde(rename = "genesis_time")] + #[serde(with = "alloy_serde::quantity")] + pub genesis_time: u64, +} + +/// A reduced config data. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct ReducedConfigData { + /// The seconds per slot. + #[serde(rename = "SECONDS_PER_SLOT")] + #[serde(with = "alloy_serde::quantity")] + pub seconds_per_slot: u64, +} + +/// An online implementation of a Beacon client. +#[derive(Debug, Clone)] +pub struct OnlineBeaconClient { + /// The base URL of the beacon API. + pub base: String, + /// The inner reqwest client. + pub inner: Client, +} + +impl OnlineBeaconClient { + /// Creates a new [`OnlineBeaconClient`] from the provided base url. + pub fn new_http(mut base: String) -> Self { + // If base ends with a slash, remove it + if base.ends_with('/') { + base.remove(base.len() - 1); + } + Self { base, inner: Client::new() } + } +} + +impl OnlineBeaconClient { + /// Returns the reduced configuration data for the Beacon client. + pub async fn config_spec(&self) -> Result, reqwest::Error> { + let first = self.inner.get(format!("{}/{}", self.base, SPEC_METHOD)).send().await?; + first.json::>().await + } + + /// Returns the Beacon genesis information. + pub async fn beacon_genesis(&self) -> Result, reqwest::Error> { + let first = self.inner.get(format!("{}/{}", self.base, GENESIS_METHOD)).send().await?; + first.json::>().await + } + + /// Returns the blobs for the provided slot. + pub async fn blobs(&self, slot: u64) -> Result, reqwest::Error> { + let raw_response = self + .inner + .get(format!("{}/{}/{}", self.base, SIDECARS_METHOD_PREFIX, slot)) + .send() + .await?; + let raw_response = raw_response.json::().await?; + + Ok(raw_response.data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // + const BEACON_CLIENT_URL: &str = "https://eth-beacon-chain.drpc.org/rest/"; + + #[tokio::test] + async fn test_should_return_genesis() -> eyre::Result<()> { + let client = OnlineBeaconClient::new_http(BEACON_CLIENT_URL.to_string()); + let genesis = client.beacon_genesis().await?; + + assert_eq!(genesis.data.genesis_time, 1606824023); + + Ok(()) + } + + #[tokio::test] + async fn test_should_return_config() -> eyre::Result<()> { + let client = OnlineBeaconClient::new_http(BEACON_CLIENT_URL.to_string()); + let config = client.config_spec().await?; + + assert_eq!(config.data.seconds_per_slot, 12); + + Ok(()) + } +} diff --git a/crates/providers/src/execution_payload.rs b/crates/providers/src/execution_payload.rs new file mode 100644 index 00000000..076df16d --- /dev/null +++ b/crates/providers/src/execution_payload.rs @@ -0,0 +1,15 @@ +use alloy_eips::BlockId; +use alloy_rpc_types_engine::ExecutionPayload; + +/// Implementers of the trait can provide the L2 execution payload for a block id. +#[async_trait::async_trait] +pub trait ExecutionPayloadProvider { + /// The error returned by the provider. + type Error; + + /// Returns the [`ExecutionPayload`] for the provided [`BlockId`], or [None]. + async fn execution_payload_by_block( + &self, + block_id: BlockId, + ) -> Result, Self::Error>; +} diff --git a/crates/providers/src/l1/blob.rs b/crates/providers/src/l1/blob.rs new file mode 100644 index 00000000..fbf5e818 --- /dev/null +++ b/crates/providers/src/l1/blob.rs @@ -0,0 +1,15 @@ +use crate::L1ProviderError; +use alloy_eips::eip4844::Blob; +use alloy_primitives::B256; +use std::sync::Arc; + +/// An instance of the trait can be used to fetch L1 blob data. +#[async_trait::async_trait] +pub trait L1BlobProvider { + /// Returns corresponding blob data for the provided hash. + async fn blob( + &self, + block_timestamp: u64, + hash: B256, + ) -> Result>, L1ProviderError>; +} diff --git a/crates/providers/src/l1/message.rs b/crates/providers/src/l1/message.rs new file mode 100644 index 00000000..c234aaa0 --- /dev/null +++ b/crates/providers/src/l1/message.rs @@ -0,0 +1,59 @@ +use crate::L1ProviderError; + +use alloy_primitives::B256; +use scroll_alloy_consensus::TxL1Message; +use scroll_db::{DatabaseConnectionProvider, DatabaseOperations}; + +/// An instance of the trait can provide L1 messages using a cursor approach. Set the cursor for the +/// provider using the queue index or hash and then call [`L1MessageProvider::next_l1_message`] to +/// iterate the queue. +#[async_trait::async_trait] +pub trait L1MessageProvider { + /// The error type for the provider. + type Error: Into; + + /// Returns the L1 message at the current cursor and advances the cursor. + async fn next_l1_message(&self) -> Result, Self::Error>; + /// Set the index cursor for the provider. + fn set_index_cursor(&mut self, index: u64); + /// Set the hash cursor for the provider. + fn set_hash_cursor(&mut self, hash: B256); +} + +/// Implements [`L1MessageProvider`] via a database connection. +#[derive(Debug)] +pub struct DatabaseL1MessageProvider { + /// A connection to the database. + database_connection: DB, + /// The current L1 message index. + index: u64, +} + +impl DatabaseL1MessageProvider { + /// Returns a new instance of the [`DatabaseL1MessageProvider`]. + pub const fn new(db: DB, index: u64) -> Self { + Self { database_connection: db, index } + } +} + +#[async_trait::async_trait] +impl L1MessageProvider for DatabaseL1MessageProvider { + type Error = L1ProviderError; + + async fn next_l1_message(&self) -> Result, Self::Error> { + Ok(self + .database_connection + .get_l1_message(self.index) + .await + .map(|tx| tx.map(|tx| tx.transaction))?) + } + + fn set_index_cursor(&mut self, index: u64) { + self.index = index; + } + + fn set_hash_cursor(&mut self, _hash: B256) { + // TODO: issue 43 + todo!() + } +} diff --git a/crates/providers/src/l1/mod.rs b/crates/providers/src/l1/mod.rs new file mode 100644 index 00000000..54d262dd --- /dev/null +++ b/crates/providers/src/l1/mod.rs @@ -0,0 +1,151 @@ +pub(crate) mod blob; +pub(crate) mod message; + +use crate::{beacon_client::OnlineBeaconClient, l1::message::L1MessageProvider, L1BlobProvider}; +use std::{num::NonZeroUsize, sync::Arc}; + +use alloy_eips::eip4844::{Blob, BlobTransactionSidecarItem}; +use alloy_primitives::B256; +use lru::LruCache; +use scroll_alloy_consensus::TxL1Message; +use scroll_db::DatabaseError; +use tokio::sync::Mutex; + +/// An instance of the trait can be used to provide L1 data. +pub trait L1Provider: L1BlobProvider + L1MessageProvider {} +impl L1Provider for T where T: L1BlobProvider + L1MessageProvider {} + +/// An error occurring at the [`L1Provider`]. +#[derive(Debug, thiserror::Error)] +pub enum L1ProviderError { + /// Invalid timestamp for slot. + #[error("Beacon client error: {0}")] + BeaconClient(#[from] reqwest::Error), + /// Invalid timestamp for slot. + #[error("invalid block timestamp: genesis {0}, provided {1}")] + InvalidBlockTimestamp(u64, u64), + /// Database error. + #[error(transparent)] + Database(#[from] DatabaseError), + /// Other error. + #[error("{0}")] + Other(&'static str), +} + +/// An online implementation of the [`L1Provider`] trait. +#[derive(Debug)] +pub struct OnlineL1Provider

{ + /// The Beacon client. + beacon_client: OnlineBeaconClient, + /// The cache for blobs from similar blocks. + cache: Arc>>>, + /// The L1 message provider + l1_message_provider: P, + /// The genesis timestamp for the Beacon chain. + genesis_timestamp: u64, + /// The slot interval for the Beacon chain. + slot_interval: u64, +} + +impl

OnlineL1Provider

{ + /// Returns a new [`OnlineBeaconClient`] from the provided [`OnlineBeaconClient`], blob capacity + /// and [`L1MessageProvider`]. + pub async fn new( + client: OnlineBeaconClient, + blob_capacity: usize, + l1_message_provider: P, + ) -> Self { + let cache = Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(blob_capacity).expect("cache requires non-zero capacity"), + ))); + let config = + client.config_spec().await.expect("failed to fetch Beacon chain configuration"); + let genesis = + client.beacon_genesis().await.expect("failed to fetch Beacon chain genesis info"); + + Self { + beacon_client: client, + cache, + l1_message_provider, + genesis_timestamp: genesis.data.genesis_time, + slot_interval: config.data.seconds_per_slot, + } + } + + const fn slot(&self, block_timestamp: u64) -> Result { + if block_timestamp < self.genesis_timestamp { + return Err(L1ProviderError::InvalidBlockTimestamp( + self.genesis_timestamp, + block_timestamp, + )) + } + + Ok((block_timestamp - self.genesis_timestamp) / self.slot_interval) + } +} + +#[async_trait::async_trait] +impl L1BlobProvider for OnlineL1Provider

{ + /// Returns the requested blob corresponding to the passed hash. + async fn blob( + &self, + block_timestamp: u64, + hash: B256, + ) -> Result>, L1ProviderError> { + // check if the requested blob is in the cache. + let mut cache = self.cache.lock().await; + if let Some(blob) = cache.get(&hash) { + return Ok(Some(blob.clone())); + } + // avoid holding the lock over the blob request. + drop(cache); + + // query the blobs with the client, return target blob and store all others in cache. + let slot = self.slot(block_timestamp)?; + let mut blobs = self + .beacon_client + .blobs(slot) + .await? + .into_iter() + .map(|blob| BlobTransactionSidecarItem { + index: blob.index, + blob: blob.blob, + kzg_commitment: blob.kzg_commitment, + kzg_proof: blob.kzg_proof, + }) + .collect::>(); + + // if we find a blob, timestamp is valid. + // cache the other blobs and return the matched blob. + let maybe_blob = blobs.iter().position(|blob| blob.to_kzg_versioned_hash() == hash.0); + if let Some(position) = maybe_blob { + let blob = Arc::new(*blobs.remove(position).blob); + let mut cache = self.cache.lock().await; + for (hash, blob) in + blobs.iter().map(|b| (b.to_kzg_versioned_hash().into(), Arc::new(*b.blob))) + { + cache.put(hash, blob); + } + return Ok(Some(blob)) + } + + Ok(None) + } +} + +#[async_trait::async_trait] +impl L1MessageProvider for OnlineL1Provider

{ + type Error =

::Error; + + async fn next_l1_message(&self) -> Result, Self::Error> { + self.l1_message_provider.next_l1_message().await + } + + fn set_index_cursor(&mut self, index: u64) { + self.l1_message_provider.set_index_cursor(index) + } + + fn set_hash_cursor(&mut self, hash: B256) { + self.l1_message_provider.set_hash_cursor(hash) + } +} diff --git a/crates/providers/src/lib.rs b/crates/providers/src/lib.rs new file mode 100644 index 00000000..319478c9 --- /dev/null +++ b/crates/providers/src/lib.rs @@ -0,0 +1,15 @@ +//! The crate exposes various Providers along with their implementations for usage across the rollup +//! node. + +pub use beacon_client::OnlineBeaconClient; +mod beacon_client; + +pub use execution_payload::ExecutionPayloadProvider; +mod execution_payload; + +pub use l1::{ + blob::L1BlobProvider, + message::{DatabaseL1MessageProvider, L1MessageProvider}, + L1Provider, L1ProviderError, OnlineL1Provider, +}; +mod l1; diff --git a/crates/watcher/src/error.rs b/crates/watcher/src/error.rs index 8b0b94ab..3af0fede 100644 --- a/crates/watcher/src/error.rs +++ b/crates/watcher/src/error.rs @@ -36,6 +36,9 @@ pub enum FilterLogError { /// The log is missing a block number. #[error("missing block number for log")] MissingBlockNumber, + /// The log is missing a block timestamp. + #[error("missing block timestamp for log")] + MissingBlockTimestamp, /// The log is missing a transaction hash. #[error("unknown transaction hash for log")] MissingTransactionHash, diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index eb2ac88b..4fcd8d02 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -314,6 +314,7 @@ where let groups: Vec<_> = groups.into_iter().map(|(hash, group)| (hash, group.collect::>())).collect(); + // iterate each group of commits for (tx_hash, group) in groups { // fetch the commit transaction. let transaction = self @@ -327,9 +328,12 @@ where transaction.blob_versioned_hashes().unwrap_or(&[]).iter().copied(); let input = Arc::new(transaction.input().clone()); + // iterate the logs emitted in the group for (raw_log, decoded_log, _) in group { let block_number = raw_log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_timestamp = + raw_log.block_timestamp.ok_or(FilterLogError::MissingBlockTimestamp)?; let batch_index = decoded_log.batch_index.uint_try_to().expect("u256 to u64 conversion error"); @@ -338,6 +342,7 @@ where hash: decoded_log.batch_hash, index: batch_index, block_number, + block_timestamp, calldata: input.clone(), blob_versioned_hash: blob_versioned_hashes.next(), })) @@ -785,6 +790,7 @@ mod tests { batch_commit.inner = inner_log; batch_commit.transaction_hash = Some(*tx.inner.tx_hash()); batch_commit.block_number = Some(random!(u64)); + batch_commit.block_timestamp = Some(random!(u64)); logs.push(batch_commit); // When