Skip to content

Commit cf5a566

Browse files
committed
add Firehose/RPC fallback for stale block cache in ancestor_block
1 parent 0555087 commit cf5a566

File tree

3 files changed

+124
-74
lines changed

3 files changed

+124
-74
lines changed

chain/ethereum/src/chain.rs

Lines changed: 107 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use graph::blockchain::{
1010
use graph::components::network_provider::ChainName;
1111
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1212
use graph::data::subgraph::UnifiedMappingApiVersion;
13-
use graph::firehose::{FirehoseEndpoint, ForkStep};
13+
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep};
1414
use graph::futures03::TryStreamExt;
1515
use graph::prelude::{
1616
retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
@@ -1037,32 +1037,62 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10371037
root: Option<BlockHash>,
10381038
) -> Result<Option<BlockFinality>, Error> {
10391039
let ptr_for_log = ptr.clone();
1040-
let block: Option<EthereumBlock> = self
1040+
let cached = self
10411041
.chain_store
10421042
.cheap_clone()
10431043
.ancestor_block(ptr, offset, root)
1044-
.await?
1045-
.map(|(json_value, block_ptr)| {
1046-
json::from_value(json_value.clone()).map_err(|e| {
1047-
warn!(
1048-
self.logger,
1049-
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
1050-
This may indicate stale cache data from a previous version.",
1051-
block_ptr.hash_hex(),
1052-
offset,
1053-
ptr_for_log.hash_hex(),
1054-
e
1055-
);
1056-
e
1057-
})
1058-
})
1059-
.transpose()?;
1060-
Ok(block.map(|block| {
1061-
BlockFinality::NonFinal(EthereumBlockWithCalls {
1044+
.await?;
1045+
1046+
let Some((json_value, block_ptr)) = cached else {
1047+
return Ok(None);
1048+
};
1049+
1050+
match json::from_value::<EthereumBlock>(json_value.clone()) {
1051+
Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
10621052
ethereum_block: block,
10631053
calls: None,
1064-
})
1065-
}))
1054+
}))),
1055+
Err(e) => {
1056+
warn!(
1057+
self.logger,
1058+
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
1059+
This may indicate stale cache data from a previous version. \
1060+
Falling back to Firehose/RPC.",
1061+
block_ptr.hash_hex(),
1062+
offset,
1063+
ptr_for_log.hash_hex(),
1064+
e
1065+
);
1066+
1067+
match self.chain_client.as_ref() {
1068+
ChainClient::Firehose(endpoints) => {
1069+
let block = self
1070+
.fetch_block_with_firehose(endpoints, &block_ptr)
1071+
.await?;
1072+
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
1073+
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
1074+
}
1075+
ChainClient::Rpc(adapters) => {
1076+
match self
1077+
.fetch_light_block_with_rpc(adapters, &block_ptr)
1078+
.await?
1079+
{
1080+
Some(light_block) => {
1081+
let ethereum_block = EthereumBlock {
1082+
block: light_block,
1083+
transaction_receipts: vec![],
1084+
};
1085+
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1086+
ethereum_block,
1087+
calls: None,
1088+
})))
1089+
}
1090+
None => Ok(None),
1091+
}
1092+
}
1093+
}
1094+
}
1095+
}
10661096
}
10671097

10681098
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
@@ -1093,52 +1123,70 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10931123
}
10941124

10951125
// If not in store, fetch from Firehose
1096-
let endpoint = endpoints.endpoint().await?;
1097-
let logger = self.logger.clone();
1098-
let retry_log_message =
1099-
format!("get_block_by_ptr for block {} with firehose", block);
1100-
let block = block.clone();
1101-
1102-
retry(retry_log_message, &logger)
1103-
.limit(ENV_VARS.request_retries)
1104-
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
1105-
.run(move || {
1106-
let endpoint = endpoint.cheap_clone();
1107-
let logger = logger.cheap_clone();
1108-
let block = block.clone();
1109-
async move {
1110-
endpoint
1111-
.get_block_by_ptr::<codec::Block>(&block, &logger)
1112-
.await
1113-
.context(format!(
1114-
"Failed to fetch block by ptr {} from firehose",
1115-
block
1116-
))
1117-
}
1118-
})
1126+
self.fetch_block_with_firehose(endpoints, block)
11191127
.await?
11201128
.parent_ptr()
11211129
}
1122-
ChainClient::Rpc(adapters) => {
1123-
let blocks = adapters
1124-
.cheapest_with(&self.capabilities)
1125-
.await?
1126-
.load_blocks(
1127-
self.logger.cheap_clone(),
1128-
self.chain_store.cheap_clone(),
1129-
HashSet::from_iter(Some(block.hash.as_b256())),
1130-
)
1131-
.await?;
1132-
assert_eq!(blocks.len(), 1);
1133-
1134-
blocks[0].parent_ptr()
1135-
}
1130+
ChainClient::Rpc(adapters) => self
1131+
.fetch_light_block_with_rpc(adapters, block)
1132+
.await?
1133+
.expect("block must exist for parent_ptr")
1134+
.parent_ptr(),
11361135
};
11371136

11381137
Ok(block)
11391138
}
11401139
}
11411140

