Skip to content

Commit 171e430

Browse files
authored
graph: Move subgraph.block_ptr() check (#3058)
1 parent 25e9606 commit 171e430

File tree

2 files changed

+87
-56
lines changed

2 files changed

+87
-56
lines changed

chain/ethereum/src/chain.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ impl Chain {
154154
true => 0,
155155
};
156156

157+
let start_block = writable.block_ptr()?;
158+
157159
Ok(Box::new(PollingBlockStream::new(
158160
writable,
159161
chain_store,
@@ -169,6 +171,7 @@ impl Chain {
169171
*MAX_BLOCK_RANGE_SIZE,
170172
*TARGET_TRIGGERS_PER_BLOCK_RANGE,
171173
unified_api_version,
174+
start_block,
172175
)))
173176
}
174177

graph/src/blockchain/polling_block_stream.rs

Lines changed: 84 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ enum ReconciliationStep<C>
6262
where
6363
C: Blockchain,
6464
{
65-
/// Revert the current block pointed at by the subgraph pointer. The pointer is to the current
65+
/// Revert(from, to) the current block pointed at by the subgraph pointer. The pointer is to the current
6666
/// subgraph head, and a single block will be reverted so the new head will be the parent of the
67-
/// current one.
68-
Revert(BlockPtr),
67+
/// current one. The second BlockPtr is the parent.
68+
Revert(BlockPtr, BlockPtr),
6969

7070
/// Move forwards, processing one or more blocks. Second element is the block range size.
7171
ProcessDescendantBlocks(Vec<BlockWithTriggers<C>>, BlockNumber),
@@ -101,6 +101,7 @@ where
101101
max_block_range_size: BlockNumber,
102102
target_triggers_per_block_range: u64,
103103
unified_api_version: UnifiedMappingApiVersion,
104+
current_block: Option<BlockPtr>,
104105
}
105106

106107
impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
@@ -121,6 +122,7 @@ impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
121122
max_block_range_size: self.max_block_range_size,
122123
target_triggers_per_block_range: self.target_triggers_per_block_range,
123124
unified_api_version: self.unified_api_version.clone(),
125+
current_block: self.current_block.clone(),
124126
}
125127
}
126128
}
@@ -140,9 +142,9 @@ where
140142
/// Blocks and range size
141143
Blocks(VecDeque<BlockWithTriggers<C>>, BlockNumber),
142144

143-
// The payload is the current subgraph head pointer, which should be reverted, such that the
145+
// The payload is the current subgraph head pointer, which should be reverted and it's parent, such that the
144146
// parent of the current subgraph head becomes the new subgraph head.
145-
Revert(BlockPtr),
147+
Revert(BlockPtr, BlockPtr),
146148
Done,
147149
}
148150

@@ -165,12 +167,14 @@ where
165167
max_block_range_size: BlockNumber,
166168
target_triggers_per_block_range: u64,
167169
unified_api_version: UnifiedMappingApiVersion,
170+
start_block: Option<BlockPtr>,
168171
) -> Self {
169172
Self {
170173
state: BlockStreamState::BeginReconciliation,
171174
consecutive_err_count: 0,
172175
chain_head_update_stream,
173176
ctx: PollingBlockStreamContext {
177+
current_block: start_block,
174178
subgraph_store,
175179
chain_store,
176180
adapter,
@@ -216,7 +220,7 @@ where
216220

217221
return Ok(NextBlocks::Done);
218222
}
219-
ReconciliationStep::Revert(block) => return Ok(NextBlocks::Revert(block)),
223+
ReconciliationStep::Revert(from, to) => return Ok(NextBlocks::Revert(from, to)),
220224
}
221225
}
222226
}
@@ -229,7 +233,7 @@ where
229233

230234
// Get pointers from database for comparison
231235
let head_ptr_opt = ctx.chain_store.chain_head_ptr()?;
232-
let subgraph_ptr = ctx.subgraph_store.block_ptr()?;
236+
let subgraph_ptr = self.current_block.clone();
233237

