Skip to content
173 changes: 123 additions & 50 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use graph::blockchain::{
use graph::components::network_provider::ChainName;
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep};
use graph::futures03::TryStreamExt;
use graph::prelude::{
retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
Expand Down Expand Up @@ -1036,20 +1036,63 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockFinality>, Error> {
let block: Option<EthereumBlock> = self
let ptr_for_log = ptr.clone();
let cached = self
.chain_store
.cheap_clone()
.ancestor_block(ptr, offset, root)
.await?
.map(|x| x.0)
.map(json::from_value)
.transpose()?;
Ok(block.map(|block| {
BlockFinality::NonFinal(EthereumBlockWithCalls {
.await?;

let Some((json_value, block_ptr)) = cached else {
return Ok(None);
};

match json::from_value::<EthereumBlock>(json_value.clone()) {
Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block: block,
calls: None,
})
}))
}))),
Err(e) => {
warn!(
self.logger,
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
This may indicate stale cache data from a previous version. \
Falling back to Firehose/RPC.",
block_ptr.hash_hex(),
offset,
ptr_for_log.hash_hex(),
e
);

match self.chain_client.as_ref() {
ChainClient::Firehose(endpoints) => {
let block = self
.fetch_block_with_firehose(endpoints, &block_ptr)
.await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
}
ChainClient::Rpc(adapters) => {
match self
.fetch_light_block_with_rpc(adapters, &block_ptr)
.await?
{
Some(light_block) => {
let ethereum_block = EthereumBlock {
block: light_block,
transaction_receipts: vec![],
};
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block,
calls: None,
})))
}
None => Ok(None),
}
}
}
}
}
}

async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
Expand All @@ -1060,60 +1103,90 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
let chain_store = self.chain_store.cheap_clone();
// First try to get the block from the store
if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await {
if let Some(block) = blocks.first() {
if let Ok(block) = json::from_value::<LightEthereumBlock>(block.clone()) {
return Ok(block.parent_ptr());
if let Some(cached_json) = blocks.first() {
match json::from_value::<LightEthereumBlock>(cached_json.clone()) {
Ok(block) => {
return Ok(block.parent_ptr());
}
Err(e) => {
warn!(
self.logger,
"Failed to deserialize cached block {}: {}. \
This may indicate stale cache data from a previous version. \
Falling back to Firehose.",
block.hash_hex(),
e
);
}
}
}
}

// If not in store, fetch from Firehose
let endpoint = endpoints.endpoint().await?;
let logger = self.logger.clone();
let retry_log_message =
format!("get_block_by_ptr for block {} with firehose", block);
let block = block.clone();

retry(retry_log_message, &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let endpoint = endpoint.cheap_clone();
let logger = logger.cheap_clone();
let block = block.clone();
async move {
endpoint
.get_block_by_ptr::<codec::Block>(&block, &logger)
.await
.context(format!(
"Failed to fetch block by ptr {} from firehose",
block
))
}
})
self.fetch_block_with_firehose(endpoints, block)
.await?
.parent_ptr()
}
ChainClient::Rpc(adapters) => {
let blocks = adapters
.cheapest_with(&self.capabilities)
.await?
.load_blocks(
self.logger.cheap_clone(),
self.chain_store.cheap_clone(),
HashSet::from_iter(Some(block.hash.as_b256())),
)
.await?;
assert_eq!(blocks.len(), 1);

blocks[0].parent_ptr()
}
ChainClient::Rpc(adapters) => self
.fetch_light_block_with_rpc(adapters, block)
.await?
.expect("block must exist for parent_ptr")
.parent_ptr(),
};

Ok(block)
}
}

impl TriggersAdapter {
async fn fetch_block_with_firehose(
&self,
endpoints: &FirehoseEndpoints,
block_ptr: &BlockPtr,
) -> Result<codec::Block, Error> {
let endpoint = endpoints.endpoint().await?;
let logger = self.logger.clone();
let retry_log_message = format!("fetch_block_with_firehose {}", block_ptr);
let block_ptr = block_ptr.clone();

let block = retry(retry_log_message, &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let endpoint = endpoint.cheap_clone();
let logger = logger.cheap_clone();
let block_ptr = block_ptr.clone();
async move {
endpoint
.get_block_by_ptr::<codec::Block>(&block_ptr, &logger)
.await
.context(format!("Failed to fetch block {} from firehose", block_ptr))
}
})
.await?;

Ok(block)
}

