Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 253 additions & 43 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,38 @@ impl std::fmt::Debug for Chain {
}
}

/// Walk back from a block pointer by following parent pointers.
/// This is the core logic used as a fallback when the cache doesn't have ancestor block.
///
async fn walk_back_ancestor<F, Fut, E>(
start_ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
mut parent_getter: F,
) -> Result<Option<BlockPtr>, E>
where
F: FnMut(BlockPtr) -> Fut,
Fut: std::future::Future<Output = Result<Option<BlockPtr>, E>>,
{
let mut current_ptr = start_ptr;

for _ in 0..offset {
match parent_getter(current_ptr.clone()).await? {
Some(parent) => {
if let Some(root_hash) = &root {
if parent.hash == *root_hash {
break;
}
}
current_ptr = parent;
}
None => return Ok(None),
}
}

Ok(Some(current_ptr))
}

impl Chain {
/// Creates a new Ethereum [`Chain`].
pub fn new(
Expand Down Expand Up @@ -1030,6 +1062,20 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

// Find an ancestor block at the specified offset from the given block pointer.
// Primarily used for reorg detection to verify if the indexed position remains
// on the main chain.
//
// Parameters:
// - ptr: Starting block pointer from which to walk backwards (typically the chain head)
// - offset: Number of blocks to traverse backwards (0 returns ptr, 1 returns parent, etc.)
// - root: Optional block hash that serves as a boundary for traversal. This is ESSENTIAL
// for chains with skipped blocks (e.g., Filecoin EVM) where block numbers are not
// consecutive. When provided, traversal stops upon reaching the child of root,
// ensuring correct ancestor relationships even with gaps in block numbers.
//
// The function attempts to use the database cache first for performance,
// with RPC fallback implemented to handle cases where the cache is unavailable.
async fn ancestor_block(
&self,
ptr: BlockPtr,
Expand All @@ -1040,56 +1086,83 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
let cached = self
.chain_store
.cheap_clone()
.ancestor_block(ptr, offset, root)
.ancestor_block(ptr.clone(), offset, root.clone())
.await?;

let Some((json_value, block_ptr)) = cached else {
return Ok(None);
};

match json::from_value::<EthereumBlock>(json_value.clone()) {
Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block: block,
calls: None,
}))),
Err(e) => {
warn!(
// First check if we have the ancestor in cache and can deserialize it
let block_ptr = match cached {
Some((json, ptr)) => {
// Try to deserialize the cached block
match json::from_value::<EthereumBlock>(json.clone()) {
Ok(block) => {
// Successfully cached and deserialized
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block: block,
calls: None,
})));
}
Err(e) => {
// Cache hit but deserialization failed
warn!(
self.logger,
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
This may indicate stale cache data from a previous version. \
Falling back to Firehose/RPC.",
ptr.hash_hex(),
offset,
ptr_for_log.hash_hex(),
e
);
ptr
}
}
}
None => {
// Cache miss - fall back to walking the chain via parent_ptr() calls.
// This provides resilience when the block cache is empty (e.g., after truncation).
debug!(
self.logger,
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
This may indicate stale cache data from a previous version. \
Falling back to Firehose/RPC.",
block_ptr.hash_hex(),
offset,
"ancestor_block cache miss for {} at offset {}, walking back via parent_ptr",
ptr_for_log.hash_hex(),
e
offset
);

match self.chain_client.as_ref() {
ChainClient::Firehose(endpoints) => {
let block = self
.fetch_block_with_firehose(endpoints, &block_ptr)
.await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
}
ChainClient::Rpc(adapters) => {
match self
.fetch_light_block_with_rpc(adapters, &block_ptr)
.await?
{
Some(light_block) => {
let ethereum_block = EthereumBlock {
block: light_block,
transaction_receipts: vec![],
};
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block,
calls: None,
})))
}
None => Ok(None),
}
match walk_back_ancestor(
ptr.clone(),
offset,
root.clone(),
|block_ptr| async move { self.parent_ptr(&block_ptr).await },
)
.await?
{
Some(ptr) => ptr,
None => return Ok(None),
}
}
};

// Fetch the actual block data for the identified block pointer.
// This path is taken for both cache misses and deserialization failures.
match self.chain_client.as_ref() {
ChainClient::Firehose(endpoints) => {
let block = self
.fetch_block_with_firehose(endpoints, &block_ptr)
.await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
}
ChainClient::Rpc(adapters) => {
match self
.fetch_full_block_with_rpc(adapters, &block_ptr)
.await?
{
Some(ethereum_block) => {
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block,
calls: None,
})))
}
None => Ok(None),
}
}
}
Expand Down Expand Up @@ -1185,6 +1258,29 @@ impl TriggersAdapter {

Ok(blocks.into_iter().next())
}

async fn fetch_full_block_with_rpc(
&self,
adapters: &EthereumNetworkAdapters,
block_ptr: &BlockPtr,
) -> Result<Option<EthereumBlock>, Error> {
let adapter = adapters.cheapest_with(&self.capabilities).await?;

let block = adapter
.block_by_hash(&self.logger, block_ptr.hash.as_b256())
.await?;

match block {
Some(block) => {
let ethereum_block = adapter
.load_full_block(&self.logger, block)
.await
.map_err(|e| anyhow!("Failed to load full block: {}", e))?;
Ok(Some(ethereum_block))
}
None => Ok(None),
}
}
}