234238
// If chain head ptr is not set yet
235239
let head_ptr = match head_ptr_opt {
@@ -249,7 +253,7 @@ where
249253
trace!(
250254
ctx.logger, "Subgraph pointer";
251255
"hash" => format!("{:?}", subgraph_ptr.as_ref().map(|block| &block.hash)),
252-
"number" => subgraph_ptr.as_ref().map(|block| block.number),
256+
"number" => subgraph_ptr.as_ref().map(|block| &block.number),
253257
);
254258

255259
// Make sure not to include genesis in the reorg threshold.
@@ -317,7 +321,10 @@ where
317321
//
318322
// Note: We can safely unwrap the subgraph ptr here, because
319323
// if it was `None`, `is_on_main_chain` would be true.
320-
return Ok(ReconciliationStep::Revert(subgraph_ptr.unwrap()));
324+
let from = subgraph_ptr.unwrap();
325+
let parent = self.parent_ptr(&from).await?;
326+
327+
return Ok(ReconciliationStep::Revert(from, parent));
321328
}
322329

323330
// The subgraph ptr points to a block on the main chain.
@@ -422,7 +429,8 @@ where
422429

423430
#[cfg(debug_assertions)]
424431
if test_reorg(subgraph_ptr.clone()) {
425-
return Ok(ReconciliationStep::Revert(subgraph_ptr));
432+
let parent = self.parent_ptr(&subgraph_ptr).await?;
433+
return Ok(ReconciliationStep::Revert(subgraph_ptr.clone(), parent));
426434
}
427435

428436
// Precondition: subgraph_ptr.number < head_ptr.number
@@ -457,16 +465,28 @@ where
457465
.await?;
458466
Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1))
459467
} else {
468+
let parent = self.parent_ptr(&subgraph_ptr).await?;
469+
460470
// The subgraph ptr is not on the main chain.
461471
// We will need to step back (possibly repeatedly) one block at a time
462472
// until we are back on the main chain.
463-
Ok(ReconciliationStep::Revert(subgraph_ptr))
473+
Ok(ReconciliationStep::Revert(subgraph_ptr, parent))
464474
}
465475
}
466476
}
467477
}
468478
}
469479

