Skip to content

Commit 196f2a5

Browse files
committed
graph, chain: make TriggersAdapterTrait methods work with firehose endpoints
1 parent 1511cfa commit 196f2a5

File tree

2 files changed

+78
-13
lines changed

2 files changed

+78
-13
lines changed

chain/ethereum/src/chain.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use graph::prelude::{
1717
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1818
};
1919
use graph::schema::InputSchema;
20-
use graph::slog::{debug, error, warn};
20+
use graph::slog::{debug, error, trace, warn};
2121
use graph::substreams::Clock;
2222
use graph::{
2323
blockchain::{
@@ -825,7 +825,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
825825
block_numbers: HashSet<BlockNumber>,
826826
) -> Result<Vec<BlockFinality>> {
827827
let blocks = match &*self.chain_client {
828-
ChainClient::Firehose(endpoints, _) => {
828+
ChainClient::Firehose(endpoints) => {
829+
trace!(
830+
logger,
831+
"Loading blocks from firehose";
832+
"block_numbers" => format!("{:?}", block_numbers)
833+
);
829834
let endpoint = endpoints.endpoint().await?;
830835
let chain_store = self.chain_store.clone();
831836

@@ -866,6 +871,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
866871
blocks
867872
}
868873
ChainClient::Rpc(client) => {
874+
trace!(
875+
logger,
876+
"Loading blocks from RPC";
877+
"block_numbers" => format!("{:?}", block_numbers)
878+
);
869879
let adapter = client.cheapest_with(&self.capabilities).await?;
870880
let chain_store = self.chain_store.clone();
871881

@@ -955,13 +965,25 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
955965
}
956966

957967
async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
958-
self.chain_client
959-
.rpc()?
960-
.cheapest()
961-
.await
962-
.ok_or(anyhow!("unable to get adapter for is_on_main_chain"))?
963-
.is_on_main_chain(&self.logger, ptr.clone())
964-
.await
968+
match &*self.chain_client {
969+
ChainClient::Firehose(endpoints) => {
970+
let endpoint = endpoints.endpoint().await?;
971+
let block = endpoint
972+
.get_block_by_number::<codec::Block>(ptr.number as u64, &self.logger)
973+
.await
974+
.map_err(|e| anyhow!("Failed to fetch block from firehose: {}", e))?;
975+
976+
Ok(block.hash() == ptr.hash)
977+
}
978+
ChainClient::Rpc(adapter) => {
979+
let adapter = adapter
980+
.cheapest()
981+
.await
982+
.ok_or_else(|| anyhow!("unable to get adapter for is_on_main_chain"))?;
983+
984+
adapter.is_on_main_chain(&self.logger, ptr).await
985+
}
986+
}
965987
}
966988

967989
async fn ancestor_block(
@@ -991,10 +1013,18 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
9911013
use graph::prelude::LightEthereumBlockExt;
9921014

9931015
let block = match self.chain_client.as_ref() {
994-
ChainClient::Firehose(_) => Some(BlockPtr {
995-
hash: BlockHash::from(vec![0xff; 32]),
996-
number: block.number.saturating_sub(1),
997-
}),
1016+
ChainClient::Firehose(endpoints) => {
1017+
let endpoint = endpoints.endpoint().await?;
1018+
let block = endpoint
1019+
.get_block_by_ptr::<codec::Block>(block, &self.logger)
1020+
.await
1021+
.context(format!(
1022+
"Failed to fetch block by ptr {} from firehose, backtrace: {}",
1023+
block,
1024+
std::backtrace::Backtrace::force_capture()
1025+
))?;
1026+
block.parent_ptr()
1027+
}
9981028
ChainClient::Rpc(adapters) => {
9991029
let blocks = adapters
10001030
.cheapest_with(&self.capabilities)

graph/src/firehose/endpoints.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,41 @@ impl FirehoseEndpoint {
509509
}
510510
}
511511

512+
pub async fn get_block_by_ptr<M>(
513+
&self,
514+
ptr: &BlockPtr,
515+
logger: &Logger,
516+
) -> Result<M, anyhow::Error>
517+
where
518+
M: prost::Message + BlockchainBlock + Default + 'static,
519+
{
520+
debug!(
521+
logger,
522+
"Connecting to firehose to retrieve block for ptr {}", ptr;
523+
"provider" => self.provider.as_str(),
524+
);
525+
526+
let req = firehose::SingleBlockRequest {
527+
transforms: [].to_vec(),
528+
reference: Some(
529+
firehose::single_block_request::Reference::BlockHashAndNumber(
530+
firehose::single_block_request::BlockHashAndNumber {
531+
hash: ptr.hash.to_string(),
532+
num: ptr.number as u64,
533+
},
534+
),
535+
),
536+
};
537+
538+
let mut client = self.new_client();
539+
match client.block(req).await {
540+
Ok(v) => Ok(M::decode(
541+
v.get_ref().block.as_ref().unwrap().value.as_ref(),
542+
)?),
543+
Err(e) => return Err(anyhow::format_err!("firehose error {}", e)),
544+
}
545+
}
546+
512547
pub async fn get_block_by_number<M>(
513548
&self,
514549
number: u64,

0 commit comments

Comments
 (0)