Skip to content

Commit f4c4980

Browse files
author
Matthieu Vachon
authored
core,graph,store: on block revert, Firehose should save the cursor to the databse (#3133)
- On revert operation, `parent_ptr` everywhere since it avoids a useless RPC (or in cache database) call. - The firehose update cursor on remove is directly in `revert_block_operations` to be in a transaction alongside the update of the subgraph ptr.
1 parent 86a8367 commit f4c4980

File tree

11 files changed

+74
-75
lines changed

11 files changed

+74
-75
lines changed

chain/ethereum/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,8 +573,8 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
573573

574574
Ok(BlockStreamEvent::Revert(
575575
block.ptr(),
576+
parent_ptr,
576577
FirehoseCursor::Some(response.cursor.clone()),
577-
Some(parent_ptr),
578578
))
579579
}
580580

chain/near/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,15 +303,15 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
303303
)),
304304

305305
StepUndo => {
306-
let header = block.header();
307-
let parent_ptr = header
306+
let parent_ptr = block
307+
.header()
308308
.parent_ptr()
309309
.expect("Genesis block should never be reverted");
310310

311311
Ok(BlockStreamEvent::Revert(
312312
block.ptr(),
313+
parent_ptr,
313314
Some(response.cursor.clone()),
314-
Some(parent_ptr),
315315
))
316316
}
317317

core/src/subgraph/instance_manager.rs

Lines changed: 16 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -556,61 +556,28 @@ where
556556

557557
let (block, cursor) = match event {
558558
Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => (block, cursor),
559-
Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, _, optional_parent_ptr))) => {
559+
Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, parent_ptr, cursor))) => {
560560
info!(
561561
logger,
562562
"Reverting block to get back to main chain";
563563
"block_number" => format!("{}", subgraph_ptr.number),
564564
"block_hash" => format!("{}", subgraph_ptr.hash)
565565
);
566566

567-
// We would like to revert the DB state to the parent of the current block.
568-
match optional_parent_ptr {
569-
Some(parent_ptr) => {
570-
if let Err(e) = inputs.store.revert_block_operations(parent_ptr) {
571-
error!(
572-
&logger,
573-
"Could not revert block. Retrying";
574-
"block_number" => format!("{}", subgraph_ptr.number),
575-
"block_hash" => format!("{}", subgraph_ptr.hash),
576-
"error" => e.to_string(),
577-
);
578-
579-
// Exit inner block stream consumption loop and go up to loop that restarts subgraph
580-
break;
581-
}
582-
}
583-
None => {
584-
// First, load the block in order to get the parent hash.
585-
if let Err(e) = inputs
586-
.triggers_adapter
587-
.parent_ptr(&subgraph_ptr)
588-
.await
589-
.map(|parent_ptr| {
590-
parent_ptr.expect("genesis block cannot be reverted")
591-
})
592-
.and_then(|parent_ptr| {
593-
// Revert entity changes from this block, and update subgraph ptr.
594-
inputs
595-
.store
596-
.revert_block_operations(parent_ptr)
597-
.map_err(Into::into)
598-
})
599-
{
600-
error!(
601-
&logger,
602-
"Could not revert block. \
603-
The likely cause is the block not being found due to a deep reorg. \
604-
Retrying";
605-
"block_number" => format!("{}", subgraph_ptr.number),
606-
"block_hash" => format!("{}", subgraph_ptr.hash),
607-
"error" => e.to_string(),
608-
);
609-
610-
// Exit inner block stream consumption loop and go up to loop that restarts subgraph
611-
break;
612-
}
613-
}
567+
if let Err(e) = inputs
568+
.store
569+
.revert_block_operations(parent_ptr, cursor.as_deref())
570+
{
571+
error!(
572+
&logger,
573+
"Could not revert block. Retrying";
574+
"block_number" => format!("{}", subgraph_ptr.number),
575+
"block_hash" => format!("{}", subgraph_ptr.hash),
576+
"error" => e.to_string(),
577+
);
578+
579+
// Exit inner block stream consumption loop and go up to loop that restarts subgraph
580+
break;
614581
}
615582

616583
ctx.block_stream_metrics
@@ -629,6 +596,7 @@ where
629596
ctx.state.entity_lfu_cache = LfuCache::new();
630597
continue;
631598
}
599+
632600
// Log and drop the errors from the block_stream
633601
// The block stream will continue attempting to produce blocks
634602
Some(Err(e)) => {

graph/src/blockchain/block_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pub enum BlockStreamEvent<C: Blockchain> {
177177
// The payload is the current subgraph head pointer, which should be reverted, such that the
178178
// parent of the current subgraph head becomes the new subgraph head.
179179
// An optional pointer to the parent block will save a round trip operation when reverting.
180-
Revert(BlockPtr, FirehoseCursor, Option<BlockPtr>),
180+
Revert(BlockPtr, BlockPtr, FirehoseCursor),
181181

182182
ProcessBlock(BlockWithTriggers<C>, FirehoseCursor),
183183
}

graph/src/blockchain/polling_block_stream.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ where
139139
/// Blocks and range size
140140
Blocks(VecDeque<BlockWithTriggers<C>>, BlockNumber),
141141

142-
// The payload is the current subgraph head pointer, which should be reverted and it's parent, such that the
142+
// The payload is the current subgraph head pointer, which should be reverted and its parent, such that the
143143
// parent of the current subgraph head becomes the new subgraph head.
144144
Revert(BlockPtr, BlockPtr),
145145
Done,
@@ -212,7 +212,9 @@ where
212212
ReconciliationStep::Done => {
213213
return Ok(NextBlocks::Done);
214214
}
215-
ReconciliationStep::Revert(from, to) => return Ok(NextBlocks::Revert(from, to)),
215+
ReconciliationStep::Revert(from, parent_ptr) => {
216+
return Ok(NextBlocks::Revert(from, parent_ptr))
217+
}
216218
}
217219
}
218220
}
@@ -538,14 +540,14 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
538540
// Poll for chain head update
539541
continue;
540542
}
541-
NextBlocks::Revert(from, to) => {
542-
self.ctx.current_block = to.into();
543+
NextBlocks::Revert(from, parent_ptr) => {
544+
self.ctx.current_block = Some(parent_ptr.clone());
543545

544546
self.state = BlockStreamState::BeginReconciliation;
545547
break Poll::Ready(Some(Ok(BlockStreamEvent::Revert(
546548
from,
549+
parent_ptr,
547550
FirehoseCursor::None,
548-
self.ctx.current_block.clone(),
549551
))));
550552
}
551553
},

