Skip to content

Commit 5dbd519

Browse files
committed
node: update graphman provider checks command
1 parent 4175f59 commit 5dbd519

File tree

6 files changed

+167
-104
lines changed

6 files changed

+167
-104
lines changed
Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::time::Duration;
2-
31
use anyhow::Result;
42
use async_trait::async_trait;
53

@@ -14,20 +12,6 @@ pub trait NetworkDetails: Send + Sync + 'static {
1412
/// Returns the data that helps to uniquely identify a chain.
1513
async fn chain_identifier(&self) -> Result<ChainIdentifier>;
1614

17-
/// Returns the data that helps to uniquely identify a chain.
18-
///
19-
/// Fails if the provider does not respond within the specified timeout.
20-
async fn chain_identifier_with_timeout(&self, timeout: Duration) -> Result<ChainIdentifier> {
21-
tokio::time::timeout(timeout, self.chain_identifier()).await?
22-
}
23-
2415
/// Returns true if the provider supports extended block details.
2516
async fn provides_extended_blocks(&self) -> Result<bool>;
26-
27-
/// Returns true if the provider supports extended block details.
28-
///
29-
/// Fails if the provider does not respond within the specified timeout.
30-
async fn provides_extended_blocks_with_timeout(&self, timeout: Duration) -> Result<bool> {
31-
tokio::time::timeout(timeout, self.provides_extended_blocks()).await?
32-
}
3317
}

node/src/bin/manager.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,13 @@ pub enum ConfigCommand {
440440
network: String,
441441
},
442442

443-
/// Compare the NetIdentifier of all defined adapters with the existing
444-
/// identifiers on the ChainStore.
445-
CheckProviders {},
443+
/// Run all available provider checks against all providers.
444+
CheckProviders {
445+
/// Maximum duration of all provider checks for a provider.
446+
///
447+
/// Defaults to 60 seconds.
448+
timeout_seconds: Option<u64>,
449+
},
446450