1141+
impl TriggersAdapter {
1142+
async fn fetch_block_with_firehose(
1143+
&self,
1144+
endpoints: &FirehoseEndpoints,
1145+
block_ptr: &BlockPtr,
1146+
) -> Result<codec::Block, Error> {
1147+
let endpoint = endpoints.endpoint().await?;
1148+
let logger = self.logger.clone();
1149+
let retry_log_message = format!("fetch_block_with_firehose {}", block_ptr);
1150+
let block_ptr = block_ptr.clone();
1151+
1152+
let block = retry(retry_log_message, &logger)
1153+
.limit(ENV_VARS.request_retries)
1154+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
1155+
.run(move || {
1156+
let endpoint = endpoint.cheap_clone();
1157+
let logger = logger.cheap_clone();
1158+
let block_ptr = block_ptr.clone();
1159+
async move {
1160+
endpoint
1161+
.get_block_by_ptr::<codec::Block>(&block_ptr, &logger)
1162+
.await
1163+
.context(format!("Failed to fetch block {} from firehose", block_ptr))
1164+
}
1165+
})
1166+
.await?;
1167+
1168+
Ok(block)
1169+
}
1170+
1171+
async fn fetch_light_block_with_rpc(
1172+
&self,
1173+
adapters: &EthereumNetworkAdapters,
1174+
block_ptr: &BlockPtr,
1175+
) -> Result<Option<Arc<LightEthereumBlock>>, Error> {
1176+
let blocks = adapters
1177+
.cheapest_with(&self.capabilities)
1178+
.await?
1179+
.load_blocks(
1180+
self.logger.cheap_clone(),
1181+
self.chain_store.cheap_clone(),
1182+
HashSet::from_iter(Some(block_ptr.hash.as_b256())),
1183+
)
1184+
.await?;
1185+
1186+
Ok(blocks.into_iter().next())
1187+
}
1188+
}
1189+
11421190
pub struct FirehoseMapper {
11431191
adapter: Arc<dyn TriggersAdapterTrait<Chain>>,
11441192
filter: Arc<TriggerFilter>,

chain/ethereum/src/codec.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -268,19 +268,21 @@ impl<'a> TryInto<Transaction<AnyTxEnvelope>> for TransactionTraceAt<'a> {
268268
.trace
269269
.access_list
270270
.iter()
271-
.map(|access_tuple| {
272-
let address = Address::from_slice(&access_tuple.address);
271+
.map(|access_tuple| -> Result<_, Error> {
272+
let address = access_tuple
273+
.address
274+
.try_decode_proto("access tuple address")?;
273275
let storage_keys = access_tuple
274276
.storage_keys
275277
.iter()
276-
.map(|key| B256::from_slice(key))
277-
.collect();
278-
AccessListItem {
278+
.map(|key| key.try_decode_proto("storage key"))
279+
.collect::<Result<Vec<_>, _>>()?;
280+
Ok(AccessListItem {
279281
address,
280282
storage_keys,
281-
}
283+
})
282284
})
283-
.collect::<Vec<_>>()
285+
.collect::<Result<Vec<_>, Error>>()?
284286
.into();
285287

286288
// Extract actual signature components from trace
@@ -359,8 +361,8 @@ impl<'a> TryInto<Transaction<AnyTxEnvelope>> for TransactionTraceAt<'a> {
359361
.trace
360362
.blob_hashes
361363
.iter()
362-
.map(|hash| B256::from_slice(hash))
363-
.collect();
364+
.map(|hash| hash.try_decode_proto("blob hash"))
365+
.collect::<Result<Vec<_>, _>>()?;
364366

365367
let max_fee_per_blob_gas_u128 =
366368
self.trace.blob_gas_fee_cap.as_ref().map_or(0u128, |x| {
@@ -401,22 +403,22 @@ impl<'a> TryInto<Transaction<AnyTxEnvelope>> for TransactionTraceAt<'a> {
401403
.trace
402404
.set_code_authorizations
403405
.iter()
404-
.map(|auth| {
406+
.map(|auth| -> Result<_, Error> {
405407
let inner = alloy::eips::eip7702::Authorization {
406408
chain_id: U256::from_be_slice(&auth.chain_id),
407-
address: Address::from_slice(&auth.address),
409+
address: auth.address.try_decode_proto("authorization address")?,
408410
nonce: auth.nonce,
409411
};
410412

411413
let r = U256::from_be_slice(&auth.r);
412414
let s = U256::from_be_slice(&auth.s);
413415
let y_parity = auth.v as u8;
414416

415-
alloy::eips::eip7702::SignedAuthorization::new_unchecked(
417+
Ok(alloy::eips::eip7702::SignedAuthorization::new_unchecked(
416418
inner, y_parity, r, s,
417-
)
419+
))
418420
})
419-
.collect();
421+
.collect::<Result<Vec<_>, Error>>()?;
420422

421423
let tx = TxEip7702 {
422424
// Firehose protobuf doesn't provide chain_id for transactions.

graph/src/ipfs/server_address.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ mod tests {
119119

120120
assert_eq!(
121121
err.to_string(),
122-
"'https://' is not a valid IPFS server address: invalid format",
122+
"'https://' is not a valid IPFS server address: empty string",
123123
);
124124
}
125125

0 commit comments

Comments
 (0)