Skip to content

Commit 0b890a8

Browse files
committed
graph: Add get_block_number_with_retry method for firehose endpoint
1 parent 8c7f7e3 commit 0b890a8

File tree

2 files changed

+80
-16
lines changed

2 files changed

+80
-16
lines changed

chain/ethereum/src/chain.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,10 +1018,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10181018
ChainClient::Firehose(endpoints) => {
10191019
let endpoint = endpoints.endpoint().await?;
10201020
let block = endpoint
1021-
.get_block_by_number::<codec::Block>(ptr.number as u64, &self.logger)
1021+
.get_block_by_number_with_retry::<codec::Block>(ptr.number as u64, &self.logger)
10221022
.await
1023-
.map_err(|e| anyhow!("Failed to fetch block from firehose: {}", e))?;
1024-
1023+
.context(format!(
1024+
"Failed to fetch block {} from firehose",
1025+
ptr.number
1026+
))?;
10251027
Ok(block.hash() == ptr.hash)
10261028
}
10271029
ChainClient::Rpc(adapter) => {

graph/src/firehose/endpoints.rs

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
prelude::{anyhow, debug, DeploymentHash},
1414
substreams_rpc,
1515
};
16+
use anyhow::Context;
1617
use async_trait::async_trait;
1718
use futures03::{StreamExt, TryStreamExt};
1819
use http::uri::{Scheme, Uri};
@@ -443,11 +444,43 @@ impl FirehoseEndpoint {
443444
}
444445
}
445446

446-
pub async fn get_block_by_number<M>(
447-
&self,
448-
number: u64,
447+
pub async fn get_block_by_ptr_with_retry<M>(
448+
self: Arc<Self>,
449+
ptr: &BlockPtr,
449450
logger: &Logger,
450451
) -> Result<M, anyhow::Error>
452+
where
453+
M: prost::Message + BlockchainBlock + Default + 'static,
454+
{
455+
let retry_log_message = format!("get_block_by_ptr for block {}", ptr);
456+
let endpoint = self.cheap_clone();
457+
let logger = logger.cheap_clone();
458+
let ptr_for_retry = ptr.clone();
459+
460+
retry(retry_log_message, &logger)
461+
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
462+
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
463+
.run(move || {
464+
let endpoint = endpoint.cheap_clone();
465+
let logger = logger.cheap_clone();
466+
let ptr = ptr_for_retry.clone();
467+
async move {
468+
endpoint
469+
.get_block_by_ptr::<M>(&ptr, &logger)
470+
.await
471+
.context(format!(
472+
"Failed to fetch block by ptr {} from firehose",
473+
ptr
474+
))
475+
}
476+
})
477+
.await
478+
.map_err(move |e| {
479+
anyhow::anyhow!("Failed to fetch block by ptr {} from firehose: {}", ptr, e)
480+
})
481+
}
482+
483+
async fn get_block_by_number<M>(&self, number: u64, logger: &Logger) -> Result<M, anyhow::Error>
451484
where
452485
M: prost::Message + BlockchainBlock + Default + 'static,
453486
{
@@ -473,6 +506,44 @@ impl FirehoseEndpoint {
473506
}
474507
}
475508

509+
pub async fn get_block_by_number_with_retry<M>(
510+
self: Arc<Self>,
511+
number: u64,
512+
logger: &Logger,
513+
) -> Result<M, anyhow::Error>
514+
where
515+
M: prost::Message + BlockchainBlock + Default + 'static,
516+
{
517+
let retry_log_message = format!("get_block_by_number for block {}", number);
518+
let endpoint = self.cheap_clone();
519+
let logger = logger.cheap_clone();
520+
521+
retry(retry_log_message, &logger)
522+
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
523+
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
524+
.run(move || {
525+
let endpoint = endpoint.cheap_clone();
526+
let logger = logger.cheap_clone();
527+
async move {
528+
endpoint
529+
.get_block_by_number::<M>(number, &logger)
530+
.await
531+
.context(format!(
532+
"Failed to fetch block by number {} from firehose",
533+
number
534+
))
535+
}
536+
})
537+
.await
538+
.map_err(|e| {
539+
anyhow::anyhow!(
540+
"Failed to fetch block by number {} from firehose: {}",
541+
number,
542+
e
543+
)
544+
})
545+
}
546+
476547
pub async fn load_blocks_by_numbers<M>(
477548
self: Arc<Self>,
478549
numbers: Vec<u64>,
@@ -488,16 +559,7 @@ impl FirehoseEndpoint {
488559
.map(move |number| {
489560
let e = self.cheap_clone();
490561
let l = logger.clone();
491-
let retry_log_message = format!("get_block_by_number for block {}", number);
492-
493-
retry(retry_log_message, &l)
494-
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
495-
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
496-
.run(move || {
497-
let e = e.cheap_clone();
498-
let l = l.clone();
499-
async move { e.get_block_by_number::<M>(number, &l).await }
500-
})
562+
async move { e.get_block_by_number_with_retry::<M>(number, &l).await }
501563
})
502564
.buffered(ENV_VARS.firehose_block_batch_size);
503565

0 commit comments

Comments
 (0)