447451
/// Show subgraph-specific settings
448452
///
@@ -1167,10 +1171,15 @@ async fn main() -> anyhow::Result<()> {
11671171
use ConfigCommand::*;
11681172

11691173
match cmd {
1170-
CheckProviders {} => {
1171-
let store = ctx.store().block_store();
1174+
CheckProviders { timeout_seconds } => {
1175+
let logger = ctx.logger.clone();
11721176
let networks = ctx.networks().await?;
1173-
Ok(commands::config::check_provider_genesis(&networks, store).await)
1177+
let store = ctx.store().block_store();
1178+
let timeout = Duration::from_secs(timeout_seconds.unwrap_or(60));
1179+
1180+
commands::provider_checks::execute(&logger, &networks, store, timeout).await;
1181+
1182+
Ok(())
11741183
}
11751184
Place { name, network } => {
11761185
commands::config::place(&ctx.config.deployment, &name, &network)

node/src/manager/commands/config.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::{collections::BTreeMap, sync::Arc};
22

3-
use graph::components::network_provider::ChainIdentifierStore;
43
use graph::components::network_provider::ChainName;
5-
use graph::components::network_provider::ProviderName;
64
use graph::{
75
anyhow::{bail, Context},
86
components::subgraph::{Setting, Settings},
@@ -16,40 +14,10 @@ use graph::{
1614
slog::Logger,
1715
};
1816
use graph_chain_ethereum::NodeCapabilities;
19-
use graph_store_postgres::{BlockStore, DeploymentPlacer};
17+
use graph_store_postgres::DeploymentPlacer;
2018

2119
use crate::{config::Config, network_setup::Networks};
2220

23-
/// Compare the NetIdentifier of all defined adapters with the existing
24-
/// identifiers on the ChainStore. If a ChainStore doesn't exist it will be show
25-
/// as an error. It's intended to be run again an environment that has already
26-
/// been setup by graph-node.
27-
pub async fn check_provider_genesis(networks: &Networks, store: Arc<BlockStore>) {
28-
println!("Checking providers");
29-
for (chain_id, ids) in networks.all_chain_identifiers().await.into_iter() {
30-
let (_oks, errs): (Vec<_>, Vec<_>) = ids
31-
.into_iter()
32-
.map(|(provider, id)| {
33-
id.and_then(|id| store.validate_identifier(chain_id, &id).map_err(Into::into))
34-
.map_err(|e| (provider, e.to_string()))
35-
})
36-
.partition_result();
37-
38-
let errs = errs
39-
.into_iter()
40-
.dedup_by(|e1, e2| e1 == e2)
41-
.collect::<Vec<(ProviderName, String)>>();
42-
43-
if errs.is_empty() {
44-
println!("chain_id: {}: status: OK", chain_id);
45-
continue;
46-
}
47-
48-
println!("chain_id: {}: status: NOK", chain_id);
49-
println!("errors: {:?}", errs);
50-
}
51-
}
52-
5321
pub fn place(placer: &dyn DeploymentPlacer, name: &str, network: &str) -> Result<(), Error> {
5422
match placer.place(name, network).map_err(|s| anyhow!(s))? {
5523
None => {

node/src/manager/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod deployment;
1010
pub mod drop;
1111
pub mod index;
1212
pub mod listen;
13+
pub mod provider_checks;
1314
pub mod prune;
1415
pub mod query;
1516
pub mod remove;
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
4+
use graph::components::network_provider::ChainIdentifierStore;
5+
use graph::components::network_provider::ChainName;
6+
use graph::components::network_provider::ExtendedBlocksCheck;
7+
use graph::components::network_provider::GenesisHashCheck;
8+
use graph::components::network_provider::NetworkDetails;
9+
use graph::components::network_provider::ProviderCheck;
10+
use graph::components::network_provider::ProviderCheckStatus;
11+
use graph::prelude::tokio;
12+
use graph::prelude::Logger;
13+
use graph_store_postgres::BlockStore;
14+
use itertools::Itertools;
15+
16+
use crate::network_setup::Networks;
17+
18+
pub async fn execute(
19+
logger: &Logger,
20+
networks: &Networks,
21+
store: Arc<BlockStore>,
22+
timeout: Duration,
23+
) {
24+
let chain_name_iter = networks
25+
.adapters
26+
.iter()
27+
.map(|a| a.chain_id())
28+
.sorted()
29+
.dedup();
30+
31+
for chain_name in chain_name_iter {
32+
let mut errors = Vec::new();
33+
34+
for adapter in networks
35+
.rpc_provider_manager
36+
.providers_unchecked(chain_name)
37+
.unique_by(|x| x.provider_name())
38+
{
39+
match tokio::time::timeout(
40+
timeout,
41+
run_checks(logger, chain_name, adapter, store.clone()),
42+
)
43+
.await
44+
{
45+
Ok(result) => {
46+
errors.extend(result);
47+
}
48+
Err(_) => {
49+
errors.push("Timeout".to_owned());
50+
}
51+
}
52+
}
53+
54+
for adapter in networks
55+
.firehose_provider_manager
56+
.providers_unchecked(chain_name)
57+
.unique_by(|x| x.provider_name())
58+
{
59+
match tokio::time::timeout(
60+
timeout,
61+
run_checks(logger, chain_name, adapter, store.clone()),
62+
)
63+
.await
64+
{
65+
Ok(result) => {
66+
errors.extend(result);
67+
}
68+
Err(_) => {
69+
errors.push("Timeout".to_owned());
70+
}
71+
}
72+
}
73+
74+
for adapter in networks
75+
.substreams_provider_manager
76+
.providers_unchecked(chain_name)
77+
.unique_by(|x| x.provider_name())
78+
{
79+
match tokio::time::timeout(
80+
timeout,
81+
run_checks(logger, chain_name, adapter, store.clone()),
82+
)
83+
.await
84+
{
85+
Ok(result) => {
86+
errors.extend(result);
87+
}
88+
Err(_) => {
89+
errors.push("Timeout".to_owned());
90+
}
91+
}
92+
}
93+
94+
if errors.is_empty() {
95+
println!("Chain: {chain_name}; Status: OK");
96+
continue;
97+
}
98+
99+
println!("Chain: {chain_name}; Status: ERROR");
100+
for error in errors.into_iter().unique() {
101+
println!("ERROR: {error}");
102+
}
103+
}
104+
}
105+
106+
async fn run_checks(
107+
logger: &Logger,
108+
chain_name: &ChainName,
109+
adapter: &dyn NetworkDetails,
110+
store: Arc<dyn ChainIdentifierStore>,
111+
) -> Vec<String> {
112+
let provider_name = adapter.provider_name();
113+
114+
let mut errors = Vec::new();
115+
116+
let genesis_check = GenesisHashCheck::new(store);
117+
118+
let status = genesis_check
119+
.check(logger, chain_name, &provider_name, adapter)
120+
.await;
121+
122+
errors_from_status(status, &mut errors);
123+
124+
let blocks_check = ExtendedBlocksCheck::new([]);
125+
126+
let status = blocks_check
127+
.check(logger, chain_name, &provider_name, adapter)
128+
.await;
129+
130+
errors_from_status(status, &mut errors);
131+
132+
errors
133+
}
134+
135+
fn errors_from_status(status: ProviderCheckStatus, out: &mut Vec<String>) {
136+
match status {
137+
ProviderCheckStatus::NotChecked => {}
138+
ProviderCheckStatus::TemporaryFailure { message, .. } => {
139+
out.push(message);
140+
}
141+
ProviderCheckStatus::Valid => {}
142+
ProviderCheckStatus::Failed { message, .. } => {
143+
out.push(message);
144+
}
145+
}
146+
}

node/src/network_setup.rs

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@ use graph::components::network_provider::NetworkDetails;
77
use graph::components::network_provider::ProviderCheck;
88
use graph::components::network_provider::ProviderCheckStrategy;
99
use graph::components::network_provider::ProviderManager;
10-
use graph::components::network_provider::ProviderName;
1110
use graph::{
1211
anyhow::{self, bail},
1312
blockchain::{Blockchain, BlockchainKind, BlockchainMap, ChainIdentifier},
1413
cheap_clone::CheapClone,
1514
components::metrics::MetricsRegistry,
1615
endpoint::EndpointMetrics,
17-
env::{EnvVars, ENV_VARS},
16+
env::EnvVars,
1817
firehose::{FirehoseEndpoint, FirehoseEndpoints},
1918
futures03::future::TryFutureExt,
2019
itertools::Itertools,
@@ -106,9 +105,9 @@ impl AdapterConfiguration {
106105

107106
pub struct Networks {
108107
pub adapters: Vec<AdapterConfiguration>,
109-
rpc_provider_manager: ProviderManager<EthereumNetworkAdapter>,
110-
firehose_provider_manager: ProviderManager<Arc<FirehoseEndpoint>>,
111-
substreams_provider_manager: ProviderManager<Arc<FirehoseEndpoint>>,
108+
pub rpc_provider_manager: ProviderManager<EthereumNetworkAdapter>,
109+
pub firehose_provider_manager: ProviderManager<Arc<FirehoseEndpoint>>,
110+
pub substreams_provider_manager: ProviderManager<Arc<FirehoseEndpoint>>,
112111
}
113112

114113
impl Networks {
@@ -134,50 +133,6 @@ impl Networks {
134133
}
135134
}
136135

137-
/// Gets the chain identifier from all providers for every chain.
138-
/// This function is intended for checking the status of providers and
139-
/// whether they match their store counterparts more than for general
140-
/// graph-node use. It may trigger verification (which would add delays on hot paths)
141-
/// and it will also make calls on potentially unveried providers (this means the providers
142-
/// have not been checked for correct net_version and genesis block hash)
143-
pub async fn all_chain_identifiers(
144-
&self,
145-
) -> Vec<(
146-
&ChainName,
147-
Vec<(ProviderName, Result<ChainIdentifier, anyhow::Error>)>,
148-
)> {
149-
let timeout = ENV_VARS.genesis_validation_timeout;
150-
let mut out = vec![];
151-
for chain_id in self.adapters.iter().map(|a| a.chain_id()).sorted().dedup() {
152-
let mut inner = vec![];
153-
for adapter in self.rpc_provider_manager.providers_unchecked(chain_id) {
154-
inner.push((
155-
adapter.provider_name(),
156-
adapter.chain_identifier_with_timeout(timeout).await,
157-
));
158-
}
159-
for adapter in self.firehose_provider_manager.providers_unchecked(chain_id) {
160-
inner.push((
161-
adapter.provider_name(),
162-
adapter.chain_identifier_with_timeout(timeout).await,
163-
));
164-
}
165-
for adapter in self
166-
.substreams_provider_manager
167-
.providers_unchecked(chain_id)
168-
{
169-
inner.push((
170-
adapter.provider_name(),
171-
adapter.chain_identifier_with_timeout(timeout).await,
172-
));
173-
}
174-
175-
out.push((chain_id, inner));
176-
}
177-
178-
out
179-
}
180-
181136
pub async fn chain_identifier(
182137
&self,
183138
logger: &Logger,

0 commit comments

Comments
 (0)