480+
async fn parent_ptr(&self, block_ptr: &BlockPtr) -> Result<BlockPtr, Error> {
481+
let ptr = self
482+
.adapter
483+
.parent_ptr(block_ptr)
484+
.await?
485+
.expect("genesis block can't be reverted");
486+
487+
Ok(ptr)
488+
}
489+
470490
/// Set subgraph deployment entity synced flag if and only if the subgraph block pointer is
471491
/// caught up to the head block pointer.
472492
fn update_subgraph_synced_status(&self) -> Result<(), StoreError> {
@@ -505,54 +525,60 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
505525
// Waiting for the reconciliation to complete or yield blocks
506526
BlockStreamState::Reconciliation(next_blocks_future) => {
507527
match next_blocks_future.poll_unpin(cx) {
508-
Poll::Ready(Ok(NextBlocks::Blocks(next_blocks, block_range_size))) => {
509-
// We had only one error, so we infer that reducing the range size is
510-
// what fixed it. Reduce the max range size to prevent future errors.
511-
// See: 018c6df4-132f-4acc-8697-a2d64e83a9f0
512-
if self.consecutive_err_count == 1 {
513-
// Reduce the max range size by 10%, but to no less than 10.
514-
self.ctx.max_block_range_size =
515-
(self.ctx.max_block_range_size * 9 / 10).max(10);
528+
Poll::Ready(Ok(next_block_step)) => match next_block_step {
529+
NextBlocks::Blocks(next_blocks, block_range_size) => {
530+
// We had only one error, so we infer that reducing the range size is
531+
// what fixed it. Reduce the max range size to prevent future errors.
532+
// See: 018c6df4-132f-4acc-8697-a2d64e83a9f0
533+
if self.consecutive_err_count == 1 {
534+
// Reduce the max range size by 10%, but to no less than 10.
535+
self.ctx.max_block_range_size =
536+
(self.ctx.max_block_range_size * 9 / 10).max(10);
537+
}
538+
self.consecutive_err_count = 0;
539+
540+
let total_triggers =
541+
next_blocks.iter().map(|b| b.trigger_count()).sum::<usize>();
542+
self.ctx.previous_triggers_per_block =
543+
total_triggers as f64 / block_range_size as f64;
544+
self.ctx.previous_block_range_size = block_range_size;
545+
if total_triggers > 0 {
546+
debug!(
547+
self.ctx.logger,
548+
"Processing {} triggers", total_triggers
549+
);
550+
}
551+
552+
// Switch to yielding state until next_blocks is depleted
553+
self.state =
554+
BlockStreamState::YieldingBlocks(Box::new(next_blocks));
555+
556+
// Yield the first block in next_blocks
557+
continue;
516558
}
517-
self.consecutive_err_count = 0;
518-
519-
let total_triggers =
520-
next_blocks.iter().map(|b| b.trigger_count()).sum::<usize>();
521-
self.ctx.previous_triggers_per_block =
522-
total_triggers as f64 / block_range_size as f64;
523-
self.ctx.previous_block_range_size = block_range_size;
524-
if total_triggers > 0 {
525-
debug!(self.ctx.logger, "Processing {} triggers", total_triggers);
526-
}
527-
528-
// Switch to yielding state until next_blocks is depleted
529-
self.state = BlockStreamState::YieldingBlocks(Box::new(next_blocks));
530-
531-
// Yield the first block in next_blocks
532-
continue;
533-
}
534-
// Reconciliation completed. We're caught up to chain head.
535-
Poll::Ready(Ok(NextBlocks::Done)) => {
536-
// Reset error count
537-
self.consecutive_err_count = 0;
559+
// Reconciliation completed. We're caught up to chain head.
560+
NextBlocks::Done => {
561+
// Reset error count
562+
self.consecutive_err_count = 0;
538563

539-
// Switch to idle
540-
self.state = BlockStreamState::Idle;
564+
// Switch to idle
565+
self.state = BlockStreamState::Idle;
541566

542-
// Poll for chain head update
543-
continue;
544-
}
545-
Poll::Ready(Ok(NextBlocks::Revert(block))) => {
546-
self.state = BlockStreamState::BeginReconciliation;
547-
break Poll::Ready(Some(Ok(BlockStreamEvent::Revert(
548-
block,
549-
FirehoseCursor::None,
550-
None,
551-
))));
552-
}
553-
Poll::Pending => {
554-
break Poll::Pending;
555-
}
567+
// Poll for chain head update
568+
continue;
569+
}
570+
NextBlocks::Revert(from, to) => {
571+
self.ctx.current_block = to.into();
572+
573+
self.state = BlockStreamState::BeginReconciliation;
574+
break Poll::Ready(Some(Ok(BlockStreamEvent::Revert(
575+
from,
576+
FirehoseCursor::None,
577+
self.ctx.current_block.clone(),
578+
))));
579+
}
580+
},
581+
Poll::Pending => break Poll::Pending,
556582
Poll::Ready(Err(e)) => {
557583
// Reset the block range size in an attempt to recover from the error.
558584
// See also: 018c6df4-132f-4acc-8697-a2d64e83a9f0
@@ -577,6 +603,8 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
577603
match next_blocks.pop_front() {
578604
// Yield one block
579605
Some(next_block) => {
606+
self.ctx.current_block = Some(next_block.block.ptr());
607+
580608
break Poll::Ready(Some(Ok(BlockStreamEvent::ProcessBlock(
581609
next_block,
582610
FirehoseCursor::None,

0 commit comments

Comments
 (0)