Skip to content

Commit 6fe6676

Browse files
authored
Add indexerHints to manifest with the ability to configure pruning
* graph, core : new structure for historyBlocks in manifest * graph: Add history_blocks_override and min_history_blocks env vars * graph, store: refactor history_blocks setting and wire overrides * graph: bump spec version to v1 * graph: min_history_blocks to default to 2 * reorg_threshold * fix parse_indexer_hints unit tests * node/graphman: graphman prune to default to min_history_blocks * graph: change historyBlocks syntax to use the term `prune` * node: better docstring graphman prune
1 parent b197682 commit 6fe6676

File tree

10 files changed

+176
-34
lines changed

10 files changed

+176
-34
lines changed

core/src/subgraph/registrar.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
586586
debug_fork: Option<DeploymentHash>,
587587
version_switching_mode: SubgraphVersionSwitchingMode,
588588
resolver: &Arc<dyn LinkResolver>,
589-
history_blocks: Option<i32>,
589+
history_blocks_override: Option<i32>,
590590
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
591591
let raw_string = serde_yaml::to_string(&raw).unwrap();
592592
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
@@ -613,8 +613,6 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
613613
.await
614614
.map_err(SubgraphRegistrarError::ManifestValidationError)?;
615615

616-
let history_blocks = history_blocks.or(manifest.history_blocks());
617-
618616
let network_name = manifest.network_name();
619617

620618
let chain = chains
@@ -687,8 +685,9 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
687685
.graft(base_block)
688686
.debug(debug_fork)
689687
.entities_with_causality_region(needs_causality_region);
690-
if let Some(history_blocks) = history_blocks {
691-
deployment = deployment.with_history_blocks(history_blocks);
688+
689+
if let Some(history_blocks) = history_blocks_override {
690+
deployment = deployment.with_history_blocks_override(history_blocks);
692691
}
693692

694693
deployment_store

graph/src/data/subgraph/api_version.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub const SPEC_VERSION_0_0_8: Version = Version::new(0, 0, 8);
4242
pub const SPEC_VERSION_0_0_9: Version = Version::new(0, 0, 9);
4343

4444
// Enables `indexerHints` feature.
45-
pub const SPEC_VERSION_0_1_0: Version = Version::new(0, 1, 0);
45+
pub const SPEC_VERSION_1_0_0: Version = Version::new(1, 0, 0);
4646

4747
pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);
4848

graph/src/data/subgraph/mod.rs

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ pub mod status;
1010

1111
pub use features::{SubgraphFeature, SubgraphFeatureValidationError};
1212

13-
use crate::object;
13+
use crate::{components::store::BLOCK_NUMBER_MAX, object};
1414
use anyhow::{anyhow, Context, Error};
1515
use futures03::{future::try_join, stream::FuturesOrdered, TryStreamExt as _};
1616
use itertools::Itertools;
1717
use semver::Version;
18-
use serde::{de, ser};
18+
use serde::{
19+
de::{self, Visitor},
20+
ser,
21+
};
1922
use serde_yaml;
2023
use slog::Logger;
2124
use stable_hash::{FieldAddress, StableHash};
@@ -548,7 +551,83 @@ pub struct BaseSubgraphManifest<C, S, D, T> {
548551
#[derive(Debug, Deserialize)]
549552
#[serde(rename_all = "camelCase")]
550553
pub struct IndexerHints {
551-
pub history_blocks: Option<BlockNumber>,
554+
prune: Option<Prune>,
555+
}
556+
557+
impl IndexerHints {
558+
pub fn history_blocks(&self) -> BlockNumber {
559+
match self.prune {
560+
Some(ref hb) => hb.history_blocks(),
561+
None => BLOCK_NUMBER_MAX,
562+
}
563+
}
564+
}
565+
566+
#[derive(Debug)]
567+
pub enum Prune {
568+
Auto,
569+
Never,
570+
Blocks(BlockNumber),
571+
}
572+
573+
impl Prune {
574+
pub fn history_blocks(&self) -> BlockNumber {
575+
match self {
576+
Prune::Never => BLOCK_NUMBER_MAX,
577+
Prune::Auto => ENV_VARS.min_history_blocks,
578+
Prune::Blocks(x) => *x,
579+
}
580+
}
581+
}
582+
583+
impl<'de> de::Deserialize<'de> for Prune {
584+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
585+
where
586+
D: de::Deserializer<'de>,
587+
{
588+
struct HistoryBlocksVisitor;
589+
590+
const ERROR_MSG: &str = "expected 'all', 'min', or a number for history blocks";
591+
592+
impl<'de> Visitor<'de> for HistoryBlocksVisitor {
593+
type Value = Prune;
594+
595+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
596+
formatter.write_str("a string or an integer for history blocks")
597+
}
598+
599+
fn visit_str<E>(self, value: &str) -> Result<Prune, E>
600+
where
601+
E: de::Error,
602+
{
603+
match value {
604+
"never" => Ok(Prune::Never),
605+
"auto" => Ok(Prune::Auto),
606+
_ => value
607+
.parse::<i32>()
608+
.map(Prune::Blocks)
609+
.map_err(|_| E::custom(ERROR_MSG)),
610+
}
611+
}
612+
613+
fn visit_i32<E>(self, value: i32) -> Result<Prune, E>
614+
where
615+
E: de::Error,
616+
{
617+
Ok(Prune::Blocks(value))
618+
}
619+
620+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
621+
where
622+
E: de::Error,
623+
{
624+
let i = v.try_into().map_err(|_| E::custom(ERROR_MSG))?;
625+
Ok(Prune::Blocks(i))
626+
}
627+
}
628+
629+
deserializer.deserialize_any(HistoryBlocksVisitor)
630+
}
552631
}
553632

