diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 3f827220b93..4624675efb8 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -10,7 +10,7 @@ use graph::blockchain::{ use graph::components::network_provider::ChainName; use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; -use graph::firehose::{FirehoseEndpoint, ForkStep}; +use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep}; use graph::futures03::TryStreamExt; use graph::prelude::{ retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock, @@ -1036,20 +1036,63 @@ impl TriggersAdapterTrait for TriggersAdapter { offset: BlockNumber, root: Option, ) -> Result, Error> { - let block: Option = self + let ptr_for_log = ptr.clone(); + let cached = self .chain_store .cheap_clone() .ancestor_block(ptr, offset, root) - .await? - .map(|x| x.0) - .map(json::from_value) - .transpose()?; - Ok(block.map(|block| { - BlockFinality::NonFinal(EthereumBlockWithCalls { + .await?; + + let Some((json_value, block_ptr)) = cached else { + return Ok(None); + }; + + match json::from_value::(json_value.clone()) { + Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { ethereum_block: block, calls: None, - }) - })) + }))), + Err(e) => { + warn!( + self.logger, + "Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \ + This may indicate stale cache data from a previous version. \ + Falling back to Firehose/RPC.", + block_ptr.hash_hex(), + offset, + ptr_for_log.hash_hex(), + e + ); + + match self.chain_client.as_ref() { + ChainClient::Firehose(endpoints) => { + let block = self + .fetch_block_with_firehose(endpoints, &block_ptr) + .await?; + let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?; + Ok(Some(BlockFinality::NonFinal(ethereum_block))) + } + ChainClient::Rpc(adapters) => { + match self + .fetch_light_block_with_rpc(adapters, &block_ptr) + .await? + { + Some(light_block) => { + let ethereum_block = EthereumBlock { + block: light_block, + transaction_receipts: vec![], + }; + Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { + ethereum_block, + calls: None, + }))) + } + None => Ok(None), + } + } + } + } + } } async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { @@ -1060,60 +1103,90 @@ impl TriggersAdapterTrait for TriggersAdapter { let chain_store = self.chain_store.cheap_clone(); // First try to get the block from the store if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await { - if let Some(block) = blocks.first() { - if let Ok(block) = json::from_value::(block.clone()) { - return Ok(block.parent_ptr()); + if let Some(cached_json) = blocks.first() { + match json::from_value::(cached_json.clone()) { + Ok(block) => { + return Ok(block.parent_ptr()); + } + Err(e) => { + warn!( + self.logger, + "Failed to deserialize cached block {}: {}. \ + This may indicate stale cache data from a previous version. \ + Falling back to Firehose.", + block.hash_hex(), + e + ); + } } } } // If not in store, fetch from Firehose - let endpoint = endpoints.endpoint().await?; - let logger = self.logger.clone(); - let retry_log_message = - format!("get_block_by_ptr for block {} with firehose", block); - let block = block.clone(); - - retry(retry_log_message, &logger) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - let endpoint = endpoint.cheap_clone(); - let logger = logger.cheap_clone(); - let block = block.clone(); - async move { - endpoint - .get_block_by_ptr::(&block, &logger) - .await - .context(format!( - "Failed to fetch block by ptr {} from firehose", - block - )) - } - }) + self.fetch_block_with_firehose(endpoints, block) .await? .parent_ptr() } - ChainClient::Rpc(adapters) => { - let blocks = adapters - .cheapest_with(&self.capabilities) - .await? - .load_blocks( - self.logger.cheap_clone(), - self.chain_store.cheap_clone(), - HashSet::from_iter(Some(block.hash.as_b256())), - ) - .await?; - assert_eq!(blocks.len(), 1); - - blocks[0].parent_ptr() - } + ChainClient::Rpc(adapters) => self + .fetch_light_block_with_rpc(adapters, block) + .await? + .expect("block must exist for parent_ptr") + .parent_ptr(), }; Ok(block) } } +impl TriggersAdapter { + async fn fetch_block_with_firehose( + &self, + endpoints: &FirehoseEndpoints, + block_ptr: &BlockPtr, + ) -> Result { + let endpoint = endpoints.endpoint().await?; + let logger = self.logger.clone(); + let retry_log_message = format!("fetch_block_with_firehose {}", block_ptr); + let block_ptr = block_ptr.clone(); + + let block = retry(retry_log_message, &logger) + .limit(ENV_VARS.request_retries) + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let endpoint = endpoint.cheap_clone(); + let logger = logger.cheap_clone(); + let block_ptr = block_ptr.clone(); + async move { + endpoint + .get_block_by_ptr::(&block_ptr, &logger) + .await + .context(format!("Failed to fetch block {} from firehose", block_ptr)) + } + }) + .await?; + + Ok(block) + } + + async fn fetch_light_block_with_rpc( + &self, + adapters: &EthereumNetworkAdapters, + block_ptr: &BlockPtr, + ) -> Result>, Error> { + let blocks = adapters + .cheapest_with(&self.capabilities) + .await? + .load_blocks( + self.logger.cheap_clone(), + self.chain_store.cheap_clone(), + HashSet::from_iter(Some(block_ptr.hash.as_b256())), + ) + .await?; + + Ok(blocks.into_iter().next()) + } +} + pub struct FirehoseMapper { adapter: Arc>, filter: Arc, diff --git a/chain/ethereum/src/codec.rs b/chain/ethereum/src/codec.rs index 2a402d4f5b2..f7d1af103bc 100644 --- a/chain/ethereum/src/codec.rs +++ b/chain/ethereum/src/codec.rs @@ -268,19 +268,21 @@ impl<'a> TryInto> for TransactionTraceAt<'a> { .trace .access_list .iter() - .map(|access_tuple| { - let address = Address::from_slice(&access_tuple.address); + .map(|access_tuple| -> Result<_, Error> { + let address = access_tuple + .address + .try_decode_proto("access tuple address")?; let storage_keys = access_tuple .storage_keys .iter() - .map(|key| B256::from_slice(key)) - .collect(); - AccessListItem { + .map(|key| key.try_decode_proto("storage key")) + .collect::, _>>()?; + Ok(AccessListItem { address, storage_keys, - } + }) }) - .collect::>() + .collect::, Error>>()? .into(); // Extract actual signature components from trace @@ -359,8 +361,8 @@ impl<'a> TryInto> for TransactionTraceAt<'a> { .trace .blob_hashes .iter() - .map(|hash| B256::from_slice(hash)) - .collect(); + .map(|hash| hash.try_decode_proto("blob hash")) + .collect::, _>>()?; let max_fee_per_blob_gas_u128 = self.trace.blob_gas_fee_cap.as_ref().map_or(0u128, |x| { @@ -401,10 +403,10 @@ impl<'a> TryInto> for TransactionTraceAt<'a> { .trace .set_code_authorizations .iter() - .map(|auth| { + .map(|auth| -> Result<_, Error> { let inner = alloy::eips::eip7702::Authorization { chain_id: U256::from_be_slice(&auth.chain_id), - address: Address::from_slice(&auth.address), + address: auth.address.try_decode_proto("authorization address")?, nonce: auth.nonce, }; @@ -412,11 +414,11 @@ impl<'a> TryInto> for TransactionTraceAt<'a> { let s = U256::from_be_slice(&auth.s); let y_parity = auth.v as u8; - alloy::eips::eip7702::SignedAuthorization::new_unchecked( + Ok(alloy::eips::eip7702::SignedAuthorization::new_unchecked( inner, y_parity, r, s, - ) + )) }) - .collect(); + .collect::, Error>>()?; let tx = TxEip7702 { // Firehose protobuf doesn't provide chain_id for transactions. diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index bd0febbf56c..6cc2ec674d8 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1627,7 +1627,19 @@ impl EthereumAdapterTrait for EthereumAdapter { .map_err(|e| error!(&logger, "Error accessing block cache {}", e)) .unwrap_or_default() .into_iter() - .filter_map(|value| json::from_value(value).ok()) + .filter_map(|value| { + json::from_value(value.clone()) + .map_err(|e| { + warn!( + &logger, + "Failed to deserialize cached block: {}. \ + This may indicate stale cache data from a previous version. \ + Block will be re-fetched from RPC.", + e + ); + }) + .ok() + }) .map(|b| Arc::new(LightEthereumBlock::new(b))) .collect(); diff --git a/chain/ethereum/src/runtime/abi.rs b/chain/ethereum/src/runtime/abi.rs index 5641a501a6e..75717106ac6 100644 --- a/chain/ethereum/src/runtime/abi.rs +++ b/chain/ethereum/src/runtime/abi.rs @@ -6,6 +6,7 @@ use anyhow::anyhow; use async_trait::async_trait; use graph::abi; use graph::prelude::alloy; +use graph::prelude::alloy::consensus::TxReceipt; use graph::prelude::alloy::network::ReceiptResponse; use graph::prelude::alloy::rpc::types::{Log, TransactionReceipt}; use graph::prelude::alloy::serde::WithOtherFields; @@ -508,10 +509,7 @@ impl<'a> ToAscObj for EthereumTransactionData<'a> hash: asc_new(heap, &self.hash(), gas).await?, index: asc_new(heap, &BigInt::from(self.index()), gas).await?, from: asc_new(heap, &self.from(), gas).await?, - to: match self.to() { - Some(to) => asc_new(heap, &to, gas).await?, - None => AscPtr::null(), - }, + to: asc_new_or_null(heap, &self.to(), gas).await?, value: asc_new(heap, &BigInt::from_unsigned_u256(&self.value()), gas).await?, gas_limit: asc_new(heap, &BigInt::from(self.gas_limit()), gas).await?, gas_price: asc_new(heap, &BigInt::from(self.gas_price()), gas).await?, @@ -530,10 +528,7 @@ impl<'a> ToAscObj for EthereumTransactionData<'a> hash: asc_new(heap, &self.hash(), gas).await?, index: asc_new(heap, &BigInt::from(self.index()), gas).await?, from: asc_new(heap, &self.from(), gas).await?, - to: match self.to() { - Some(to) => asc_new(heap, &to, gas).await?, - None => AscPtr::null(), - }, + to: asc_new_or_null(heap, &self.to(), gas).await?, value: asc_new(heap, &BigInt::from_unsigned_u256(&self.value()), gas).await?, gas_limit: asc_new(heap, &BigInt::from(self.gas_limit()), gas).await?, gas_price: asc_new(heap, &BigInt::from(self.gas_price()), gas).await?, @@ -675,8 +670,8 @@ impl ToAscObj for Log { transaction_hash: asc_new_or_null(heap, &self.transaction_hash, gas).await?, transaction_index: asc_new_or_null_u64(heap, &self.transaction_index, gas).await?, log_index: asc_new_or_null_u64(heap, &self.log_index, gas).await?, - transaction_log_index: AscPtr::null(), // TODO(alloy): figure out how to get transaction log index - log_type: AscPtr::null(), // TODO(alloy): figure out how to get log type + transaction_log_index: AscPtr::null(), // Non-standard field, not available in alloy + log_type: AscPtr::null(), // Non-standard field, not available in alloy removed: asc_new( heap, &AscWrapped { @@ -703,16 +698,21 @@ impl ToAscObj .ok_or(HostExportError::Unknown(anyhow!( "Transaction index is missing" )))?; + let status = match self.inner.status_or_post_state().as_eip658() { + Some(success) => asc_new(heap, &BigInt::from(success as u64), gas).await?, + None => AscPtr::null(), // Pre-EIP-658 (pre-Byzantium) receipt + }; Ok(AscEthereumTransactionReceipt { transaction_hash: asc_new(heap, &self.transaction_hash, gas).await?, transaction_index: asc_new(heap, &BigInt::from(transaction_index), gas).await?, block_hash: asc_new_or_null(heap, &self.block_hash, gas).await?, block_number: asc_new_or_null_u64(heap, &self.block_number, gas).await?, - cumulative_gas_used: asc_new(heap, &BigInt::from(self.gas_used), gas).await?, + cumulative_gas_used: asc_new(heap, &BigInt::from(self.cumulative_gas_used()), gas) + .await?, gas_used: asc_new(heap, &BigInt::from(self.gas_used), gas).await?, contract_address: asc_new_or_null(heap, &self.contract_address, gas).await?, logs: asc_new(heap, &self.logs(), gas).await?, - status: asc_new(heap, &BigInt::from(self.status() as u64), gas).await?, + status, root: asc_new_or_null(heap, &self.state_root(), gas).await?, logs_bloom: asc_new(heap, self.inner.bloom().as_slice(), gas).await?, }) diff --git a/graph/src/data_source/common.rs b/graph/src/data_source/common.rs index 8090f9334b7..6149742c3ee 100644 --- a/graph/src/data_source/common.rs +++ b/graph/src/data_source/common.rs @@ -24,6 +24,80 @@ use slog::Logger; use std::collections::HashMap; use std::{str::FromStr, sync::Arc}; +/// Normalizes ABI JSON to handle compatibility issues between the legacy `ethabi`/`rust-web3` +/// parser and the stricter `alloy` parser. +/// +/// Some deployed subgraph ABIs contain non-standard constructs that `ethabi` accepted but +/// `alloy` rejects. This function patches these issues to maintain backward compatibility: +/// +/// 1. **`stateMutability: "undefined"`** - Some ABIs use "undefined" which is not a valid +/// Solidity state mutability. We replace it with "nonpayable". +/// +/// 2. **Duplicate constructors** - Some ABIs contain multiple constructor definitions. +/// We keep only the first one. +/// +/// 3. **Duplicate fallback functions** - Similar to constructors, some ABIs have multiple +/// fallback definitions. We keep only the first one. +/// +/// 4. **`indexed` field in non-event params** - The `indexed` field is only valid for event +/// parameters, but some ABIs include it on function inputs/outputs. We strip it from +/// non-event items. +/// +/// These issues were identified by validating ABIs across deployed subgraphs in production +/// before the migration to alloy. +fn normalize_abi_json(json_bytes: &[u8]) -> Result, anyhow::Error> { + let mut value: serde_json::Value = serde_json::from_slice(json_bytes)?; + + if let Some(array) = value.as_array_mut() { + let mut found_constructor = false; + let mut found_fallback = false; + let mut indices_to_remove = Vec::new(); + + for (index, item) in array.iter_mut().enumerate() { + if let Some(obj) = item.as_object_mut() { + if let Some(state_mutability) = obj.get_mut("stateMutability") { + if let Some(s) = state_mutability.as_str() { + if s == "undefined" { + *state_mutability = serde_json::Value::String("nonpayable".to_string()); + } + } + } + + let item_type = obj.get("type").and_then(|t| t.as_str()); + + match item_type { + Some("constructor") if found_constructor => indices_to_remove.push(index), + Some("constructor") => found_constructor = true, + Some("fallback") if found_fallback => indices_to_remove.push(index), + Some("fallback") => found_fallback = true, + _ => {} + } + + if item_type != Some("event") { + strip_indexed_from_params(obj.get_mut("inputs")); + strip_indexed_from_params(obj.get_mut("outputs")); + } + } + } + + for index in indices_to_remove.iter().rev() { + array.remove(*index); + } + } + + Ok(serde_json::to_vec(&value)?) +} + +fn strip_indexed_from_params(params: Option<&mut serde_json::Value>) { + if let Some(serde_json::Value::Array(arr)) = params { + for param in arr.iter_mut() { + if let Some(obj) = param.as_object_mut() { + obj.remove("indexed"); + } + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct MappingABI { pub name: String, @@ -364,11 +438,16 @@ impl UnresolvedMappingABI { self.name, self.file.link ) })?; - let contract = serde_json::from_slice(&*contract_bytes) + // Normalize the ABI to handle compatibility issues between ethabi and alloy parsers. + // See `normalize_abi_json` for details on the specific issues being addressed. + let normalized_bytes = normalize_abi_json(&contract_bytes) + .with_context(|| format!("failed to normalize ABI JSON for {}", self.name))?; + + let contract = serde_json::from_slice(&normalized_bytes) .with_context(|| format!("failed to load ABI {}", self.name))?; // Parse ABI JSON for on-demand struct field extraction - let abi_json = AbiJson::new(&contract_bytes) + let abi_json = AbiJson::new(&normalized_bytes) .with_context(|| format!("Failed to parse ABI JSON for {}", self.name))?; Ok(( @@ -2103,6 +2182,146 @@ mod tests { assert!(error_msg.contains("is not a struct")); } + #[test] + fn test_normalize_abi_json_with_undefined_state_mutability() { + let abi_with_undefined = r#"[ + { + "type": "function", + "name": "testFunction", + "inputs": [], + "outputs": [], + "stateMutability": "undefined" + }, + { + "type": "function", + "name": "normalFunction", + "inputs": [], + "outputs": [], + "stateMutability": "view" + } + ]"#; + + let normalized = normalize_abi_json(abi_with_undefined.as_bytes()).unwrap(); + let result: serde_json::Value = serde_json::from_slice(&normalized).unwrap(); + + if let Some(array) = result.as_array() { + assert_eq!(array[0]["stateMutability"], "nonpayable"); + assert_eq!(array[1]["stateMutability"], "view"); + } else { + panic!("Expected JSON array"); + } + + let json_abi: abi::JsonAbi = serde_json::from_slice(&normalized).unwrap(); + assert_eq!(json_abi.len(), 2); + } + + #[test] + fn test_normalize_abi_json_with_duplicate_constructors() { + let abi_with_duplicate_constructors = r#"[ + { + "type": "constructor", + "inputs": [{"name": "param1", "type": "address"}], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "someFunction", + "inputs": [], + "outputs": [], + "stateMutability": "view" + }, + { + "type": "constructor", + "inputs": [{"name": "param2", "type": "uint256"}], + "stateMutability": "nonpayable" + } + ]"#; + + let normalized = normalize_abi_json(abi_with_duplicate_constructors.as_bytes()).unwrap(); + let result: serde_json::Value = serde_json::from_slice(&normalized).unwrap(); + + if let Some(array) = result.as_array() { + assert_eq!(array.len(), 2); + assert_eq!(array[0]["type"], "constructor"); + assert_eq!(array[0]["inputs"][0]["name"], "param1"); + assert_eq!(array[1]["type"], "function"); + } else { + panic!("Expected JSON array"); + } + + let json_abi: abi::JsonAbi = serde_json::from_slice(&normalized).unwrap(); + assert_eq!(json_abi.len(), 2); + } + + #[test] + fn test_normalize_abi_json_with_duplicate_fallbacks() { + let abi_with_duplicate_fallbacks = r#"[ + { + "type": "fallback", + "stateMutability": "payable" + }, + { + "type": "function", + "name": "someFunction", + "inputs": [], + "outputs": [], + "stateMutability": "view" + }, + { + "type": "fallback", + "stateMutability": "nonpayable" + } + ]"#; + + let normalized = normalize_abi_json(abi_with_duplicate_fallbacks.as_bytes()).unwrap(); + let result: serde_json::Value = serde_json::from_slice(&normalized).unwrap(); + + if let Some(array) = result.as_array() { + assert_eq!(array.len(), 2); + assert_eq!(array[0]["type"], "fallback"); + assert_eq!(array[0]["stateMutability"], "payable"); + assert_eq!(array[1]["type"], "function"); + } else { + panic!("Expected JSON array"); + } + + let json_abi: abi::JsonAbi = serde_json::from_slice(&normalized).unwrap(); + assert_eq!(json_abi.len(), 2); + } + + #[test] + fn test_normalize_abi_json_strips_indexed_from_non_events() { + let abi_with_indexed_in_function = r#"[ + { + "type": "function", + "name": "testFunction", + "inputs": [{"name": "x", "type": "uint256", "indexed": true}], + "outputs": [{"name": "y", "type": "address", "indexed": false}], + "stateMutability": "view" + }, + { + "type": "event", + "name": "TestEvent", + "anonymous": false, + "inputs": [{"name": "from", "type": "address", "indexed": true}] + } + ]"#; + + let normalized = normalize_abi_json(abi_with_indexed_in_function.as_bytes()).unwrap(); + let result: serde_json::Value = serde_json::from_slice(&normalized).unwrap(); + + if let Some(array) = result.as_array() { + assert!(array[0]["inputs"][0].get("indexed").is_none()); + assert!(array[0]["outputs"][0].get("indexed").is_none()); + assert_eq!(array[1]["inputs"][0]["indexed"], true); + } else { + panic!("Expected JSON array"); + } + + let json_abi: abi::JsonAbi = serde_json::from_slice(&normalized).unwrap(); + assert_eq!(json_abi.len(), 2); + } + // Helper function to create consistent test ABI fn create_test_mapping_abi() -> AbiJson { const ABI_JSON: &str = r#"[ diff --git a/graph/src/ipfs/server_address.rs b/graph/src/ipfs/server_address.rs index c7c8bc109f6..556997406ef 100644 --- a/graph/src/ipfs/server_address.rs +++ b/graph/src/ipfs/server_address.rs @@ -119,7 +119,7 @@ mod tests { assert_eq!( err.to_string(), - "'https://' is not a valid IPFS server address: invalid format", + "'https://' is not a valid IPFS server address: empty string", ); } diff --git a/store/postgres/src/transaction_receipt.rs b/store/postgres/src/transaction_receipt.rs index 73b11c9c400..1177422f42b 100644 --- a/store/postgres/src/transaction_receipt.rs +++ b/store/postgres/src/transaction_receipt.rs @@ -45,18 +45,11 @@ impl TryFrom for LightTransactionReceipt { let block_number = block_number.map(u64::from_be_bytes); let gas_used = gas_used.map(u64::from_be_bytes).unwrap_or(0); - // Handle both old U64 format and new boolean format + // Status is non-zero for success, zero for failure. Works for any byte length. + // Defaults to true for pre-Byzantium receipts (no status field), consistent with alloy. let status = status - .map(|bytes| { - match bytes.len() { - 1 => bytes[0] != 0, // New format: single byte - 8 => { - u64::from_be_bytes(drain_vector::<8>(bytes.to_vec()).unwrap_or([0; 8])) != 0 - } // Old format: U64 - _ => false, // Fallback - } - }) - .unwrap_or(false); + .map(|bytes| bytes.iter().any(|&b| b != 0)) + .unwrap_or(true); Ok(LightTransactionReceipt { transaction_hash: transaction_hash.into(),