diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index c47c383f057..c166472bf76 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -40,6 +40,7 @@ reth-consensus.workspace = true reth-consensus-common.workspace = true reth-node-api.workspace = true reth-trie-common.workspace = true +reth-ethereum-primitives.workspace = true # ethereum alloy-evm = { workspace = true, features = ["overrides"] } @@ -95,7 +96,6 @@ derive_more.workspace = true itertools.workspace = true [dev-dependencies] -reth-ethereum-primitives.workspace = true reth-testing-utils.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 1c7982f80fd..64f3a1aae85 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -2,9 +2,13 @@ use std::sync::Arc; +use alloy_consensus::{BlockHeader, TxReceipt}; +use alloy_eips::eip2718::{Encodable2718, Typed2718}; use alloy_primitives::{TxHash, U256}; use alloy_rpc_types_eth::{ - pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata}, + pubsub::{ + Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams, + }, Filter, Header, Log, }; use futures::StreamExt; @@ -12,12 +16,17 @@ use jsonrpsee::{ server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink, }; use reth_chain_state::CanonStateSubscriptions; +use reth_chainspec::{ChainSpecProvider, EthChainSpec}; use reth_network_api::NetworkInfo; -use reth_primitives_traits::NodePrimitives; +use reth_primitives_traits::{ + BlockBody, NodePrimitives, SealedBlock, SignedTransaction, TransactionMeta, +}; +use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert}; use reth_rpc_eth_api::{ - pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction, + helpers::receipt::calculate_gas_used_and_next_log_index, pubsub::EthPubSubApiServer, + EthApiTypes, RpcNodeCore, RpcTransaction, }; -use reth_rpc_eth_types::logs_utils; +use reth_rpc_eth_types::{logs_utils, receipt::build_receipt, EthApiError}; use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err}; use reth_storage_api::BlockNumReader; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; @@ -27,7 +36,7 @@ use tokio_stream::{ wrappers::{BroadcastStream, ReceiverStream}, Stream, }; -use tracing::error; +use tracing::{error, warn}; /// `Eth` pubsub RPC implementation. /// @@ -94,6 +103,14 @@ where self.inner.log_stream(filter) } + /// Returns a stream that yields all transaction receipts that match the given filter. + pub fn transaction_receipts_stream( + &self, + params: TransactionReceiptsParams, + ) -> impl Stream> { + self.inner.transaction_receipts_stream(params) + } + /// The actual handler for an accepted [`EthPubSub::subscribe`] call. pub async fn handle_accepted( &self, @@ -109,7 +126,7 @@ where // if no params are provided, used default filter params let filter = match params { Some(Params::Logs(filter)) => *filter, - Some(Params::Bool(_)) => { + Some(Params::Bool(_) | Params::TransactionReceipts(_)) => { return Err(invalid_params_rpc_err("Invalid params for logs")) } _ => Default::default(), @@ -144,7 +161,7 @@ where Params::Bool(false) | Params::None => { // only hashes requested } - Params::Logs(_) => { + Params::Logs(_) | Params::TransactionReceipts(_) => { return Err(invalid_params_rpc_err( "Invalid params for newPendingTransactions", )) @@ -199,6 +216,24 @@ where Ok(()) } + SubscriptionKind::TransactionReceipts => { + // Parse transaction receipts parameters + let receipt_params = match params { + Some(Params::TransactionReceipts(params)) => params, + Some(Params::Logs(_) | Params::Bool(_)) => { + return Err(invalid_params_rpc_err( + "Invalid params for transactionReceipts subscription", + )) + } + Some(Params::None) | None => { + // Default to all transaction receipts if no params provided + TransactionReceiptsParams { transaction_hashes: None } + } + }; + + pipe_from_stream(accepted_sink, self.transaction_receipts_stream(receipt_params)) + .await + } } } } @@ -226,7 +261,10 @@ where let sink = pending.accept().await?; let pubsub = self.clone(); self.inner.subscription_task_spawner.spawn(Box::pin(async move { - let _ = pubsub.handle_accepted(sink, kind, params).await; + let result = pubsub.handle_accepted(sink, kind, params).await; + if let Err(err) = result { + warn!(target = "rpc", %err, "Subscription task ended with error"); + } })); Ok(()) @@ -386,4 +424,156 @@ where futures::stream::iter(all_logs) }) } + + /// Returns a stream that yields all transaction receipts that match the given filter. + fn transaction_receipts_stream( + &self, + params: TransactionReceiptsParams, + ) -> impl Stream> { + self.eth_api.provider().canonical_state_stream().filter_map(move |canon_state| { + std::future::ready({ + // Get the committed chain (new blocks) + let chain = canon_state.committed(); + + // Process all transactions across all blocks using a single iterator chain + let all_receipts: Vec<_> = chain + .blocks_and_receipts() + .flat_map(|(block, block_receipts)| { + let block_number = block.number(); + let transactions: Vec<_> = block.body().transactions().iter().collect(); + + // Skip empty blocks + if transactions.is_empty() { + return Vec::new().into_iter(); + } + + // Verify transaction/receipt count match + if transactions.len() != block_receipts.len() { + error!(target = "rpc", + block_number = %block_number, + block_hash = %block.hash(), + tx_count = transactions.len(), + receipt_count = block_receipts.len(), + "Transaction and receipt count mismatch" + ); + return Vec::new().into_iter(); + } + + // Calculate blob params + let blob_params = self + .eth_api + .provider() + .chain_spec() + .blob_params_at_timestamp(block.header().timestamp()); + + // Process all transactions in this block + let processed_receipts: Vec<_> = transactions + .iter() + .zip(block_receipts.iter()) + .enumerate() + .filter_map(|(tx_index, (tx, receipt))| { + let tx_hash = tx.trie_hash(); + + // Apply transaction hash filter + let should_include = match ¶ms.transaction_hashes { + Some(hashes) if !hashes.is_empty() => hashes.contains(&tx_hash), + _ => true, + }; + + if !should_include { + return None; + } + + // Calculate gas used and next log index + let (gas_used_before, next_log_index) = + calculate_gas_used_and_next_log_index( + tx_index as u64, + block_receipts, + ); + + // Convert to RPC receipt + match self.build_rpc_receipt_with_tx_data( + block, + tx_index as u64, + tx, + receipt, + gas_used_before, + next_log_index, + blob_params, + ) { + Ok(rpc_receipt) => Some(rpc_receipt), + Err(err) => { + error!(target = "rpc", %err, tx_hash = %tx_hash, "Failed to convert receipt to RPC format"); + None + } + } + }) + .collect(); + + processed_receipts.into_iter() + }) + .collect(); + + if all_receipts.is_empty() { + None + } else { + Some(all_receipts) + } + }) + }) + } + + /// Converts a receipt to RPC format with transaction data. + #[expect(clippy::too_many_arguments)] + #[inline] + fn build_rpc_receipt_with_tx_data( + &self, + block: &SealedBlock, + tx_index: u64, + tx: &::SignedTx, + receipt: &::Receipt, + gas_used_before: u64, + next_log_index: usize, + blob_params: Option, + ) -> Result { + // Recover signer + let signer = tx.try_recover().map_err(|_| EthApiError::InvalidTransactionSignature)?; + let recovered_tx = tx.clone().with_signer(signer); + + let meta = TransactionMeta { + tx_hash: tx.trie_hash(), + index: tx_index, + block_hash: block.hash(), + block_number: block.number(), + base_fee: block.header().base_fee_per_gas(), + excess_blob_gas: block.header().excess_blob_gas(), + timestamp: block.header().timestamp(), + }; + + let convert_input: ConvertReceiptInput<'_, N> = ConvertReceiptInput { + receipt: receipt.clone(), + tx: recovered_tx.as_recovered_ref(), + gas_used: receipt.cumulative_gas_used() - gas_used_before, + next_log_index, + meta, + }; + + let rpc_receipt = build_receipt( + convert_input, + blob_params, + |receipt, next_log_index, meta| { + alloy_consensus::ReceiptEnvelope::from(reth_ethereum_primitives::RpcReceipt { + tx_type: alloy_consensus::TxType::try_from(receipt.ty()).unwrap_or_else(|_| { + warn!(target: "rpc", tx_hash = %meta.tx_hash, tx_type = receipt.ty(), "Unknown tx type, fallback to Legacy"); + alloy_consensus::TxType::Legacy + }), + success: receipt.status(), + cumulative_gas_used: receipt.cumulative_gas_used(), + logs: Log::collect_for_receipt(next_log_index, meta, receipt.logs().iter().cloned()), + }) + }, + ); + + Ok(rpc_receipt) + } }