diff --git a/Cargo.lock b/Cargo.lock index 74867a1b269..01b03534c88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9515,6 +9515,7 @@ dependencies = [ "reth-storage-api", "reth-tasks", "ringbuffer", + "serde", "serde_json", "test-case", "tokio", diff --git a/crates/optimism/flashblocks/Cargo.toml b/crates/optimism/flashblocks/Cargo.toml index e0754aab95e..ccaa44dec6e 100644 --- a/crates/optimism/flashblocks/Cargo.toml +++ b/crates/optimism/flashblocks/Cargo.toml @@ -34,11 +34,13 @@ alloy-rpc-types-engine = { workspace = true, features = ["serde"] } alloy-consensus.workspace = true # op-alloy +op-alloy-consensus.workspace = true op-alloy-rpc-types-engine = { workspace = true, features = ["k256"] } # io tokio.workspace = true tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } +serde.workspace = true serde_json.workspace = true url.workspace = true futures-util.workspace = true @@ -57,4 +59,3 @@ derive_more.workspace = true [dev-dependencies] test-case.workspace = true alloy-consensus.workspace = true -op-alloy-consensus.workspace = true diff --git a/crates/optimism/flashblocks/src/cache.rs b/crates/optimism/flashblocks/src/cache.rs index 9aeed3435e3..4acd44fca0b 100644 --- a/crates/optimism/flashblocks/src/cache.rs +++ b/crates/optimism/flashblocks/src/cache.rs @@ -5,17 +5,28 @@ use crate::{ sequence::{FlashBlockPendingSequence, SequenceExecutionOutcome}, + traits::{FlashblockDiff, FlashblockPayload, FlashblockPayloadBase}, worker::BuildArgs, - FlashBlock, FlashBlockCompleteSequence, PendingFlashBlock, + FlashBlockCompleteSequence, PendingFlashBlock, }; use alloy_eips::eip2718::WithEncoded; use alloy_primitives::B256; -use reth_primitives_traits::{NodePrimitives, Recovered, SignedTransaction}; +use reth_primitives_traits::{NodePrimitives, Recovered}; use reth_revm::cached::CachedReads; use ringbuffer::{AllocRingBuffer, RingBuffer}; use tokio::sync::broadcast; use tracing::*; +type CachedSequenceEntry

= ( + FlashBlockCompleteSequence

, + Vec::SignedTx>>>, +); + +type SequenceBuildArgs

= BuildArgs< + Vec::SignedTx>>>, +

::Base, +>; + /// Maximum number of cached sequences in the ring buffer. const CACHE_SIZE: usize = 3; /// 200 ms flashblock time. @@ -29,21 +40,21 @@ pub(crate) const FLASHBLOCK_BLOCK_TIME: u64 = 200; /// - Finding the best sequence to build based on local chain tip /// - Broadcasting completed sequences to subscribers #[derive(Debug)] -pub(crate) struct SequenceManager { +pub(crate) struct SequenceManager { /// Current pending sequence being built up from incoming flashblocks - pending: FlashBlockPendingSequence, + pending: FlashBlockPendingSequence

, /// Cached recovered transactions for the pending sequence - pending_transactions: Vec>>, + pending_transactions: Vec>>, /// Ring buffer of recently completed sequences bundled with their decoded transactions (FIFO, /// size 3) - completed_cache: AllocRingBuffer<(FlashBlockCompleteSequence, Vec>>)>, + completed_cache: AllocRingBuffer>, /// Broadcast channel for completed sequences - block_broadcaster: broadcast::Sender, + block_broadcaster: broadcast::Sender>, /// Whether to compute state roots when building blocks compute_state_root: bool, } -impl SequenceManager { +impl SequenceManager

{ /// Creates a new sequence manager. pub(crate) fn new(compute_state_root: bool) -> Self { let (block_broadcaster, _) = broadcast::channel(128); @@ -59,12 +70,14 @@ impl SequenceManager { /// Returns the sender half of the flashblock sequence broadcast channel. pub(crate) const fn block_sequence_broadcaster( &self, - ) -> &broadcast::Sender { + ) -> &broadcast::Sender> { &self.block_broadcaster } /// Gets a subscriber to the flashblock sequences produced. - pub(crate) fn subscribe_block_sequence(&self) -> crate::FlashBlockCompleteSequenceRx { + pub(crate) fn subscribe_block_sequence( + &self, + ) -> broadcast::Receiver> { self.block_broadcaster.subscribe() } @@ -76,12 +89,12 @@ impl SequenceManager { /// with computed `state_root`. /// /// Transactions are recovered once and cached for reuse during block building. - pub(crate) fn insert_flashblock(&mut self, flashblock: FlashBlock) -> eyre::Result<()> { + pub(crate) fn insert_flashblock(&mut self, flashblock: P) -> eyre::Result<()> { // If this starts a new block, finalize and cache the previous sequence BEFORE inserting - if flashblock.index == 0 && self.pending.count() > 0 { + if flashblock.index() == 0 && self.pending.count() > 0 { let completed = self.pending.finalize()?; let block_number = completed.block_number(); - let parent_hash = completed.payload_base().parent_hash; + let parent_hash = completed.payload_base().parent_hash(); trace!( target: "flashblocks", @@ -114,7 +127,7 @@ impl SequenceManager { } /// Returns the current pending sequence for inspection. - pub(crate) const fn pending(&self) -> &FlashBlockPendingSequence { + pub(crate) const fn pending(&self) -> &FlashBlockPendingSequence

{ &self.pending } @@ -129,21 +142,21 @@ impl SequenceManager { &mut self, local_tip_hash: B256, local_tip_timestamp: u64, - ) -> Option>>>> { + ) -> Option> { // Try to find a buildable sequence: (base, last_fb, transactions, cached_state, // source_name) let (base, last_flashblock, transactions, cached_state, source_name) = // Priority 1: Try current pending sequence - if let Some(base) = self.pending.payload_base().filter(|b| b.parent_hash == local_tip_hash) { - let cached_state = self.pending.take_cached_reads().map(|r| (base.parent_hash, r)); - let last_fb = self.pending.last_flashblock()?; + if let Some(base) = self.pending.payload_base().filter(|b| b.parent_hash() == local_tip_hash) { + let cached_state = self.pending.take_cached_reads().map(|r| (base.parent_hash(), r)); + let last_fb = self.pending.last_flashblock()?.clone(); let transactions = self.pending_transactions.clone(); (base, last_fb, transactions, cached_state, "pending") } // Priority 2: Try cached sequence with exact parent match - else if let Some((cached, txs)) = self.completed_cache.iter().find(|(c, _)| c.payload_base().parent_hash == local_tip_hash) { - let base = cached.payload_base().clone(); - let last_fb = cached.last(); + else if let Some((cached, txs)) = self.completed_cache.iter().find(|(c, _)| c.payload_base().parent_hash() == local_tip_hash) { + let base = cached.payload_base(); + let last_fb = cached.last().clone(); let transactions = txs.clone(); let cached_state = None; (base, last_fb, transactions, cached_state, "cached") @@ -179,20 +192,20 @@ impl SequenceManager { // compute the state root, causing FlashblockConsensusClient to lack precomputed state for // engine_newPayload. This is safe: we still have op-node as backstop to maintain // chain progression. - let block_time_ms = (base.timestamp - local_tip_timestamp) * 1000; + let block_time_ms = (base.timestamp() - local_tip_timestamp) * 1000; let expected_final_flashblock = block_time_ms / FLASHBLOCK_BLOCK_TIME; let compute_state_root = self.compute_state_root && - last_flashblock.diff.state_root.is_zero() && - last_flashblock.index >= expected_final_flashblock.saturating_sub(1); + last_flashblock.diff().state_root().is_zero() && + last_flashblock.index() >= expected_final_flashblock.saturating_sub(1); trace!( target: "flashblocks", - block_number = base.block_number, + block_number = base.block_number(), source = source_name, - flashblock_index = last_flashblock.index, + flashblock_index = last_flashblock.index(), expected_final_flashblock, compute_state_root_enabled = self.compute_state_root, - state_root_is_zero = last_flashblock.diff.state_root.is_zero(), + state_root_is_zero = last_flashblock.diff().state_root().is_zero(), will_compute_state_root = compute_state_root, "Building from flashblock sequence" ); @@ -201,8 +214,8 @@ impl SequenceManager { base, transactions, cached_state, - last_flashblock_index: last_flashblock.index, - last_flashblock_hash: last_flashblock.diff.block_hash, + last_flashblock_index: last_flashblock.index(), + last_flashblock_hash: last_flashblock.diff().block_hash(), compute_state_root, }) } @@ -227,7 +240,7 @@ impl SequenceManager { }); // Update pending sequence with execution results - if self.pending.payload_base().is_some_and(|base| base.parent_hash == parent_hash) { + if self.pending.payload_base().is_some_and(|base| base.parent_hash() == parent_hash) { self.pending.set_execution_outcome(execution_outcome); self.pending.set_cached_reads(cached_reads); trace!( @@ -241,7 +254,7 @@ impl SequenceManager { else if let Some((cached, _)) = self .completed_cache .iter_mut() - .find(|(c, _)| c.payload_base().parent_hash == parent_hash) + .find(|(c, _)| c.payload_base().parent_hash() == parent_hash) { // Only re-broadcast if we computed new information (state_root was missing). // If sequencer already provided state_root, we already broadcast in insert_flashblock, @@ -266,19 +279,18 @@ impl SequenceManager { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::TestFlashBlockFactory; + use crate::{test_utils::TestFlashBlockFactory, FlashBlock}; use alloy_primitives::B256; - use op_alloy_consensus::OpTxEnvelope; #[test] fn test_sequence_manager_new() { - let manager: SequenceManager = SequenceManager::new(true); + let manager: SequenceManager = SequenceManager::new(true); assert_eq!(manager.pending().count(), 0); } #[test] fn test_insert_flashblock_creates_pending_sequence() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -290,7 +302,7 @@ mod tests { #[test] fn test_insert_flashblock_caches_completed_sequence() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); // Build first sequence @@ -314,7 +326,7 @@ mod tests { #[test] fn test_next_buildable_args_returns_none_when_empty() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let local_tip_hash = B256::random(); let local_tip_timestamp = 1000; @@ -324,7 +336,7 @@ mod tests { #[test] fn test_next_buildable_args_matches_pending_parent() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -340,7 +352,7 @@ mod tests { #[test] fn test_next_buildable_args_returns_none_when_parent_mismatch() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -354,7 +366,7 @@ mod tests { #[test] fn test_next_buildable_args_prefers_pending_over_cached() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); // Create and finalize first sequence @@ -373,7 +385,7 @@ mod tests { #[test] fn test_next_buildable_args_finds_cached_sequence() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); // Build and cache first sequence @@ -396,7 +408,7 @@ mod tests { #[test] fn test_compute_state_root_logic_near_expected_final() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let block_time = 2u64; let factory = TestFlashBlockFactory::new().with_block_time(block_time); @@ -420,7 +432,7 @@ mod tests { #[test] fn test_no_compute_state_root_when_provided_by_sequencer() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let block_time = 2u64; let factory = TestFlashBlockFactory::new().with_block_time(block_time); @@ -437,7 +449,7 @@ mod tests { #[test] fn test_no_compute_state_root_when_disabled() { - let mut manager: SequenceManager = SequenceManager::new(false); + let mut manager: SequenceManager = SequenceManager::new(false); let block_time = 2u64; let factory = TestFlashBlockFactory::new().with_block_time(block_time); @@ -461,7 +473,7 @@ mod tests { #[test] fn test_cache_ring_buffer_evicts_oldest() { - let mut manager: SequenceManager = SequenceManager::new(true); + let mut manager: SequenceManager = SequenceManager::new(true); let factory = TestFlashBlockFactory::new(); // Fill cache with 4 sequences (cache size is 3, so oldest should be evicted) diff --git a/crates/optimism/flashblocks/src/consensus.rs b/crates/optimism/flashblocks/src/consensus.rs index 0b502c07387..42a03d9945c 100644 --- a/crates/optimism/flashblocks/src/consensus.rs +++ b/crates/optimism/flashblocks/src/consensus.rs @@ -1,7 +1,10 @@ -use crate::{FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx}; +use crate::{ + traits::FlashblockPayloadBase, FlashBlock, FlashBlockCompleteSequence, + FlashBlockCompleteSequenceRx, +}; use alloy_primitives::B256; use alloy_rpc_types_engine::PayloadStatusEnum; -use op_alloy_rpc_types_engine::OpExecutionData; +use op_alloy_rpc_types_engine::{OpExecutionData, OpFlashblockPayload}; use reth_engine_primitives::ConsensusEngineHandle; use reth_optimism_payload_builder::OpPayloadTypes; use reth_payload_primitives::{EngineApiMessageVersion, ExecutionPayload, PayloadTypes}; @@ -22,18 +25,19 @@ where /// Handle to execution client. engine_handle: ConsensusEngineHandle

, /// Receiver for completed flashblock sequences from `FlashBlockService`. - sequence_receiver: FlashBlockCompleteSequenceRx, + sequence_receiver: FlashBlockCompleteSequenceRx, } impl

FlashBlockConsensusClient

where P: PayloadTypes, - P::ExecutionData: for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>, + P::ExecutionData: + for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>, { /// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver. pub const fn new( engine_handle: ConsensusEngineHandle

, - sequence_receiver: FlashBlockCompleteSequenceRx, + sequence_receiver: FlashBlockCompleteSequenceRx, ) -> eyre::Result { Ok(Self { engine_handle, sequence_receiver }) } @@ -44,12 +48,12 @@ where /// in which case this returns the `parent_hash` instead to drive the chain forward. /// /// Returns the block hash to use for FCU (either the new block or parent). - async fn submit_new_payload(&self, sequence: &FlashBlockCompleteSequence) -> B256 { + async fn submit_new_payload(&self, sequence: &FlashBlockCompleteSequence) -> B256 { let payload = match P::ExecutionData::try_from(sequence) { Ok(payload) => payload, Err(err) => { trace!(target: "flashblocks", %err, "Failed payload conversion, using parent hash"); - return sequence.payload_base().parent_hash; + return sequence.payload_base().parent_hash(); } }; @@ -93,11 +97,11 @@ where async fn submit_forkchoice_update( &self, head_block_hash: B256, - sequence: &FlashBlockCompleteSequence, + sequence: &FlashBlockCompleteSequence, ) { let block_number = sequence.block_number(); - let safe_hash = sequence.payload_base().parent_hash; - let finalized_hash = sequence.payload_base().parent_hash; + let safe_hash = sequence.payload_base().parent_hash(); + let finalized_hash = sequence.payload_base().parent_hash(); let fcu_state = alloy_rpc_types_engine::ForkchoiceState { head_block_hash, safe_block_hash: safe_hash, @@ -157,10 +161,10 @@ where } } -impl TryFrom<&FlashBlockCompleteSequence> for OpExecutionData { +impl TryFrom<&FlashBlockCompleteSequence> for OpExecutionData { type Error = &'static str; - fn try_from(sequence: &FlashBlockCompleteSequence) -> Result { + fn try_from(sequence: &FlashBlockCompleteSequence) -> Result { let mut data = Self::from_flashblocks_unchecked(sequence); // If execution outcome is available, use the computed state_root and block_hash. @@ -320,7 +324,7 @@ mod tests { assert!(conversion_result.is_err()); // In the actual run loop, submit_new_payload would return parent_hash - assert_eq!(sequence.payload_base().parent_hash, parent_hash); + assert_eq!(sequence.payload_base().parent_hash(), parent_hash); } #[test] @@ -357,7 +361,7 @@ mod tests { let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap(); // Verify the expected forkchoice state - assert_eq!(sequence.payload_base().parent_hash, parent_hash); + assert_eq!(sequence.payload_base().parent_hash(), parent_hash); } #[test] @@ -389,7 +393,7 @@ mod tests { let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap(); // The head_block_hash for FCU would be parent_hash (fallback) - assert_eq!(sequence.payload_base().parent_hash, parent_hash); + assert_eq!(sequence.payload_base().parent_hash(), parent_hash); } } @@ -426,7 +430,7 @@ mod tests { assert!(conversion.is_err()); // But FCU should still happen with parent_hash - assert!(sequence.payload_base().parent_hash != B256::ZERO); + assert!(sequence.payload_base().parent_hash() != B256::ZERO); } #[test] diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index 6c5d9c1e86e..cfe18d2ea88 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -14,6 +14,11 @@ use std::sync::Arc; // Included to enable serde feature for OpReceipt type used transitively use reth_optimism_primitives as _; +pub mod traits; +pub use traits::{FlashblockDiff, FlashblockPayload, FlashblockPayloadBase}; + +mod op_impl; + mod consensus; pub use consensus::FlashBlockConsensusClient; @@ -21,7 +26,9 @@ mod payload; pub use payload::{FlashBlock, PendingFlashBlock}; mod sequence; -pub use sequence::{FlashBlockCompleteSequence, FlashBlockPendingSequence}; +pub use sequence::{ + FlashBlockCompleteSequence, FlashBlockPendingSequence, SequenceExecutionOutcome, +}; mod service; pub use service::{FlashBlockBuildInfo, FlashBlockService}; @@ -34,49 +41,43 @@ mod cache; mod test_utils; mod ws; -pub use ws::{WsConnect, WsFlashBlockStream}; +pub use ws::{FlashBlockDecoder, WsConnect, WsFlashBlockStream}; -/// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s. -/// -/// [`FlashBlock`]: crate::FlashBlock +/// Receiver of the most recent [`PendingFlashBlock`] built out of flashblocks. pub type PendingBlockRx = tokio::sync::watch::Receiver>>; -/// Receiver of the sequences of [`FlashBlock`]s built. -/// -/// [`FlashBlock`]: crate::FlashBlock -pub type FlashBlockCompleteSequenceRx = - tokio::sync::broadcast::Receiver; +/// Receiver of the sequences of flashblocks built (using OP payload type). +pub type FlashBlockCompleteSequenceRx

= + tokio::sync::broadcast::Receiver>; -/// Receiver of received [`FlashBlock`]s from the (websocket) subscription. -/// -/// [`FlashBlock`]: crate::FlashBlock +/// Receiver of received flashblocks from the (websocket) subscription. pub type FlashBlockRx = tokio::sync::broadcast::Receiver>; -/// Receiver that signals whether a [`FlashBlock`] is currently being built. +/// Receiver that signals whether a flashblock is currently being built. pub type InProgressFlashBlockRx = tokio::sync::watch::Receiver>; /// Container for all flashblocks-related listeners. /// /// Groups together the channels for flashblock-related updates. #[derive(Debug)] -pub struct FlashblocksListeners { - /// Receiver of the most recent executed [`PendingFlashBlock`] built out of [`FlashBlock`]s. +pub struct FlashblocksListeners { + /// Receiver of the most recent executed pending block built from flashblocks. pub pending_block_rx: PendingBlockRx, - /// Subscription channel of the complete sequences of [`FlashBlock`]s built. - pub flashblocks_sequence: tokio::sync::broadcast::Sender, - /// Receiver that signals whether a [`FlashBlock`] is currently being built. + /// Subscription channel of the complete sequences of flashblocks built. + pub flashblocks_sequence: tokio::sync::broadcast::Sender>, + /// Receiver that signals whether a flashblock is currently being built. pub in_progress_rx: InProgressFlashBlockRx, /// Subscription channel for received flashblocks from the (websocket) connection. - pub received_flashblocks: tokio::sync::broadcast::Sender>, + pub received_flashblocks: tokio::sync::broadcast::Sender>, } -impl FlashblocksListeners { +impl FlashblocksListeners { /// Creates a new [`FlashblocksListeners`] with the given channels. pub const fn new( pending_block_rx: PendingBlockRx, - flashblocks_sequence: tokio::sync::broadcast::Sender, + flashblocks_sequence: tokio::sync::broadcast::Sender>, in_progress_rx: InProgressFlashBlockRx, - received_flashblocks: tokio::sync::broadcast::Sender>, + received_flashblocks: tokio::sync::broadcast::Sender>, ) -> Self { Self { pending_block_rx, flashblocks_sequence, in_progress_rx, received_flashblocks } } diff --git a/crates/optimism/flashblocks/src/op_impl.rs b/crates/optimism/flashblocks/src/op_impl.rs new file mode 100644 index 00000000000..3bd840959dd --- /dev/null +++ b/crates/optimism/flashblocks/src/op_impl.rs @@ -0,0 +1,94 @@ +//! Optimism implementation of flashblock traits. + +use crate::traits::{FlashblockDiff, FlashblockPayload, FlashblockPayloadBase}; +use alloy_consensus::crypto::RecoveryError; +use alloy_eips::{eip2718::WithEncoded, eip4895::Withdrawals}; +use alloy_primitives::{Bloom, Bytes, B256}; +use alloy_rpc_types_engine::PayloadId; +use op_alloy_consensus::OpTxEnvelope; +use op_alloy_rpc_types_engine::{ + OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, +}; +use reth_primitives_traits::Recovered; + +impl FlashblockPayloadBase for OpFlashblockPayloadBase { + fn parent_hash(&self) -> B256 { + self.parent_hash + } + + fn block_number(&self) -> u64 { + self.block_number + } + + fn timestamp(&self) -> u64 { + self.timestamp + } +} + +impl FlashblockDiff for OpFlashblockPayloadDelta { + fn block_hash(&self) -> B256 { + self.block_hash + } + + fn state_root(&self) -> B256 { + self.state_root + } + + fn gas_used(&self) -> u64 { + self.gas_used + } + + fn logs_bloom(&self) -> &Bloom { + &self.logs_bloom + } + + fn receipts_root(&self) -> B256 { + self.receipts_root + } + + fn transactions_raw(&self) -> &[Bytes] { + &self.transactions + } + + fn withdrawals(&self) -> Option<&Withdrawals> { + // TODO: Might not be needed as withdrawals aren't processed in a block except if at start + // or end + None + } + + fn withdrawals_root(&self) -> Option { + Some(self.withdrawals_root) + } +} + +impl FlashblockPayload for OpFlashblockPayload { + type Base = OpFlashblockPayloadBase; + type Diff = OpFlashblockPayloadDelta; + type SignedTx = OpTxEnvelope; + + fn index(&self) -> u64 { + self.index + } + + fn payload_id(&self) -> PayloadId { + self.payload_id + } + + fn base(&self) -> Option { + self.base.clone() + } + + fn diff(&self) -> &Self::Diff { + &self.diff + } + + fn block_number(&self) -> u64 { + Self::block_number(self) + } + + fn recover_transactions( + &self, + ) -> impl Iterator>, RecoveryError>> { + Self::recover_transactions::(self) + } +} diff --git a/crates/optimism/flashblocks/src/sequence.rs b/crates/optimism/flashblocks/src/sequence.rs index abf9e6d514c..0f1afe997e7 100644 --- a/crates/optimism/flashblocks/src/sequence.rs +++ b/crates/optimism/flashblocks/src/sequence.rs @@ -1,9 +1,8 @@ -use crate::{FlashBlock, FlashBlockCompleteSequenceRx}; +use crate::traits::FlashblockPayload; use alloy_primitives::{Bytes, B256}; use alloy_rpc_types_engine::PayloadId; use core::mem; use eyre::{bail, OptionExt}; -use op_alloy_rpc_types_engine::OpFlashblockPayloadBase; use reth_revm::cached::CachedReads; use std::{collections::BTreeMap, ops::Deref}; use tokio::sync::broadcast; @@ -21,23 +20,23 @@ pub struct SequenceExecutionOutcome { pub state_root: B256, } -/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices. +/// An ordered B-tree keeping the track of a sequence of flashblocks by their indices. #[derive(Debug)] -pub struct FlashBlockPendingSequence { +pub struct FlashBlockPendingSequence { /// tracks the individual flashblocks in order - inner: BTreeMap, + inner: BTreeMap, /// Broadcasts flashblocks to subscribers. - block_broadcaster: broadcast::Sender, + block_broadcaster: broadcast::Sender>, /// Optional execution outcome from building the current sequence. execution_outcome: Option, /// Cached state reads for the current block. - /// Current `PendingFlashBlock` is built out of a sequence of `FlashBlocks`, and executed again + /// Current `PendingFlashBlock` is built out of a sequence of flashblocks, and executed again /// when fb received on top of the same block. Avoid redundant I/O across multiple /// executions within the same block. cached_reads: Option, } -impl FlashBlockPendingSequence { +impl FlashBlockPendingSequence

{ /// Create a new pending sequence. pub fn new() -> Self { // Note: if the channel is full, send will not block but rather overwrite the oldest @@ -54,35 +53,35 @@ impl FlashBlockPendingSequence { /// Returns the sender half of the [`FlashBlockCompleteSequence`] channel. pub const fn block_sequence_broadcaster( &self, - ) -> &broadcast::Sender { + ) -> &broadcast::Sender> { &self.block_broadcaster } /// Gets a subscriber to the flashblock sequences produced. - pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx { + pub fn subscribe_block_sequence(&self) -> broadcast::Receiver> { self.block_broadcaster.subscribe() } /// Inserts a new block into the sequence. /// - /// A [`FlashBlock`] with index 0 resets the set. - pub fn insert(&mut self, flashblock: FlashBlock) { - if flashblock.index == 0 { + /// A flashblock with index 0 resets the set. + pub fn insert(&mut self, flashblock: P) { + if flashblock.index() == 0 { trace!(target: "flashblocks", number=%flashblock.block_number(), "Tracking new flashblock sequence"); - self.inner.insert(flashblock.index, flashblock); + self.inner.insert(flashblock.index(), flashblock); return; } // only insert if we previously received the same block and payload, assume we received // index 0 let same_block = self.block_number() == Some(flashblock.block_number()); - let same_payload = self.payload_id() == Some(flashblock.payload_id); + let same_payload = self.payload_id() == Some(flashblock.payload_id()); if same_block && same_payload { - trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock"); - self.inner.insert(flashblock.index, flashblock); + trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index(), block_count = self.inner.len() ,"Received followup flashblock"); + self.inner.insert(flashblock.index(), flashblock); } else { - trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following"); + trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index(), current=?self.block_number() ,"Ignoring untracked flashblock following"); } } @@ -110,8 +109,8 @@ impl FlashBlockPendingSequence { } /// Returns the payload base of the first tracked flashblock. - pub fn payload_base(&self) -> Option { - self.inner.values().next()?.base.clone() + pub fn payload_base(&self) -> Option { + self.inner.values().next()?.base() } /// Returns the number of tracked flashblocks. @@ -120,23 +119,24 @@ impl FlashBlockPendingSequence { } /// Returns the reference to the last flashblock. - pub fn last_flashblock(&self) -> Option<&FlashBlock> { + pub fn last_flashblock(&self) -> Option<&P> { self.inner.last_key_value().map(|(_, b)| b) } /// Returns the current/latest flashblock index in the sequence pub fn index(&self) -> Option { - Some(self.inner.values().last()?.index) + Some(self.inner.values().last()?.index()) } + /// Returns the payload id of the first tracked flashblock in the current sequence. pub fn payload_id(&self) -> Option { - Some(self.inner.values().next()?.payload_id) + Some(self.inner.values().next()?.payload_id()) } /// Finalizes the current pending sequence and returns it as a complete sequence. /// /// Clears the internal state and returns an error if the sequence is empty or validation fails. - pub fn finalize(&mut self) -> eyre::Result { + pub fn finalize(&mut self) -> eyre::Result> { if self.inner.is_empty() { bail!("Cannot finalize empty flashblock sequence"); } @@ -149,12 +149,12 @@ impl FlashBlockPendingSequence { } /// Returns an iterator over all flashblocks in the sequence. - pub fn flashblocks(&self) -> impl Iterator { + pub fn flashblocks(&self) -> impl Iterator { self.inner.values() } } -impl Default for FlashBlockPendingSequence { +impl Default for FlashBlockPendingSequence

{ fn default() -> Self { Self::new() } @@ -166,31 +166,31 @@ impl Default for FlashBlockPendingSequence { /// If this entire sequence of flashblocks was executed on top of latest block, this also includes /// the execution outcome with block hash and state root. #[derive(Debug, Clone)] -pub struct FlashBlockCompleteSequence { - inner: Vec, +pub struct FlashBlockCompleteSequence { + inner: Vec

, /// Optional execution outcome from building the flashblock sequence execution_outcome: Option, } -impl FlashBlockCompleteSequence { +impl FlashBlockCompleteSequence

{ /// Create a complete sequence from a vector of flashblocks. /// Ensure that: /// * vector is not empty /// * first flashblock have the base payload /// * sequence of flashblocks is sound (successive index from 0, same payload id, ...) pub fn new( - blocks: Vec, + blocks: Vec

, execution_outcome: Option, ) -> eyre::Result { let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?; // Ensure that first flashblock have base - first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?; + first_block.base().ok_or_eyre("Flashblock at index 0 has no base")?; // Ensure that index are successive from 0, have same block number and payload id if !blocks.iter().enumerate().all(|(idx, block)| { - idx == block.index as usize && - block.payload_id == first_block.payload_id && + idx == block.index() as usize && + block.payload_id() == first_block.payload_id() && block.block_number() == first_block.block_number() }) { bail!("Flashblock inconsistencies detected in sequence"); @@ -205,8 +205,8 @@ impl FlashBlockCompleteSequence { } /// Returns the payload base of the first flashblock. - pub fn payload_base(&self) -> &OpFlashblockPayloadBase { - self.inner.first().unwrap().base.as_ref().unwrap() + pub fn payload_base(&self) -> P::Base { + self.inner.first().unwrap().base().unwrap() } /// Returns the number of flashblocks in the sequence. @@ -215,7 +215,7 @@ impl FlashBlockCompleteSequence { } /// Returns the last flashblock in the sequence. - pub fn last(&self) -> &FlashBlock { + pub fn last(&self) -> &P { self.inner.last().unwrap() } @@ -234,21 +234,27 @@ impl FlashBlockCompleteSequence { /// Returns all transactions from all flashblocks in the sequence pub fn all_transactions(&self) -> Vec { - self.inner.iter().flat_map(|fb| fb.diff.transactions.iter().cloned()).collect() + use crate::traits::FlashblockDiff; + self.inner.iter().flat_map(|fb| fb.diff().transactions_raw().iter().cloned()).collect() + } + + /// Returns an iterator over all flashblocks in the sequence. + pub fn flashblocks(&self) -> impl Iterator { + self.inner.iter() } } -impl Deref for FlashBlockCompleteSequence { - type Target = Vec; +impl Deref for FlashBlockCompleteSequence

{ + type Target = Vec

; fn deref(&self) -> &Self::Target { &self.inner } } -impl TryFrom for FlashBlockCompleteSequence { +impl TryFrom> for FlashBlockCompleteSequence

{ type Error = eyre::Error; - fn try_from(sequence: FlashBlockPendingSequence) -> Result { + fn try_from(sequence: FlashBlockPendingSequence

) -> Result { Self::new(sequence.inner.into_values().collect(), sequence.execution_outcome) } } @@ -256,14 +262,15 @@ impl TryFrom for FlashBlockCompleteSequence { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::TestFlashBlockFactory; + use crate::{test_utils::TestFlashBlockFactory, FlashBlock}; mod pending_sequence_insert { use super::*; #[test] fn test_insert_index_zero_creates_new_sequence() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); let payload_id = fb0.payload_id; @@ -277,7 +284,8 @@ mod tests { #[test] fn test_insert_followup_same_block_and_payload() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -295,7 +303,8 @@ mod tests { #[test] fn test_insert_ignores_different_block_number() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -311,7 +320,8 @@ mod tests { #[test] fn test_insert_ignores_different_payload_id() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -329,7 +339,8 @@ mod tests { #[test] fn test_insert_maintains_btree_order() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -341,7 +352,7 @@ mod tests { let fb1 = factory.flashblock_after(&fb0).build(); sequence.insert(fb1); - let indices: Vec = sequence.flashblocks().map(|fb| fb.index).collect(); + let indices: Vec = sequence.flashblocks().map(|fb| fb.index()).collect(); assert_eq!(indices, vec![0, 1, 2]); } } @@ -351,7 +362,8 @@ mod tests { #[test] fn test_finalize_empty_sequence_fails() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let result = sequence.finalize(); assert!(result.is_err()); @@ -363,7 +375,8 @@ mod tests { #[test] fn test_finalize_clears_pending_state() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -380,7 +393,8 @@ mod tests { #[test] fn test_finalize_preserves_execution_outcome() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -397,7 +411,8 @@ mod tests { #[test] fn test_finalize_clears_cached_reads() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -415,7 +430,8 @@ mod tests { #[test] fn test_finalize_multiple_times_after_refill() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); // First sequence @@ -440,7 +456,7 @@ mod tests { #[test] fn test_new_empty_sequence_fails() { - let result = FlashBlockCompleteSequence::new(vec![], None); + let result = FlashBlockCompleteSequence::::new(vec![], None); assert!(result.is_err()); assert_eq!(result.unwrap_err().to_string(), "No flashblocks in sequence"); } @@ -532,7 +548,7 @@ mod tests { let complete = result.unwrap(); assert_eq!(complete.count(), 3); - assert_eq!(complete.last().index, 2); + assert_eq!(complete.last().index(), 2); } #[test] @@ -605,28 +621,30 @@ mod tests { #[test] fn test_try_from_pending_to_complete_valid() { - let mut pending = FlashBlockPendingSequence::new(); + let mut pending: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); pending.insert(fb0); - let complete: Result = pending.try_into(); + let complete: Result, _> = pending.try_into(); assert!(complete.is_ok()); assert_eq!(complete.unwrap().count(), 1); } #[test] fn test_try_from_pending_to_complete_empty_fails() { - let pending = FlashBlockPendingSequence::new(); + let pending: FlashBlockPendingSequence = FlashBlockPendingSequence::new(); - let complete: Result = pending.try_into(); + let complete: Result, _> = pending.try_into(); assert!(complete.is_err()); } #[test] fn test_try_from_preserves_execution_outcome() { - let mut pending = FlashBlockPendingSequence::new(); + let mut pending: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -636,7 +654,7 @@ mod tests { SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() }; pending.set_execution_outcome(Some(outcome)); - let complete: FlashBlockCompleteSequence = pending.try_into().unwrap(); + let complete: FlashBlockCompleteSequence = pending.try_into().unwrap(); assert_eq!(complete.execution_outcome(), Some(outcome)); } } @@ -646,7 +664,8 @@ mod tests { #[test] fn test_last_flashblock_returns_highest_index() { - let mut sequence = FlashBlockPendingSequence::new(); + let mut sequence: FlashBlockPendingSequence = + FlashBlockPendingSequence::new(); let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); @@ -656,12 +675,12 @@ mod tests { sequence.insert(fb1); let last = sequence.last_flashblock().unwrap(); - assert_eq!(last.index, 1); + assert_eq!(last.index(), 1); } #[test] fn test_subscribe_block_sequence_channel() { - let sequence = FlashBlockPendingSequence::new(); + let sequence: FlashBlockPendingSequence = FlashBlockPendingSequence::new(); let mut rx = sequence.subscribe_block_sequence(); // Spawn a task that sends a complete sequence diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 4eed74683f7..6d149495606 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,11 +1,12 @@ use crate::{ - cache::SequenceManager, worker::FlashBlockBuilder, FlashBlock, FlashBlockCompleteSequence, - FlashBlockCompleteSequenceRx, InProgressFlashBlockRx, PendingFlashBlock, + cache::SequenceManager, + traits::{FlashblockPayload, FlashblockPayloadBase}, + worker::FlashBlockBuilder, + FlashBlockCompleteSequence, InProgressFlashBlockRx, PendingFlashBlock, }; use alloy_primitives::B256; use futures_util::{FutureExt, Stream, StreamExt}; use metrics::{Gauge, Histogram}; -use op_alloy_rpc_types_engine::OpFlashblockPayloadBase; use reth_evm::ConfigureEvm; use reth_metrics::Metrics; use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy}; @@ -13,45 +14,45 @@ use reth_revm::cached::CachedReads; use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; use reth_tasks::TaskExecutor; use std::{sync::Arc, time::Instant}; -use tokio::sync::{oneshot, watch}; +use tokio::sync::{broadcast, oneshot, watch}; use tracing::*; /// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of -/// [`FlashBlock`]s. +/// flashblocks. #[derive(Debug)] -pub struct FlashBlockService< +pub struct FlashBlockService +where N: NodePrimitives, - S, - EvmConfig: ConfigureEvm + Unpin>, - Provider, -> { + P: FlashblockPayload, + EvmConfig: ConfigureEvm + Unpin>, +{ /// Incoming flashblock stream. incoming_flashblock_rx: S, /// Signals when a block build is in progress. in_progress_tx: watch::Sender>, /// Broadcast channel to forward received flashblocks from the subscription. - received_flashblocks_tx: tokio::sync::broadcast::Sender>, + received_flashblocks_tx: broadcast::Sender>, /// Executes flashblock sequences to build pending blocks. - builder: FlashBlockBuilder, + builder: FlashBlockBuilder, /// Task executor for spawning block build jobs. spawner: TaskExecutor, /// Currently running block build job with start time and result receiver. job: Option>, /// Manages flashblock sequences with caching and intelligent build selection. - sequences: SequenceManager, + sequences: SequenceManager

, /// `FlashBlock` service's metrics metrics: FlashBlockServiceMetrics, } -impl FlashBlockService +impl FlashBlockService where N: NodePrimitives, - S: Stream> + Unpin + 'static, - EvmConfig: ConfigureEvm + Unpin> - + Clone - + 'static, + P: FlashblockPayload, + S: Stream> + Unpin + 'static, + EvmConfig: + ConfigureEvm + Unpin> + Clone + 'static, Provider: StateProviderFactory + BlockReaderIdExt< Header = HeaderTy, @@ -62,7 +63,7 @@ where + Clone + 'static, { - /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream. + /// Constructs a new `FlashBlockService` that receives flashblocks from `rx` stream. pub fn new( incoming_flashblock_rx: S, evm_config: EvmConfig, @@ -71,7 +72,7 @@ where compute_state_root: bool, ) -> Self { let (in_progress_tx, _) = watch::channel(None); - let (received_flashblocks_tx, _) = tokio::sync::broadcast::channel(128); + let (received_flashblocks_tx, _) = broadcast::channel(128); Self { incoming_flashblock_rx, in_progress_tx, @@ -85,21 +86,19 @@ where } /// Returns the sender half to the received flashblocks. - pub const fn flashblocks_broadcaster( - &self, - ) -> &tokio::sync::broadcast::Sender> { + pub const fn flashblocks_broadcaster(&self) -> &broadcast::Sender> { &self.received_flashblocks_tx } /// Returns the sender half to the flashblock sequence. pub const fn block_sequence_broadcaster( &self, - ) -> &tokio::sync::broadcast::Sender { + ) -> &broadcast::Sender> { self.sequences.block_sequence_broadcaster() } /// Returns a subscriber to the flashblock sequence. - pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx { + pub fn subscribe_block_sequence(&self) -> broadcast::Receiver> { self.sequences.subscribe_block_sequence() } @@ -181,10 +180,10 @@ where /// Processes a single flashblock: notifies subscribers, records metrics, and inserts into /// sequence. - fn process_flashblock(&mut self, flashblock: FlashBlock) { + fn process_flashblock(&mut self, flashblock: P) { self.notify_received_flashblock(&flashblock); - if flashblock.index == 0 { + if flashblock.index() == 0 { self.metrics.last_flashblock_length.record(self.sequences.pending().count() as f64); } @@ -194,7 +193,7 @@ where } /// Notifies all subscribers about the received flashblock. - fn notify_received_flashblock(&self, flashblock: &FlashBlock) { + fn notify_received_flashblock(&self, flashblock: &P) { if self.received_flashblocks_tx.receiver_count() > 0 { let _ = self.received_flashblocks_tx.send(Arc::new(flashblock.clone())); } @@ -217,9 +216,9 @@ where // Spawn build job let fb_info = FlashBlockBuildInfo { - parent_hash: args.base.parent_hash, + parent_hash: args.base.parent_hash(), index: args.last_flashblock_index, - block_number: args.base.block_number, + block_number: args.base.block_number(), }; self.metrics.current_block_height.set(fb_info.block_number as f64); self.metrics.current_index.set(fb_info.index as f64); diff --git a/crates/optimism/flashblocks/src/traits.rs b/crates/optimism/flashblocks/src/traits.rs new file mode 100644 index 00000000000..291fa0e11b0 --- /dev/null +++ b/crates/optimism/flashblocks/src/traits.rs @@ -0,0 +1,95 @@ +//! Generic traits for flashblock payloads. +//! +//! These traits enable chain-specific flashblock implementations while sharing +//! the core flashblock infrastructure. + +use alloy_consensus::crypto::RecoveryError; +use alloy_eips::eip4895::Withdrawals; +use alloy_primitives::{Bloom, Bytes, B256}; +use alloy_rpc_types_engine::PayloadId; + +/// Base payload information for constructing block environment. +/// +/// Contains all fields needed to configure EVM execution context for the next block. +/// This is present only on the first flashblock (index 0) of a sequence. +pub trait FlashblockPayloadBase: Clone + Send + Sync + std::fmt::Debug + 'static { + /// Parent block hash. + fn parent_hash(&self) -> B256; + /// Block number being built. + fn block_number(&self) -> u64; + /// Block timestamp. + fn timestamp(&self) -> u64; +} + +/// State diff from flashblock execution. +/// +/// Contains the cumulative state changes from executing transactions in this flashblock. +pub trait FlashblockDiff: Clone + Send + Sync + std::fmt::Debug + 'static { + /// Block hash after applying this flashblock. + fn block_hash(&self) -> B256; + /// State root after applying this flashblock. + fn state_root(&self) -> B256; + /// Cumulative gas used. + fn gas_used(&self) -> u64; + /// Bloom filter for logs. + fn logs_bloom(&self) -> &Bloom; + /// Receipts root. + fn receipts_root(&self) -> B256; + /// Raw encoded transactions in this flashblock. + fn transactions_raw(&self) -> &[Bytes]; + + /// Withdrawals included in this flashblock. + fn withdrawals(&self) -> Option<&Withdrawals> { + None + } + + /// Withdrawals root. + fn withdrawals_root(&self) -> Option { + None + } +} + +/// A flashblock payload representing one slice of a block. +/// +/// Flashblocks are incremental updates to block state, allowing for faster +/// pre-confirmations. A complete block is built from a sequence of flashblocks. +pub trait FlashblockPayload: + Clone + Send + Sync + std::fmt::Debug + for<'de> serde::Deserialize<'de> + 'static +{ + /// The base payload type containing block environment configuration. + type Base: FlashblockPayloadBase; + /// The diff type containing state changes. + type Diff: FlashblockDiff; + /// The signed transaction type for this chain. + type SignedTx: reth_primitives_traits::SignedTransaction; + + /// Sequential index of this flashblock within the current block's sequence. + fn index(&self) -> u64; + + /// Unique identifier for the payload being built. + fn payload_id(&self) -> PayloadId; + + /// Base payload (only present on index 0). + fn base(&self) -> Option; + + /// State diff for this flashblock. + fn diff(&self) -> &Self::Diff; + + /// Block number this flashblock belongs to. + fn block_number(&self) -> u64 { + self.base().map(|b| b.block_number()).unwrap_or(0) + } + + /// Recovers transactions from the raw transaction bytes in this flashblock. + /// + /// Each item is a result containing either the recovered transaction with its encoding, + /// or an error if decoding/recovery failed. + fn recover_transactions( + &self, + ) -> impl Iterator< + Item = Result< + alloy_eips::eip2718::WithEncoded>, + RecoveryError, + >, + >; +} diff --git a/crates/optimism/flashblocks/src/worker.rs b/crates/optimism/flashblocks/src/worker.rs index 7d9ab860a58..5b3ea6b5d16 100644 --- a/crates/optimism/flashblocks/src/worker.rs +++ b/crates/optimism/flashblocks/src/worker.rs @@ -1,7 +1,6 @@ -use crate::PendingFlashBlock; +use crate::{traits::FlashblockPayloadBase, PendingFlashBlock}; use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag}; use alloy_primitives::B256; -use op_alloy_rpc_types_engine::OpFlashblockPayloadBase; use reth_chain_state::{ComputedTrieData, ExecutedBlock}; use reth_errors::RethError; use reth_evm::{ @@ -16,6 +15,7 @@ use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State} use reth_rpc_eth_types::{EthApiError, PendingBlock}; use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory}; use std::{ + marker::PhantomData, sync::Arc, time::{Duration, Instant}, }; @@ -23,14 +23,15 @@ use tracing::trace; /// The `FlashBlockBuilder` builds [`PendingBlock`] out of a sequence of transactions. #[derive(Debug)] -pub(crate) struct FlashBlockBuilder { +pub(crate) struct FlashBlockBuilder { evm_config: EvmConfig, provider: Provider, + _base: PhantomData, } -impl FlashBlockBuilder { +impl FlashBlockBuilder { pub(crate) const fn new(evm_config: EvmConfig, provider: Provider) -> Self { - Self { evm_config, provider } + Self { evm_config, provider, _base: PhantomData } } pub(crate) const fn provider(&self) -> &Provider { @@ -38,8 +39,9 @@ impl FlashBlockBuilder { } } -pub(crate) struct BuildArgs { - pub(crate) base: OpFlashblockPayloadBase, +/// Arguments for building a block from flashblocks. +pub(crate) struct BuildArgs { + pub(crate) base: Base, pub(crate) transactions: I, pub(crate) cached_state: Option<(B256, CachedReads)>, pub(crate) last_flashblock_index: u64, @@ -47,10 +49,11 @@ pub(crate) struct BuildArgs { pub(crate) compute_state_root: bool, } -impl FlashBlockBuilder +impl FlashBlockBuilder where N: NodePrimitives, - EvmConfig: ConfigureEvm + Unpin>, + Base: FlashblockPayloadBase, + EvmConfig: ConfigureEvm + Unpin>, Provider: StateProviderFactory + BlockReaderIdExt< Header = HeaderTy, @@ -60,12 +63,12 @@ where > + Unpin, { /// Returns the [`PendingFlashBlock`] made purely out of transactions and - /// [`OpFlashblockPayloadBase`] in `args`. + /// the flashblock payload base in `args`. /// /// Returns `None` if the flashblock doesn't attach to the latest header. pub(crate) fn execute>>>( &self, - mut args: BuildArgs, + mut args: BuildArgs, ) -> eyre::Result, CachedReads)>> { trace!(target: "flashblocks", "Attempting new pending block from flashblocks"); @@ -75,8 +78,8 @@ where .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; let latest_hash = latest.hash(); - if args.base.parent_hash != latest_hash { - trace!(target: "flashblocks", flashblock_parent = ?args.base.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock"); + if args.base.parent_hash() != latest_hash { + trace!(target: "flashblocks", flashblock_parent = ?args.base.parent_hash(), local_latest=?latest.num_hash(),"Skipping non consecutive flashblock"); // doesn't attach to the latest block return Ok(None) } @@ -141,8 +144,14 @@ where } } -impl Clone for FlashBlockBuilder { +impl Clone + for FlashBlockBuilder +{ fn clone(&self) -> Self { - Self { evm_config: self.evm_config.clone(), provider: self.provider.clone() } + Self { + evm_config: self.evm_config.clone(), + provider: self.provider.clone(), + _base: PhantomData, + } } } diff --git a/crates/optimism/flashblocks/src/ws/decoding.rs b/crates/optimism/flashblocks/src/ws/decoding.rs index 64d96dc5e3e..615045d82c2 100644 --- a/crates/optimism/flashblocks/src/ws/decoding.rs +++ b/crates/optimism/flashblocks/src/ws/decoding.rs @@ -1,24 +1,28 @@ -use crate::FlashBlock; use alloy_primitives::bytes::Bytes; use std::io; -/// A trait for decoding flashblocks from bytes. -pub trait FlashBlockDecoder: Send + 'static { - /// Decodes `bytes` into a [`FlashBlock`]. - fn decode(&self, bytes: Bytes) -> eyre::Result; +/// A trait for decoding flashblocks from bytes into payload type `F`. +pub trait FlashBlockDecoder: Send + 'static { + /// Decodes `bytes` into a flashblock payload of type `F`. + fn decode(&self, bytes: Bytes) -> eyre::Result; } -/// Default implementation of the decoder. -impl FlashBlockDecoder for () { - fn decode(&self, bytes: Bytes) -> eyre::Result { +impl FlashBlockDecoder for () +where + F: serde::de::DeserializeOwned, +{ + fn decode(&self, bytes: Bytes) -> eyre::Result { decode_flashblock(bytes) } } -pub(crate) fn decode_flashblock(bytes: Bytes) -> eyre::Result { - let bytes = crate::ws::decoding::try_parse_message(bytes)?; +fn decode_flashblock(bytes: Bytes) -> eyre::Result +where + F: serde::de::DeserializeOwned, +{ + let bytes = try_decompress(bytes)?; - let payload: FlashBlock = + let payload: F = serde_json::from_slice(&bytes).map_err(|e| eyre::eyre!("failed to parse message: {e}"))?; Ok(payload) @@ -30,7 +34,7 @@ pub(crate) fn decode_flashblock(bytes: Bytes) -> eyre::Result { /// then it assumes that it is JSON-encoded and returns it as-is. /// /// Otherwise, the `bytes` are passed through a brotli decompressor and returned. -fn try_parse_message(bytes: Bytes) -> eyre::Result { +fn try_decompress(bytes: Bytes) -> eyre::Result { if bytes.trim_ascii_start().starts_with(b"{") { return Ok(bytes); } diff --git a/crates/optimism/flashblocks/src/ws/mod.rs b/crates/optimism/flashblocks/src/ws/mod.rs index 8c8a5910892..651d83c916b 100644 --- a/crates/optimism/flashblocks/src/ws/mod.rs +++ b/crates/optimism/flashblocks/src/ws/mod.rs @@ -1,6 +1,6 @@ pub use stream::{WsConnect, WsFlashBlockStream}; mod decoding; -pub(crate) use decoding::FlashBlockDecoder; +pub use decoding::FlashBlockDecoder; mod stream; diff --git a/crates/optimism/flashblocks/src/ws/stream.rs b/crates/optimism/flashblocks/src/ws/stream.rs index e46fd6d747f..8a5ee7b547d 100644 --- a/crates/optimism/flashblocks/src/ws/stream.rs +++ b/crates/optimism/flashblocks/src/ws/stream.rs @@ -1,4 +1,4 @@ -use crate::{ws::FlashBlockDecoder, FlashBlock}; +use crate::ws::FlashBlockDecoder; use futures_util::{ stream::{SplitSink, SplitStream}, FutureExt, Sink, Stream, StreamExt, @@ -18,23 +18,27 @@ use tokio_tungstenite::{ use tracing::debug; use url::Url; -/// An asynchronous stream of [`FlashBlock`] from a websocket connection. +/// An asynchronous stream of flashblock payloads from a websocket connection. /// -/// The stream attempts to connect to a websocket URL and then decode each received item. +/// The stream attempts to connect to a websocket URL and then decode each received item +/// into the payload type `F`. /// /// If the connection fails, the error is returned and connection retried. The number of retries is /// unbounded. -pub struct WsFlashBlockStream { +pub struct WsFlashBlockStream { ws_url: Url, state: State, connector: Connector, - decoder: Box, + decoder: Box>, connect: ConnectFuture, stream: Option, sink: Option, } -impl WsFlashBlockStream { +impl WsFlashBlockStream +where + F: serde::de::DeserializeOwned, +{ /// Creates a new websocket stream over `ws_url`. pub fn new(ws_url: Url) -> Self { Self { @@ -48,13 +52,16 @@ impl WsFlashBlockStream { } } - /// Sets the [`FlashBlock`] decoder for the websocket stream. - pub fn with_decoder(self, decoder: Box) -> Self { + /// Sets a custom decoder for the websocket stream. + pub fn with_decoder(self, decoder: Box>) -> Self { Self { decoder, ..self } } } -impl WsFlashBlockStream { +impl WsFlashBlockStream +where + F: serde::de::DeserializeOwned, +{ /// Creates a new websocket stream over `ws_url`. pub fn with_connector(ws_url: Url, connector: C) -> Self { Self { @@ -69,13 +76,14 @@ impl WsFlashBlockStream { } } -impl Stream for WsFlashBlockStream +impl Stream for WsFlashBlockStream where Str: Stream> + Unpin, S: Sink + Send + Unpin, C: WsConnect + Clone + Send + 'static + Unpin, + F: 'static, { - type Item = eyre::Result; + type Item = eyre::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -136,7 +144,7 @@ where } } -impl WsFlashBlockStream +impl WsFlashBlockStream where C: WsConnect + Clone + Send + 'static, { @@ -169,7 +177,7 @@ where } } -impl Debug for WsFlashBlockStream { +impl Debug for WsFlashBlockStream { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlashBlockStream") .field("ws_url", &self.ws_url) @@ -240,6 +248,7 @@ impl WsConnect for WsConnector { #[cfg(test)] mod tests { use super::*; + use crate::FlashBlock; use alloy_primitives::bytes::Bytes; use brotli::enc::BrotliEncoderParams; use std::{future, iter}; @@ -463,7 +472,8 @@ mod tests { let flashblocks = [flashblock()]; let connector = FakeConnector::from(flashblocks.iter().map(to_message)); let ws_url = "http://localhost".parse().unwrap(); - let stream = WsFlashBlockStream::with_connector(ws_url, connector); + let stream: WsFlashBlockStream<_, _, _, FlashBlock> = + WsFlashBlockStream::with_connector(ws_url, connector); let actual_messages: Vec<_> = stream.take(1).map(Result::unwrap).collect().await; let expected_messages = flashblocks.to_vec(); @@ -478,7 +488,8 @@ mod tests { let flashblock = flashblock(); let connector = FakeConnector::from([Ok(message), to_json_binary_message(&flashblock)]); let ws_url = "http://localhost".parse().unwrap(); - let mut stream = WsFlashBlockStream::with_connector(ws_url, connector); + let mut stream: WsFlashBlockStream<_, _, _, FlashBlock> = + WsFlashBlockStream::with_connector(ws_url, connector); let expected_message = flashblock; let actual_message = @@ -491,7 +502,8 @@ mod tests { async fn test_stream_passes_errors_through() { let connector = FakeConnector::from([Err(Error::AttackAttempt)]); let ws_url = "http://localhost".parse().unwrap(); - let stream = WsFlashBlockStream::with_connector(ws_url, connector); + let stream: WsFlashBlockStream<_, _, _, FlashBlock> = + WsFlashBlockStream::with_connector(ws_url, connector); let actual_messages: Vec<_> = stream.take(1).map(Result::unwrap_err).map(|e| format!("{e}")).collect().await; @@ -506,7 +518,8 @@ mod tests { let error_msg = "test".to_owned(); let connector = FailingConnector(error_msg.clone()); let ws_url = "http://localhost".parse().unwrap(); - let stream = WsFlashBlockStream::with_connector(ws_url, connector); + let stream: WsFlashBlockStream<_, _, _, FlashBlock> = + WsFlashBlockStream::with_connector(ws_url, connector); let actual_errors: Vec<_> = stream.take(tries).map(Result::unwrap_err).map(|e| format!("{e}")).collect().await; @@ -531,7 +544,8 @@ mod tests { let messages = [Ok(msg), to_json_binary_message(&flashblock)]; let connector = FakeConnectorWithSink::from(messages); let ws_url = "http://localhost".parse().unwrap(); - let mut stream = WsFlashBlockStream::with_connector(ws_url, connector); + let mut stream: WsFlashBlockStream<_, _, _, FlashBlock> = + WsFlashBlockStream::with_connector(ws_url, connector); let _ = stream.next().await; diff --git a/crates/optimism/flashblocks/tests/it/stream.rs b/crates/optimism/flashblocks/tests/it/stream.rs index 99e78fee23a..1de1395bc02 100644 --- a/crates/optimism/flashblocks/tests/it/stream.rs +++ b/crates/optimism/flashblocks/tests/it/stream.rs @@ -1,11 +1,12 @@ use futures_util::stream::StreamExt; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use reth_optimism_flashblocks::WsFlashBlockStream; #[tokio::test] async fn test_streaming_flashblocks_from_remote_source_is_successful() { let items = 3; let ws_url = "wss://sepolia.flashblocks.base.org/ws".parse().unwrap(); - let stream = WsFlashBlockStream::new(ws_url); + let stream: WsFlashBlockStream<_, _, _, OpFlashblockPayload> = WsFlashBlockStream::new(ws_url); let blocks: Vec<_> = stream.take(items).collect().await; diff --git a/crates/optimism/rpc/src/eth/block.rs b/crates/optimism/rpc/src/eth/block.rs index 0efd9aea988..9dacc2bbdb8 100644 --- a/crates/optimism/rpc/src/eth/block.rs +++ b/crates/optimism/rpc/src/eth/block.rs @@ -1,23 +1,26 @@ //! Loads and formats OP block RPC response. use crate::{eth::RpcNodeCore, OpEthApi, OpEthApiError}; +use reth_optimism_flashblocks::FlashblockPayload; use reth_rpc_eth_api::{ helpers::{EthBlocks, LoadBlock}, FromEvmError, RpcConvert, }; -impl EthBlocks for OpEthApi +impl EthBlocks for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { } -impl LoadBlock for OpEthApi +impl LoadBlock for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { } diff --git a/crates/optimism/rpc/src/eth/call.rs b/crates/optimism/rpc/src/eth/call.rs index db96bda83f3..169885eccab 100644 --- a/crates/optimism/rpc/src/eth/call.rs +++ b/crates/optimism/rpc/src/eth/call.rs @@ -1,30 +1,34 @@ use crate::{eth::RpcNodeCore, OpEthApi, OpEthApiError}; +use reth_optimism_flashblocks::FlashblockPayload; use reth_rpc_eth_api::{ helpers::{estimate::EstimateCall, Call, EthCall}, FromEvmError, RpcConvert, }; -impl EthCall for OpEthApi +impl EthCall for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { } -impl EstimateCall for OpEthApi +impl EstimateCall for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { } -impl Call for OpEthApi +impl Call for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { #[inline] fn call_gas_limit(&self) -> u64 { diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index d5f42a5473d..f382f2bd3ee 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -16,7 +16,7 @@ use alloy_consensus::BlockHeader; use alloy_primitives::{B256, U256}; use eyre::WrapErr; use op_alloy_network::Optimism; -use op_alloy_rpc_types_engine::OpFlashblockPayloadBase; +use op_alloy_rpc_types_engine::OpFlashblockPayload; pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder}; use reqwest::Url; use reth_chainspec::{EthereumHardforks, Hardforks}; @@ -25,9 +25,10 @@ use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes}; use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx, - FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners, - PendingBlockRx, PendingFlashBlock, WsFlashBlockStream, + FlashBlockService, FlashblockPayload, FlashblocksListeners, PendingBlockRx, PendingFlashBlock, + WsFlashBlockStream, }; +use reth_primitives_traits::NodePrimitives; use reth_rpc::eth::core::EthApiInner; use reth_rpc_eth_api::{ helpers::{ @@ -52,6 +53,16 @@ use std::{ use tokio::{sync::watch, time}; use tracing::info; +/// Extension trait for OP-specific RPC types that includes flashblock configuration. +pub trait OpRpcTypes: RpcTypes { + /// The flashblock payload type for this chain. + type Flashblock: FlashblockPayload; +} + +impl OpRpcTypes for Optimism { + type Flashblock = OpFlashblockPayload; +} + /// Maximum duration to wait for a fresh flashblock when one is being built. const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50); @@ -68,24 +79,24 @@ pub type EthApiNodeBackend = EthApiInner; /// /// This type implements the [`FullEthApi`](reth_rpc_eth_api::helpers::FullEthApi) by implemented /// all the `Eth` helper traits and prerequisite traits. -pub struct OpEthApi { +pub struct OpEthApi { /// Gateway to node's core components. - inner: Arc>, + inner: Arc>, } -impl Clone for OpEthApi { +impl Clone for OpEthApi { fn clone(&self) -> Self { Self { inner: self.inner.clone() } } } -impl OpEthApi { +impl OpEthApi { /// Creates a new `OpEthApi`. pub fn new( eth_api: EthApiNodeBackend, sequencer_client: Option, min_suggested_priority_fee: U256, - flashblocks: Option>, + flashblocks: Option>, ) -> Self { let inner = Arc::new(OpEthApiInner { eth_api, @@ -116,12 +127,14 @@ impl OpEthApi { } /// Returns a new subscription to received flashblocks. - pub fn subscribe_received_flashblocks(&self) -> Option { + pub fn subscribe_received_flashblocks( + &self, + ) -> Option>> { self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe()) } /// Returns a new subscription to flashblock sequences. - pub fn subscribe_flashblock_sequence(&self) -> Option { + pub fn subscribe_flashblock_sequence(&self) -> Option> { self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe()) } @@ -185,10 +198,11 @@ impl OpEthApi { } } -impl EthApiTypes for OpEthApi +impl EthApiTypes for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { type Error = OpEthApiError; type NetworkTypes = Rpc::Network; @@ -199,10 +213,11 @@ where } } -impl RpcNodeCore for OpEthApi +impl RpcNodeCore for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { type Primitives = N::Primitives; type Provider = N::Provider; @@ -231,10 +246,11 @@ where } } -impl RpcNodeCoreExt for OpEthApi +impl RpcNodeCoreExt for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { #[inline] fn cache(&self) -> &EthStateCache { @@ -242,10 +258,11 @@ where } } -impl EthApiSpec for OpEthApi +impl EthApiSpec for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { #[inline] fn starting_block(&self) -> U256 { @@ -253,10 +270,11 @@ where } } -impl SpawnBlocking for OpEthApi +impl SpawnBlocking for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { #[inline] fn io_task_spawner(&self) -> impl TaskSpawner { @@ -274,11 +292,12 @@ where } } -impl LoadFee for OpEthApi +impl LoadFee for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { #[inline] fn gas_oracle(&self) -> &GasPriceOracle { @@ -300,18 +319,20 @@ where } } -impl LoadState for OpEthApi +impl LoadState for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, Self: LoadPendingBlock, { } -impl EthState for OpEthApi +impl EthState for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, Self: LoadPendingBlock, { #[inline] @@ -320,30 +341,36 @@ where } } -impl EthFees for OpEthApi +impl EthFees for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { } -impl Trace for OpEthApi +impl Trace for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { } -impl fmt::Debug for OpEthApi { +impl fmt::Debug for OpEthApi { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("OpEthApi").finish_non_exhaustive() } } /// Container type `OpEthApi` -pub struct OpEthApiInner { +pub struct OpEthApiInner< + N: RpcNodeCore, + Rpc: RpcConvert, + F: FlashblockPayload = OpFlashblockPayload, +> { /// Gateway to node's core components. eth_api: EthApiNodeBackend, /// Sequencer client, configured to forward submitted transactions to sequencer of given OP @@ -356,16 +383,18 @@ pub struct OpEthApiInner { /// Flashblocks listeners. /// /// If set, provides receivers for pending blocks, flashblock sequences, and build status. - flashblocks: Option>, + flashblocks: Option>, } -impl fmt::Debug for OpEthApiInner { +impl fmt::Debug + for OpEthApiInner +{ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("OpEthApiInner").finish() } } -impl OpEthApiInner { +impl OpEthApiInner { /// Returns a reference to the [`EthApiNodeBackend`]. const fn eth_api(&self) -> &EthApiNodeBackend { &self.eth_api @@ -472,25 +501,28 @@ where N: FullNodeComponents< Evm: ConfigureEvm< NextBlockEnvCtx: BuildPendingEnv> - + From + + From<::Base> + Unpin, >, Types: NodeTypes< ChainSpec: Hardforks + EthereumHardforks, + Primitives: NodePrimitives< + SignedTx = ::SignedTx, + >, Payload: reth_node_api::PayloadTypes< ExecutionData: for<'a> TryFrom< - &'a FlashBlockCompleteSequence, + &'a FlashBlockCompleteSequence, Error: std::fmt::Display, >, >, >, >, - NetworkT: RpcTypes, + NetworkT: OpRpcTypes, OpRpcConvert: RpcConvert, - OpEthApi>: + OpEthApi, NetworkT::Flashblock>: FullEthApiServer, { - type EthApi = OpEthApi>; + type EthApi = OpEthApi, NetworkT::Flashblock>; async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result { let Self { @@ -519,7 +551,8 @@ where info!(target: "reth:cli", %ws_url, "Launching flashblocks service"); let (tx, pending_rx) = watch::channel(None); - let stream = WsFlashBlockStream::new(ws_url); + let stream: WsFlashBlockStream<_, _, _, NetworkT::Flashblock> = + WsFlashBlockStream::new(ws_url); let service = FlashBlockService::new( stream, ctx.components.evm_config().clone(), @@ -535,13 +568,16 @@ where ctx.components.task_executor().spawn(Box::pin(service.run(tx))); if flashblock_consensus { - info!(target: "reth::cli", "Launching FlashBlockConsensusClient"); - let flashblock_client = FlashBlockConsensusClient::new( - ctx.engine_handle.clone(), - flashblocks_sequence.subscribe(), - )?; - ctx.components.task_executor().spawn(Box::pin(flashblock_client.run())); + todo!("Modularize FlashBlockConsensusClient?") } + // if flashblock_consensus { + // info!(target: "reth::cli", "Launching FlashBlockConsensusClient"); + // let flashblock_client = FlashBlockConsensusClient::new( + // ctx.engine_handle.clone(), + // flashblocks_sequence.subscribe(), + // )?; + // ctx.components.task_executor().spawn(Box::pin(flashblock_client.run())); + // } Some(FlashblocksListeners::new( pending_rx, diff --git a/crates/optimism/rpc/src/eth/pending_block.rs b/crates/optimism/rpc/src/eth/pending_block.rs index bf351d7de11..8faf04c835c 100644 --- a/crates/optimism/rpc/src/eth/pending_block.rs +++ b/crates/optimism/rpc/src/eth/pending_block.rs @@ -4,6 +4,7 @@ use crate::{OpEthApi, OpEthApiError}; use alloy_consensus::BlockHeader; use alloy_eips::BlockNumberOrTag; use reth_chain_state::BlockState; +use reth_optimism_flashblocks::FlashblockPayload; use reth_rpc_eth_api::{ helpers::{pending_block::PendingEnvBuilder, LoadPendingBlock, SpawnBlocking}, FromEvmError, RpcConvert, RpcNodeCore, RpcNodeCoreExt, @@ -14,11 +15,12 @@ use reth_rpc_eth_types::{ }; use reth_storage_api::{BlockReaderIdExt, StateProviderBox, StateProviderFactory}; -impl LoadPendingBlock for OpEthApi +impl LoadPendingBlock for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { #[inline] fn pending_block(&self) -> &tokio::sync::Mutex>> { diff --git a/crates/optimism/rpc/src/eth/receipt.rs b/crates/optimism/rpc/src/eth/receipt.rs index c04a4d2c72d..05e868a9e82 100644 --- a/crates/optimism/rpc/src/eth/receipt.rs +++ b/crates/optimism/rpc/src/eth/receipt.rs @@ -10,6 +10,7 @@ use op_revm::estimate_tx_compressed_size; use reth_chainspec::ChainSpecProvider; use reth_node_api::NodePrimitives; use reth_optimism_evm::RethL1BlockInfo; +use reth_optimism_flashblocks::FlashblockPayload; use reth_optimism_forks::OpHardforks; use reth_optimism_primitives::OpReceipt; use reth_primitives_traits::SealedBlock; @@ -22,10 +23,11 @@ use reth_rpc_eth_types::{receipt::build_receipt, EthApiError}; use reth_storage_api::BlockReader; use std::fmt::Debug; -impl LoadReceipt for OpEthApi +impl LoadReceipt for OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { } diff --git a/crates/optimism/rpc/src/eth/transaction.rs b/crates/optimism/rpc/src/eth/transaction.rs index 5dee6e14c5b..018bf964175 100644 --- a/crates/optimism/rpc/src/eth/transaction.rs +++ b/crates/optimism/rpc/src/eth/transaction.rs @@ -6,6 +6,7 @@ use alloy_rpc_types_eth::TransactionInfo; use futures::StreamExt; use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction}; use reth_chain_state::CanonStateSubscriptions; +use reth_optimism_flashblocks::FlashblockPayload; use reth_optimism_primitives::DepositReceipt; use reth_primitives_traits::{ BlockBody, Recovered, SignedTransaction, SignerRecoverable, WithEncoded, @@ -27,11 +28,12 @@ use std::{ }; use tokio_stream::wrappers::WatchStream; -impl EthTransactions for OpEthApi +impl EthTransactions for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { fn signers(&self) -> &SignersForRpc { self.inner.eth_api.signers() @@ -175,11 +177,12 @@ where } } -impl LoadTransaction for OpEthApi +impl LoadTransaction for OpEthApi where N: RpcNodeCore, OpEthApiError: FromEvmError, Rpc: RpcConvert, + F: FlashblockPayload, { async fn transaction_by_hash( &self, @@ -230,10 +233,11 @@ where } } -impl OpEthApi +impl OpEthApi where N: RpcNodeCore, Rpc: RpcConvert, + F: FlashblockPayload, { /// Returns the [`SequencerClient`] if one is set. pub fn raw_tx_forwarder(&self) -> Option { diff --git a/crates/optimism/rpc/src/lib.rs b/crates/optimism/rpc/src/lib.rs index 10f8ad5dccd..086ad114be8 100644 --- a/crates/optimism/rpc/src/lib.rs +++ b/crates/optimism/rpc/src/lib.rs @@ -21,6 +21,6 @@ pub mod witness; pub use engine::OpEngineApiClient; pub use engine::{OpEngineApi, OpEngineApiServer, OP_ENGINE_CAPABILITIES}; pub use error::{OpEthApiError, OpInvalidTransactionError, SequencerClientError}; -pub use eth::{OpEthApi, OpEthApiBuilder, OpReceiptBuilder}; +pub use eth::{OpEthApi, OpEthApiBuilder, OpReceiptBuilder, OpRpcTypes}; pub use metrics::SequencerMetrics; pub use sequencer::SequencerClient; diff --git a/crates/trie/trie/src/node_iter.rs b/crates/trie/trie/src/node_iter.rs index b57fc2da707..7c2ff216835 100644 --- a/crates/trie/trie/src/node_iter.rs +++ b/crates/trie/trie/src/node_iter.rs @@ -84,16 +84,19 @@ where K: AsRef, { /// Creates a new [`TrieNodeIter`] for the state trie. + #[allow(clippy::missing_const_for_fn)] pub fn state_trie(walker: TrieWalker, hashed_cursor: H) -> Self { Self::new(walker, hashed_cursor, TrieType::State) } /// Creates a new [`TrieNodeIter`] for the storage trie. + #[allow(clippy::missing_const_for_fn)] pub fn storage_trie(walker: TrieWalker, hashed_cursor: H) -> Self { Self::new(walker, hashed_cursor, TrieType::Storage) } /// Creates a new [`TrieNodeIter`]. + #[allow(clippy::missing_const_for_fn)] fn new(walker: TrieWalker, hashed_cursor: H, trie_type: TrieType) -> Self { Self { walker, diff --git a/examples/custom-node/src/engine.rs b/examples/custom-node/src/engine.rs index fceace2d2eb..e7502ef1c28 100644 --- a/examples/custom-node/src/engine.rs +++ b/examples/custom-node/src/engine.rs @@ -1,6 +1,7 @@ use crate::{ chainspec::CustomChainSpec, evm::CustomEvmConfig, + flashblock::CustomFlashblockPayload, primitives::{CustomHeader, CustomNodePrimitives, CustomTransaction}, CustomNode, }; @@ -67,14 +68,15 @@ impl ExecutionPayload for CustomExecutionData { } } -impl TryFrom<&reth_optimism_flashblocks::FlashBlockCompleteSequence> for CustomExecutionData { +impl TryFrom<&reth_optimism_flashblocks::FlashBlockCompleteSequence> + for CustomExecutionData +{ type Error = &'static str; fn try_from( - sequence: &reth_optimism_flashblocks::FlashBlockCompleteSequence, + _sequence: &reth_optimism_flashblocks::FlashBlockCompleteSequence, ) -> Result { - let inner = OpExecutionData::try_from(sequence)?; - Ok(Self { inner, extension: sequence.last().diff.gas_used }) + todo!("convert flashblock sequence to CustomExecutionData") } } diff --git a/examples/custom-node/src/evm/config.rs b/examples/custom-node/src/evm/config.rs index f2bd3326893..7666d25a296 100644 --- a/examples/custom-node/src/evm/config.rs +++ b/examples/custom-node/src/evm/config.rs @@ -2,6 +2,7 @@ use crate::{ chainspec::CustomChainSpec, engine::{CustomExecutionData, CustomPayloadBuilderAttributes}, evm::{alloy::CustomEvmFactory, executor::CustomBlockExecutionCtx, CustomBlockAssembler}, + flashblock::CustomFlashblockPayloadBase, primitives::{Block, CustomHeader, CustomNodePrimitives, CustomTransaction}, }; use alloy_consensus::BlockHeader; @@ -9,7 +10,6 @@ use alloy_eips::{eip2718::WithEncoded, Decodable2718}; use alloy_evm::EvmEnv; use alloy_op_evm::OpBlockExecutionCtx; use alloy_rpc_types_engine::PayloadError; -use op_alloy_rpc_types_engine::flashblock::OpFlashblockPayloadBase; use op_revm::OpSpecId; use reth_engine_primitives::ExecutableTxIterator; use reth_ethereum::{ @@ -143,9 +143,9 @@ pub struct CustomNextBlockEnvAttributes { extension: u64, } -impl From for CustomNextBlockEnvAttributes { - fn from(value: OpFlashblockPayloadBase) -> Self { - Self { inner: value.into(), extension: 0 } +impl From for CustomNextBlockEnvAttributes { + fn from(_value: CustomFlashblockPayloadBase) -> Self { + todo!("map CustomFlashblockPayloadBase fields to CustomNextBlockEnvAttributes") } } diff --git a/examples/custom-node/src/flashblock.rs b/examples/custom-node/src/flashblock.rs new file mode 100644 index 00000000000..c3c7e54de33 --- /dev/null +++ b/examples/custom-node/src/flashblock.rs @@ -0,0 +1,121 @@ +use crate::primitives::CustomTransaction; +use alloy_consensus::{crypto::RecoveryError, transaction::Recovered}; +use alloy_eips::{eip2718::WithEncoded, eip4895::Withdrawals}; +use alloy_primitives::{Bloom, Bytes, B256}; +use alloy_rpc_types_engine::PayloadId; +use reth_optimism_flashblocks::{FlashblockDiff, FlashblockPayload, FlashblockPayloadBase}; +use serde::{Deserialize, Deserializer}; + +#[derive(Debug, Clone, Default)] +pub struct CustomFlashblockPayloadBase { + pub parent_hash: B256, + pub block_number: u64, + pub timestamp: u64, +} + +impl FlashblockPayloadBase for CustomFlashblockPayloadBase { + fn parent_hash(&self) -> B256 { + self.parent_hash + } + + fn block_number(&self) -> u64 { + self.block_number + } + + fn timestamp(&self) -> u64 { + self.timestamp + } +} + +#[derive(Debug, Clone, Default)] +pub struct CustomFlashblockPayloadDiff { + pub block_hash: B256, + pub state_root: B256, + pub gas_used: u64, + pub logs_bloom: Bloom, + pub receipts_root: B256, + pub transactions: Vec, +} + +impl FlashblockDiff for CustomFlashblockPayloadDiff { + fn block_hash(&self) -> B256 { + self.block_hash + } + + fn state_root(&self) -> B256 { + self.state_root + } + + fn gas_used(&self) -> u64 { + self.gas_used + } + + fn logs_bloom(&self) -> &Bloom { + &self.logs_bloom + } + + fn receipts_root(&self) -> B256 { + self.receipts_root + } + + fn transactions_raw(&self) -> &[Bytes] { + &self.transactions + } + + fn withdrawals(&self) -> Option<&Withdrawals> { + None + } + + fn withdrawals_root(&self) -> Option { + None + } +} + +#[derive(Debug, Clone)] +pub struct CustomFlashblockPayload { + pub index: u64, + pub payload_id: PayloadId, + pub base: Option, + pub diff: CustomFlashblockPayloadDiff, +} + +impl<'de> Deserialize<'de> for CustomFlashblockPayload { + fn deserialize(_deserializer: D) -> Result + where + D: Deserializer<'de>, + { + todo!("implement deserialization") + } +} + +impl FlashblockPayload for CustomFlashblockPayload { + type Base = CustomFlashblockPayloadBase; + type Diff = CustomFlashblockPayloadDiff; + type SignedTx = CustomTransaction; + + fn index(&self) -> u64 { + self.index + } + + fn payload_id(&self) -> PayloadId { + self.payload_id + } + + fn base(&self) -> Option { + self.base.clone() + } + + fn diff(&self) -> &Self::Diff { + &self.diff + } + + fn block_number(&self) -> u64 { + self.base.as_ref().map(|b| b.block_number()).unwrap_or(0) + } + + fn recover_transactions( + &self, + ) -> impl Iterator>, RecoveryError>> { + std::iter::from_fn(|| todo!("implement transaction recovery")) + } +} diff --git a/examples/custom-node/src/lib.rs b/examples/custom-node/src/lib.rs index 4210ac9b767..791d25c7dc3 100644 --- a/examples/custom-node/src/lib.rs +++ b/examples/custom-node/src/lib.rs @@ -34,6 +34,7 @@ pub mod chainspec; pub mod engine; pub mod engine_api; pub mod evm; +pub mod flashblock; pub mod pool; pub mod primitives; pub mod rpc; diff --git a/examples/custom-node/src/rpc.rs b/examples/custom-node/src/rpc.rs index b6dc7742d93..f51f46c4533 100644 --- a/examples/custom-node/src/rpc.rs +++ b/examples/custom-node/src/rpc.rs @@ -1,5 +1,6 @@ use crate::{ evm::CustomTxEnv, + flashblock::CustomFlashblockPayload, primitives::{CustomHeader, CustomTransaction}, }; use alloy_consensus::error::ValueError; @@ -7,7 +8,7 @@ use alloy_evm::EvmEnv; use alloy_network::TxSigner; use op_alloy_consensus::OpTxEnvelope; use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; -use reth_op::rpc::RpcTypes; +use reth_op::rpc::{OpRpcTypes, RpcTypes}; use reth_rpc_api::eth::{ EthTxEnvError, SignTxRequestError, SignableTxRequest, TryIntoSimTx, TryIntoTxEnv, }; @@ -17,6 +18,10 @@ use revm::context::BlockEnv; #[non_exhaustive] pub struct CustomRpcTypes; +impl OpRpcTypes for CustomRpcTypes { + type Flashblock = CustomFlashblockPayload; +} + impl RpcTypes for CustomRpcTypes { type Header = alloy_rpc_types_eth::Header; type Receipt = OpTransactionReceipt;