Skip to content

fix(anvil): log's removed field always false during reorg #11298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crates/anvil/src/eth/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
self,
db::SerializableState,
mem::{MIN_CREATE_GAS, MIN_TRANSACTION_GAS},
notifications::NewBlockNotifications,
notifications::{NewBlockNotifications, ReorgedBlockNotifications},
validate::TransactionValidator,
},
error::{
Expand Down Expand Up @@ -3158,6 +3158,12 @@ impl EthApi {
self.backend.new_block_notifications()
}

/// Returns a new reorged block event stream that yields Notifications when blocks are removed
/// due to reorg
pub fn new_reorged_block_notifications(&self) -> ReorgedBlockNotifications {
self.backend.new_reorged_block_notifications()
}

/// Returns a new listeners for ready transactions
pub fn new_ready_transactions(&self) -> Receiver<TxHash> {
self.pool.add_ready_listener()
Expand Down
86 changes: 83 additions & 3 deletions crates/anvil/src/eth/backend/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use crate::{
state::{storage_root, trie_accounts},
storage::MinedTransactionReceipt,
},
notifications::{NewBlockNotification, NewBlockNotifications},
notifications::{
NewBlockNotification, NewBlockNotifications, ReorgedBlockNotification,
ReorgedBlockNotifications,
},
time::{TimeManager, utc_from_secs},
validate::TransactionValidator,
},
Expand Down Expand Up @@ -224,6 +227,8 @@ pub struct Backend {
genesis: GenesisConfig,
/// Listeners for new blocks that get notified when a new block was imported.
new_block_listeners: Arc<Mutex<Vec<UnboundedSender<NewBlockNotification>>>>,
/// Listeners for reorged blocks that get notified when blocks are removed due to reorg.
reorged_block_listeners: Arc<Mutex<Vec<UnboundedSender<ReorgedBlockNotification>>>>,
/// Keeps track of active state snapshots at a specific block.
active_state_snapshots: Arc<Mutex<HashMap<U256, (u64, B256)>>>,
enable_steps_tracing: bool,
Expand Down Expand Up @@ -362,6 +367,7 @@ impl Backend {
time: TimeManager::new(start_timestamp),
cheats: Default::default(),
new_block_listeners: Default::default(),
reorged_block_listeners: Default::default(),
fees,
genesis,
active_state_snapshots: Arc::new(Mutex::new(Default::default())),
Expand Down Expand Up @@ -3219,6 +3225,14 @@ impl Backend {
rx
}

/// Returns a reorged block event stream
pub fn new_reorged_block_notifications(&self) -> ReorgedBlockNotifications {
let (tx, rx) = unbounded();
self.reorged_block_listeners.lock().push(tx);
trace!(target: "backed", "added new reorged block listener");
rx
}

/// Notifies all `new_block_listeners` about the new block
fn notify_on_new_block(&self, header: Header, hash: B256) {
// cleanup closed notification streams first, if the channel is closed we can remove the
Expand All @@ -3232,6 +3246,33 @@ impl Backend {
.retain(|tx| tx.unbounded_send(notification.clone()).is_ok());
}

/// Notifies all `reorged_block_listeners` about the reorged block with its data
fn notify_on_reorged_block_with_data(
&self,
header: Header,
hash: B256,
block: Block,
receipts: Vec<TypedReceipt>,
) {
// cleanup closed notification streams first, if the channel is closed we can remove the
// sender half for the set
self.reorged_block_listeners.lock().retain(|tx| !tx.is_closed());

let notification =
ReorgedBlockNotification { hash, header: Arc::new(header), block, receipts };

let _sent_count = self
.reorged_block_listeners
.lock()
.iter()
.filter(|tx| tx.unbounded_send(notification.clone()).is_ok())
.count();

self.reorged_block_listeners
.lock()
.retain(|tx| tx.unbounded_send(notification.clone()).is_ok());
}

/// Reorg the chain to a common height and execute blocks to build new chain.
///
/// The state of the chain is rewound using `rewind` to the common block, including the db,
Expand Down Expand Up @@ -3287,12 +3328,51 @@ impl Backend {
}

{
// Unwind the storage back to the common ancestor
self.blockchain
// Collect block data and receipts BEFORE unwinding
let mut blocks_with_receipts = Vec::new();
let current_height = self.blockchain.storage.read().best_number;

// Collect data for blocks that will be removed
for block_num in (common_block.header.number + 1)..=current_height {
if let Some(block_hash) =
self.blockchain.storage.read().hashes.get(&block_num).copied()
&& let Some(block) =
self.blockchain.storage.read().blocks.get(&block_hash).cloned()
{
// Get receipts from transactions
let mut receipts = Vec::new();
for tx_hash in block.transactions.iter().map(|tx| tx.hash()) {
if let Some(mined_tx) =
self.blockchain.storage.read().transactions.get(&tx_hash)
{
receipts.push(mined_tx.receipt.clone());
}
}
if !receipts.is_empty() {
blocks_with_receipts.push((block, receipts));
}
}
}

// Unwind the storage back to the common ancestor and get the removed blocks
let removed_blocks = self
.blockchain
.storage
.write()
.unwind_to(common_block.header.number, common_block.header.hash_slow());

// Notify about each reorged block with its data
for (removed_block, (block_data, receipts)) in
removed_blocks.into_iter().zip(blocks_with_receipts.into_iter())
{
self.notify_on_reorged_block_with_data(
removed_block.header.clone(),
removed_block.header.hash_slow(),
block_data,
receipts,
);
}

// Set environment back to common block
let mut env = self.env.write();
env.evm_env.block_env.number = U256::from(common_block.header.number);
Expand Down
17 changes: 17 additions & 0 deletions crates/anvil/src/eth/backend/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use alloy_consensus::Header;
use alloy_primitives::B256;
use anvil_core::eth::{block::Block, transaction::TypedReceipt};
use futures::channel::mpsc::UnboundedReceiver;
use std::sync::Arc;

Expand All @@ -14,5 +15,21 @@ pub struct NewBlockNotification {
pub header: Arc<Header>,
}

/// A notification that's emitted when blocks are removed due to reorg
#[derive(Clone, Debug)]
pub struct ReorgedBlockNotification {
/// Hash of the removed block
pub hash: B256,
/// block header
pub header: Arc<Header>,
/// The removed block data
pub block: Block,
/// The receipts from the removed block
pub receipts: Vec<TypedReceipt>,
}

/// Type alias for a receiver that receives [NewBlockNotification]
pub type NewBlockNotifications = UnboundedReceiver<NewBlockNotification>;

/// Type alias for a receiver that receives [ReorgedBlockNotification]
pub type ReorgedBlockNotifications = UnboundedReceiver<ReorgedBlockNotification>;
2 changes: 1 addition & 1 deletion crates/anvil/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl LogsFilter {
let b = self.storage.block(block.hash);
let receipts = self.storage.receipts(block.hash);
if let (Some(receipts), Some(block)) = (receipts, b) {
logs.extend(filter_logs(block, receipts, &self.filter))
logs.extend(filter_logs(block, receipts, &self.filter, false))
}
}
logs
Expand Down
43 changes: 30 additions & 13 deletions crates/anvil/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
StorageInfo,
eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
eth::{
backend::notifications::{NewBlockNotifications, ReorgedBlockNotifications},
error::to_rpc_result,
},
};
use alloy_network::AnyRpcTransaction;
use alloy_primitives::{B256, TxHash};
Expand All @@ -20,6 +23,7 @@ use tokio::sync::mpsc::UnboundedReceiver;
#[derive(Debug)]
pub struct LogsSubscription {
pub blocks: NewBlockNotifications,
pub reorged_blocks: ReorgedBlockNotifications,
pub storage: StorageInfo,
pub filter: FilteredParams,
pub queued: VecDeque<Log>,
Expand All @@ -37,23 +41,31 @@ impl LogsSubscription {
return Poll::Ready(Some(EthSubscriptionResponse::new(params)));
}

if let Some(block) = ready!(self.blocks.poll_next_unpin(cx)) {
// Check for new blocks first
if let Poll::Ready(Some(block)) = self.blocks.poll_next_unpin(cx) {
let b = self.storage.block(block.hash);
let receipts = self.storage.receipts(block.hash);
if let (Some(receipts), Some(block)) = (receipts, b) {
let logs = filter_logs(block, receipts, &self.filter);
if logs.is_empty() {
// this ensures we poll the receiver until it is pending, in which case the
// underlying `UnboundedReceiver` will register the new waker, see
// [`futures::channel::mpsc::UnboundedReceiver::poll_next()`]
continue;
let logs = filter_logs(block, receipts, &self.filter, false);
if !logs.is_empty() {
self.queued.extend(logs);
}
self.queued.extend(logs)
}
} else {
return Poll::Ready(None);
continue;
}

// Check for reorged blocks
if let Poll::Ready(Some(reorged_block)) = self.reorged_blocks.poll_next_unpin(cx) {
// Use the block and receipts from the notification
let logs =
filter_logs(reorged_block.block, reorged_block.receipts, &self.filter, true);
if !logs.is_empty() {
self.queued.extend(logs);
}
continue;
}

// If we reach here, both streams are pending
if self.queued.is_empty() {
return Poll::Pending;
}
Expand Down Expand Up @@ -147,7 +159,12 @@ impl Stream for EthSubscription {
}

/// Returns all the logs that match the given filter
pub fn filter_logs(block: Block, receipts: Vec<TypedReceipt>, filter: &FilteredParams) -> Vec<Log> {
pub fn filter_logs(
block: Block,
receipts: Vec<TypedReceipt>,
filter: &FilteredParams,
removed: bool,
) -> Vec<Log> {
/// Determines whether to add this log
fn add_log(
block_hash: B256,
Expand Down Expand Up @@ -182,7 +199,7 @@ pub fn filter_logs(block: Block, receipts: Vec<TypedReceipt>, filter: &FilteredP
transaction_hash: Some(transaction_hash),
transaction_index: Some(receipt_index as u64),
log_index: Some(log_index as u64),
removed: false,
removed,
block_timestamp: Some(block.header.timestamp),
});
}
Expand Down
2 changes: 2 additions & 0 deletions crates/anvil/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ impl PubSubEthRpcHandler {

trace!(target: "rpc::ws", "received logs subscription {:?}", params);
let blocks = self.api.new_block_notifications();
let reorged_blocks = self.api.new_reorged_block_notifications();
let storage = self.api.storage_info();
EthSubscription::Logs(Box::new(LogsSubscription {
blocks,
reorged_blocks,
storage,
filter: params,
queued: Default::default(),
Expand Down
10 changes: 7 additions & 3 deletions crates/anvil/test-data/SimpleStorage.sol
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
pragma solidity >=0.4.24;

contract SimpleStorage {

event ValueChanged(address indexed author, address indexed oldAuthor, string oldValue, string newValue);
event ValueChanged(
address indexed author,
address indexed oldAuthor,
string oldValue,
string newValue
);

address public lastSender;
string _value;
Expand All @@ -13,7 +17,7 @@ contract SimpleStorage {
_value = value;
}

function getValue() view public returns (string memory) {
function getValue() public view returns (string memory) {
return _value;
}

Expand Down
Loading
Loading