Skip to content

Commit b0d573d

Browse files
committed
chain/ethereum: Refactor load_block_ptrs_by_numbers
1 parent 0d27fa6 commit b0d573d

File tree

1 file changed

+78
-74
lines changed

1 file changed

+78
-74
lines changed

chain/ethereum/src/chain.rs

Lines changed: 78 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use graph::prelude::{
1717
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1818
};
1919
use graph::schema::InputSchema;
20-
use graph::slog::{debug, error, trace, warn};
20+
use graph::slog::{debug, error, trace};
2121
use graph::substreams::Clock;
2222
use graph::{
2323
blockchain::{
@@ -40,6 +40,7 @@ use graph::{
4040
};
4141
use prost::Message;
4242
use std::collections::{HashMap, HashSet};
43+
use std::future::Future;
4344
use std::iter::FromIterator;
4445
use std::sync::Arc;
4546
use std::time::Duration;
@@ -763,16 +764,10 @@ async fn fetch_unique_blocks_from_cache(
763764
// Collect blocks and filter out ones with multiple entries
764765
let blocks: Vec<Arc<BlockPtrExt>> = blocks_map
765766
.into_iter()
766-
.filter_map(|(number, values)| {
767+
.filter_map(|(_, values)| {
767768
if values.len() == 1 {
768769
Some(Arc::new(values[0].clone()))
769770
} else {
770-
warn!(
771-
logger,
772-
"Expected one block for block number {:?}, found {}",
773-
number,
774-
values.len()
775-
);
776771
None
777772
}
778773
})
@@ -825,87 +820,96 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
825820
logger: Logger,
826821
block_numbers: HashSet<BlockNumber>,
827822
) -> Result<Vec<BlockFinality>> {
828-
let blocks = match &*self.chain_client {
823+
// Common function to handle block loading, regardless of source
824+
async fn load_blocks<F, Fut>(
825+
logger: &Logger,
826+
chain_store: Arc<dyn ChainStore>,
827+
block_numbers: HashSet<BlockNumber>,
828+
fetch_missing: F,
829+
) -> Result<Vec<BlockFinality>>
830+
where
831+
F: FnOnce(Vec<BlockNumber>) -> Fut,
832+
Fut: Future<Output = Result<Vec<Arc<BlockPtrExt>>>>,
833+
{
834+
// Fetch cached blocks and identify missing ones
835+
let (mut cached_blocks, missing_block_numbers) =
836+
fetch_unique_blocks_from_cache(logger, chain_store, block_numbers).await;
837+
838+
// Fetch missing blocks if any
839+
if !missing_block_numbers.is_empty() {
840+
let missing_blocks = fetch_missing(missing_block_numbers).await?;
841+
cached_blocks.extend(missing_blocks);
842+
cached_blocks.sort_by_key(|block| block.number);
843+
}
844+
845+
// Convert to BlockFinality
846+
Ok(cached_blocks.into_iter().map(BlockFinality::Ptr).collect())
847+
}
848+
849+
match &*self.chain_client {
829850
ChainClient::Firehose(endpoints, _) => {
830851
trace!(
831-
logger,
832-
"Loading blocks from firehose";
833-
"block_numbers" => format!("{:?}", block_numbers)
852+
logger,
853+
"Loading blocks from firehose";
854+
"block_numbers" => format!("{:?}", block_numbers)
834855
);
856+
835857
let endpoint = endpoints.endpoint().await?;
836858
let chain_store = self.chain_store.clone();
837-
838-
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
839-
// that we can fetch the right block from the RPC.
840-
let (mut cached_blocks, missing_block_numbers) =
841-
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
842-
843-
// Then fetch missing blocks from RPC
844-
if !missing_block_numbers.is_empty() {
845-
let missing_blocks = endpoint
846-
.load_blocks_by_numbers::<codec::Block>(
847-
missing_block_numbers.iter().map(|&n| n as u64).collect(),
848-
&logger,
849-
)
850-
.await?
851-
.into_iter()
852-
.map(|block| {
853-
let block: BlockPtrExt = BlockPtrExt {
854-
hash: block.hash(),
855-
number: block.number(),
856-
parent_hash: block.parent_hash().unwrap_or_default(),
857-
timestamp: block.timestamp(),
858-
};
859-
Arc::new(block)
860-
})
861-
.collect::<Vec<_>>();
862-
863-
// Combine cached and newly fetched blocks
864-
cached_blocks.extend(missing_blocks);
865-
cached_blocks.sort_by_key(|block| block.number);
866-
}
867-
868-
// Convert to BlockFinality
869-
let blocks: Vec<BlockFinality> =
870-
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
871-
872-
blocks
859+
let logger_clone = logger.clone();
860+
861+
load_blocks(
862+
&logger,
863+
chain_store,
864+
block_numbers,
865+
|missing_numbers| async move {
866+
let blocks = endpoint
867+
.load_blocks_by_numbers::<codec::Block>(
868+
missing_numbers.iter().map(|&n| n as u64).collect(),
869+
&logger_clone,
870+
)
871+
.await?
872+
.into_iter()
873+
.map(|block| {
874+
Arc::new(BlockPtrExt {
875+
hash: block.hash(),
876+
number: block.number(),
877+
parent_hash: block.parent_hash().unwrap_or_default(),
878+
timestamp: block.timestamp(),
879+
})
880+
})
881+
.collect::<Vec<_>>();
882+
Ok(blocks)
883+
},
884+
)
885+
.await
873886
}
887+
874888
ChainClient::Rpc(client) => {
875889
trace!(
876890
logger,
877891
"Loading blocks from RPC";
878892
"block_numbers" => format!("{:?}", block_numbers)
879893
);
894+
880895
let adapter = client.cheapest_with(&self.capabilities).await?;
881896
let chain_store = self.chain_store.clone();
882-
883-
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
884-
// that we can fetch the right block from the RPC.
885-
let (mut cached_blocks, missing_block_numbers) =
886-
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
887-
888-
// Then fetch missing blocks from RPC
889-
if !missing_block_numbers.is_empty() {
890-
let missing_blocks: Vec<Arc<BlockPtrExt>> = adapter
891-
.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_block_numbers)
892-
.try_collect()
893-
.await?;
894-
895-
// Combine cached and newly fetched blocks
896-
cached_blocks.extend(missing_blocks);
897-
cached_blocks.sort_by_key(|block| block.number);
898-
}
899-
900-
// Convert to BlockFinality
901-
let blocks: Vec<BlockFinality> =
902-
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
903-
904-
blocks
897+
let logger_clone = logger.clone();
898+
899+
load_blocks(
900+
&logger,
901+
chain_store,
902+
block_numbers,
903+
|missing_numbers| async move {
904+
adapter
905+
.load_block_ptrs_by_numbers_rpc(logger_clone, missing_numbers)
906+
.try_collect()
907+
.await
908+
},
909+
)
910+
.await
905911
}
906-
};
907-
908-
Ok(blocks)
912+
}
909913
}
910914

911915
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {

0 commit comments

Comments
 (0)