Skip to content

Commit 52ed6a8

Browse files
committed
chain,graph: use firehose get_block to get block ptrs for subgraph triggers
1 parent 317ce7a commit 52ed6a8

File tree

5 files changed

+274
-132
lines changed

5 files changed

+274
-132
lines changed

chain/ethereum/src/adapter.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use anyhow::Error;
22
use ethabi::{Error as ABIError, Function, ParamType, Token};
3-
use graph::blockchain::BlockPtrExt;
43
use graph::blockchain::ChainIdentifier;
54
use graph::components::subgraph::MappingError;
65
use graph::data::store::ethereum::call;
@@ -1110,13 +1109,6 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11101109
block_hash: H256,
11111110
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;
11121111

1113-
async fn load_block_ptrs_by_numbers(
1114-
&self,
1115-
_logger: Logger,
1116-
_chain_store: Arc<dyn ChainStore>,
1117-
_block_numbers: HashSet<BlockNumber>,
1118-
) -> Box<dyn Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send>;
1119-
11201112
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
11211113
/// May use the `chain_store` as a cache.
11221114
async fn load_blocks(

chain/ethereum/src/chain.rs

Lines changed: 135 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ use graph::components::store::{DeploymentCursorTracker, WritableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::firehose::{FirehoseEndpoint, ForkStep};
1313
use graph::futures03::compat::Future01CompatExt;
14+
use graph::futures03::TryStreamExt;
1415
use graph::prelude::{
1516
BlockHash, ComponentLoggerConfig, DeploymentHash, ElasticComponentLoggerConfig, EthereumBlock,
1617
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1718
};
1819
use graph::schema::InputSchema;
20+
use graph::slog::{debug, error, warn};
1921
use graph::substreams::Clock;
2022
use graph::{
2123
blockchain::{
@@ -243,7 +245,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
243245
logger: &Logger,
244246
cursor: FirehoseCursor,
245247
) -> Result<BlockFinality, Error> {
246-
let endpoint = chain.chain_client().firehose_endpoint().await?;
248+
let endpoint: Arc<FirehoseEndpoint> = chain.chain_client().firehose_endpoint().await?;
247249
let block = endpoint.get_block::<codec::Block>(cursor, logger).await?;
248250
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
249251
Ok(BlockFinality::NonFinal(ethereum_block))
@@ -714,13 +716,17 @@ impl Block for BlockFinality {
714716
}
715717

716718
fn timestamp(&self) -> BlockTime {
717-
let ts = match self {
718-
BlockFinality::Final(block) => block.timestamp,
719-
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
719+
match self {
720+
BlockFinality::Final(block) => {
721+
let ts = i64::try_from(block.timestamp.as_u64()).unwrap();
722+
BlockTime::since_epoch(ts, 0)
723+
}
724+
BlockFinality::NonFinal(block) => {
725+
let ts = i64::try_from(block.ethereum_block.block.timestamp.as_u64()).unwrap();
726+
BlockTime::since_epoch(ts, 0)
727+
}
720728
BlockFinality::Ptr(block) => block.timestamp,
721-
};
722-
let ts = i64::try_from(ts.as_u64()).unwrap();
723-
BlockTime::since_epoch(ts, 0)
729+
}
724730
}
725731
}
726732

@@ -735,6 +741,61 @@ pub struct TriggersAdapter {
735741
unified_api_version: UnifiedMappingApiVersion,
736742
}
737743

744+
/// Fetches blocks from the cache based on block numbers, excluding duplicates
745+
/// (i.e., multiple blocks for the same number), and identifying missing blocks that
746+
/// need to be fetched via RPC/Firehose. Returns a tuple of the found blocks and the missing block numbers.
747+
async fn fetch_unique_blocks_from_cache(
748+
logger: &Logger,
749+
chain_store: Arc<dyn ChainStore>,
750+
block_numbers: HashSet<BlockNumber>,
751+
) -> (Vec<Arc<BlockPtrExt>>, Vec<i32>) {
752+
// Load blocks from the cache
753+
let blocks_map = chain_store
754+
.cheap_clone()
755+
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
756+
.await
757+
.map_err(|e| {
758+
error!(logger, "Error accessing block cache {}", e);
759+
e
760+
})
761+
.unwrap_or_default();
762+
763+
// Collect blocks and filter out ones with multiple entries
764+
let blocks: Vec<Arc<BlockPtrExt>> = blocks_map
765+
.into_iter()
766+
.filter_map(|(number, values)| {
767+
if values.len() == 1 {
768+
Some(Arc::new(values[0].clone()))
769+
} else {
770+
warn!(
771+
logger,
772+
"Expected one block for block number {:?}, found {}",
773+
number,
774+
values.len()
775+
);
776+
None
777+
}
778+
})
779+
.collect();
780+
781+
// Identify missing blocks
782+
let missing_blocks: Vec<i32> = block_numbers
783+
.into_iter()
784+
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
785+
.collect();
786+
787+
if !missing_blocks.is_empty() {
788+
debug!(
789+
logger,
790+
"Loading {} block(s) not in the block cache",
791+
missing_blocks.len()
792+
);
793+
debug!(logger, "Missing blocks {:?}", missing_blocks);
794+
}
795+
796+
(blocks, missing_blocks)
797+
}
798+
738799
#[async_trait]
739800
impl TriggersAdapterTrait<Chain> for TriggersAdapter {
740801
async fn scan_triggers(
@@ -764,21 +825,75 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
764825
logger: Logger,
765826
block_numbers: HashSet<BlockNumber>,
766827
) -> Result<Vec<BlockFinality>> {
767-
use graph::futures01::stream::Stream;
828+
let blocks = match &*self.chain_client {
829+
ChainClient::Firehose(endpoints, _) => {
830+
let endpoint = endpoints.endpoint().await?;
831+
let chain_store = self.chain_store.clone();
832+
833+
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
834+
// that we can fetch the right block from the RPC.
835+
let (mut cached_blocks, missing_block_numbers) =
836+
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
837+
838+
// Then fetch missing blocks from RPC
839+
if !missing_block_numbers.is_empty() {
840+
let missing_blocks = endpoint
841+
.load_blocks_by_numbers::<codec::Block>(
842+
missing_block_numbers.iter().map(|&n| n as u64).collect(),
843+
&logger,
844+
)
845+
.await?
846+
.into_iter()
847+
.map(|block| {
848+
let block: BlockPtrExt = BlockPtrExt {
849+
hash: block.hash(),
850+
number: block.number(),
851+
parent_hash: block.parent_hash().unwrap_or_default(),
852+
timestamp: block.timestamp(),
853+
};
854+
Arc::new(block)
855+
})
856+
.collect::<Vec<_>>();
857+
858+
// Combine cached and newly fetched blocks
859+
cached_blocks.extend(missing_blocks);
860+
cached_blocks.sort_by_key(|block| block.number);
861+
}
768862

769-
let adapter = self
770-
.chain_client
771-
.rpc()?
772-
.cheapest_with(&self.capabilities)
773-
.await?;
863+
// Convert to BlockFinality
864+
let blocks: Vec<BlockFinality> =
865+
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
774866

775-
let blocks = adapter
776-
.load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers)
777-
.await
778-
.map(|block| BlockFinality::Ptr(block))
779-
.collect()
780-
.compat()
781-
.await?;
867+
blocks
868+
}
869+
ChainClient::Rpc(client) => {
870+
let adapter = client.cheapest_with(&self.capabilities).await?;
871+
let chain_store = self.chain_store.clone();
872+
873+
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
874+
// that we can fetch the right block from the RPC.
875+
let (mut cached_blocks, missing_block_numbers) =
876+
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
877+
878+
// Then fetch missing blocks from RPC
879+
if !missing_block_numbers.is_empty() {
880+
let missing_blocks: Vec<Arc<BlockPtrExt>> = adapter
881+
.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_block_numbers)
882+
.try_collect()
883+
.await?;
884+
885+
// Combine cached and newly fetched blocks
886+
cached_blocks.extend(missing_blocks);
887+
cached_blocks.sort_by_key(|block| block.number);
888+
}
889+
890+
// Convert to BlockFinality
891+
let blocks: Vec<BlockFinality> =
892+
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
893+
894+
blocks
895+
}
896+
};
782897

783898
Ok(blocks)
784899
}

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 44 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -784,50 +784,59 @@ impl EthereumAdapter {
784784
}
785785

786786
/// Request blocks by number through JSON-RPC.
787-
fn load_block_ptrs_by_numbers_rpc(
787+
pub fn load_block_ptrs_by_numbers_rpc(
788788
&self,
789789
logger: Logger,
790790
numbers: Vec<BlockNumber>,
791-
) -> impl Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send {
791+
) -> impl futures03::Stream<Item = Result<Arc<BlockPtrExt>, Error>> + Send {
792792
let web3 = self.web3.clone();
793793

794-
stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
794+
futures03::stream::iter(numbers.into_iter().map(move |number| {
795795
let web3 = web3.clone();
796-
retry(format!("load block {}", number), &logger)
797-
.limit(ENV_VARS.request_retries)
798-
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
799-
.run(move || {
800-
Box::pin(
801-
web3.eth()
802-
.block(BlockId::Number(Web3BlockNumber::Number(number.into()))),
803-
)
804-
.compat()
805-
.from_err::<Error>()
806-
.and_then(move |block| {
807-
block
808-
.map(|block| {
809-
let ptr = BlockPtrExt::try_from((
810-
block.hash,
811-
block.number,
812-
block.parent_hash,
813-
block.timestamp,
814-
))
815-
.unwrap();
816-
817-
Arc::new(ptr)
818-
})
819-
.ok_or_else(|| {
820-
anyhow::anyhow!(
796+
let logger = logger.clone();
797+
798+
async move {
799+
retry(format!("load block {}", number), &logger)
800+
.limit(ENV_VARS.request_retries)
801+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
802+
.run(move || {
803+
let web3 = web3.clone();
804+
805+
async move {
806+
let block_result = web3
807+
.eth()
808+
.block(BlockId::Number(Web3BlockNumber::Number(number.into())))
809+
.await;
810+
811+
match block_result {
812+
Ok(Some(block)) => {
813+
let ptr = BlockPtrExt::try_from((
814+
block.hash,
815+
block.number,
816+
block.parent_hash,
817+
block.timestamp,
818+
))
819+
.map_err(|e| {
820+
anyhow::anyhow!("Failed to convert block: {}", e)
821+
})?;
822+
Ok(Arc::new(ptr))
823+
}
824+
Ok(None) => Err(anyhow::anyhow!(
821825
"Ethereum node did not find block with number {:?}",
822826
number
823-
)
824-
})
827+
)),
828+
Err(e) => Err(anyhow::anyhow!("Failed to fetch block: {}", e)),
829+
}
830+
}
825831
})
826-
.compat()
827-
})
828-
.boxed()
829-
.compat()
830-
.from_err()
832+
.await
833+
.map_err(|e| match e {
834+
TimeoutError::Elapsed => {
835+
anyhow::anyhow!("Timeout while fetching block {}", number)
836+
}
837+
TimeoutError::Inner(e) => e,
838+
})
839+
}
831840
}))
832841
.buffered(ENV_VARS.block_ptr_batch_size)
833842
}
@@ -1700,67 +1709,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
17001709
Ok(decoded)
17011710
}
17021711

1703-
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
1704-
async fn load_block_ptrs_by_numbers(
1705-
&self,
1706-
logger: Logger,
1707-
chain_store: Arc<dyn ChainStore>,
1708-
block_numbers: HashSet<BlockNumber>,
1709-
) -> Box<dyn Stream<Item = Arc<BlockPtrExt>, Error = Error> + Send> {
1710-
let blocks_map = chain_store
1711-
.cheap_clone()
1712-
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
1713-
.await
1714-
.map_err(|e| {
1715-
error!(&logger, "Error accessing block cache {}", e);
1716-
e
1717-
})
1718-
.unwrap_or_default();
1719-
1720-
let mut blocks: Vec<Arc<BlockPtrExt>> = blocks_map
1721-
.into_iter()
1722-
.filter_map(|(number, values)| {
1723-
if values.len() == 1 {
1724-
Arc::new(values[0].clone()).into()
1725-
} else {
1726-
warn!(
1727-
&logger,
1728-
"Expected one block for block number {:?}, found {}",
1729-
number,
1730-
values.len()
1731-
);
1732-
None
1733-
}
1734-
})
1735-
.collect::<Vec<_>>();
1736-
1737-
let missing_blocks: Vec<i32> = block_numbers
1738-
.into_iter()
1739-
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
1740-
.collect();
1741-
1742-
if !missing_blocks.is_empty() {
1743-
debug!(
1744-
logger,
1745-
"Loading {} block(s) not in the block cache",
1746-
missing_blocks.len()
1747-
);
1748-
1749-
debug!(logger, "Missing blocks {:?}", missing_blocks);
1750-
}
1751-
1752-
Box::new(
1753-
self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks)
1754-
.collect()
1755-
.map(move |new_blocks| {
1756-
blocks.extend(new_blocks);
1757-
blocks.sort_by_key(|block| block.number);
1758-
stream::iter_ok(blocks)
1759-
})
1760-
.flatten_stream(),
1761-
)
1762-
}
1763-
17641712
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
17651713
async fn load_blocks(
17661714
&self,

0 commit comments

Comments
 (0)