Skip to content

Commit c319463

Browse files
committed
core: Factor refetching blocks into a method
1 parent a8cacba commit c319463

File tree

1 file changed

+43
-23
lines changed

1 file changed

+43
-23
lines changed

core/src/subgraph/runner.rs

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -689,29 +689,18 @@ where
689689
vec![],
690690
));
691691

692-
let block: Arc<C::Block> = if self.inputs.chain.is_refetch_block_required() {
693-
let cur = firehose_cursor.clone();
694-
let log = logger.cheap_clone();
695-
let chain = self.inputs.chain.cheap_clone();
696-
Arc::new(
697-
retry(
698-
"refetch firehose block after dynamic datasource was added",
699-
&logger,
700-
)
701-
.limit(5)
702-
.no_timeout()
703-
.run(move || {
704-
let cur = cur.clone();
705-
let log = log.cheap_clone();
706-
let chain = chain.cheap_clone();
707-
async move { chain.refetch_firehose_block(&log, cur).await }
708-
})
709-
.await
710-
.non_deterministic()?,
711-
)
712-
} else {
713-
block.cheap_clone()
714-
};
692+
// TODO: We have to pass a reference to `block` to
693+
// `refetch_block`, otherwise the call to
694+
// handle_offchain_triggers below gets an error that `block`
695+
// has moved. That is extremely fishy since it means that
696+
// `handle_offchain_triggers` uses the non-refetched block
697+
//
698+
// It's also not clear why refetching needs to happen inside
699+
// the loop; will firehose really return something diffrent
700+
// each time even though the cursor doesn't change?
701+
let block = self
702+
.refetch_block(&logger, &block, &firehose_cursor)
703+
.await?;
715704

716705
// Reprocess the triggers from this block that match the new data sources
717706
let block_with_triggers = self
@@ -838,6 +827,37 @@ where
838827
}
839828
}
840829

830+
/// Refetch the block if it that is needed. Otherwise return the block as is.
831+
async fn refetch_block(
832+
&mut self,
833+
logger: &Logger,
834+
block: &Arc<C::Block>,
835+
firehose_cursor: &FirehoseCursor,
836+
) -> Result<Arc<C::Block>, ProcessingError> {
837+
if !self.inputs.chain.is_refetch_block_required() {
838+
return Ok(block.cheap_clone());
839+
}
840+
841+
let cur = firehose_cursor.clone();
842+
let log = logger.cheap_clone();
843+
let chain = self.inputs.chain.cheap_clone();
844+
let block = retry(
845+
"refetch firehose block after dynamic datasource was added",
846+
logger,
847+
)
848+
.limit(5)
849+
.no_timeout()
850+
.run(move || {
851+
let cur = cur.clone();
852+
let log = log.cheap_clone();
853+
let chain = chain.cheap_clone();
854+
async move { chain.refetch_firehose_block(&log, cur).await }
855+
})
856+
.await
857+
.non_deterministic()?;
858+
Ok(Arc::new(block))
859+
}
860+
841861
async fn process_wasm_block(
842862
&mut self,
843863
proof_of_indexing: &SharedProofOfIndexing,

0 commit comments

Comments
 (0)