pub struct FirehoseMapper {
Expand Down Expand Up @@ -1461,4 +1557,118 @@ mod tests {
assert!(missing.contains(&2));
assert!(missing.contains(&3));
}

#[tokio::test]
async fn test_walk_back_ancestor() {
use std::collections::HashMap;

let block_100_hash = BlockHash("block100".as_bytes().to_vec().into_boxed_slice());
let block_101_hash = BlockHash("block101".as_bytes().to_vec().into_boxed_slice());
let block_102_hash = BlockHash("block102".as_bytes().to_vec().into_boxed_slice());
let block_103_hash = BlockHash("block103".as_bytes().to_vec().into_boxed_slice());
let block_104_hash = BlockHash("block104".as_bytes().to_vec().into_boxed_slice());
let block_105_hash = BlockHash("block105".as_bytes().to_vec().into_boxed_slice());

let block_105 = BlockPtr::new(block_105_hash.clone(), 105);
let block_104 = BlockPtr::new(block_104_hash.clone(), 104);
let block_103 = BlockPtr::new(block_103_hash.clone(), 103);
let block_102 = BlockPtr::new(block_102_hash.clone(), 102);
let block_101 = BlockPtr::new(block_101_hash.clone(), 101);
let block_100 = BlockPtr::new(block_100_hash.clone(), 100);

let mut parent_map = HashMap::new();
parent_map.insert(block_105_hash.clone(), block_104.clone());
parent_map.insert(block_104_hash.clone(), block_103.clone());
parent_map.insert(block_103_hash.clone(), block_102.clone());
parent_map.insert(block_102_hash.clone(), block_101.clone());
parent_map.insert(block_101_hash.clone(), block_100.clone());

let result = super::walk_back_ancestor(block_105.clone(), 2, None, |block_ptr| {
let parent = parent_map.get(&block_ptr.hash).cloned();
async move { Ok::<_, std::convert::Infallible>(parent) }
})
.await
.unwrap();
assert_eq!(result, Some(block_103.clone()));

let result = super::walk_back_ancestor(
block_105.clone(),
10,
Some(block_102_hash.clone()),
|block_ptr| {
let parent = parent_map.get(&block_ptr.hash).cloned();
async move { Ok::<_, std::convert::Infallible>(parent) }
},
)
.await
.unwrap();
assert_eq!(
result,
Some(block_103.clone()),
"Should stop at child of root"
);
}

#[tokio::test]
async fn test_walk_back_ancestor_skipped_blocks_with_root() {
use std::collections::HashMap;

let block_100_hash = BlockHash("block100".as_bytes().to_vec().into_boxed_slice());
let block_101_hash = BlockHash("block101".as_bytes().to_vec().into_boxed_slice());
let block_102_hash = BlockHash("block102".as_bytes().to_vec().into_boxed_slice());
let block_110_hash = BlockHash("block110".as_bytes().to_vec().into_boxed_slice());
let block_111_hash = BlockHash("block111".as_bytes().to_vec().into_boxed_slice());
let block_112_hash = BlockHash("block112".as_bytes().to_vec().into_boxed_slice());
let block_120_hash = BlockHash("block120".as_bytes().to_vec().into_boxed_slice());

let block_120 = BlockPtr::new(block_120_hash.clone(), 120);
let block_112 = BlockPtr::new(block_112_hash.clone(), 112);
let block_111 = BlockPtr::new(block_111_hash.clone(), 111);
let block_110 = BlockPtr::new(block_110_hash.clone(), 110);
let block_102 = BlockPtr::new(block_102_hash.clone(), 102);
let block_101 = BlockPtr::new(block_101_hash.clone(), 101);
let block_100 = BlockPtr::new(block_100_hash.clone(), 100);

let mut parent_map = HashMap::new();
parent_map.insert(block_120_hash.clone(), block_112.clone());
parent_map.insert(block_112_hash.clone(), block_111.clone());
parent_map.insert(block_111_hash.clone(), block_110.clone());
parent_map.insert(block_110_hash.clone(), block_102.clone());
parent_map.insert(block_102_hash.clone(), block_101.clone());
parent_map.insert(block_101_hash.clone(), block_100.clone());

let result = super::walk_back_ancestor(
block_120.clone(),
10,
Some(block_110_hash.clone()),
|block_ptr| {
let parent = parent_map.get(&block_ptr.hash).cloned();
async move { Ok::<_, std::convert::Infallible>(parent) }
},
)
.await
.unwrap();
assert_eq!(
result,
Some(block_111.clone()),
"root=110: should stop at 111 (child of root)"
);

let result = super::walk_back_ancestor(
block_120.clone(),
10,
Some(block_101_hash.clone()),
|block_ptr| {
let parent = parent_map.get(&block_ptr.hash).cloned();
async move { Ok::<_, std::convert::Infallible>(parent) }
},
)
.await
.unwrap();
assert_eq!(
result,
Some(block_102.clone()),
"root=101: should stop at 102 (child of root, across skip)"
);
}
}