554633
/// SubgraphManifest with IPFS links unresolved
@@ -681,10 +760,11 @@ impl<C: Blockchain> SubgraphManifest<C> {
681760
.collect()
682761
}
683762

684-
pub fn history_blocks(&self) -> Option<BlockNumber> {
685-
self.indexer_hints
686-
.as_ref()
687-
.and_then(|hints| hints.history_blocks)
763+
pub fn history_blocks(&self) -> BlockNumber {
764+
match self.indexer_hints {
765+
Some(ref hints) => hints.history_blocks(),
766+
None => BLOCK_NUMBER_MAX,
767+
}
688768
}
689769

690770
pub fn api_versions(&self) -> impl Iterator<Item = semver::Version> + '_ {
@@ -872,10 +952,10 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
872952
);
873953
}
874954

875-
if spec_version < SPEC_VERSION_0_1_0 && indexer_hints.is_some() {
955+
if spec_version < SPEC_VERSION_1_0_0 && indexer_hints.is_some() {
876956
bail!(
877957
"`indexerHints` are not supported prior to {}",
878-
SPEC_VERSION_0_1_0
958+
SPEC_VERSION_1_0_0
879959
);
880960
}
881961

graph/src/data/subgraph/schema.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pub struct DeploymentCreate {
105105
pub graft_base: Option<DeploymentHash>,
106106
pub graft_block: Option<BlockPtr>,
107107
pub debug_fork: Option<DeploymentHash>,
108-
pub history_blocks: Option<i32>,
108+
pub history_blocks_override: Option<i32>,
109109
}
110110

111111
impl DeploymentCreate {
@@ -120,12 +120,12 @@ impl DeploymentCreate {
120120
graft_base: None,
121121
graft_block: None,
122122
debug_fork: None,
123-
history_blocks: None,
123+
history_blocks_override: None,
124124
}
125125
}
126126

127-
pub fn with_history_blocks(mut self, blocks: i32) -> Self {
128-
self.history_blocks = Some(blocks);
127+
pub fn with_history_blocks_override(mut self, blocks: i32) -> Self {
128+
self.history_blocks_override = Some(blocks);
129129
self
130130
}
131131

@@ -201,7 +201,7 @@ impl SubgraphManifestEntity {
201201
schema: manifest.schema.document_string(),
202202
raw_yaml: Some(raw_yaml),
203203
entities_with_causality_region,
204-
history_blocks: BLOCK_NUMBER_MAX,
204+
history_blocks: manifest.history_blocks(),
205205
}
206206
}
207207

graph/src/env/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,14 @@ pub struct EnvVars {
193193
/// is there to simplify development and will be changed to `false` when
194194
/// we get closer to release
195195
pub enable_timeseries: bool,
196+
/// Set by the env var `GRAPH_HISTORY_BLOCKS_OVERRIDE`. Defaults to None
197+
/// Sets an override for the amount history to keep regardless of the
198+
/// historyBlocks set in the manifest
199+
pub history_blocks_override: Option<BlockNumber>,
200+
/// Set by the env var `GRAPH_MIN_HISTORY_BLOCKS`
201+
/// The amount of history to keep when using 'min' historyBlocks
202+
/// in the manifest
203+
pub min_history_blocks: BlockNumber,
196204
}
197205

198206
impl EnvVars {
@@ -258,6 +266,10 @@ impl EnvVars {
258266
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
259267
enable_gas_metrics: inner.enable_gas_metrics.0,
260268
enable_timeseries: inner.enable_timeseries.unwrap_or(cfg!(debug_assertions)),
269+
history_blocks_override: inner.history_blocks_override,
270+
min_history_blocks: inner
271+
.min_history_blocks
272+
.unwrap_or(2 * inner.reorg_threshold),
261273
})
262274
}
263275

