Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crates/rpc/src/base/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Instant};

use alloy_consensus::{BlockHeader, Transaction as _, transaction::SignerRecoverable};
use alloy_primitives::{B256, U256};
use alloy_rpc_types::state::StateOverride;
use base_bundles::{BundleExtensions, BundleTxs, ParsedBundle, TransactionResult};
use eyre::{Result as EyreResult, eyre};
use reth::revm::db::State;
Expand All @@ -14,9 +15,13 @@ const BLOCK_TIME: u64 = 2; // 2 seconds per block

/// Simulates and meters a bundle of transactions
///
/// Takes a state provider, chain spec, decoded transactions, block header, and bundle metadata,
/// Takes a state provider, chain spec, decoded transactions, block header, bundle metadata, and optional state overrides,
/// and executes transactions in sequence to measure gas usage and execution time.
///
/// Note: The state_overrides parameter is currently unused. The correct approach is to use the header
/// from the latest pending flashblock, which ensures the state provider already includes all committed
/// flashblock transactions (including updated nonces).
///
/// Returns a tuple of:
/// - Vector of transaction results
/// - Total gas used
Expand All @@ -28,6 +33,7 @@ pub fn meter_bundle<SP>(
chain_spec: Arc<OpChainSpec>,
bundle: ParsedBundle,
header: &SealedHeader,
_state_overrides: Option<StateOverride>,
) -> EyreResult<(Vec<TransactionResult>, u64, U256, B256, u128)>
where
SP: reth_provider::StateProvider,
Expand Down
74 changes: 46 additions & 28 deletions crates/rpc/src/base/meter_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use alloy_consensus::Header;
use std::sync::Arc;

use alloy_consensus::{Header, Sealed};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{B256, U256};
use base_bundles::{Bundle, MeterBundleResponse, ParsedBundle};
use base_reth_flashblocks::FlashblocksAPI;
use jsonrpsee::core::{RpcResult, async_trait};
use reth::providers::BlockReaderIdExt;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_primitives::OpBlock;
use reth_primitives_traits::SealedHeader;
use reth_provider::{BlockReader, ChainSpecProvider, HeaderProvider, StateProviderFactory};
use tracing::{error, info};

