Skip to content

Commit 7c41d93

Browse files
committed
node: add optional extended block checks for providers
1 parent aa933d3 commit 7c41d93

File tree

9 files changed

+150
-172
lines changed

9 files changed

+150
-172
lines changed

graph/src/env/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ pub struct EnvVars {
226226
///
227227
/// If not specified, the graphman server will not start.
228228
pub graphman_server_auth_token: Option<String>,
229+
230+
/// If enabled for a chain, requires providers to support extended block details.
231+
///
232+
/// Providers that do not support extended block details for enabled chains are
233+
/// considered invalid and will not be used.
234+
///
235+
/// Defaults to an empty list;
236+
pub firehose_require_extended_blocks_for_chains: Vec<String>,
229237
}
230238

231239
impl EnvVars {
@@ -311,6 +319,10 @@ impl EnvVars {
311319
genesis_validation_enabled: inner.genesis_validation_enabled.0,
312320
genesis_validation_timeout: Duration::from_secs(inner.genesis_validation_timeout),
313321
graphman_server_auth_token: inner.graphman_server_auth_token,
322+
firehose_require_extended_blocks_for_chains:
323+
Self::firehose_require_extended_blocks_for_chains(
324+
inner.firehose_require_extended_blocks_for_chains,
325+
),
314326
})
315327
}
316328

@@ -335,6 +347,14 @@ impl EnvVars {
335347
pub fn log_gql_cache_timing(&self) -> bool {
336348
self.log_query_timing_contains("cache") && self.log_gql_timing()
337349
}
350+
351+
fn firehose_require_extended_blocks_for_chains(s: Option<String>) -> Vec<String> {
352+
s.unwrap_or_default()
353+
.split(",")
354+
.map(|x| x.trim().to_string())
355+
.filter(|x| !x.is_empty())
356+
.collect()
357+
}
338358
}
339359

340360
impl Default for EnvVars {
@@ -462,6 +482,8 @@ struct Inner {
462482
genesis_validation_timeout: u64,
463483
#[envconfig(from = "GRAPHMAN_SERVER_AUTH_TOKEN")]
464484
graphman_server_auth_token: Option<String>,
485+
#[envconfig(from = "GRAPH_NODE_FIREHOSE_REQUIRE_EXTENDED_BLOCKS_FOR_CHAINS")]
486+
firehose_require_extended_blocks_for_chains: Option<String>,
465487
}
466488

467489
#[derive(Clone, Debug)]

node/src/bin/manager.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use git_testament::{git_testament, render_testament};
44
use graph::bail;
55
use graph::blockchain::BlockHash;
66
use graph::cheap_clone::CheapClone;
7-
use graph::components::adapter::ChainId;
7+
use graph::components::network_provider::ChainName;
88
use graph::endpoint::EndpointMetrics;
99
use graph::env::ENV_VARS;
1010
use graph::log::logger_with_levels;
@@ -1006,11 +1006,12 @@ impl Context {
10061006
))
10071007
}
10081008