async fn fetch_light_block_with_rpc(
&self,
adapters: &EthereumNetworkAdapters,
block_ptr: &BlockPtr,
) -> Result<Option<Arc<LightEthereumBlock>>, Error> {
let blocks = adapters
.cheapest_with(&self.capabilities)
.await?
.load_blocks(
self.logger.cheap_clone(),
self.chain_store.cheap_clone(),
HashSet::from_iter(Some(block_ptr.hash.as_b256())),
)
.await?;

Ok(blocks.into_iter().next())
}
}

pub struct FirehoseMapper {
adapter: Arc<dyn TriggersAdapterTrait<Chain>>,
filter: Arc<TriggerFilter>,
Expand Down
30 changes: 16 additions & 14 deletions chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,21 @@ impl<'a> TryInto<Transaction<AnyTxEnvelope>> for TransactionTraceAt<'a> {
.trace
.access_list
.iter()
.map(|access_tuple| {
let address = Address::from_slice(&access_tuple.address);
.map(|access_tuple| -> Result<_, Error> {
let address = access_tuple
.address
.try_decode_proto("access tuple address")?;
let storage_keys = access_tuple
.storage_keys
.iter()
.map(|key| B256::from_slice(key))
.collect();
AccessListItem {
.map(|key| key.try_decode_proto("storage key"))
.collect::<Result<Vec<_>, _>>()?;
Ok(AccessListItem {
address,
storage_keys,
}
})
})
.collect::<Vec<_>>()
.collect::<Result<Vec<_>, Error>>()?
.into();

// Extract actual signature components from trace
Expand Down Expand Up @@ -359,8 +361,8 @@ impl<'a> TryInto<Transaction<AnyTxEnvelope>> for TransactionTraceAt<'a> {
.trace
.blob_hashes
.iter()
.map(|hash| B256::from_slice(hash))
.collect();
.map(|hash| hash.try_decode_proto("blob hash"))
.collect::<Result<Vec<_>, _>>()?;

let max_fee_per_blob_gas_u128 =
self.trace.blob_gas_fee_cap.as_ref().map_or(0u128, |x| {
Expand Down Expand Up @@ -401,22 +403,22 @@ impl<'a> TryInto<Transaction<AnyTxEnvelope>> for TransactionTraceAt<'a> {
.trace
.set_code_authorizations
.iter()
.map(|auth| {
.map(|auth| -> Result<_, Error> {
let inner = alloy::eips::eip7702::Authorization {
chain_id: U256::from_be_slice(&auth.chain_id),
address: Address::from_slice(&auth.address),
address: auth.address.try_decode_proto("authorization address")?,
nonce: auth.nonce,
};

let r = U256::from_be_slice(&auth.r);
let s = U256::from_be_slice(&auth.s);
let y_parity = auth.v as u8;

alloy::eips::eip7702::SignedAuthorization::new_unchecked(
Ok(alloy::eips::eip7702::SignedAuthorization::new_unchecked(
inner, y_parity, r, s,
)
))
})
.collect();
.collect::<Result<Vec<_>, Error>>()?;

let tx = TxEip7702 {
// Firehose protobuf doesn't provide chain_id for transactions.
Expand Down
14 changes: 13 additions & 1 deletion chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,19 @@ impl EthereumAdapterTrait for EthereumAdapter {
.map_err(|e| error!(&logger, "Error accessing block cache {}", e))
.unwrap_or_default()
.into_iter()
.filter_map(|value| json::from_value(value).ok())
.filter_map(|value| {
json::from_value(value.clone())
.map_err(|e| {
warn!(
&logger,
"Failed to deserialize cached block: {}. \
This may indicate stale cache data from a previous version. \
Block will be re-fetched from RPC.",
e
);
})
.ok()
})
.map(|b| Arc::new(LightEthereumBlock::new(b)))
.collect();

Expand Down
24 changes: 12 additions & 12 deletions chain/ethereum/src/runtime/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use graph::abi;
use graph::prelude::alloy;
use graph::prelude::alloy::consensus::TxReceipt;
use graph::prelude::alloy::network::ReceiptResponse;
use graph::prelude::alloy::rpc::types::{Log, TransactionReceipt};
use graph::prelude::alloy::serde::WithOtherFields;
Expand Down Expand Up @@ -508,10 +509,7 @@ impl<'a> ToAscObj<AscEthereumTransaction_0_0_1> for EthereumTransactionData<'a>
hash: asc_new(heap, &self.hash(), gas).await?,
index: asc_new(heap, &BigInt::from(self.index()), gas).await?,
from: asc_new(heap, &self.from(), gas).await?,
to: match self.to() {
Some(to) => asc_new(heap, &to, gas).await?,
None => AscPtr::null(),
},
to: asc_new_or_null(heap, &self.to(), gas).await?,
value: asc_new(heap, &BigInt::from_unsigned_u256(&self.value()), gas).await?,
gas_limit: asc_new(heap, &BigInt::from(self.gas_limit()), gas).await?,
gas_price: asc_new(heap, &BigInt::from(self.gas_price()), gas).await?,
Expand All @@ -530,10 +528,7 @@ impl<'a> ToAscObj<AscEthereumTransaction_0_0_2> for EthereumTransactionData<'a>
hash: asc_new(heap, &self.hash(), gas).await?,
index: asc_new(heap, &BigInt::from(self.index()), gas).await?,
from: asc_new(heap, &self.from(), gas).await?,
to: match self.to() {
Some(to) => asc_new(heap, &to, gas).await?,
None => AscPtr::null(),
},
to: asc_new_or_null(heap, &self.to(), gas).await?,
value: asc_new(heap, &BigInt::from_unsigned_u256(&self.value()), gas).await?,
gas_limit: asc_new(heap, &BigInt::from(self.gas_limit()), gas).await?,
gas_price: asc_new(heap, &BigInt::from(self.gas_price()), gas).await?,
Expand Down Expand Up @@ -675,8 +670,8 @@ impl ToAscObj<AscEthereumLog> for Log {
transaction_hash: asc_new_or_null(heap, &self.transaction_hash, gas).await?,
transaction_index: asc_new_or_null_u64(heap, &self.transaction_index, gas).await?,
log_index: asc_new_or_null_u64(heap, &self.log_index, gas).await?,
transaction_log_index: AscPtr::null(), // TODO(alloy): figure out how to get transaction log index
log_type: AscPtr::null(), // TODO(alloy): figure out how to get log type
transaction_log_index: AscPtr::null(), // Non-standard field, not available in alloy
log_type: AscPtr::null(), // Non-standard field, not available in alloy
removed: asc_new(
heap,
&AscWrapped {
Expand All @@ -703,16 +698,21 @@ impl ToAscObj<AscEthereumTransactionReceipt>
.ok_or(HostExportError::Unknown(anyhow!(
"Transaction index is missing"
)))?;
let status = match self.inner.status_or_post_state().as_eip658() {
Some(success) => asc_new(heap, &BigInt::from(success as u64), gas).await?,
None => AscPtr::null(), // Pre-EIP-658 (pre-Byzantium) receipt
};
Ok(AscEthereumTransactionReceipt {
transaction_hash: asc_new(heap, &self.transaction_hash, gas).await?,
transaction_index: asc_new(heap, &BigInt::from(transaction_index), gas).await?,
block_hash: asc_new_or_null(heap, &self.block_hash, gas).await?,
block_number: asc_new_or_null_u64(heap, &self.block_number, gas).await?,
cumulative_gas_used: asc_new(heap, &BigInt::from(self.gas_used), gas).await?,
cumulative_gas_used: asc_new(heap, &BigInt::from(self.cumulative_gas_used()), gas)
.await?,
gas_used: asc_new(heap, &BigInt::from(self.gas_used), gas).await?,
contract_address: asc_new_or_null(heap, &self.contract_address, gas).await?,
logs: asc_new(heap, &self.logs(), gas).await?,
status: asc_new(heap, &BigInt::from(self.status() as u64), gas).await?,
status,
root: asc_new_or_null(heap, &self.state_root(), gas).await?,
logs_bloom: asc_new(heap, self.inner.bloom().as_slice(), gas).await?,
})
Expand Down
Loading
Loading