Skip to content

Commit 75a95c8

Browse files
sduchesneauleoyvens
authored andcommitted
fix firehose UNDO case on old blocks
When firehose sends an undo event from an older block, the instance-manager would try to fetch the block from RPC to retrieve its parent block info. Now, the firehose will include the parent's hash and num in the Revert event to remove that extraneous RPC call
1 parent f06bb8a commit 75a95c8

File tree

5 files changed

+55
-27
lines changed

5 files changed

+55
-27
lines changed

chain/ethereum/src/chain.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,13 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
597597
number: block.number as i32,
598598
},
599599
FirehoseCursor::Some(response.cursor.clone()),
600+
match block.header {
601+
Some(header) => Some(BlockPtr {
602+
hash: BlockHash::from(header.parent_hash),
603+
number: (block.number as i32) - 1,
604+
}),
605+
None => None,
606+
},
600607
)),
601608

602609
bstream::ForkStep::StepIrreversible => {

chain/near/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
285285
number: header.height as i32,
286286
},
287287
Some(response.cursor.clone()),
288+
None, // FIXME: we should get the parent block pointer when we have access to parent block height
288289
))
289290
}
290291

core/src/subgraph/instance_manager.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ where
493493
loop {
494494
let (block, cursor) = match block_stream.next().await {
495495
Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => (block, cursor),
496-
Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, _))) => {
496+
Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, _, optional_parent_ptr))) => {
497497
info!(
498498
logger,
499499
"Reverting block to get back to main chain";
@@ -502,31 +502,49 @@ where
502502
);
503503

504504
// We would like to revert the DB state to the parent of the current block.
505-
// First, load the block in order to get the parent hash.
506-
if let Err(e) = ctx
507-
.inputs
508-
.triggers_adapter
509-
.parent_ptr(&subgraph_ptr)
510-
.await
511-
.map(|parent_ptr| parent_ptr.expect("genesis block cannot be reverted"))
512-
.and_then(|parent_ptr| {
513-
// Revert entity changes from this block, and update subgraph ptr.
514-
ctx.inputs
515-
.store
516-
.revert_block_operations(parent_ptr)
517-
.map_err(Into::into)
518-
})
519-
{
520-
error!(
521-
&logger,
522-
"Could not revert block. \
523-
The likely cause is the block not being found due to a deep reorg. \
524-
Retrying";
525-
"block_number" => format!("{}", subgraph_ptr.number),
526-
"block_hash" => format!("{}", subgraph_ptr.hash),
527-
"error" => e.to_string(),
528-
);
529-
continue;
505+
match optional_parent_ptr {
506+
Some(parent_ptr) => {
507+
if let Err(e) = ctx.inputs.store.revert_block_operations(parent_ptr) {
508+
error!(
509+
&logger,
510+
"Could not revert block. Retrying";
511+
"block_number" => format!("{}", subgraph_ptr.number),
512+
"block_hash" => format!("{}", subgraph_ptr.hash),
513+
"error" => e.to_string(),
514+
);
515+
continue;
516+
}
517+
}
518+
None => {
519+
// First, load the block in order to get the parent hash.
520+
if let Err(e) = ctx
521+
.inputs
522+
.triggers_adapter
523+
.parent_ptr(&subgraph_ptr)
524+
.await
525+
.map(|parent_ptr| {
526+
parent_ptr.expect("genesis block cannot be reverted")
527+
})
528+
.and_then(|parent_ptr| {
529+
// Revert entity changes from this block, and update subgraph ptr.
530+
ctx.inputs
531+
.store
532+
.revert_block_operations(parent_ptr)
533+
.map_err(Into::into)
534+
})
535+
{
536+
error!(
537+
&logger,
538+
"Could not revert block. \
539+
The likely cause is the block not being found due to a deep reorg. \
540+
Retrying";
541+
"block_number" => format!("{}", subgraph_ptr.number),
542+
"block_hash" => format!("{}", subgraph_ptr.hash),
543+
"error" => e.to_string(),
544+
);
545+
continue;
546+
}
547+
}
530548
}
531549

532550
ctx.block_stream_metrics

graph/src/blockchain/block_stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ pub enum FirehoseError {
102102
pub enum BlockStreamEvent<C: Blockchain> {
103103
// The payload is the current subgraph head pointer, which should be reverted, such that the
104104
// parent of the current subgraph head becomes the new subgraph head.
105-
Revert(BlockPtr, FirehoseCursor),
105+
// An optional pointer to the parent block will save a round trip operation when reverting.
106+
Revert(BlockPtr, FirehoseCursor, Option<BlockPtr>),
106107

107108
ProcessBlock(BlockWithTriggers<C>, FirehoseCursor),
108109
}

graph/src/blockchain/polling_block_stream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
546546
break Poll::Ready(Some(Ok(BlockStreamEvent::Revert(
547547
block,
548548
FirehoseCursor::None,
549+
None,
549550
))));
550551
}
551552
Poll::Pending => {

0 commit comments

Comments
 (0)