Expand All @@ -15,11 +19,12 @@ use super::{

/// Implementation of the metering RPC API
#[derive(Debug)]
pub struct MeteringApiImpl<Provider> {
pub struct MeteringApiImpl<Provider, FB> {
provider: Provider,
flashblocks_state: Arc<FB>,
}

impl<Provider> MeteringApiImpl<Provider>
impl<Provider, FB> MeteringApiImpl<Provider, FB>
where
Provider: StateProviderFactory
+ ChainSpecProvider<ChainSpec = OpChainSpec>
Expand All @@ -29,13 +34,13 @@ where
+ Clone,
{
/// Creates a new instance of MeteringApi
pub const fn new(provider: Provider) -> Self {
Self { provider }
pub const fn new(provider: Provider, flashblocks_state: Arc<FB>) -> Self {
Self { provider, flashblocks_state }
}
}

#[async_trait]
impl<Provider> MeteringApiServer for MeteringApiImpl<Provider>
impl<Provider, FB> MeteringApiServer for MeteringApiImpl<Provider, FB>
where
Provider: StateProviderFactory
+ ChainSpecProvider<ChainSpec = OpChainSpec>
Expand All @@ -46,6 +51,7 @@ where
+ Send
+ Sync
+ 'static,
FB: FlashblocksAPI + Send + Sync + 'static,
{
async fn meter_bundle(&self, bundle: Bundle) -> RpcResult<MeterBundleResponse> {
info!(
Expand All @@ -54,24 +60,33 @@ where
"Starting bundle metering"
);

// Get the latest header
let header = self
.provider
.sealed_header_by_number_or_tag(BlockNumberOrTag::Latest)
.map_err(|e| {
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::ErrorCode::InternalError.code(),
format!("Failed to get latest header: {}", e),
None::<()>,
)
})?
.ok_or_else(|| {
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::ErrorCode::InternalError.code(),
"Latest block not found".to_string(),
None::<()>,
)
})?;
// Get pending blocks from flashblocks to get the latest header including pending state
let pending_blocks = self.flashblocks_state.get_pending_blocks();

// Use the latest pending header if available, otherwise fallback to latest finalized
let header: SealedHeader<Header> = if let Some(ref pb) = *pending_blocks {
info!("Using pending block header for bundle metering");
let latest_header: Sealed<Header> = pb.latest_header();
SealedHeader::new(latest_header.inner().clone(), latest_header.hash())
} else {
info!("No pending blocks, using latest finalized header for bundle metering");
self.provider
.sealed_header_by_number_or_tag(BlockNumberOrTag::Latest)
.map_err(|e| {
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::ErrorCode::InternalError.code(),
format!("Failed to get latest header: {}", e),
None::<()>,
)
})?
.ok_or_else(|| {
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::ErrorCode::InternalError.code(),
"Latest block not found".to_string(),
None::<()>,
)
})?
};

let parsed_bundle = ParsedBundle::try_from(bundle).map_err(|e| {
jsonrpsee::types::ErrorObjectOwned::owned(
Expand All @@ -81,8 +96,9 @@ where
)
})?;

// Get state provider for the block
let state_provider = self.provider.state_by_block_hash(header.hash()).map_err(|e| {
// Get state provider for the parent block (to simulate on top of)
let parent_hash = header.parent_hash;
let state_provider = self.provider.state_by_block_hash(parent_hash).map_err(|e| {
error!(error = %e, "Failed to get state provider");
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::ErrorCode::InternalError.code(),
Expand All @@ -92,8 +108,9 @@ where
})?;

// Meter bundle using utility function
// The state provider already includes all committed flashblock state
let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) =
meter_bundle(state_provider, self.provider.chain_spec(), parsed_bundle, &header)
meter_bundle(state_provider, self.provider.chain_spec(), parsed_bundle, &header, None)
.map_err(|e| {
error!(error = %e, "Bundle metering failed");
jsonrpsee::types::ErrorObjectOwned::owned(
Expand Down Expand Up @@ -209,7 +226,7 @@ where
}
}

impl<Provider> MeteringApiImpl<Provider>
impl<Provider, FB> MeteringApiImpl<Provider, FB>
where
Provider: StateProviderFactory
+ ChainSpecProvider<ChainSpec = OpChainSpec>
Expand All @@ -220,6 +237,7 @@ where
+ Send
+ Sync
+ 'static,
FB: FlashblocksAPI + Send + Sync + 'static,
{
/// Internal helper to meter a block's execution
fn meter_block_internal(&self, block: &OpBlock) -> RpcResult<MeterBlockResponse> {
Expand Down
24 changes: 21 additions & 3 deletions crates/rpc/tests/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ fn meter_bundle_empty_transactions() -> eyre::Result<()> {
let parsed_bundle = create_parsed_bundle(Vec::new())?;

let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) =
meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?;
meter_bundle(
state_provider,
harness.chain_spec.clone(),
parsed_bundle,
&harness.header,
None,
)?;

assert!(results.is_empty());
assert_eq!(total_gas_used, 0);
Expand Down Expand Up @@ -179,7 +185,13 @@ fn meter_bundle_single_transaction() -> eyre::Result<()> {
let parsed_bundle = create_parsed_bundle(vec![envelope.clone()])?;

let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) =
meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?;
meter_bundle(
state_provider,
harness.chain_spec.clone(),
parsed_bundle,
&harness.header,
None,
)?;

assert_eq!(results.len(), 1);
let result = &results[0];
Expand Down Expand Up @@ -256,7 +268,13 @@ fn meter_bundle_multiple_transactions() -> eyre::Result<()> {
let parsed_bundle = create_parsed_bundle(vec![envelope_1.clone(), envelope_2.clone()])?;

let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) =
meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?;
meter_bundle(
state_provider,
harness.chain_spec.clone(),
parsed_bundle,
&harness.header,
None,
)?;

assert_eq!(results.len(), 2);
assert!(total_execution_time > 0);
Expand Down
23 changes: 22 additions & 1 deletion crates/rpc/tests/meter_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{any::Any, net::SocketAddr, sync::Arc};
use alloy_eips::Encodable2718;
use alloy_primitives::{Bytes, U256, address, b256, bytes};
use alloy_rpc_client::RpcClient;
use alloy_rpc_types::state::StateOverride;
use arc_swap::Guard;
use base_bundles::{Bundle, MeterBundleResponse};
use base_reth_flashblocks::{FlashblocksAPI, PendingBlocks};
use base_reth_rpc::{MeteringApiImpl, MeteringApiServer};
use base_reth_test_utils::{init_silenced_tracing, load_genesis};
use op_alloy_consensus::OpTxEnvelope;
Expand All @@ -21,6 +24,23 @@ use reth_optimism_node::{OpNode, args::RollupArgs};
use reth_optimism_primitives::OpTransactionSigned;
use reth_provider::providers::BlockchainProvider;
use reth_transaction_pool::test_utils::TransactionBuilder;
use tokio::sync::broadcast;

/// Mock FlashblocksAPI implementation for testing
#[derive(Debug, Clone)]
struct MockFlashblocksAPI;

impl FlashblocksAPI for MockFlashblocksAPI {
fn get_pending_blocks(&self) -> Guard<Option<Arc<PendingBlocks>>> {
Guard::from_inner(arc_swap::Guard::into_inner(
arc_swap::ArcSwapOption::<PendingBlocks>::new(None).load(),
))
}

fn subscribe_to_flashblocks(&self) -> broadcast::Receiver<Arc<PendingBlocks>> {
broadcast::channel(1).1
}
}

struct NodeContext {
http_api_addr: SocketAddr,
Expand Down Expand Up @@ -84,7 +104,8 @@ async fn setup_node() -> eyre::Result<NodeContext> {
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
.extend_rpc_modules(move |ctx| {
let metering_api = MeteringApiImpl::new(ctx.provider().clone());
let mock_flashblocks = Arc::new(MockFlashblocksAPI);
let metering_api = MeteringApiImpl::new(ctx.provider().clone(), mock_flashblocks);
ctx.modules.merge_configured(metering_api.into_rpc())?;
Ok(())
})
Expand Down
18 changes: 12 additions & 6 deletions crates/runner/src/extensions/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ impl BaseNodeExtension for BaseRpcExtension {
let sequencer_rpc = self.sequencer_rpc;

builder.extend_rpc_modules(move |ctx| {
if metering_enabled {
info!(message = "Starting Metering RPC");
let metering_api = MeteringApiImpl::new(ctx.provider().clone());
ctx.modules.merge_configured(metering_api.into_rpc())?;
}

let proxy_api =
TransactionStatusApiImpl::new(sequencer_rpc.clone(), ctx.pool().clone())
.expect("Failed to create transaction status proxy");
Expand All @@ -77,6 +71,13 @@ impl BaseNodeExtension for BaseRpcExtension {
let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url);
flashblocks_client.start();

// Register metering RPC with flashblocks state
if metering_enabled {
info!(message = "Starting Metering RPC with flashblocks support");
let metering_api = MeteringApiImpl::new(ctx.provider().clone(), fb.clone());
ctx.modules.merge_configured(metering_api.into_rpc())?;
}

let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
Expand All @@ -91,6 +92,11 @@ impl BaseNodeExtension for BaseRpcExtension {
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;
} else {
info!(message = "flashblocks integration is disabled");

// If flashblocks is disabled but metering is enabled, we can't support it
if metering_enabled {
info!(message = "Metering RPC requires flashblocks to be enabled, skipping");
}
}

Ok(())
Expand Down
Loading