diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 4624675efb8..9114e136950 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -355,6 +355,38 @@ impl std::fmt::Debug for Chain { } } +/// Walk back from a block pointer by following parent pointers. +/// This is the core logic used as a fallback when the cache doesn't have ancestor block. +/// +async fn walk_back_ancestor( + start_ptr: BlockPtr, + offset: BlockNumber, + root: Option, + mut parent_getter: F, +) -> Result, E> +where + F: FnMut(BlockPtr) -> Fut, + Fut: std::future::Future, E>>, +{ + let mut current_ptr = start_ptr; + + for _ in 0..offset { + match parent_getter(current_ptr.clone()).await? { + Some(parent) => { + if let Some(root_hash) = &root { + if parent.hash == *root_hash { + break; + } + } + current_ptr = parent; + } + None => return Ok(None), + } + } + + Ok(Some(current_ptr)) +} + impl Chain { /// Creates a new Ethereum [`Chain`]. pub fn new( @@ -1030,6 +1062,20 @@ impl TriggersAdapterTrait for TriggersAdapter { } } + // Find an ancestor block at the specified offset from the given block pointer. + // Primarily used for reorg detection to verify if the indexed position remains + // on the main chain. + // + // Parameters: + // - ptr: Starting block pointer from which to walk backwards (typically the chain head) + // - offset: Number of blocks to traverse backwards (0 returns ptr, 1 returns parent, etc.) + // - root: Optional block hash that serves as a boundary for traversal. This is ESSENTIAL + // for chains with skipped blocks (e.g., Filecoin EVM) where block numbers are not + // consecutive. When provided, traversal stops upon reaching the child of root, + // ensuring correct ancestor relationships even with gaps in block numbers. + // + // The function attempts to use the database cache first for performance, + // with RPC fallback implemented to handle cases where the cache is unavailable. async fn ancestor_block( &self, ptr: BlockPtr, @@ -1040,56 +1086,83 @@ impl TriggersAdapterTrait for TriggersAdapter { let cached = self .chain_store .cheap_clone() - .ancestor_block(ptr, offset, root) + .ancestor_block(ptr.clone(), offset, root.clone()) .await?; - let Some((json_value, block_ptr)) = cached else { - return Ok(None); - }; - - match json::from_value::(json_value.clone()) { - Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { - ethereum_block: block, - calls: None, - }))), - Err(e) => { - warn!( + // First check if we have the ancestor in cache and can deserialize it + let block_ptr = match cached { + Some((json, ptr)) => { + // Try to deserialize the cached block + match json::from_value::(json.clone()) { + Ok(block) => { + // Successfully cached and deserialized + return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { + ethereum_block: block, + calls: None, + }))); + } + Err(e) => { + // Cache hit but deserialization failed + warn!( + self.logger, + "Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \ + This may indicate stale cache data from a previous version. \ + Falling back to Firehose/RPC.", + ptr.hash_hex(), + offset, + ptr_for_log.hash_hex(), + e + ); + ptr + } + } + } + None => { + // Cache miss - fall back to walking the chain via parent_ptr() calls. + // This provides resilience when the block cache is empty (e.g., after truncation). + debug!( self.logger, - "Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \ - This may indicate stale cache data from a previous version. \ - Falling back to Firehose/RPC.", - block_ptr.hash_hex(), - offset, + "ancestor_block cache miss for {} at offset {}, walking back via parent_ptr", ptr_for_log.hash_hex(), - e + offset ); - match self.chain_client.as_ref() { - ChainClient::Firehose(endpoints) => { - let block = self - .fetch_block_with_firehose(endpoints, &block_ptr) - .await?; - let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?; - Ok(Some(BlockFinality::NonFinal(ethereum_block))) - } - ChainClient::Rpc(adapters) => { - match self - .fetch_light_block_with_rpc(adapters, &block_ptr) - .await? - { - Some(light_block) => { - let ethereum_block = EthereumBlock { - block: light_block, - transaction_receipts: vec![], - }; - Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { - ethereum_block, - calls: None, - }))) - } - None => Ok(None), - } + match walk_back_ancestor( + ptr.clone(), + offset, + root.clone(), + |block_ptr| async move { self.parent_ptr(&block_ptr).await }, + ) + .await? + { + Some(ptr) => ptr, + None => return Ok(None), + } + } + }; + + // Fetch the actual block data for the identified block pointer. + // This path is taken for both cache misses and deserialization failures. + match self.chain_client.as_ref() { + ChainClient::Firehose(endpoints) => { + let block = self + .fetch_block_with_firehose(endpoints, &block_ptr) + .await?; + let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?; + Ok(Some(BlockFinality::NonFinal(ethereum_block))) + } + ChainClient::Rpc(adapters) => { + match self + .fetch_full_block_with_rpc(adapters, &block_ptr) + .await? + { + Some(ethereum_block) => { + Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { + ethereum_block, + calls: None, + }))) } + None => Ok(None), } } } @@ -1185,6 +1258,29 @@ impl TriggersAdapter { Ok(blocks.into_iter().next()) } + + async fn fetch_full_block_with_rpc( + &self, + adapters: &EthereumNetworkAdapters, + block_ptr: &BlockPtr, + ) -> Result, Error> { + let adapter = adapters.cheapest_with(&self.capabilities).await?; + + let block = adapter + .block_by_hash(&self.logger, block_ptr.hash.as_b256()) + .await?; + + match block { + Some(block) => { + let ethereum_block = adapter + .load_full_block(&self.logger, block) + .await + .map_err(|e| anyhow!("Failed to load full block: {}", e))?; + Ok(Some(ethereum_block)) + } + None => Ok(None), + } + } } pub struct FirehoseMapper { @@ -1461,4 +1557,118 @@ mod tests { assert!(missing.contains(&2)); assert!(missing.contains(&3)); } + + #[tokio::test] + async fn test_walk_back_ancestor() { + use std::collections::HashMap; + + let block_100_hash = BlockHash("block100".as_bytes().to_vec().into_boxed_slice()); + let block_101_hash = BlockHash("block101".as_bytes().to_vec().into_boxed_slice()); + let block_102_hash = BlockHash("block102".as_bytes().to_vec().into_boxed_slice()); + let block_103_hash = BlockHash("block103".as_bytes().to_vec().into_boxed_slice()); + let block_104_hash = BlockHash("block104".as_bytes().to_vec().into_boxed_slice()); + let block_105_hash = BlockHash("block105".as_bytes().to_vec().into_boxed_slice()); + + let block_105 = BlockPtr::new(block_105_hash.clone(), 105); + let block_104 = BlockPtr::new(block_104_hash.clone(), 104); + let block_103 = BlockPtr::new(block_103_hash.clone(), 103); + let block_102 = BlockPtr::new(block_102_hash.clone(), 102); + let block_101 = BlockPtr::new(block_101_hash.clone(), 101); + let block_100 = BlockPtr::new(block_100_hash.clone(), 100); + + let mut parent_map = HashMap::new(); + parent_map.insert(block_105_hash.clone(), block_104.clone()); + parent_map.insert(block_104_hash.clone(), block_103.clone()); + parent_map.insert(block_103_hash.clone(), block_102.clone()); + parent_map.insert(block_102_hash.clone(), block_101.clone()); + parent_map.insert(block_101_hash.clone(), block_100.clone()); + + let result = super::walk_back_ancestor(block_105.clone(), 2, None, |block_ptr| { + let parent = parent_map.get(&block_ptr.hash).cloned(); + async move { Ok::<_, std::convert::Infallible>(parent) } + }) + .await + .unwrap(); + assert_eq!(result, Some(block_103.clone())); + + let result = super::walk_back_ancestor( + block_105.clone(), + 10, + Some(block_102_hash.clone()), + |block_ptr| { + let parent = parent_map.get(&block_ptr.hash).cloned(); + async move { Ok::<_, std::convert::Infallible>(parent) } + }, + ) + .await + .unwrap(); + assert_eq!( + result, + Some(block_103.clone()), + "Should stop at child of root" + ); + } + + #[tokio::test] + async fn test_walk_back_ancestor_skipped_blocks_with_root() { + use std::collections::HashMap; + + let block_100_hash = BlockHash("block100".as_bytes().to_vec().into_boxed_slice()); + let block_101_hash = BlockHash("block101".as_bytes().to_vec().into_boxed_slice()); + let block_102_hash = BlockHash("block102".as_bytes().to_vec().into_boxed_slice()); + let block_110_hash = BlockHash("block110".as_bytes().to_vec().into_boxed_slice()); + let block_111_hash = BlockHash("block111".as_bytes().to_vec().into_boxed_slice()); + let block_112_hash = BlockHash("block112".as_bytes().to_vec().into_boxed_slice()); + let block_120_hash = BlockHash("block120".as_bytes().to_vec().into_boxed_slice()); + + let block_120 = BlockPtr::new(block_120_hash.clone(), 120); + let block_112 = BlockPtr::new(block_112_hash.clone(), 112); + let block_111 = BlockPtr::new(block_111_hash.clone(), 111); + let block_110 = BlockPtr::new(block_110_hash.clone(), 110); + let block_102 = BlockPtr::new(block_102_hash.clone(), 102); + let block_101 = BlockPtr::new(block_101_hash.clone(), 101); + let block_100 = BlockPtr::new(block_100_hash.clone(), 100); + + let mut parent_map = HashMap::new(); + parent_map.insert(block_120_hash.clone(), block_112.clone()); + parent_map.insert(block_112_hash.clone(), block_111.clone()); + parent_map.insert(block_111_hash.clone(), block_110.clone()); + parent_map.insert(block_110_hash.clone(), block_102.clone()); + parent_map.insert(block_102_hash.clone(), block_101.clone()); + parent_map.insert(block_101_hash.clone(), block_100.clone()); + + let result = super::walk_back_ancestor( + block_120.clone(), + 10, + Some(block_110_hash.clone()), + |block_ptr| { + let parent = parent_map.get(&block_ptr.hash).cloned(); + async move { Ok::<_, std::convert::Infallible>(parent) } + }, + ) + .await + .unwrap(); + assert_eq!( + result, + Some(block_111.clone()), + "root=110: should stop at 111 (child of root)" + ); + + let result = super::walk_back_ancestor( + block_120.clone(), + 10, + Some(block_101_hash.clone()), + |block_ptr| { + let parent = parent_map.get(&block_ptr.hash).cloned(); + async move { Ok::<_, std::convert::Infallible>(parent) } + }, + ) + .await + .unwrap(); + assert_eq!( + result, + Some(block_102.clone()), + "root=101: should stop at 102 (child of root, across skip)" + ); + } }