Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ anyhow = "1.0"
async-graphql = { version = "7.0.11", features = ["chrono", "uuid"] }
async-graphql-axum = "7.0.11"
axum = "0.7.5"
bs58 = "0.5.1"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
derivative = "2.2.0"
Expand Down
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use graph::blockchain::{
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -42,7 +42,7 @@ use graph::blockchain::block_stream::{

pub struct Chain {
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::{BlockIngestor, NoopDecoderHook};
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
Expand Down Expand Up @@ -37,7 +37,7 @@ use crate::{codec, TriggerFilter};

pub struct Chain {
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down
3 changes: 1 addition & 2 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Error;
use graph::{
endpoint::EndpointMetrics,
env::env_var,
firehose::{self, FirehoseEndpoint, NoopGenesisDecoder, SubgraphLimit},
firehose::{self, FirehoseEndpoint, SubgraphLimit},
log::logger,
prelude::{prost, tokio, tonic, MetricsRegistry},
};
Expand Down Expand Up @@ -38,7 +38,6 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
metrics,
NoopGenesisDecoder::boxed(),
));

loop {
Expand Down
6 changes: 3 additions & 3 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transfor
use graph::blockchain::{
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
Expand Down Expand Up @@ -288,7 +288,7 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
pub name: ChainId,
pub name: ChainName,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
client: Arc<ChainClient<Self>>,
Expand All @@ -315,7 +315,7 @@ impl Chain {
/// Creates a new Ethereum [`Chain`].
pub fn new(
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
Expand Down
8 changes: 4 additions & 4 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{chain::BlockFinality, ENV_VARS};
use crate::{EthereumAdapter, EthereumAdapterTrait as _};
use graph::blockchain::client::ChainClient;
use graph::blockchain::BlockchainKind;
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::futures03::compat::Future01CompatExt as _;
use graph::slog::o;
use graph::util::backoff::ExponentialBackoff;
Expand All @@ -22,7 +22,7 @@ pub struct PollingBlockIngestor {
chain_client: Arc<ChainClient<crate::chain::Chain>>,
chain_store: Arc<dyn ChainStore>,
polling_interval: Duration,
network_name: ChainId,
network_name: ChainName,
}

impl PollingBlockIngestor {
Expand All @@ -32,7 +32,7 @@ impl PollingBlockIngestor {
chain_client: Arc<ChainClient<crate::chain::Chain>>,
chain_store: Arc<dyn ChainStore>,
polling_interval: Duration,
network_name: ChainId,
network_name: ChainName,
) -> Result<PollingBlockIngestor, Error> {
Ok(PollingBlockIngestor {
logger,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl BlockIngestor for PollingBlockIngestor {
}
}

fn network_name(&self) -> ChainId {
fn network_name(&self) -> ChainName {
self.network_name.clone()
}

Expand Down
78 changes: 41 additions & 37 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use anyhow::{anyhow, bail};
use graph::blockchain::ChainIdentifier;
use graph::components::adapter::{ChainId, NetIdentifiable, ProviderManager, ProviderName};
use graph::components::network_provider::ChainName;
use graph::components::network_provider::NetworkDetails;
use graph::components::network_provider::ProviderManager;
use graph::components::network_provider::ProviderName;
use graph::endpoint::EndpointMetrics;
use graph::firehose::{AvailableCapacity, SubgraphLimit};
use graph::prelude::rand::seq::IteratorRandom;
use graph::prelude::rand::{self, Rng};
use itertools::Itertools;
use std::sync::Arc;

pub use graph::impl_slog_value;
Expand All @@ -29,13 +33,18 @@ pub struct EthereumNetworkAdapter {
}

#[async_trait]
impl NetIdentifiable for EthereumNetworkAdapter {
async fn net_identifiers(&self) -> Result<ChainIdentifier, anyhow::Error> {
self.adapter.net_identifiers().await
}
impl NetworkDetails for EthereumNetworkAdapter {
fn provider_name(&self) -> ProviderName {
self.adapter.provider().into()
}

async fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
self.adapter.net_identifiers().await
}

async fn provides_extended_blocks(&self) -> Result<bool, Error> {
Ok(true)
}
}

impl EthereumNetworkAdapter {
Expand Down Expand Up @@ -72,7 +81,7 @@ impl EthereumNetworkAdapter {

#[derive(Debug, Clone)]
pub struct EthereumNetworkAdapters {
chain_id: ChainId,
chain_id: ChainName,
manager: ProviderManager<EthereumNetworkAdapter>,
call_only_adapters: Vec<EthereumNetworkAdapter>,
// Percentage of request that should be used to retest errored adapters.
Expand All @@ -96,10 +105,10 @@ impl EthereumNetworkAdapters {
) -> Self {
use std::cmp::Ordering;

use graph::components::network_provider::ProviderCheckStrategy;
use graph::slog::{o, Discard, Logger};

use graph::components::adapter::NoopIdentValidator;
let chain_id: ChainId = "testing".into();
let chain_id: ChainName = "testing".into();
adapters.sort_by(|a, b| {
a.capabilities
.partial_cmp(&b.capabilities)
Expand All @@ -109,15 +118,14 @@ impl EthereumNetworkAdapters {
let provider = ProviderManager::new(
Logger::root(Discard, o!()),
vec![(chain_id.clone(), adapters)].into_iter(),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
provider.mark_all_valid().await;

Self::new(chain_id, provider, call_only, None)
}

pub fn new(
chain_id: ChainId,
chain_id: ChainName,
manager: ProviderManager<EthereumNetworkAdapter>,
call_only_adapters: Vec<EthereumNetworkAdapter>,
retest_percent: Option<f64>,
Expand Down Expand Up @@ -159,8 +167,9 @@ impl EthereumNetworkAdapters {
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
let all = self
.manager
.get_all(&self.chain_id)
.providers(&self.chain_id)
.await
.map(|adapters| adapters.collect_vec())
.unwrap_or_default();

Self::available_with_capabilities(all, required_capabilities)
Expand All @@ -172,7 +181,10 @@ impl EthereumNetworkAdapters {
&self,
required_capabilities: &NodeCapabilities,
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
let all = self.manager.get_all_unverified(&self.chain_id);
let all = self
.manager
.providers_unchecked(&self.chain_id)
.collect_vec();

Self::available_with_capabilities(all, required_capabilities)
}
Expand Down Expand Up @@ -242,10 +254,10 @@ impl EthereumNetworkAdapters {
// EthereumAdapters are sorted by their NodeCapabilities when the EthereumNetworks
// struct is instantiated so they do not need to be sorted here
self.manager
.get_all(&self.chain_id)
.providers(&self.chain_id)
.await
.map(|mut adapters| adapters.next())
.unwrap_or_default()
.first()
.map(|ethereum_network_adapter| ethereum_network_adapter.adapter.clone())
}

Expand Down Expand Up @@ -299,7 +311,9 @@ impl EthereumNetworkAdapters {
#[cfg(test)]
mod tests {
use graph::cheap_clone::CheapClone;
use graph::components::adapter::{NoopIdentValidator, ProviderManager, ProviderName};
use graph::components::network_provider::ProviderCheckStrategy;
use graph::components::network_provider::ProviderManager;
use graph::components::network_provider::ProviderName;
use graph::data::value::Word;
use graph::http::HeaderMap;
use graph::{
Expand Down Expand Up @@ -746,18 +760,14 @@ mod tests {
.collect(),
)]
.into_iter(),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let no_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager.cheap_clone(),
vec![],
Some(0f64),
);
let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(0f64));

let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id, manager.cheap_clone(), vec![], Some(1f64));
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64));

assert_eq!(
no_retest_adapters
Expand Down Expand Up @@ -842,16 +852,12 @@ mod tests {
.iter()
.cloned()
.map(|a| (chain_id.clone(), vec![a])),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let always_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager.cheap_clone(),
vec![],
Some(1f64),
);
let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(1f64));

assert_eq!(
always_retest_adapters
.cheapest_with(&NodeCapabilities {
Expand All @@ -870,9 +876,8 @@ mod tests {
.iter()
.cloned()
.map(|a| (chain_id.clone(), vec![a])),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64));
Expand Down Expand Up @@ -912,9 +917,8 @@ mod tests {
no_available_adapter.iter().cloned().collect(),
)]
.into_iter(),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let no_available_adapter = EthereumNetworkAdapters::new(chain_id, manager, vec![], None);
let res = no_available_adapter
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use graph::components::adapter::ProviderName;
use graph::components::network_provider::ProviderName;
use graph::endpoint::{EndpointMetrics, RequestLabels};
use jsonrpc_core::types::Call;
use jsonrpc_core::Value;
Expand Down
4 changes: 2 additions & 2 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::blockchain::{
NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down
Loading
Loading