Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/rpc/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ reth-consensus.workspace = true
reth-consensus-common.workspace = true
reth-node-api.workspace = true
reth-trie-common.workspace = true
reth-ethereum-primitives.workspace = true

# ethereum
alloy-evm = { workspace = true, features = ["overrides"] }
Expand Down Expand Up @@ -95,7 +96,6 @@ derive_more.workspace = true
itertools.workspace = true

[dev-dependencies]
reth-ethereum-primitives.workspace = true
reth-testing-utils.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
Expand Down
206 changes: 198 additions & 8 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,31 @@

use std::sync::Arc;

use alloy_consensus::{BlockHeader, TxReceipt};
use alloy_eips::eip2718::{Encodable2718, Typed2718};
use alloy_primitives::{TxHash, U256};
use alloy_rpc_types_eth::{
pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams,
},
Filter, Header, Log,
};
use futures::StreamExt;
use jsonrpsee::{
server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
};
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_network_api::NetworkInfo;
use reth_primitives_traits::NodePrimitives;
use reth_primitives_traits::{
BlockBody, NodePrimitives, SealedBlock, SignedTransaction, TransactionMeta,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert};
use reth_rpc_eth_api::{
pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
helpers::receipt::calculate_gas_used_and_next_log_index, pubsub::EthPubSubApiServer,
EthApiTypes, RpcNodeCore, RpcTransaction,
};
use reth_rpc_eth_types::logs_utils;
use reth_rpc_eth_types::{logs_utils, receipt::build_receipt, EthApiError};
use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
use reth_storage_api::BlockNumReader;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
Expand All @@ -27,7 +36,7 @@ use tokio_stream::{
wrappers::{BroadcastStream, ReceiverStream},
Stream,
};
use tracing::error;
use tracing::{error, warn};

/// `Eth` pubsub RPC implementation.
///
Expand Down Expand Up @@ -94,6 +103,14 @@ where
self.inner.log_stream(filter)
}

/// Returns a stream that yields all transaction receipts that match the given filter.
pub fn transaction_receipts_stream(
&self,
params: TransactionReceiptsParams,
) -> impl Stream<Item = Vec<alloy_rpc_types_eth::TransactionReceipt>> {
self.inner.transaction_receipts_stream(params)
}

/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
pub async fn handle_accepted(
&self,
Expand All @@ -109,7 +126,7 @@ where
// if no params are provided, used default filter params
let filter = match params {
Some(Params::Logs(filter)) => *filter,
Some(Params::Bool(_)) => {
Some(Params::Bool(_) | Params::TransactionReceipts(_)) => {
return Err(invalid_params_rpc_err("Invalid params for logs"))
}
_ => Default::default(),
Expand Down Expand Up @@ -144,7 +161,7 @@ where
Params::Bool(false) | Params::None => {
// only hashes requested
}
Params::Logs(_) => {
Params::Logs(_) | Params::TransactionReceipts(_) => {
return Err(invalid_params_rpc_err(
"Invalid params for newPendingTransactions",
))
Expand Down Expand Up @@ -199,6 +216,24 @@ where

Ok(())
}
SubscriptionKind::TransactionReceipts => {
// Parse transaction receipts parameters
let receipt_params = match params {
Some(Params::TransactionReceipts(params)) => params,
Some(Params::Logs(_) | Params::Bool(_)) => {
return Err(invalid_params_rpc_err(
"Invalid params for transactionReceipts subscription",
))
}
Some(Params::None) | None => {
// Default to all transaction receipts if no params provided
TransactionReceiptsParams { transaction_hashes: None }
}
};

pipe_from_stream(accepted_sink, self.transaction_receipts_stream(receipt_params))
.await
}
}
}
}
Expand Down Expand Up @@ -226,7 +261,10 @@ where
let sink = pending.accept().await?;
let pubsub = self.clone();
self.inner.subscription_task_spawner.spawn(Box::pin(async move {
let _ = pubsub.handle_accepted(sink, kind, params).await;
let result = pubsub.handle_accepted(sink, kind, params).await;
if let Err(err) = result {
warn!(target = "rpc", %err, "Subscription task ended with error");
}
}));

Ok(())
Expand Down Expand Up @@ -386,4 +424,156 @@ where
futures::stream::iter(all_logs)
})
}