@@ -310,7 +322,7 @@ struct Inner {
310322
default = "false"
311323
)]
312324
allow_non_deterministic_fulltext_search: EnvVarBoolean,
313-
#[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "0.0.9")]
325+
#[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "1.0.0")]
314326
max_spec_version: Version,
315327
#[envconfig(from = "GRAPH_LOAD_WINDOW_SIZE", default = "300")]
316328
load_window_size_in_secs: u64,
@@ -391,6 +403,10 @@ struct Inner {
391403
enable_gas_metrics: EnvVarBoolean,
392404
#[envconfig(from = "GRAPH_EXPERIMENTAL_TIMESERIES")]
393405
enable_timeseries: Option<bool>,
406+
#[envconfig(from = "GRAPH_HISTORY_BLOCKS_OVERRIDE")]
407+
history_blocks_override: Option<BlockNumber>,
408+
#[envconfig(from = "GRAPH_MIN_HISTORY_BLOCKS")]
409+
min_history_blocks: Option<BlockNumber>,
394410
}
395411

396412
#[derive(Clone, Debug)]

node/src/bin/manager.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use config::PoolSize;
33
use git_testament::{git_testament, render_testament};
44
use graph::bail;
55
use graph::endpoint::EndpointMetrics;
6+
use graph::env::ENV_VARS;
67
use graph::log::logger_with_levels;
78
use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX};
89
use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry};
@@ -310,9 +311,10 @@ pub enum Command {
310311
/// GRAPH_STORE_HISTORY_DELETE_THRESHOLD
311312
#[clap(long, short)]
312313
delete_threshold: Option<f64>,
313-
/// How much history to keep in blocks
314-
#[clap(long, short = 'y', default_value = "10000")]
315-
history: usize,
314+
/// How much history to keep in blocks. Defaults to
315+
/// GRAPH_MIN_HISTORY_BLOCKS
316+
#[clap(long, short = 'y')]
317+
history: Option<usize>,
316318
/// Prune only this once
317319
#[clap(long, short)]
318320
once: bool,
@@ -1493,6 +1495,7 @@ async fn main() -> anyhow::Result<()> {
14931495
once,
14941496
} => {
14951497
let (store, primary_pool) = ctx.store_and_primary();
1498+
let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?);
14961499
commands::prune::run(
14971500
store,
14981501
primary_pool,

store/postgres/src/deployment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,7 @@ pub fn create_deployment(
10881088
graft_base,
10891089
graft_block,
10901090
debug_fork,
1091-
history_blocks: history_blocks_override,
1091+
history_blocks_override,
10921092
} = deployment;
10931093
let earliest_block_number = start_block.as_ref().map(|ptr| ptr.number).unwrap_or(0);
10941094
let entities_with_causality_region = Vec::from_iter(

store/postgres/src/deployment_store.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,16 @@ impl DeploymentStore {
190190
// Create (or update) the metadata. Update only happens in tests
191191
let entities_with_causality_region =
192192
deployment.manifest.entities_with_causality_region.clone();
193+
194+
// If `GRAPH_HISTORY_BLOCKS_OVERRIDE` is set, override the history_blocks
195+
// setting with the value of the environment variable.
196+
let deployment =
197+
if let Some(history_blocks_global_override) = ENV_VARS.history_blocks_override {
198+
deployment.with_history_blocks_override(history_blocks_global_override)
199+
} else {
200+
deployment
201+
};
202+
193203
if replace || !exists {
194204
deployment::create_deployment(&conn, &site, deployment, exists, replace)?;
195205
};

store/postgres/src/subgraph_store.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -630,16 +630,14 @@ impl SubgraphStoreInner {
630630
)));
631631
}
632632

633-
let history_blocks = deployment.manifest.history_blocks;
634-
635633
// Transmogrify the deployment into a new one
636634
let deployment = DeploymentCreate {
637635
manifest: deployment.manifest,
638636
start_block: deployment.start_block.clone(),
639637
graft_base: Some(src.deployment.clone()),
640638
graft_block: Some(block),
641639
debug_fork: deployment.debug_fork,
642-
history_blocks: Some(history_blocks),
640+
history_blocks_override: None,
643641
};
644642

645643
let graft_base = self.layout(&src.deployment)?;

store/test-store/tests/chain/ethereum/manifest.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@ use std::sync::Arc;
55
use std::time::Duration;
66

77
use graph::blockchain::DataSource;
8+
use graph::components::store::BLOCK_NUMBER_MAX;
89
use graph::data::store::scalar::Bytes;
910
use graph::data::store::Value;
1011
use graph::data::subgraph::schema::SubgraphError;
1112
use graph::data::subgraph::{
12-
SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, SPEC_VERSION_0_0_9,
13-
SPEC_VERSION_0_1_0,
13+
Prune, SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, SPEC_VERSION_0_0_9,
14+
SPEC_VERSION_1_0_0,
1415
};
1516
use graph::data_source::offchain::OffchainDataSourceKind;
1617
use graph::data_source::DataSourceTemplate;
1718
use graph::entity;
19+
use graph::env::ENV_VARS;
1820
use graph::prelude::{
1921
anyhow, async_trait, serde_yaml, tokio, BigDecimal, BigInt, DeploymentHash, Link, Logger,
2022
SubgraphManifest, SubgraphManifestValidationError, SubgraphStore, UnvalidatedSubgraphManifest,
@@ -198,14 +200,48 @@ schema:
198200
graft:
199201
base: Qmbase
200202
block: 12345
201-
specVersion: 0.1.0
203+
specVersion: 1.0.0
202204
indexerHints:
203-
historyBlocks: 100
205+
prune: 100
204206
";
205207

206-
let manifest = resolve_manifest(YAML, SPEC_VERSION_0_1_0).await;
208+
let manifest = resolve_manifest(YAML, SPEC_VERSION_1_0_0).await;
207209

208-
assert_eq!(manifest.history_blocks().unwrap(), 100);
210+
assert_eq!(manifest.history_blocks(), 100);
211+
212+
let yaml: &str = "
213+
dataSources: []
214+
schema:
215+
file:
216+
/: /ipfs/Qmschema
217+
graft:
218+
base: Qmbase
219+
block: 12345
220+
specVersion: 1.0.0
221+
indexerHints:
222+
prune: auto
223+
";
224+
225+
let manifest = resolve_manifest(yaml, SPEC_VERSION_1_0_0).await;
226+
Prune::Auto.history_blocks();
227+
assert_eq!(manifest.history_blocks(), ENV_VARS.min_history_blocks);
228+
229+
let yaml: &str = "
230+
dataSources: []
231+
schema:
232+
file:
233+
/: /ipfs/Qmschema
234+
graft:
235+
base: Qmbase
236+
block: 12345
237+
specVersion: 1.0.0
238+
indexerHints:
239+
prune: never
240+
";
241+
242+
let manifest = resolve_manifest(yaml, SPEC_VERSION_1_0_0).await;
243+
244+
assert_eq!(manifest.history_blocks(), BLOCK_NUMBER_MAX);
209245
}
210246

211247
#[test]

0 commit comments

Comments
 (0)