Skip to content

Commit 2610018

Browse files
incrypto32Zoran Cvetkov
authored andcommitted
chain,graph: use firehose get_block to get block ptrs for subgraph triggers
1 parent f1bb58c commit 2610018

File tree

5 files changed

+274
-124
lines changed

5 files changed

+274
-124
lines changed

chain/ethereum/src/adapter.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use anyhow::Error;
22
use ethabi::{Error as ABIError, Function, ParamType, Token};
33
use graph::blockchain::ChainIdentifier;
4-
use graph::blockchain::ExtendedBlockPtr;
54
use graph::components::subgraph::MappingError;
65
use graph::data::store::ethereum::call;
76
use graph::firehose::CallToFilter;
@@ -1110,13 +1109,6 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11101109
block_hash: H256,
11111110
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;
11121111

1113-
async fn load_block_ptrs_by_numbers(
1114-
&self,
1115-
_logger: Logger,
1116-
_chain_store: Arc<dyn ChainStore>,
1117-
_block_numbers: HashSet<BlockNumber>,
1118-
) -> Box<dyn Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send>;
1119-
11201112
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
11211113
/// May use the `chain_store` as a cache.
11221114
async fn load_blocks(

chain/ethereum/src/chain.rs

Lines changed: 135 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::firehose::{FirehoseEndpoint, ForkStep};
1313
use graph::futures03::compat::Future01CompatExt;
14+
use graph::futures03::TryStreamExt;
1415
use graph::prelude::{
1516
BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
1617
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1718
};
1819
use graph::schema::InputSchema;
20+
use graph::slog::{debug, error, warn};
1921
use graph::substreams::Clock;
2022
use graph::{
2123
blockchain::{
@@ -242,7 +244,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
242244
logger: &Logger,
243245
cursor: FirehoseCursor,
244246
) -> Result<BlockFinality, Error> {
245-
let endpoint = chain.chain_client().firehose_endpoint().await?;
247+
let endpoint: Arc<FirehoseEndpoint> = chain.chain_client().firehose_endpoint().await?;
246248
let block = endpoint.get_block::<codec::Block>(cursor, logger).await?;
247249
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
248250
Ok(BlockFinality::NonFinal(ethereum_block))
@@ -713,13 +715,17 @@ impl Block for BlockFinality {
713715
}
714716

715717
fn timestamp(&self) -> BlockTime {
716-
let ts = match self {
717-
BlockFinality::Final(block) => block.timestamp,
718-
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
718+
match self {
719+
BlockFinality::Final(block) => {
720+
let ts = i64::try_from(block.timestamp.as_u64()).unwrap();
721+
BlockTime::since_epoch(ts, 0)
722+
}
723+
BlockFinality::NonFinal(block) => {
724+
let ts = i64::try_from(block.ethereum_block.block.timestamp.as_u64()).unwrap();
725+
BlockTime::since_epoch(ts, 0)
726+
}
719727
BlockFinality::Ptr(block) => block.timestamp,
720-
};
721-
let ts = i64::try_from(ts.as_u64()).unwrap();
722-
BlockTime::since_epoch(ts, 0)
728+
}
723729
}
724730
}
725731

@@ -734,6 +740,61 @@ pub struct TriggersAdapter {
734740
unified_api_version: UnifiedMappingApiVersion,
735741
}
736742

743+
/// Fetches blocks from the cache based on block numbers, excluding duplicates
744+
/// (i.e., multiple blocks for the same number), and identifying missing blocks that
745+
/// need to be fetched via RPC/Firehose. Returns a tuple of the found blocks and the missing block numbers.
746+
async fn fetch_unique_blocks_from_cache(
747+
logger: &Logger,
748+
chain_store: Arc<dyn ChainStore>,
749+
block_numbers: HashSet<BlockNumber>,
750+
) -> (Vec<Arc<ExtendedBlockPtr>>, Vec<i32>) {
751+
// Load blocks from the cache
752+
let blocks_map = chain_store
753+
.cheap_clone()
754+
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
755+
.await
756+
.map_err(|e| {
757+
error!(logger, "Error accessing block cache {}", e);
758+
e
759+
})
760+
.unwrap_or_default();
761+
762+
// Collect blocks and filter out ones with multiple entries
763+
let blocks: Vec<Arc<ExtendedBlockPtr>> = blocks_map
764+
.into_iter()
765+
.filter_map(|(number, values)| {
766+
if values.len() == 1 {
767+
Some(Arc::new(values[0].clone()))
768+
} else {
769+
warn!(
770+
logger,
771+
"Expected one block for block number {:?}, found {}",
772+
number,
773+
values.len()
774+
);
775+
None
776+
}
777+
})
778+
.collect();
779+
780+
// Identify missing blocks
781+
let missing_blocks: Vec<i32> = block_numbers
782+
.into_iter()
783+
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
784+
.collect();
785+
786+
if !missing_blocks.is_empty() {
787+
debug!(
788+
logger,
789+
"Loading {} block(s) not in the block cache",
790+
missing_blocks.len()
791+
);
792+
debug!(logger, "Missing blocks {:?}", missing_blocks);
793+
}
794+
795+
(blocks, missing_blocks)
796+
}
797+
737798
#[async_trait]
738799
impl TriggersAdapterTrait<Chain> for TriggersAdapter {
739800
async fn scan_triggers(
@@ -763,21 +824,75 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
763824
logger: Logger,
764825
block_numbers: HashSet<BlockNumber>,
765826
) -> Result<Vec<BlockFinality>> {
766-
use graph::futures01::stream::Stream;
827+
let blocks = match &*self.chain_client {
828+
ChainClient::Firehose(endpoints, _) => {
829+
let endpoint = endpoints.endpoint().await?;
830+
let chain_store = self.chain_store.clone();
831+
832+
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
833+
// that we can fetch the right block from the RPC.
834+
let (mut cached_blocks, missing_block_numbers) =
835+
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
836+
837+
// Then fetch missing blocks from RPC
838+
if !missing_block_numbers.is_empty() {
839+
let missing_blocks = endpoint
840+
.load_blocks_by_numbers::<codec::Block>(
841+
missing_block_numbers.iter().map(|&n| n as u64).collect(),
842+
&logger,
843+
)
844+
.await?
845+
.into_iter()
846+
.map(|block| {
847+
let block: ExtendedBlockPtr = ExtendedBlockPtr {
848+
hash: block.hash(),
849+
number: block.number(),
850+
parent_hash: block.parent_hash().unwrap_or_default(),
851+
timestamp: block.timestamp(),
852+
};
853+
Arc::new(block)
854+
})
855+
.collect::<Vec<_>>();
856+
857+
// Combine cached and newly fetched blocks
858+
cached_blocks.extend(missing_blocks);
859+
cached_blocks.sort_by_key(|block| block.number);
860+
}
767861

768-
let adapter = self
769-
.chain_client
770-
.rpc()?
771-
.cheapest_with(&self.capabilities)
772-
.await?;
862+
// Convert to BlockFinality
863+
let blocks: Vec<BlockFinality> =
864+
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
773865

774-
let blocks = adapter
775-
.load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers)
776-
.await
777-
.map(|block| BlockFinality::Ptr(block))
778-
.collect()
779-
.compat()
780-
.await?;
866+
blocks
867+
}
868+
ChainClient::Rpc(client) => {
869+
let adapter = client.cheapest_with(&self.capabilities).await?;
870+
let chain_store = self.chain_store.clone();
871+
872+
// Fetch blocks that are in the cache. We ignore duplicates (i.e., multiple blocks for the same number) so
873+
// that we can fetch the right block from the RPC.
874+
let (mut cached_blocks, missing_block_numbers) =
875+
fetch_unique_blocks_from_cache(&logger, chain_store, block_numbers).await;
876+
877+
// Then fetch missing blocks from RPC
878+
if !missing_block_numbers.is_empty() {
879+
let missing_blocks: Vec<Arc<ExtendedBlockPtr>> = adapter
880+
.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_block_numbers)
881+
.try_collect()
882+
.await?;
883+
884+
// Combine cached and newly fetched blocks
885+
cached_blocks.extend(missing_blocks);
886+
cached_blocks.sort_by_key(|block| block.number);
887+
}
888+
889+
// Convert to BlockFinality
890+
let blocks: Vec<BlockFinality> =
891+
cached_blocks.into_iter().map(BlockFinality::Ptr).collect();
892+
893+
blocks
894+
}
895+
};
781896

782897
Ok(blocks)
783898
}

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 44 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -784,50 +784,59 @@ impl EthereumAdapter {
784784
}
785785

786786
/// Request blocks by number through JSON-RPC.
787-
fn load_block_ptrs_by_numbers_rpc(
787+
pub fn load_block_ptrs_by_numbers_rpc(
788788
&self,
789789
logger: Logger,
790790
numbers: Vec<BlockNumber>,
791-
) -> impl Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send {
791+
) -> impl futures03::Stream<Item = Result<Arc<ExtendedBlockPtr>, Error>> + Send {
792792
let web3 = self.web3.clone();
793793

794-
stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
794+
futures03::stream::iter(numbers.into_iter().map(move |number| {
795795
let web3 = web3.clone();
796-
retry(format!("load block {}", number), &logger)
797-
.limit(ENV_VARS.request_retries)
798-
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
799-
.run(move || {
800-
Box::pin(
801-
web3.eth()
802-
.block(BlockId::Number(Web3BlockNumber::Number(number.into()))),
803-
)
804-
.compat()
805-
.from_err::<Error>()
806-
.and_then(move |block| {
807-
block
808-
.map(|block| {
809-
let ptr = ExtendedBlockPtr::try_from((
810-
block.hash,
811-
block.number,
812-
block.parent_hash,
813-
block.timestamp,
814-
))
815-
.unwrap();
816-
817-
Arc::new(ptr)
818-
})
819-
.ok_or_else(|| {
820-
anyhow::anyhow!(
796+
let logger = logger.clone();
797+
798+
async move {
799+
retry(format!("load block {}", number), &logger)
800+
.limit(ENV_VARS.request_retries)
801+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
802+
.run(move || {
803+
let web3 = web3.clone();
804+
805+
async move {
806+
let block_result = web3
807+
.eth()
808+
.block(BlockId::Number(Web3BlockNumber::Number(number.into())))
809+
.await;
810+
811+
match block_result {
812+
Ok(Some(block)) => {
813+
let ptr = ExtendedBlockPtr::try_from((
814+
block.hash,
815+
block.number,
816+
block.parent_hash,
817+
block.timestamp,
818+
))
819+
.map_err(|e| {
820+
anyhow::anyhow!("Failed to convert block: {}", e)
821+
})?;
822+
Ok(Arc::new(ptr))
823+
}
824+
Ok(None) => Err(anyhow::anyhow!(
821825
"Ethereum node did not find block with number {:?}",
822826
number
823-
)
824-
})
827+
)),
828+
Err(e) => Err(anyhow::anyhow!("Failed to fetch block: {}", e)),
829+
}
830+
}
825831
})
826-
.compat()
827-
})
828-
.boxed()
829-
.compat()
830-
.from_err()
832+
.await
833+
.map_err(|e| match e {
834+
TimeoutError::Elapsed => {
835+
anyhow::anyhow!("Timeout while fetching block {}", number)
836+
}
837+
TimeoutError::Inner(e) => e,
838+
})
839+
}
831840
}))
832841
.buffered(ENV_VARS.block_ptr_batch_size)
833842
}
@@ -1700,59 +1709,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
17001709
Ok(decoded)
17011710
}
17021711

1703-
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
1704-
async fn load_block_ptrs_by_numbers(
1705-
&self,
1706-
logger: Logger,
1707-
chain_store: Arc<dyn ChainStore>,
1708-
block_numbers: HashSet<BlockNumber>,
1709-
) -> Box<dyn Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send> {
1710-
let blocks_map = chain_store
1711-
.cheap_clone()
1712-
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
1713-
.await
1714-
.map_err(|e| {
1715-
error!(&logger, "Error accessing block cache {}", e);
1716-
e
1717-
})
1718-
.unwrap_or_default();
1719-
1720-
let mut blocks: Vec<Arc<ExtendedBlockPtr>> = blocks_map
1721-
.into_iter()
1722-
.filter_map(|(_number, values)| {
1723-
if values.len() == 1 {
1724-
Arc::new(values[0].clone()).into()
1725-
} else {
1726-
None
1727-
}
1728-
})
1729-
.collect::<Vec<_>>();
1730-
1731-
let missing_blocks: Vec<i32> = block_numbers
1732-
.into_iter()
1733-
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
1734-
.collect();
1735-
1736-
if !missing_blocks.is_empty() {
1737-
debug!(
1738-
logger,
1739-
"Loading {} block(s) not in the block cache",
1740-
missing_blocks.len()
1741-
);
1742-
}
1743-
1744-
Box::new(
1745-
self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks)
1746-
.collect()
1747-
.map(move |new_blocks| {
1748-
blocks.extend(new_blocks);
1749-
blocks.sort_by_key(|block| block.number);
1750-
stream::iter_ok(blocks)
1751-
})
1752-
.flatten_stream(),
1753-
)
1754-
}
1755-
17561712
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
17571713
async fn load_blocks(
17581714
&self,

0 commit comments

Comments
 (0)