Skip to content

Commit a8cacba

Browse files
committed
core, graph: Factor out the decision to cancel a buggy subgraph
1 parent c91a09e commit a8cacba

File tree

2 files changed

+64
-71
lines changed

2 files changed

+64
-71
lines changed

core/src/subgraph/runner.rs

Lines changed: 64 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use graph::prelude::{
3636
};
3737
use graph::schema::EntityKey;
3838
use graph::slog::{debug, error, info, o, trace, warn, Logger};
39+
use graph::util::lfu_cache::EvictStats;
3940
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
4041
use std::sync::Arc;
4142
use std::time::{Duration, Instant};
@@ -377,15 +378,28 @@ where
377378
proof_of_indexing: SharedProofOfIndexing,
378379
offchain_mods: Vec<EntityModification>,
379380
processed_offchain_data_sources: Vec<StoredDynamicDataSource>,
380-
) -> Result<bool, ProcessingError> {
381-
let has_errors = block_state.has_errors();
381+
) -> Result<(), ProcessingError> {
382+
fn log_evict_stats(logger: &Logger, evict_stats: &EvictStats) {
383+
trace!(logger, "Entity cache statistics";
384+
"weight" => evict_stats.new_weight,
385+
"evicted_weight" => evict_stats.evicted_weight,
386+
"count" => evict_stats.new_count,
387+
"evicted_count" => evict_stats.evicted_count,
388+
"stale_update" => evict_stats.stale_update,
389+
"hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()),
390+
"accesses" => evict_stats.accesses,
391+
"evict_time_ms" => evict_stats.evict_time.as_millis());
392+
}
393+
382394
let BlockState {
383395
deterministic_errors,
384396
persisted_data_sources,
385397
metrics: block_state_metrics,
386398
mut entity_cache,
387399
..
388400
} = block_state;
401+
let first_error = deterministic_errors.first().cloned();
402+
let has_errors = first_error.is_some();
389403

390404
// Avoid writing to store if block stream has been canceled
391405
if self.is_canceled() {
@@ -412,20 +426,10 @@ where
412426
modifications: mut mods,
413427
entity_lfu_cache: cache,
414428
evict_stats,
415-
} = entity_cache
416-
.as_modifications(block_ptr.number)
417-
.map_err(|e| ProcessingError::Unknown(e.into()))?;
429+
} = entity_cache.as_modifications(block_ptr.number).classify()?;
418430
section.end();
419431

420-
trace!(self.logger, "Entity cache statistics";
421-
"weight" => evict_stats.new_weight,
422-
"evicted_weight" => evict_stats.evicted_weight,
423-
"count" => evict_stats.new_count,
424-
"evicted_count" => evict_stats.evicted_count,
425-
"stale_update" => evict_stats.stale_update,
426-
"hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()),
427-
"accesses" => evict_stats.accesses,
428-
"evict_time_ms" => evict_stats.evict_time.as_millis());
432+
log_evict_stats(&self.logger, &evict_stats);
429433

430434
mods.extend(offchain_mods);
431435

@@ -463,8 +467,6 @@ where
463467
);
464468
}
465469

466-
let first_error = deterministic_errors.first().cloned();
467-
468470
let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?;
469471

470472
self.inputs
@@ -509,7 +511,30 @@ where
509511
.flush_metrics_to_store(&logger, block_ptr, self.inputs.deployment.id)
510512
.non_deterministic()?;
511513

512-
Ok(has_errors)
514+
if has_errors {
515+
self.maybe_cancel()?;
516+
}
517+
518+
Ok(())
519+
}
520+
521+
/// Cancel the subgraph if `disable_fail_fast` is not set and it is not
522+
/// synced
523+
fn maybe_cancel(&self) -> Result<(), ProcessingError> {
524+
// To prevent a buggy pending version from replacing a current version, if errors are
525+
// present the subgraph will be unassigned.
526+
let store = &self.inputs.store;
527+
if !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() {
528+
store
529+
.unassign_subgraph()
530+
.map_err(|e| ProcessingError::Unknown(e.into()))?;
531+
532+
// Use `Canceled` to avoiding setting the subgraph health to failed, an error was
533+
// just transacted so it will be already be set to unhealthy.
534+
Err(ProcessingError::Canceled.into())
535+
} else {
536+
Ok(())
537+
}
513538
}
514539

515540
/// Processes a block and returns the updated context and a boolean flag indicating
@@ -795,31 +820,17 @@ where
795820
.persisted_data_sources
796821
.extend(persisted_off_chain_data_sources);
797822

798-
let has_errors = self
799-
.transact_block_state(
800-
&logger,
801-
block_ptr.clone(),
802-
firehose_cursor.clone(),
803-
block.timestamp(),
804-
block_state,
805-
proof_of_indexing,
806-
offchain_mods,
807-
processed_offchain_data_sources,
808-
)
809-
.await?;
810-
811-
// To prevent a buggy pending version from replacing a current version, if errors are
812-
// present the subgraph will be unassigned.
813-
let store = &self.inputs.store;
814-
if has_errors && !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() {
815-
store
816-
.unassign_subgraph()
817-
.map_err(|e| ProcessingError::Unknown(e.into()))?;
818-
819-
// Use `Canceled` to avoiding setting the subgraph health to failed, an error was
820-
// just transacted so it will be already be set to unhealthy.
821-
return Err(ProcessingError::Canceled);
822-
}
823+
self.transact_block_state(
824+
&logger,
825+
block_ptr.clone(),
826+
firehose_cursor.clone(),
827+
block.timestamp(),
828+
block_state,
829+
proof_of_indexing,
830+
offchain_mods,
831+
processed_offchain_data_sources,
832+
)
833+
.await?;
823834

824835
match needs_restart {
825836
true => Ok(Action::Restart),
@@ -1389,31 +1400,17 @@ where
13891400
}
13901401
};
13911402

1392-
let has_errors = self
1393-
.transact_block_state(
1394-
&logger,
1395-
block_ptr.clone(),
1396-
cursor.clone(),
1397-
block_time,
1398-
block_state,
1399-
proof_of_indexing,
1400-
vec![],
1401-
vec![],
1402-
)
1403-
.await?;
1404-
1405-
// To prevent a buggy pending version from replacing a current version, if errors are
1406-
// present the subgraph will be unassigned.
1407-
let store = &self.inputs.store;
1408-
if has_errors && !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() {
1409-
store
1410-
.unassign_subgraph()
1411-
.map_err(|e| ProcessingError::Unknown(e.into()))?;
1412-
1413-
// Use `Canceled` to avoiding setting the subgraph health to failed, an error was
1414-
// just transacted so it will be already be set to unhealthy.
1415-
return Err(ProcessingError::Canceled.into());
1416-
};
1403+
self.transact_block_state(
1404+
&logger,
1405+
block_ptr.clone(),
1406+
cursor.clone(),
1407+
block_time,
1408+
block_state,
1409+
proof_of_indexing,
1410+
vec![],
1411+
vec![],
1412+
)
1413+
.await?;
14171414

14181415
Ok(Action::Continue)
14191416
}

graph/src/components/subgraph/instance.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,6 @@ impl BlockState {
131131
write_capacity_remaining.saturating_sub(other.write_capacity_remaining);
132132
}
133133

134-
pub fn has_errors(&self) -> bool {
135-
!self.deterministic_errors.is_empty()
136-
}
137-
138134
pub fn has_created_data_sources(&self) -> bool {
139135
assert!(!self.in_handler);
140136
!self.created_data_sources.is_empty()

0 commit comments

Comments
 (0)