From dd5c9a87277f96027b808308eae2e84ce77906fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Vel=C3=A1zquez?= Date: Thu, 14 Aug 2025 14:12:49 -0600 Subject: [PATCH] fix(anvil): log's removed field always false during reorg - Add ReorgedBlockNotification system for proper reorg handling - Collect block data before unwinding storage during rollbacks - Update LogsSubscription to emit logs with removed=true for reorged blocks - Add comprehensive test using SimpleStorage contract Fixes #9933 --- crates/anvil/src/eth/api.rs | 8 +- crates/anvil/src/eth/backend/mem/mod.rs | 86 ++++++++++++++++++- crates/anvil/src/eth/backend/notifications.rs | 17 ++++ crates/anvil/src/filter.rs | 2 +- crates/anvil/src/pubsub.rs | 43 +++++++--- crates/anvil/src/server/handler.rs | 2 + crates/anvil/test-data/SimpleStorage.sol | 10 ++- crates/anvil/tests/it/anvil_api.rs | 86 +++++++++++++++++++ 8 files changed, 233 insertions(+), 21 deletions(-) diff --git a/crates/anvil/src/eth/api.rs b/crates/anvil/src/eth/api.rs index 4dee08b450475..de4201df6c269 100644 --- a/crates/anvil/src/eth/api.rs +++ b/crates/anvil/src/eth/api.rs @@ -9,7 +9,7 @@ use crate::{ self, db::SerializableState, mem::{MIN_CREATE_GAS, MIN_TRANSACTION_GAS}, - notifications::NewBlockNotifications, + notifications::{NewBlockNotifications, ReorgedBlockNotifications}, validate::TransactionValidator, }, error::{ @@ -3158,6 +3158,12 @@ impl EthApi { self.backend.new_block_notifications() } + /// Returns a new reorged block event stream that yields Notifications when blocks are removed + /// due to reorg + pub fn new_reorged_block_notifications(&self) -> ReorgedBlockNotifications { + self.backend.new_reorged_block_notifications() + } + /// Returns a new listeners for ready transactions pub fn new_ready_transactions(&self) -> Receiver { self.pool.add_ready_listener() diff --git a/crates/anvil/src/eth/backend/mem/mod.rs b/crates/anvil/src/eth/backend/mem/mod.rs index 9b6110e058ec2..225de377aa1cf 100644 --- a/crates/anvil/src/eth/backend/mem/mod.rs +++ b/crates/anvil/src/eth/backend/mem/mod.rs @@ -17,7 +17,10 @@ use crate::{ state::{storage_root, trie_accounts}, storage::MinedTransactionReceipt, }, - notifications::{NewBlockNotification, NewBlockNotifications}, + notifications::{ + NewBlockNotification, NewBlockNotifications, ReorgedBlockNotification, + ReorgedBlockNotifications, + }, time::{TimeManager, utc_from_secs}, validate::TransactionValidator, }, @@ -224,6 +227,8 @@ pub struct Backend { genesis: GenesisConfig, /// Listeners for new blocks that get notified when a new block was imported. new_block_listeners: Arc>>>, + /// Listeners for reorged blocks that get notified when blocks are removed due to reorg. + reorged_block_listeners: Arc>>>, /// Keeps track of active state snapshots at a specific block. active_state_snapshots: Arc>>, enable_steps_tracing: bool, @@ -362,6 +367,7 @@ impl Backend { time: TimeManager::new(start_timestamp), cheats: Default::default(), new_block_listeners: Default::default(), + reorged_block_listeners: Default::default(), fees, genesis, active_state_snapshots: Arc::new(Mutex::new(Default::default())), @@ -3219,6 +3225,14 @@ impl Backend { rx } + /// Returns a reorged block event stream + pub fn new_reorged_block_notifications(&self) -> ReorgedBlockNotifications { + let (tx, rx) = unbounded(); + self.reorged_block_listeners.lock().push(tx); + trace!(target: "backed", "added new reorged block listener"); + rx + } + /// Notifies all `new_block_listeners` about the new block fn notify_on_new_block(&self, header: Header, hash: B256) { // cleanup closed notification streams first, if the channel is closed we can remove the @@ -3232,6 +3246,33 @@ impl Backend { .retain(|tx| tx.unbounded_send(notification.clone()).is_ok()); } + /// Notifies all `reorged_block_listeners` about the reorged block with its data + fn notify_on_reorged_block_with_data( + &self, + header: Header, + hash: B256, + block: Block, + receipts: Vec, + ) { + // cleanup closed notification streams first, if the channel is closed we can remove the + // sender half for the set + self.reorged_block_listeners.lock().retain(|tx| !tx.is_closed()); + + let notification = + ReorgedBlockNotification { hash, header: Arc::new(header), block, receipts }; + + let _sent_count = self + .reorged_block_listeners + .lock() + .iter() + .filter(|tx| tx.unbounded_send(notification.clone()).is_ok()) + .count(); + + self.reorged_block_listeners + .lock() + .retain(|tx| tx.unbounded_send(notification.clone()).is_ok()); + } + /// Reorg the chain to a common height and execute blocks to build new chain. /// /// The state of the chain is rewound using `rewind` to the common block, including the db, @@ -3287,12 +3328,51 @@ impl Backend { } { - // Unwind the storage back to the common ancestor - self.blockchain + // Collect block data and receipts BEFORE unwinding + let mut blocks_with_receipts = Vec::new(); + let current_height = self.blockchain.storage.read().best_number; + + // Collect data for blocks that will be removed + for block_num in (common_block.header.number + 1)..=current_height { + if let Some(block_hash) = + self.blockchain.storage.read().hashes.get(&block_num).copied() + && let Some(block) = + self.blockchain.storage.read().blocks.get(&block_hash).cloned() + { + // Get receipts from transactions + let mut receipts = Vec::new(); + for tx_hash in block.transactions.iter().map(|tx| tx.hash()) { + if let Some(mined_tx) = + self.blockchain.storage.read().transactions.get(&tx_hash) + { + receipts.push(mined_tx.receipt.clone()); + } + } + if !receipts.is_empty() { + blocks_with_receipts.push((block, receipts)); + } + } + } + + // Unwind the storage back to the common ancestor and get the removed blocks + let removed_blocks = self + .blockchain .storage .write() .unwind_to(common_block.header.number, common_block.header.hash_slow()); + // Notify about each reorged block with its data + for (removed_block, (block_data, receipts)) in + removed_blocks.into_iter().zip(blocks_with_receipts.into_iter()) + { + self.notify_on_reorged_block_with_data( + removed_block.header.clone(), + removed_block.header.hash_slow(), + block_data, + receipts, + ); + } + // Set environment back to common block let mut env = self.env.write(); env.evm_env.block_env.number = U256::from(common_block.header.number); diff --git a/crates/anvil/src/eth/backend/notifications.rs b/crates/anvil/src/eth/backend/notifications.rs index 795de0cca9a55..acae2f47709e3 100644 --- a/crates/anvil/src/eth/backend/notifications.rs +++ b/crates/anvil/src/eth/backend/notifications.rs @@ -2,6 +2,7 @@ use alloy_consensus::Header; use alloy_primitives::B256; +use anvil_core::eth::{block::Block, transaction::TypedReceipt}; use futures::channel::mpsc::UnboundedReceiver; use std::sync::Arc; @@ -14,5 +15,21 @@ pub struct NewBlockNotification { pub header: Arc
, } +/// A notification that's emitted when blocks are removed due to reorg +#[derive(Clone, Debug)] +pub struct ReorgedBlockNotification { + /// Hash of the removed block + pub hash: B256, + /// block header + pub header: Arc
, + /// The removed block data + pub block: Block, + /// The receipts from the removed block + pub receipts: Vec, +} + /// Type alias for a receiver that receives [NewBlockNotification] pub type NewBlockNotifications = UnboundedReceiver; + +/// Type alias for a receiver that receives [ReorgedBlockNotification] +pub type ReorgedBlockNotifications = UnboundedReceiver; diff --git a/crates/anvil/src/filter.rs b/crates/anvil/src/filter.rs index 9d1e0958e99fe..3b9b0d6fc7a70 100644 --- a/crates/anvil/src/filter.rs +++ b/crates/anvil/src/filter.rs @@ -168,7 +168,7 @@ impl LogsFilter { let b = self.storage.block(block.hash); let receipts = self.storage.receipts(block.hash); if let (Some(receipts), Some(block)) = (receipts, b) { - logs.extend(filter_logs(block, receipts, &self.filter)) + logs.extend(filter_logs(block, receipts, &self.filter, false)) } } logs diff --git a/crates/anvil/src/pubsub.rs b/crates/anvil/src/pubsub.rs index b6d8e93a53de4..e84accd73621b 100644 --- a/crates/anvil/src/pubsub.rs +++ b/crates/anvil/src/pubsub.rs @@ -1,6 +1,9 @@ use crate::{ StorageInfo, - eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result}, + eth::{ + backend::notifications::{NewBlockNotifications, ReorgedBlockNotifications}, + error::to_rpc_result, + }, }; use alloy_network::AnyRpcTransaction; use alloy_primitives::{B256, TxHash}; @@ -20,6 +23,7 @@ use tokio::sync::mpsc::UnboundedReceiver; #[derive(Debug)] pub struct LogsSubscription { pub blocks: NewBlockNotifications, + pub reorged_blocks: ReorgedBlockNotifications, pub storage: StorageInfo, pub filter: FilteredParams, pub queued: VecDeque, @@ -37,23 +41,31 @@ impl LogsSubscription { return Poll::Ready(Some(EthSubscriptionResponse::new(params))); } - if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) { + // Check for new blocks first + if let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) { let b = self.storage.block(block.hash); let receipts = self.storage.receipts(block.hash); if let (Some(receipts), Some(block)) = (receipts, b) { - let logs = filter_logs(block, receipts, &self.filter); - if logs.is_empty() { - // this ensures we poll the receiver until it is pending, in which case the - // underlying `UnboundedReceiver` will register the new waker, see - // [`futures::channel::mpsc::UnboundedReceiver::poll_next()`] - continue; + let logs = filter_logs(block, receipts, &self.filter, false); + if !logs.is_empty() { + self.queued.extend(logs); } - self.queued.extend(logs) } - } else { - return Poll::Ready(None); + continue; } + // Check for reorged blocks + if let Poll::Ready(Some(reorged_block)) = self.reorged_blocks.poll_next_unpin(cx) { + // Use the block and receipts from the notification + let logs = + filter_logs(reorged_block.block, reorged_block.receipts, &self.filter, true); + if !logs.is_empty() { + self.queued.extend(logs); + } + continue; + } + + // If we reach here, both streams are pending if self.queued.is_empty() { return Poll::Pending; } @@ -147,7 +159,12 @@ impl Stream for EthSubscription { } /// Returns all the logs that match the given filter -pub fn filter_logs(block: Block, receipts: Vec, filter: &FilteredParams) -> Vec { +pub fn filter_logs( + block: Block, + receipts: Vec, + filter: &FilteredParams, + removed: bool, +) -> Vec { /// Determines whether to add this log fn add_log( block_hash: B256, @@ -182,7 +199,7 @@ pub fn filter_logs(block: Block, receipts: Vec, filter: &FilteredP transaction_hash: Some(transaction_hash), transaction_index: Some(receipt_index as u64), log_index: Some(log_index as u64), - removed: false, + removed, block_timestamp: Some(block.header.timestamp), }); } diff --git a/crates/anvil/src/server/handler.rs b/crates/anvil/src/server/handler.rs index e880e5e5628a9..108e34f85163a 100644 --- a/crates/anvil/src/server/handler.rs +++ b/crates/anvil/src/server/handler.rs @@ -76,9 +76,11 @@ impl PubSubEthRpcHandler { trace!(target: "rpc::ws", "received logs subscription {:?}", params); let blocks = self.api.new_block_notifications(); + let reorged_blocks = self.api.new_reorged_block_notifications(); let storage = self.api.storage_info(); EthSubscription::Logs(Box::new(LogsSubscription { blocks, + reorged_blocks, storage, filter: params, queued: Default::default(), diff --git a/crates/anvil/test-data/SimpleStorage.sol b/crates/anvil/test-data/SimpleStorage.sol index 85f92db687ce9..911837e62d274 100644 --- a/crates/anvil/test-data/SimpleStorage.sol +++ b/crates/anvil/test-data/SimpleStorage.sol @@ -1,8 +1,12 @@ pragma solidity >=0.4.24; contract SimpleStorage { - - event ValueChanged(address indexed author, address indexed oldAuthor, string oldValue, string newValue); + event ValueChanged( + address indexed author, + address indexed oldAuthor, + string oldValue, + string newValue + ); address public lastSender; string _value; @@ -13,7 +17,7 @@ contract SimpleStorage { _value = value; } - function getValue() view public returns (string memory) { + function getValue() public view returns (string memory) { return _value; } diff --git a/crates/anvil/tests/it/anvil_api.rs b/crates/anvil/tests/it/anvil_api.rs index 77089e4bdb080..5cabc5632caf3 100644 --- a/crates/anvil/tests/it/anvil_api.rs +++ b/crates/anvil/tests/it/anvil_api.rs @@ -32,6 +32,7 @@ use anvil_core::{ }, types::{ReorgOptions, TransactionData}, }; +use futures::StreamExt; use revm::primitives::hardfork::SpecId; use std::{ str::FromStr, @@ -1147,3 +1148,88 @@ async fn test_anvil_reset_fork_to_non_fork() { let new_block = provider.get_block(BlockId::latest()).await.unwrap().unwrap(); assert_eq!(new_block.header.number, 1); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_reorg_logs_removed_field() { + use crate::utils::connect_pubsub; + use alloy_rpc_types::Filter; + + let (api, handle) = spawn(NodeConfig::test()).await; + let provider = handle.http_provider(); + let accounts = handle.dev_wallets().collect::>(); + + // Deploy SimpleStorage contract that emits events + let storage = abi::SimpleStorage::deploy(&provider, "initial value".to_string()).await.unwrap(); + + // Emit some events in different blocks by calling setValue + let _ = storage + .setValue("value1".to_string()) + .from(accounts[0].address()) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + api.mine_one().await; + + let _ = storage + .setValue("value2".to_string()) + .from(accounts[1].address()) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + api.mine_one().await; + + let _ = storage + .setValue("value3".to_string()) + .from(accounts[2].address()) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + api.mine_one().await; + + // Set up a log subscription for ValueChanged events using connect_pubsub + let ws_provider = connect_pubsub(&handle.ws_endpoint()).await; + let filter = Filter::new().address(*storage.address()); + let logs_sub = ws_provider.subscribe_logs(&filter).await.unwrap(); + let mut sub = logs_sub.into_stream(); + + // Perform a reorg to depth 2 (this should remove the last 2 blocks with events) + api.anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs: vec![] }).await.unwrap(); + + // Wait for logs with removed: true + let mut removed_logs = vec![]; + let mut timeout_count = 0; + let max_timeout = 50; // Increased timeout + + while removed_logs.len() < 2 && timeout_count < max_timeout { + tokio::select! { + Some(log) = sub.next() => { + if log.removed { + removed_logs.push(log); + } + } + _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => { + timeout_count += 1; + } + } + } + + // Assert that we received logs with removed: true for the reorged blocks + assert_eq!(removed_logs.len(), 2, "Should receive 2 logs with removed: true"); + + // Verify the logs are for the reorged ValueChanged events + for log in &removed_logs { + assert!(log.removed, "Log should have removed: true"); + assert_eq!(log.inner.address, *storage.address()); + // Verify it's an event log (has topics) + assert!(!log.topics().is_empty(), "Log should have topics"); + } +}