Skip to content

Commit 0d27fa6

Browse files
committed
graph, chain: make TriggersAdapterTrait methods work with firehose endpoints
1 parent 52ed6a8 commit 0d27fa6

File tree

2 files changed

+77
-12
lines changed

2 files changed

+77
-12
lines changed

chain/ethereum/src/chain.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use graph::prelude::{
1717
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
1818
};
1919
use graph::schema::InputSchema;
20-
use graph::slog::{debug, error, warn};
20+
use graph::slog::{debug, error, trace, warn};
2121
use graph::substreams::Clock;
2222
use graph::{
2323
blockchain::{
@@ -827,6 +827,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
827827
) -> Result<Vec<BlockFinality>> {
828828
let blocks = match &*self.chain_client {
829829
ChainClient::Firehose(endpoints, _) => {
830+
trace!(
831+
logger,
832+
"Loading blocks from firehose";
833+
"block_numbers" => format!("{:?}", block_numbers)
834+
);
830835
let endpoint = endpoints.endpoint().await?;
831836
let chain_store = self.chain_store.clone();
832837

@@ -867,6 +872,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
867872
blocks
868873
}
869874
ChainClient::Rpc(client) => {
875+
trace!(
876+
logger,
877+
"Loading blocks from RPC";
878+
"block_numbers" => format!("{:?}", block_numbers)
879+
);
870880
let adapter = client.cheapest_with(&self.capabilities).await?;
871881
let chain_store = self.chain_store.clone();
872882

@@ -956,13 +966,25 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
956966
}
957967

958968
async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
959-
self.chain_client
960-
.rpc()?
961-
.cheapest()
962-
.await
963-
.ok_or(anyhow!("unable to get adapter for is_on_main_chain"))?
964-
.is_on_main_chain(&self.logger, ptr.clone())
965-
.await
969+
match &*self.chain_client {
970+
ChainClient::Firehose(endpoints, _) => {
971+
let endpoint = endpoints.endpoint().await?;
972+
let block = endpoint
973+
.get_block_by_number::<codec::Block>(ptr.number as u64, &self.logger)
974+
.await
975+
.map_err(|e| anyhow!("Failed to fetch block from firehose: {}", e))?;
976+
977+
Ok(block.hash() == ptr.hash)
978+
}
979+
ChainClient::Rpc(adapter) => {
980+
let adapter = adapter
981+
.cheapest()
982+
.await
983+
.ok_or_else(|| anyhow!("unable to get adapter for is_on_main_chain"))?;
984+
985+
adapter.is_on_main_chain(&self.logger, ptr).await
986+
}
987+
}
966988
}
967989

968990
async fn ancestor_block(
@@ -992,10 +1014,18 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
9921014
use graph::prelude::LightEthereumBlockExt;
9931015

9941016
let block = match self.chain_client.as_ref() {
995-
ChainClient::Firehose(_, _) => Some(BlockPtr {
996-
hash: BlockHash::from(vec![0xff; 32]),
997-
number: block.number.saturating_sub(1),
998-
}),
1017+
ChainClient::Firehose(endpoints, _) => {
1018+
let endpoint = endpoints.endpoint().await?;
1019+
let block = endpoint
1020+
.get_block_by_ptr::<codec::Block>(block, &self.logger)
1021+
.await
1022+
.context(format!(
1023+
"Failed to fetch block by ptr {} from firehose, backtrace: {}",
1024+
block,
1025+
std::backtrace::Backtrace::force_capture()
1026+
))?;
1027+
block.parent_ptr()
1028+
}
9991029
ChainClient::Rpc(adapters) => {
10001030
let blocks = adapters
10011031
.cheapest_with(&self.capabilities)

graph/src/firehose/endpoints.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,41 @@ impl FirehoseEndpoint {
509509
}
510510
}
511511

512+
pub async fn get_block_by_ptr<M>(
513+
&self,
514+
ptr: &BlockPtr,
515+
logger: &Logger,
516+
) -> Result<M, anyhow::Error>
517+
where
518+
M: prost::Message + BlockchainBlock + Default + 'static,
519+
{
520+
debug!(
521+
logger,
522+
"Connecting to firehose to retrieve block for ptr {}", ptr;
523+
"provider" => self.provider.as_str(),
524+
);
525+
526+
let req = firehose::SingleBlockRequest {
527+
transforms: [].to_vec(),
528+
reference: Some(
529+
firehose::single_block_request::Reference::BlockHashAndNumber(
530+
firehose::single_block_request::BlockHashAndNumber {
531+
hash: ptr.hash.to_string(),
532+
num: ptr.number as u64,
533+
},
534+
),
535+
),
536+
};
537+
538+
let mut client = self.new_client();
539+
match client.block(req).await {
540+
Ok(v) => Ok(M::decode(
541+
v.get_ref().block.as_ref().unwrap().value.as_ref(),
542+
)?),
543+
Err(e) => return Err(anyhow::format_err!("firehose error {}", e)),
544+
}
545+
}
546+
512547
pub async fn get_block_by_number<M>(
513548
&self,
514549
number: u64,

0 commit comments

Comments
 (0)