/// Returns a stream that yields all transaction receipts that match the given filter.
fn transaction_receipts_stream(
&self,
params: TransactionReceiptsParams,
) -> impl Stream<Item = Vec<alloy_rpc_types_eth::TransactionReceipt>> {
self.eth_api.provider().canonical_state_stream().filter_map(move |canon_state| {
std::future::ready({
// Get the committed chain (new blocks)
let chain = canon_state.committed();

// Process all transactions across all blocks using a single iterator chain
let all_receipts: Vec<_> = chain
.blocks_and_receipts()
.flat_map(|(block, block_receipts)| {
let block_number = block.number();
let transactions: Vec<_> = block.body().transactions().iter().collect();

// Skip empty blocks
if transactions.is_empty() {
return Vec::new().into_iter();
}

// Verify transaction/receipt count match
if transactions.len() != block_receipts.len() {
error!(target = "rpc",
block_number = %block_number,
block_hash = %block.hash(),
tx_count = transactions.len(),
receipt_count = block_receipts.len(),
"Transaction and receipt count mismatch"
);
return Vec::new().into_iter();
}

// Calculate blob params
let blob_params = self
.eth_api
.provider()
.chain_spec()
.blob_params_at_timestamp(block.header().timestamp());

// Process all transactions in this block
let processed_receipts: Vec<_> = transactions
.iter()
.zip(block_receipts.iter())
.enumerate()
.filter_map(|(tx_index, (tx, receipt))| {
let tx_hash = tx.trie_hash();

// Apply transaction hash filter
let should_include = match &params.transaction_hashes {
Some(hashes) if !hashes.is_empty() => hashes.contains(&tx_hash),
_ => true,
};

if !should_include {
return None;
}

// Calculate gas used and next log index
let (gas_used_before, next_log_index) =
calculate_gas_used_and_next_log_index(
tx_index as u64,
block_receipts,
);

// Convert to RPC receipt
match self.build_rpc_receipt_with_tx_data(
block,
tx_index as u64,
tx,
receipt,
gas_used_before,
next_log_index,
blob_params,
) {
Ok(rpc_receipt) => Some(rpc_receipt),
Err(err) => {
error!(target = "rpc", %err, tx_hash = %tx_hash, "Failed to convert receipt to RPC format");
None
}
}
})
.collect();

processed_receipts.into_iter()
})
.collect();

if all_receipts.is_empty() {
None
} else {
Some(all_receipts)
}
})
})
}

/// Converts a receipt to RPC format with transaction data.
#[expect(clippy::too_many_arguments)]
#[inline]
fn build_rpc_receipt_with_tx_data(
&self,
block: &SealedBlock<N::Block>,
tx_index: u64,
tx: &<N as NodePrimitives>::SignedTx,
receipt: &<N as NodePrimitives>::Receipt,
gas_used_before: u64,
next_log_index: usize,
blob_params: Option<alloy_eips::eip7840::BlobParams>,
) -> Result<alloy_rpc_types_eth::TransactionReceipt, EthApiError> {
// Recover signer
let signer = tx.try_recover().map_err(|_| EthApiError::InvalidTransactionSignature)?;
let recovered_tx = tx.clone().with_signer(signer);

let meta = TransactionMeta {
tx_hash: tx.trie_hash(),
index: tx_index,
block_hash: block.hash(),
block_number: block.number(),
base_fee: block.header().base_fee_per_gas(),
excess_blob_gas: block.header().excess_blob_gas(),
timestamp: block.header().timestamp(),
};

let convert_input: ConvertReceiptInput<'_, N> = ConvertReceiptInput {
receipt: receipt.clone(),
tx: recovered_tx.as_recovered_ref(),
gas_used: receipt.cumulative_gas_used() - gas_used_before,
next_log_index,
meta,
};

let rpc_receipt = build_receipt(
convert_input,
blob_params,
|receipt, next_log_index, meta| {
alloy_consensus::ReceiptEnvelope::from(reth_ethereum_primitives::RpcReceipt {
tx_type: alloy_consensus::TxType::try_from(receipt.ty()).unwrap_or_else(|_| {
warn!(target: "rpc", tx_hash = %meta.tx_hash, tx_type = receipt.ty(), "Unknown tx type, fallback to Legacy");
alloy_consensus::TxType::Legacy
}),
success: receipt.status(),
cumulative_gas_used: receipt.cumulative_gas_used(),
logs: Log::collect_for_receipt(next_log_index, meta, receipt.logs().iter().cloned()),
})
},
);

Ok(rpc_receipt)
}
}