Skip to content

Commit a3b03a3

Browse files
committed
chain/ethereum: Refactor load_block_ptrs_by_numbers
1 parent f36aa1a commit a3b03a3

File tree

1 file changed

+78
-74
lines changed

1 file changed

+78
-74
lines changed

chain/ethereum/src/chain.rs

Lines changed: 78 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use graph::prelude::{
1717
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1818
};
1919
use graph::schema::InputSchema;
20-
use graph::slog::{debug, error, trace, warn};
20+
use graph::slog::{debug, error, trace};
2121
use graph::substreams::Clock;
2222
use graph::{
2323
blockchain::{
@@ -40,6 +40,7 @@ use graph::{
4040
};
4141
use prost::Message;
4242
use std::collections::{HashMap, HashSet};
43+
use std::future::Future;
4344
use std::iter::FromIterator;
4445
use std::sync::Arc;
4546
use std::time::Duration;
@@ -762,16 +763,10 @@ async fn fetch_unique_blocks_from_cache(
762763
// Collect blocks and filter out ones with multiple entries
763764
let blocks: Vec<Arc<ExtendedBlockPtr>> = blocks_map
764765
.into_iter()
765-
.filter_map(|(number, values)| {
766+
.filter_map(|(_, values)| {
766767
if values.len() == 1 {
767768
Some(Arc::new(values[0].clone()))
768769
} else {
769-
warn!(
770-
logger,
771-
"Expected one block for block number {:?}, found {}",
772-
number,
773-
values.len()
774-
);
775770
None
776771
}
777772
})
@@ -824,87 +819,96 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
824819
logger: Logger,
825820
block_numbers: HashSet<BlockNumber>,
826821
) -> Result<Vec<BlockFinality>> {
827-
let blocks = match &*self.chain_client {
822+
// Common function to handle block loading, regardless of source
823+
async fn load_blocks<F, Fut>(
824+
logger: &Logger,
825+
chain_store: Arc<dyn ChainStore>,
826+
block_numbers: HashSet<BlockNumber>,
827+
fetch_missing: F,
828+
) -> Result<Vec<BlockFinality>>
829+
where
830+
F: FnOnce(Vec<BlockNumber>) -> Fut,
831+
Fut: Future<Output = Result<Vec<Arc<BlockPtrExt>>>>,
832+
{
833+
// Fetch cached blocks and identify missing ones
834+
let (mut cached_blocks, missing_block_numbers) =
835+
fetch_unique_blocks_from_cache(logger, chain_store, block_numbers).await;
836+
837+
// Fetch missing blocks if any
838+
if !missing_block_numbers.is_empty() {
839+
let missing_blocks = fetch_missing(missing_block_numbers).await?;
840+
cached_blocks.extend(missing_blocks);
841+
cached_blocks.sort_by_key(|block| block.number);
842+
}
843+
844+
// Convert to BlockFinality
845+
Ok(cached_blocks.into_iter().map(BlockFinality::Ptr).collect())
846+
}
847+
848+
match &*self.chain_client {
828849
ChainClient::Firehose(endpoints, _) => {
829850
trace!(
830-
logger,
831-
"Loading blocks from firehose";
832-
"block_numbers" => format!("{:?}", block_numbers)
851+
logger,
852+
"Loading blocks from firehose";
853+
"block_numbers" => format!("{:?}", block_numbers)
833854
);
855+
834856
let endpoint = endpoints.endpoint().await?;
835857
let chain_store = self.chain_store.clone();
836-
837-
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
838-
// that we can fetch the right block from the RPC.
839-
let (mut cached_blocks, missing_block_numbers) =
840-
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
841-
842-
// Then fetch missing blocks from RPC
843-
if !missing_block_numbers.is_empty() {
844-
let missing_blocks = endpoint
845-
.load_blocks_by_numbers::<codec::Block>(
846-
missing_block_numbers.iter().map(|&n| n as u64).collect(),
847-
&logger,
848-
)
849-
.await?
850-
.into_iter()
851-
.map(|block| {
852-
let block: ExtendedBlockPtr = ExtendedBlockPtr {
853-
hash: block.hash(),
854-
number: block.number(),
855-
parent_hash: block.parent_hash().unwrap_or_default(),
856-
timestamp: block.timestamp(),
857-
};
858-
Arc::new(block)
859-
})
860-
.collect::<Vec<_>>();
861-
862-
// Combine cached and newly fetched blocks
863-
cached_blocks.extend(missing_blocks);
864-
cached_blocks.sort_by_key(|block| block.number);
865-
}
866-
867-
// Convert to BlockFinality
868-
let blocks: Vec<BlockFinality> =
869-
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
870-
871-
blocks
858+
let logger_clone = logger.clone();
859+
860+
load_blocks(
861+
&logger,
862+
chain_store,
863+
block_numbers,
864+
|missing_numbers| async move {
865+
let blocks = endpoint
866+
.load_blocks_by_numbers::<codec::Block>(
867+
missing_numbers.iter().map(|&n| n as u64).collect(),
868+
&logger_clone,
869+
)
870+
.await?
871+
.into_iter()
872+
.map(|block| {
873+
Arc::new(BlockPtrExt {
874+
hash: block.hash(),
875+
number: block.number(),
876+
parent_hash: block.parent_hash().unwrap_or_default(),
877+
timestamp: block.timestamp(),
878+
})
879+
})
880+
.collect::<Vec<_>>();
881+
Ok(blocks)
882+
},
883+
)
884+
.await
872885
}
886+
873887
ChainClient::Rpc(client) => {
874888
trace!(
875889
logger,
876890
"Loading blocks from RPC";
877891
"block_numbers" => format!("{:?}", block_numbers)
878892
);
893+
879894
let adapter = client.cheapest_with(&self.capabilities).await?;
880895
let chain_store = self.chain_store.clone();
881-
882-
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
883-
// that we can fetch the right block from the RPC.
884-
let (mut cached_blocks, missing_block_numbers) =
885-
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
886-
887-
// Then fetch missing blocks from RPC
888-
if !missing_block_numbers.is_empty() {
889-
let missing_blocks: Vec<Arc<ExtendedBlockPtr>> = adapter
890-
.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_block_numbers)
891-
.try_collect()
892-
.await?;
893-
894-
// Combine cached and newly fetched blocks
895-
cached_blocks.extend(missing_blocks);
896-
cached_blocks.sort_by_key(|block| block.number);
897-
}
898-
899-
// Convert to BlockFinality
900-
let blocks: Vec<BlockFinality> =
901-
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
902-
903-
blocks
896+
let logger_clone = logger.clone();
897+
898+
load_blocks(
899+
&logger,
900+
chain_store,
901+
block_numbers,
902+
|missing_numbers| async move {
903+
adapter
904+
.load_block_ptrs_by_numbers_rpc(logger_clone, missing_numbers)
905+
.try_collect()
906+
.await
907+
},
908+
)
909+
.await
904910
}
905-
};
906-
907-
Ok(blocks)
911+
}
908912
}
909913

910914
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {

0 commit comments

Comments
 (0)