Skip to content

Commit e3aad48

Browse files
authored
graphman config check providers (#5517)
* graphman config check providers * graphman chain update genesis
1 parent 0530ce1 commit e3aad48

File tree

8 files changed

+222
-15
lines changed

8 files changed

+222
-15
lines changed

chain/ethereum/src/network.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,7 @@ impl EthereumNetworkAdapters {
172172
&self,
173173
required_capabilities: &NodeCapabilities,
174174
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
175-
let all = self
176-
.manager
177-
.get_all_unverified(&self.chain_id)
178-
.unwrap_or_default();
175+
let all = self.manager.get_all_unverified(&self.chain_id);
179176

180177
Self::available_with_capabilities(all, required_capabilities)
181178
}

graph/src/components/adapter.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ struct Ident {
6565
chain_id: ChainId,
6666
}
6767

68-
#[derive(Error, Debug, Clone)]
68+
#[derive(Error, Debug, Clone, PartialEq)]
6969
pub enum IdentValidatorError {
7070
#[error("database error: {0}")]
7171
UnknownError(String),
@@ -89,6 +89,12 @@ pub enum IdentValidatorError {
8989

9090
impl From<anyhow::Error> for IdentValidatorError {
9191
fn from(value: anyhow::Error) -> Self {
92+
Self::from(&value)
93+
}
94+
}
95+
96+
impl From<&anyhow::Error> for IdentValidatorError {
97+
fn from(value: &anyhow::Error) -> Self {
9298
IdentValidatorError::UnknownError(value.to_string())
9399
}
94100
}
@@ -308,13 +314,12 @@ impl<T: NetIdentifiable + Clone + 'static> ProviderManager<T> {
308314
/// adapters that failed verification. For the most part this should be fine since ideally
309315
/// get_all would have been used before. Nevertheless, it is possible that a misconfigured
310316
/// adapter is returned from this list even after validation.
311-
pub fn get_all_unverified(&self, chain_id: &ChainId) -> Result<Vec<&T>, ProviderManagerError> {
312-
Ok(self
313-
.inner
317+
pub fn get_all_unverified(&self, chain_id: &ChainId) -> Vec<&T> {
318+
self.inner
314319
.adapters
315320
.get(chain_id)
316321
.map(|v| v.iter().map(|v| &v.1).collect())
317-
.unwrap_or_default())
322+
.unwrap_or_default()
318323
}
319324

320325
/// get_all will trigger the verification of the endpoints for the provided chain_id, hence the

node/src/bin/manager.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use clap::{Parser, Subcommand};
22
use config::PoolSize;
33
use git_testament::{git_testament, render_testament};
44
use graph::bail;
5+
use graph::blockchain::BlockHash;
56
use graph::cheap_clone::CheapClone;
7+
use graph::components::adapter::ChainId;
68
use graph::endpoint::EndpointMetrics;
79
use graph::env::ENV_VARS;
810
use graph::log::logger_with_levels;
@@ -33,6 +35,7 @@ use graph_store_postgres::{
3335
SubscriptionManager, PRIMARY_SHARD,
3436
};
3537
use lazy_static::lazy_static;
38+
use std::str::FromStr;
3639
use std::{collections::HashMap, num::ParseIntError, sync::Arc, time::Duration};
3740
const VERSION_LABEL_KEY: &str = "version";
3841

@@ -435,6 +438,11 @@ pub enum ConfigCommand {
435438
features: String,
436439
network: String,
437440
},
441+
442+
/// Compare the NetIdentifier of all defined adapters with the existing
443+
/// identifiers on the ChainStore.
444+
CheckProviders {},
445+
438446
/// Show subgraph-specific settings
439447
///
440448
/// GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS can add a file that contains
@@ -547,6 +555,16 @@ pub enum ChainCommand {
547555
force: bool,
548556
},
549557

558+
/// Update the genesis block hash for a chain
559+
UpdateGenesis {
560+
#[clap(long, short)]
561+
force: bool,
562+
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
563+
block_hash: String,
564+
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
565+
chain_name: String,
566+
},
567+
550568
/// Change the block cache shard for a chain
551569
ChangeShard {
552570
/// Chain name (must be an existing chain, see 'chain list')
@@ -1149,6 +1167,11 @@ async fn main() -> anyhow::Result<()> {
11491167
use ConfigCommand::*;
11501168

11511169
match cmd {
1170+
CheckProviders {} => {
1171+
let store = ctx.store().block_store();
1172+
let networks = ctx.networks(store.cheap_clone()).await?;
1173+
Ok(commands::config::check_provider_genesis(&networks, store).await)
1174+
}
11521175
Place { name, network } => {
11531176
commands::config::place(&ctx.config.deployment, &name, &network)
11541177
}
@@ -1326,6 +1349,29 @@ async fn main() -> anyhow::Result<()> {
13261349
shard,
13271350
)
13281351
}
1352+
1353+
UpdateGenesis {
1354+
force,
1355+
block_hash,
1356+
chain_name,
1357+
} => {
1358+
let store_builder = ctx.store_builder().await;
1359+
let store = ctx.store().block_store();
1360+
let networks = ctx.networks(store.cheap_clone()).await?;
1361+
let chain_id = ChainId::from(chain_name);
1362+
let block_hash = BlockHash::from_str(&block_hash)?;
1363+
commands::chain::update_chain_genesis(
1364+
&networks,
1365+
store_builder.coord.cheap_clone(),
1366+
store,
1367+
&logger,
1368+
chain_id,
1369+
block_hash,
1370+
force,
1371+
)
1372+
.await
1373+
}
1374+
13291375
CheckBlocks { method, chain_name } => {
13301376
use commands::check_blocks::{by_hash, by_number, by_range};
13311377
use CheckBlockMethod::*;

node/src/manager/commands/chain.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,20 @@ use std::sync::Arc;
33
use diesel::sql_query;
44
use diesel::Connection;
55
use diesel::RunQueryDsl;
6+
use graph::blockchain::BlockHash;
67
use graph::blockchain::BlockPtr;
8+
use graph::blockchain::ChainIdentifier;
79
use graph::cheap_clone::CheapClone;
10+
use graph::components::adapter::ChainId;
11+
use graph::components::adapter::IdentValidator;
812
use graph::components::store::StoreError;
913
use graph::prelude::BlockNumber;
1014
use graph::prelude::ChainStore as _;
1115
use graph::prelude::{anyhow, anyhow::bail};
16+
use graph::slog::Logger;
1217
use graph::{components::store::BlockStore as _, prelude::anyhow::Error};
1318
use graph_store_postgres::add_chain;
19+
use graph_store_postgres::connection_pool::PoolCoordinator;
1420
use graph_store_postgres::find_chain;
1521
use graph_store_postgres::update_chain_name;
1622
use graph_store_postgres::BlockStore;
@@ -21,6 +27,8 @@ use graph_store_postgres::{
2127
command_support::catalog::block_store, connection_pool::ConnectionPool,
2228
};
2329

30+
use crate::network_setup::Networks;
31+
2432
pub async fn list(primary: ConnectionPool, store: Arc<BlockStore>) -> Result<(), Error> {
2533
let mut chains = {
2634
let mut conn = primary.get()?;
@@ -148,6 +156,52 @@ pub fn remove(primary: ConnectionPool, store: Arc<BlockStore>, name: String) ->
148156
Ok(())
149157
}
150158

159+
pub async fn update_chain_genesis(
160+
networks: &Networks,
161+
coord: Arc<PoolCoordinator>,
162+
store: Arc<BlockStore>,
163+
logger: &Logger,
164+
chain_id: ChainId,
165+
genesis_hash: BlockHash,
166+
force: bool,
167+
) -> Result<(), Error> {
168+
let ident = networks.chain_identifier(logger, &chain_id).await?;
169+
if !genesis_hash.eq(&ident.genesis_block_hash) {
170+
println!(
171+
"Expected adapter for chain {} to return genesis hash {} but got {}",
172+
chain_id, genesis_hash, ident.genesis_block_hash
173+
);
174+
if !force {
175+
println!("Not performing update");
176+
return Ok(());
177+
} else {
178+
println!("--force used, updating anyway");
179+
}
180+
}
181+
182+
println!("Updating shard...");
183+
// Update the local shard's genesis, whether or not it is the primary.
184+
// The chains table is replicated from the primary and keeps another genesis hash.
185+
// To keep those in sync we need to update the primary and then refresh the shard tables.
186+
store.update_ident(
187+
&chain_id,
188+
&ChainIdentifier {
189+
net_version: ident.net_version.clone(),
190+
genesis_block_hash: genesis_hash,
191+
},
192+
)?;
193+
194+
// Update the primary public.chains
195+
println!("Updating primary public.chains");
196+
store.set_chain_identifier(chain_id, &ident)?;
197+
198+
// Refresh the new values
199+
println!("Refresh mappings");
200+
crate::manager::commands::database::remap(&coord, None, None, false).await?;
201+
202+
Ok(())
203+
}
204+
151205
pub fn change_block_cache_shard(
152206
primary_store: ConnectionPool,
153207
store: Arc<BlockStore>,

node/src/manager/commands/config.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{collections::BTreeMap, sync::Arc};
33
use graph::{
44
anyhow::{bail, Context},
55
components::{
6-
adapter::{ChainId, MockIdentValidator},
6+
adapter::{ChainId, IdentValidator, IdentValidatorError, MockIdentValidator, ProviderName},
77
subgraph::{Setting, Settings},
88
},
99
endpoint::EndpointMetrics,
@@ -16,10 +16,40 @@ use graph::{
1616
slog::Logger,
1717
};
1818
use graph_chain_ethereum::NodeCapabilities;
19-
use graph_store_postgres::DeploymentPlacer;
19+
use graph_store_postgres::{BlockStore, DeploymentPlacer};
2020

2121
use crate::{config::Config, network_setup::Networks};
2222

23+
/// Compare the NetIdentifier of all defined adapters with the existing
24+
/// identifiers on the ChainStore. If a ChainStore doesn't exist it will be show
25+
/// as an error. It's intended to be run again an environment that has already
26+
/// been setup by graph-node.
27+
pub async fn check_provider_genesis(networks: &Networks, store: Arc<BlockStore>) {
28+
println!("Checking providers");
29+
for (chain_id, ids) in networks.all_chain_identifiers().await.into_iter() {
30+
let (_oks, errs): (Vec<_>, Vec<_>) = ids
31+
.into_iter()
32+
.map(|(provider, id)| {
33+
id.map_err(IdentValidatorError::from)
34+
.and_then(|id| store.check_ident(chain_id, &id))
35+
.map_err(|e| (provider, e))
36+
})
37+
.partition_result();
38+
let errs = errs
39+
.into_iter()
40+
.dedup_by(|e1, e2| e1.eq(e2))
41+
.collect::<Vec<(ProviderName, IdentValidatorError)>>();
42+
43+
if errs.is_empty() {
44+
println!("chain_id: {}: status: OK", chain_id);
45+
continue;
46+
}
47+
48+
println!("chain_id: {}: status: NOK", chain_id);
49+
println!("errors: {:?}", errs);
50+
}
51+
}
52+
2353
pub fn place(placer: &dyn DeploymentPlacer, name: &str, network: &str) -> Result<(), Error> {
2454
match placer.place(name, network).map_err(|s| anyhow!(s))? {
2555
None => {

node/src/network_setup.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use graph::{
77
blockchain::{Blockchain, BlockchainKind, BlockchainMap, ChainIdentifier},
88
cheap_clone::CheapClone,
99
components::{
10-
adapter::{ChainId, IdentValidator, MockIdentValidator, NetIdentifiable, ProviderManager},
10+
adapter::{
11+
ChainId, IdentValidator, MockIdentValidator, NetIdentifiable, ProviderManager,
12+
ProviderName,
13+
},
1114
metrics::MetricsRegistry,
1215
},
1316
endpoint::EndpointMetrics,
@@ -131,6 +134,40 @@ impl Networks {
131134
}
132135
}
133136

137+
/// Gets the chain identifier from all providers for every chain.
138+
/// This function is intended for checking the status of providers and
139+
/// whether they match their store counterparts more than for general
140+
/// graph-node use. It may trigger verification (which would add delays on hot paths)
141+
/// and it will also make calls on potentially unveried providers (this means the providers
142+
/// have not been checked for correct net_version and genesis block hash)
143+
pub async fn all_chain_identifiers(
144+
&self,
145+
) -> Vec<(
146+
&ChainId,
147+
Vec<(ProviderName, Result<ChainIdentifier, anyhow::Error>)>,
148+
)> {
149+
let mut out = vec![];
150+
for chain_id in self.adapters.iter().map(|a| a.chain_id()).sorted().dedup() {
151+
let mut inner = vec![];
152+
for adapter in self.rpc_provider_manager.get_all_unverified(chain_id) {
153+
inner.push((adapter.provider_name(), adapter.net_identifiers().await));
154+
}
155+
for adapter in self.firehose_provider_manager.get_all_unverified(chain_id) {
156+
inner.push((adapter.provider_name(), adapter.net_identifiers().await));
157+
}
158+
for adapter in self
159+
.substreams_provider_manager
160+
.get_all_unverified(chain_id)
161+
{
162+
inner.push((adapter.provider_name(), adapter.net_identifiers().await));
163+
}
164+
165+
out.push((chain_id, inner));
166+
}
167+
168+
out
169+
}
170+
134171
pub async fn chain_identifier(
135172
&self,
136173
logger: &Logger,
@@ -142,7 +179,7 @@ impl Networks {
142179
chain_id: &ChainId,
143180
provider_type: &str,
144181
) -> Result<ChainIdentifier> {
145-
for adapter in pm.get_all_unverified(chain_id).unwrap_or_default() {
182+
for adapter in pm.get_all_unverified(chain_id) {
146183
match adapter.net_identifiers().await {
147184
Ok(ident) => return Ok(ident),
148185
Err(err) => {

0 commit comments

Comments
 (0)