graph/src/components/store.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,11 @@ pub trait WritableStore: Send + Sync + 'static {
10371037
/// subgraph block pointer to `block_ptr_to`.
10381038
///
10391039
/// `block_ptr_to` must point to the parent block of the subgraph block pointer.
1040-
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError>;
1040+
fn revert_block_operations(
1041+
&self,
1042+
block_ptr_to: BlockPtr,
1043+
firehose_cursor: Option<&str>,
1044+
) -> Result<(), StoreError>;
10411045

10421046
/// If a deterministic error happened, this function reverts the block operations from the
10431047
/// current block to the previous block.

graph/tests/entity_cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl WritableStore for MockStore {
5858
unimplemented!()
5959
}
6060

61-
fn revert_block_operations(&self, _: BlockPtr) -> Result<(), StoreError> {
61+
fn revert_block_operations(&self, _: BlockPtr, _: Option<&str>) -> Result<(), StoreError> {
6262
unimplemented!()
6363
}
6464

store/postgres/src/deployment_store.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Context;
12
use detail::DeploymentDetail;
23
use diesel::connection::SimpleConnection;
34
use diesel::pg::PgConnection;
@@ -905,7 +906,7 @@ impl DeploymentStore {
905906

906907
if let Some(cursor) = firehose_cursor {
907908
if cursor != "" {
908-
deployment::update_firehose_cursor(&conn, &site.deployment, &cursor)?;
909+
deployment::update_firehose_cursor(&conn, &site.deployment, cursor)?;
909910
}
910911
}
911912

@@ -920,6 +921,7 @@ impl DeploymentStore {
920921
conn: &PgConnection,
921922
site: Arc<Site>,
922923
block_ptr_to: BlockPtr,
924+
firehose_cursor: Option<&str>,
923925
) -> Result<StoreEvent, StoreError> {
924926
let event = conn.transaction(|| -> Result<_, StoreError> {
925927
// Don't revert past a graft point
@@ -940,6 +942,11 @@ impl DeploymentStore {
940942

941943
deployment::revert_block_ptr(&conn, &site.deployment, block_ptr_to.clone())?;
942944

945+
if let Some(cursor) = firehose_cursor {
946+
deployment::update_firehose_cursor(&conn, &site.deployment, cursor)
947+
.context("updating firehose cursor")?;
948+
}
949+
943950
// Revert the data
944951
let layout = self.layout(&conn, site.clone())?;
945952

@@ -991,13 +998,18 @@ impl DeploymentStore {
991998
block_ptr_to.number
992999
);
9931000
}
994-
self.rewind_with_conn(&conn, site, block_ptr_to)
1001+
1002+
// When rewinding, we reset the firehose cursor to the empty string. That way, on resume,
1003+
// Firehose will start from the block_ptr instead (with sanity check to ensure it's resume
1004+
// at the exact block).
1005+
self.rewind_with_conn(&conn, site, block_ptr_to, Some(""))
9951006
}
9961007

9971008
pub(crate) fn revert_block_operations(
9981009
&self,
9991010
site: Arc<Site>,
10001011
block_ptr_to: BlockPtr,
1012+
firehose_cursor: Option<&str>,
10011013
) -> Result<StoreEvent, StoreError> {
10021014
let conn = self.get_conn()?;
10031015
// Unwrap: If we are reverting then the block ptr is not `None`.
@@ -1008,7 +1020,7 @@ impl DeploymentStore {
10081020
panic!("revert_block_operations must revert a single block only");
10091021
}
10101022

1011-
self.rewind_with_conn(&conn, site, block_ptr_to)
1023+
self.rewind_with_conn(&conn, site, block_ptr_to, firehose_cursor)
10121024
}
10131025

10141026
pub(crate) async fn deployment_state_from_id(
@@ -1227,7 +1239,11 @@ impl DeploymentStore {
12271239
);
12281240

12291241
// We ignore the StoreEvent that's being returned, we'll not use it.
1230-
let _ = self.revert_block_operations(site.clone(), parent_ptr.clone())?;
1242+
//
1243+
// We reset the firehose cursor to the empty string. That way, on resume,
1244+
// Firehose will start from the block_ptr instead (with sanity checks to ensure it's resuming
1245+
// at the correct block).
1246+
let _ = self.revert_block_operations(site.clone(), parent_ptr.clone(), Some(""))?;
12311247

12321248
// Unfail the deployment.
12331249
deployment::update_deployment_status(conn, deployment_id, prev_health, None)?;

store/postgres/src/writable.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,17 @@ impl WritableStore {
163163
})
164164
}
165165

166-
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
166+
fn revert_block_operations(
167+
&self,
168+
block_ptr_to: BlockPtr,
169+
firehose_cursor: Option<&str>,
170+
) -> Result<(), StoreError> {
167171
self.retry("revert_block_operations", || {
168-
let event = self
169-
.writable
170-
.revert_block_operations(self.site.clone(), block_ptr_to.clone())?;
172+
let event = self.writable.revert_block_operations(
173+
self.site.clone(),
174+
block_ptr_to.clone(),
175+
firehose_cursor.clone(),
176+
)?;
171177
self.try_send_store_event(event)
172178
})
173179
}
@@ -363,13 +369,16 @@ impl WritableStoreTrait for WritableAgent {
363369
self.store.start_subgraph_deployment(logger)
364370
}
365371

366-
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
372+
fn revert_block_operations(
373+
&self,
374+
block_ptr_to: BlockPtr,
375+
firehose_cursor: Option<&str>,
376+
) -> Result<(), StoreError> {
367377
*self.block_ptr.lock().unwrap() = Some(block_ptr_to.clone());
368-
// FIXME: What about the firehose cursor? Why doesn't that get updated?
369-
370378
// TODO: If we haven't written the block yet, revert in memory. If
371379
// we have, revert in the database
372-
self.store.revert_block_operations(block_ptr_to)
380+
self.store
381+
.revert_block_operations(block_ptr_to, firehose_cursor)
373382
}
374383

375384
fn unfail_deterministic_error(

store/postgres/tests/graft.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,13 +301,13 @@ async fn check_graft(
301301
.cheap_clone()
302302
.writable(LOGGER.clone(), deployment.id)
303303
.await?
304-
.revert_block_operations(BLOCKS[1].clone())
304+
.revert_block_operations(BLOCKS[1].clone(), None)
305305
.expect("We can revert a block we just created");
306306

307307
let err = store
308308
.writable(LOGGER.clone(), deployment.id)
309309
.await?
310-
.revert_block_operations(BLOCKS[0].clone())
310+
.revert_block_operations(BLOCKS[0].clone(), None)
311311
.expect_err("Reverting past graft point is not allowed");
312312

313313
assert!(err.to_string().contains("Can not revert subgraph"));

0 commit comments

Comments
 (0)