1009-
async fn networks(&self, block_store: Arc<BlockStore>) -> anyhow::Result<Networks> {
1009+
async fn networks(&self) -> anyhow::Result<Networks> {
10101010
let logger = self.logger.clone();
10111011
let registry = self.metrics_registry();
10121012
let metrics = Arc::new(EndpointMetrics::mock());
1013-
Networks::from_config(logger, &self.config, registry, metrics, block_store, false).await
1013+
1014+
Networks::from_config(logger, &self.config, registry, metrics, &[]).await
10141015
}
10151016

10161017
fn chain_store(self, chain_name: &str) -> anyhow::Result<Arc<ChainStore>> {
@@ -1025,8 +1026,7 @@ impl Context {
10251026
self,
10261027
chain_name: &str,
10271028
) -> anyhow::Result<(Arc<ChainStore>, Arc<EthereumAdapter>)> {
1028-
let block_store = self.store().block_store();
1029-
let networks = self.networks(block_store).await?;
1029+
let networks = self.networks().await?;
10301030
let chain_store = self.chain_store(chain_name)?;
10311031
let ethereum_adapter = networks
10321032
.ethereum_rpcs(chain_name.into())
@@ -1169,7 +1169,7 @@ async fn main() -> anyhow::Result<()> {
11691169
match cmd {
11701170
CheckProviders {} => {
11711171
let store = ctx.store().block_store();
1172-
let networks = ctx.networks(store.cheap_clone()).await?;
1172+
let networks = ctx.networks().await?;
11731173
Ok(commands::config::check_provider_genesis(&networks, store).await)
11741174
}
11751175
Place { name, network } => {
@@ -1367,8 +1367,8 @@ async fn main() -> anyhow::Result<()> {
13671367
} => {
13681368
let store_builder = ctx.store_builder().await;
13691369
let store = ctx.store().block_store();
1370-
let networks = ctx.networks(store.cheap_clone()).await?;
1371-
let chain_id = ChainId::from(chain_name);
1370+
let networks = ctx.networks().await?;
1371+
let chain_id = ChainName::from(chain_name);
13721372
let block_hash = BlockHash::from_str(&block_hash)?;
13731373
commands::chain::update_chain_genesis(
13741374
&networks,

node/src/chain.rs

Lines changed: 12 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@ use graph::blockchain::{
1515
ChainIdentifier,
1616
};
1717
use graph::cheap_clone::CheapClone;
18-
use graph::components::adapter::ChainId;
18+
use graph::components::network_provider::ChainName;
1919
use graph::components::store::{BlockStore as _, ChainStore};
2020
use graph::data::store::NodeId;
2121
use graph::endpoint::EndpointMetrics;
2222
use graph::env::{EnvVars, ENV_VARS};
23-
use graph::firehose::{
24-
FirehoseEndpoint, FirehoseGenesisDecoder, GenesisDecoder, SubgraphLimit,
25-
SubstreamsGenesisDecoder,
26-
};
23+
use graph::firehose::{FirehoseEndpoint, SubgraphLimit};
2724
use graph::futures03::future::try_join_all;
2825
use graph::itertools::Itertools;
2926
use graph::log::factory::LoggerFactory;
@@ -63,11 +60,11 @@ pub fn create_substreams_networks(
6360
config.chains.ingestor,
6461
);
6562

66-
let mut networks_by_kind: BTreeMap<(BlockchainKind, ChainId), Vec<Arc<FirehoseEndpoint>>> =
63+
let mut networks_by_kind: BTreeMap<(BlockchainKind, ChainName), Vec<Arc<FirehoseEndpoint>>> =
6764
BTreeMap::new();
6865

6966
for (name, chain) in &config.chains.chains {
70-
let name: ChainId = name.as_str().into();
67+
let name: ChainName = name.as_str().into();
7168
for provider in &chain.providers {
7269
if let ProviderDetails::Substreams(ref firehose) = provider.details {
7370
info!(
@@ -93,7 +90,6 @@ pub fn create_substreams_networks(
9390
firehose.compression_enabled(),
9491
SubgraphLimit::Unlimited,
9592
endpoint_metrics.clone(),
96-
Box::new(SubstreamsGenesisDecoder {}),
9793
)));
9894
}
9995
}
@@ -124,11 +120,11 @@ pub fn create_firehose_networks(
124120
config.chains.ingestor,
125121
);
126122

127-
let mut networks_by_kind: BTreeMap<(BlockchainKind, ChainId), Vec<Arc<FirehoseEndpoint>>> =
123+
let mut networks_by_kind: BTreeMap<(BlockchainKind, ChainName), Vec<Arc<FirehoseEndpoint>>> =
128124
BTreeMap::new();
129125

130126
for (name, chain) in &config.chains.chains {
131-
let name: ChainId = name.as_str().into();
127+
let name: ChainName = name.as_str().into();
132128
for provider in &chain.providers {
133129
let logger = logger.cheap_clone();
134130
if let ProviderDetails::Firehose(ref firehose) = provider.details {
@@ -143,27 +139,6 @@ pub fn create_firehose_networks(
143139
.entry((chain.protocol, name.clone()))
144140
.or_insert_with(Vec::new);
145141

146-
let decoder: Box<dyn GenesisDecoder> = match chain.protocol {
147-
BlockchainKind::Arweave => {
148-
FirehoseGenesisDecoder::<graph_chain_arweave::Block>::new(logger)
149-
}
150-
BlockchainKind::Ethereum => {
151-
FirehoseGenesisDecoder::<graph_chain_ethereum::codec::Block>::new(logger)
152-
}
153-
BlockchainKind::Near => {
154-
FirehoseGenesisDecoder::<graph_chain_near::HeaderOnlyBlock>::new(logger)
155-
}
156-
BlockchainKind::Cosmos => {
157-
FirehoseGenesisDecoder::<graph_chain_cosmos::Block>::new(logger)
158-
}
159-
BlockchainKind::Substreams => {
160-
unreachable!("Substreams configuration should not be handled here");
161-
}
162-
BlockchainKind::Starknet => {
163-
FirehoseGenesisDecoder::<graph_chain_starknet::Block>::new(logger)
164-
}
165-
};
166-
167142
// Create n FirehoseEndpoints where n is the size of the pool. If a
168143
// subgraph limit is defined for this endpoint then each endpoint
169144
// instance will have their own subgraph limit.
@@ -182,7 +157,6 @@ pub fn create_firehose_networks(
182157
firehose.compression_enabled(),
183158
firehose.limit_for(&config.node),
184159
endpoint_metrics.cheap_clone(),
185-
decoder.box_clone(),
186160
)));
187161
}
188162
}
@@ -384,7 +358,7 @@ pub async fn networks_as_chains(
384358
async fn add_substreams<C: Blockchain>(
385359
networks: &Networks,
386360
config: &Arc<EnvVars>,
387-
chain_id: ChainId,
361+
chain_id: ChainName,
388362
blockchain_map: &mut BlockchainMap,
389363
logger_factory: LoggerFactory,
390364
chain_store: Arc<dyn ChainStore>,
@@ -608,7 +582,7 @@ pub async fn networks_as_chains(
608582
mod test {
609583
use crate::config::{Config, Opt};
610584
use crate::network_setup::{AdapterConfiguration, Networks};
611-
use graph::components::adapter::{ChainId, NoopIdentValidator};
585+
use graph::components::network_provider::ChainName;
612586
use graph::endpoint::EndpointMetrics;
613587
use graph::log::logger;
614588
use graph::prelude::{tokio, MetricsRegistry};
@@ -641,23 +615,15 @@ mod test {
641615
let metrics = Arc::new(EndpointMetrics::mock());
642616
let config = Config::load(&logger, &opt).expect("can create config");
643617
let metrics_registry = Arc::new(MetricsRegistry::mock());
644-
let ident_validator = Arc::new(NoopIdentValidator);
645618

646-
let networks = Networks::from_config(
647-
logger,
648-
&config,
649-
metrics_registry,
650-
metrics,
651-
ident_validator,
652-
false,
653-
)
654-
.await
655-
.expect("can parse config");
619+
let networks = Networks::from_config(logger, &config, metrics_registry, metrics, &[])
620+
.await
621+
.expect("can parse config");
656622
let mut network_names = networks
657623
.adapters
658624
.iter()
659625
.map(|a| a.chain_id())
660-
.collect::<Vec<&ChainId>>();
626+
.collect::<Vec<&ChainName>>();
661627
network_names.sort();
662628

663629
let traces = NodeCapabilities {

node/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use graph::{
22
anyhow::Error,
33
blockchain::BlockchainKind,
4-
components::adapter::ChainId,
4+
components::network_provider::ChainName,
55
env::ENV_VARS,
66
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
77
itertools::Itertools,
@@ -104,7 +104,7 @@ fn validate_name(s: &str) -> Result<()> {
104104
}
105105

106106
impl Config {
107-
pub fn chain_ids(&self) -> Vec<ChainId> {
107+
pub fn chain_ids(&self) -> Vec<ChainName> {
108108
self.chains
109109
.chains
110110
.keys()

node/src/main.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use clap::Parser as _;
22
use git_testament::{git_testament, render_testament};
3-
use graph::components::adapter::{IdentValidator, NoopIdentValidator};
43
use graph::futures01::Future as _;
54
use graph::futures03::compat::Future01CompatExt;
65
use graph::futures03::future::TryFutureExt;
@@ -273,21 +272,36 @@ async fn main() {
273272
start_graphman_server(opt.graphman_port, graphman_server_config).await;
274273

275274
let launch_services = |logger: Logger, env_vars: Arc<EnvVars>| async move {
275+
use graph::components::network_provider;
276+
276277
let block_store = network_store.block_store();
277278

278-
let validator: Arc<dyn IdentValidator> = if env_vars.genesis_validation_enabled {
279-
network_store.block_store()
280-
} else {
281-
Arc::new(NoopIdentValidator {})
282-
};
279+
let mut provider_checks: Vec<Arc<dyn network_provider::ProviderCheck>> = Vec::new();
280+
281+
if env_vars.genesis_validation_enabled {
282+
provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new(
283+
block_store.clone(),
284+
)));
285+
}
286+
287+
if !env_vars
288+
.firehose_require_extended_blocks_for_chains
289+
.is_empty()
290+
{
291+
provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new(
292+
env_vars
293+
.firehose_require_extended_blocks_for_chains
294+
.iter()
295+
.map(|x| x.as_str().into()),
296+
)));
297+
}
283298

284299
let network_adapters = Networks::from_config(
285300
logger.cheap_clone(),
286301
&config,
287302
metrics_registry.cheap_clone(),
288303
endpoint_metrics,
289-
validator,
290-
env_vars.genesis_validation_enabled,
304+
&provider_checks,
291305
)
292306
.await
293307
.expect("unable to parse network configuration");

node/src/manager/commands/chain.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use graph::blockchain::BlockHash;
77
use graph::blockchain::BlockPtr;
88
use graph::blockchain::ChainIdentifier;
99
use graph::cheap_clone::CheapClone;
10-
use graph::components::adapter::ChainId;
11-
use graph::components::adapter::IdentValidator;
10+
use graph::components::network_provider::ChainIdentifierStore;
11+
use graph::components::network_provider::ChainName;
1212
use graph::components::store::StoreError;
1313
use graph::prelude::BlockNumber;
1414
use graph::prelude::ChainStore as _;
@@ -161,7 +161,7 @@ pub async fn update_chain_genesis(
161161
coord: Arc<PoolCoordinator>,
162162
store: Arc<BlockStore>,
163163
logger: &Logger,
164-
chain_id: ChainId,
164+
chain_id: ChainName,
165165
genesis_hash: BlockHash,
166166
force: bool,
167167
) -> Result<(), Error> {
@@ -183,7 +183,7 @@ pub async fn update_chain_genesis(
183183
// Update the local shard's genesis, whether or not it is the primary.
184184
// The chains table is replicated from the primary and keeps another genesis hash.
185185
// To keep those in sync we need to update the primary and then refresh the shard tables.
186-
store.update_ident(
186+
store.update_identifier(
187187
&chain_id,
188188
&ChainIdentifier {
189189
net_version: ident.net_version.clone(),

node/src/manager/commands/config.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::{collections::BTreeMap, sync::Arc};
22

3+
use graph::components::network_provider::ChainIdentifierStore;
4+
use graph::components::network_provider::ChainName;
5+
use graph::components::network_provider::ProviderName;
36
use graph::{
47
anyhow::{bail, Context},
5-
components::{
6-
adapter::{ChainId, IdentValidator, IdentValidatorError, NoopIdentValidator, ProviderName},
7-
subgraph::{Setting, Settings},
8-
},
8+
components::subgraph::{Setting, Settings},
99
endpoint::EndpointMetrics,
1010
env::EnvVars,
1111
itertools::Itertools,
@@ -30,15 +30,15 @@ pub async fn check_provider_genesis(networks: &Networks, store: Arc<BlockStore>)
3030
let (_oks, errs): (Vec<_>, Vec<_>) = ids
3131
.into_iter()
3232
.map(|(provider, id)| {
33-
id.map_err(IdentValidatorError::from)
34-
.and_then(|id| store.check_ident(chain_id, &id))
35-
.map_err(|e| (provider, e))
33+
id.and_then(|id| store.validate_identifier(chain_id, &id).map_err(Into::into))
34+
.map_err(|e| (provider, e.to_string()))
3635
})
3736
.partition_result();
37+
3838
let errs = errs
3939
.into_iter()
40-
.dedup_by(|e1, e2| e1.eq(e2))
41-
.collect::<Vec<(ProviderName, IdentValidatorError)>>();
40+
.dedup_by(|e1, e2| e1 == e2)
41+
.collect::<Vec<(ProviderName, String)>>();
4242

4343
if errs.is_empty() {
4444
println!("chain_id: {}: status: OK", chain_id);
@@ -171,16 +171,8 @@ pub async fn provider(
171171

172172
let metrics = Arc::new(EndpointMetrics::mock());
173173
let caps = caps_from_features(features)?;
174-
let networks = Networks::from_config(
175-
logger,
176-
&config,
177-
registry,
178-
metrics,
179-
Arc::new(NoopIdentValidator),
180-
false,
181-
)
182-
.await?;
183-
let network: ChainId = network.into();
174+
let networks = Networks::from_config(logger, &config, registry, metrics, &[]).await?;
175+
let network: ChainName = network.into();
184176
let adapters = networks.ethereum_rpcs(network.clone());
185177

186178
let adapters = adapters.all_cheapest_with(&caps).await;

0 commit comments

Comments
 (0)