Skip to content

Commit a46a865

Browse files
authored
Fix unfail mechanism based of determinism (#3008)
* instance_manager: Refactor process_block to receive context by mutable reference * instance_manager: Change 'first_run' variable name to 'should_try_unfail' * instance_manager: Fix unfail mechanism based of determinism Before the "PoI for failed subgraphs" feature, where we advance the deployment head a block if the error is deterministic, we would do the `unfail` logic after a block is successfully processed, like this: ```rust match process_block() { Ok(needs_restart) => { if first_run { store.unfail(); } // ... } // ... } ``` With this feature the `unfail` mechanism had to change, we did that by changing two things about it: - It would run before `process_block` - Making it revert to the parent block if the error was deterministic This worked fine for deterministic cases, however for non-deterministic ones we faced an issue. We need to run `unfail` after `process_block` went successful (returned `Ok`). This is necessary because we can only unfail the deployment if the subgraph actually advanced past the error block range. Example with the `unfail` previous to this commit: - Subgraph state: - failed - deployment head at 1399 - non-determinstic error happened at 1400 - Once the index-node gets restarted, `unfail` would run, but nothing would happen since the deployment head only advances after a successful `process_block` execution. - Subgraph would continue to advance it's pointer but `unfail` wouldn't run anymore (only once at start). This would make the subgraph be in the `failed` state, but advancing it's pointer normally. To fix the issue, we need to run `unfail` for non-deterministic errors **after** the `process_block` went successful: - Subgraph state: - failed - deployment head at 1399 - non-determinstic error happened at 1400 - Once the index-node gets restarted, `unfail_determinstic_error` would run and be NOOP, then the next block would be processed (1400), the error can be fixed (non-deterministic) when `process_block` gets to run and returns `Ok`, then `unfail_non_determinstic_error` executes and **unfails** the subgraph finally 🙌 - Subgraph continues to advance it's pointer and the status is correct * deployment_store: Abstract common logic into a single function * store: Add proper testing for all unfailing situations * instance_manager: Improve comment for unfailing deterministic errors * deployment_store: Remove unfail common function/abstraction * store: Combine deployment::get_fatal_error_id with detail::error
1 parent 8155ed5 commit a46a865

File tree

6 files changed

+590
-200
lines changed

6 files changed

+590
-200
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,8 @@ where
458458
let store_for_err = ctx.inputs.store.cheap_clone();
459459
let logger = ctx.state.logger.cheap_clone();
460460
let id_for_err = ctx.inputs.deployment.hash.clone();
461-
let mut first_run = true;
461+
let mut should_try_unfail_deterministic = true;
462+
let mut should_try_unfail_non_deterministic = true;
462463

463464
loop {
464465
debug!(logger, "Starting or restarting subgraph");
@@ -592,30 +593,35 @@ where
592593
let start = Instant::now();
593594
let deployment_failed = ctx.block_stream_metrics.deployment_failed.clone();
594595

595-
// If the subgraph is failed, unfail it and revert the block on which
596-
// it failed so that it is reprocessed. This gives the subgraph a chance
597-
// to move past errors.
596+
// If a subgraph failed for deterministic reasons, before processing a new block, we
597+
// revert the deployment head. It should lead to the same result since the error was
598+
// deterministic.
598599
//
599600
// As an optimization we check this only on the first run.
600-
if first_run {
601-
first_run = false;
602-
603-
let (current_ptr, parent_ptr) = match ctx.inputs.store.block_ptr()? {
604-
Some(current_ptr) => {
605-
let parent_ptr =
606-
ctx.inputs.triggers_adapter.parent_ptr(&current_ptr).await?;
607-
(Some(current_ptr), parent_ptr)
601+
if should_try_unfail_deterministic {
602+
should_try_unfail_deterministic = false;
603+
604+
if let Some(current_ptr) = ctx.inputs.store.block_ptr()? {
605+
if let Some(parent_ptr) =
606+
ctx.inputs.triggers_adapter.parent_ptr(&current_ptr).await?
607+
{
608+
// This reverts the deployment head to the parent_ptr if
609+
// deterministic errors happened.
610+
//
611+
// There's no point in calling it if we have no current or parent block
612+
// pointers, because there would be: no block to revert to or to search
613+
// errors from (first execution).
614+
ctx.inputs
615+
.store
616+
.unfail_deterministic_error(&current_ptr, &parent_ptr)?;
608617
}
609-
None => (None, None),
610-
};
611-
612-
ctx.inputs.store.unfail(current_ptr, parent_ptr)?;
618+
}
613619
}
614620

615621
let res = process_block(
616622
&logger,
617623
ctx.inputs.triggers_adapter.cheap_clone(),
618-
ctx,
624+
&mut ctx,
619625
block_stream_cancel_handle.clone(),
620626
block,
621627
cursor.into(),
@@ -626,8 +632,17 @@ where
626632
subgraph_metrics.block_processing_duration.observe(elapsed);
627633

628634
match res {
629-
Ok((c, needs_restart)) => {
630-
ctx = c;
635+
Ok(needs_restart) => {
636+
// Runs only once
637+
if should_try_unfail_non_deterministic {
638+
should_try_unfail_non_deterministic = false;
639+
640+
// If the deployment head advanced, we can unfail
641+
// the non-deterministic error (if there's any).
642+
ctx.inputs
643+
.store
644+
.unfail_non_deterministic_error(&block_ptr)?;
645+
}
631646

632647
deployment_failed.set(0.0);
633648

@@ -719,11 +734,11 @@ impl From<StoreError> for BlockProcessingError {
719734
async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
720735
logger: &Logger,
721736
triggers_adapter: Arc<C::TriggersAdapter>,
722-
mut ctx: IndexingContext<T, C>,
737+
ctx: &mut IndexingContext<T, C>,
723738
block_stream_cancel_handle: CancelHandle,
724739
block: BlockWithTriggers<C>,
725740
firehose_cursor: Option<String>,
726-
) -> Result<(IndexingContext<T, C>, bool), BlockProcessingError> {
741+
) -> Result<bool, BlockProcessingError> {
727742
let triggers = block.trigger_data;
728743
let block = Arc::new(block.block);
729744
let block_ptr = block.ptr();
@@ -797,7 +812,7 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
797812
// Losing the cache is a bit annoying but not an issue for correctness.
798813
//
799814
// See also b21fa73b-6453-4340-99fb-1a78ec62efb1.
800-
return Ok((ctx, true));
815+
return Ok(true);
801816
}
802817
};
803818

@@ -818,7 +833,7 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
818833
// Instantiate dynamic data sources, removing them from the block state.
819834
let (data_sources, runtime_hosts) = create_dynamic_data_sources(
820835
logger.clone(),
821-
&mut ctx,
836+
ctx,
822837
host_metrics.clone(),
823838
block_state.drain_created_data_sources(),
824839
)?;
@@ -849,7 +864,7 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
849864
// and add runtimes for the data sources to the subgraph instance.
850865
persist_dynamic_data_sources(
851866
logger.clone(),
852-
&mut ctx,
867+
ctx,
853868
&mut block_state.entity_cache,
854869
data_sources,
855870
);
@@ -1004,7 +1019,7 @@ async fn process_block<T: RuntimeHostBuilder<C>, C: Blockchain>(
10041019
return Err(BlockProcessingError::Canceled);
10051020
}
10061021

1007-
Ok((ctx, needs_restart))
1022+
Ok(needs_restart)
10081023
}
10091024

10101025
Err(e) => Err(anyhow!("Error while processing block stream for a subgraph: {}", e).into()),

graph/src/components/store.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,15 +1016,18 @@ pub trait WritableStore: Send + Sync + 'static {
10161016
/// `block_ptr_to` must point to the parent block of the subgraph block pointer.
10171017
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError>;
10181018

1019-
/// This method:
1020-
/// - Sets the SubgraphDeployment status accordingly to it's SubgraphErrors
1021-
/// - Reverts block operations to the parent block if necessary
1022-
fn unfail(
1019+
/// If a deterministic error happened, this function reverts the block operations from the
1020+
/// current block to the previous block.
1021+
fn unfail_deterministic_error(
10231022
&self,
1024-
current_ptr: Option<BlockPtr>,
1025-
parent_ptr: Option<BlockPtr>,
1023+
current_ptr: &BlockPtr,
1024+
parent_ptr: &BlockPtr,
10261025
) -> Result<(), StoreError>;
10271026

1027+
/// If a non-deterministic error happened and the current deployment head is past the error
1028+
/// block range, this function unfails the subgraph and deletes the error.
1029+
fn unfail_non_deterministic_error(&self, current_ptr: &BlockPtr) -> Result<(), StoreError>;
1030+
10281031
/// Set subgraph status to failed with the given error as the cause.
10291032
async fn fail_subgraph(&self, error: SubgraphError) -> Result<(), StoreError>;
10301033

@@ -1203,7 +1206,11 @@ impl WritableStore for MockStore {
12031206
unimplemented!()
12041207
}
12051208

1206-
fn unfail(&self, _: Option<BlockPtr>, _: Option<BlockPtr>) -> Result<(), StoreError> {
1209+
fn unfail_deterministic_error(&self, _: &BlockPtr, _: &BlockPtr) -> Result<(), StoreError> {
1210+
unimplemented!()
1211+
}
1212+
1213+
fn unfail_non_deterministic_error(&self, _: &BlockPtr) -> Result<(), StoreError> {
12071214
unimplemented!()
12081215
}
12091216

0 commit comments

Comments
 (0)