diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 59a698ab20b..4870a025e23 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -6,6 +6,7 @@ use graph::components::network_provider::ProviderManager; use graph::components::network_provider::ProviderName; use graph::endpoint::EndpointMetrics; use graph::firehose::{AvailableCapacity, SubgraphLimit}; +use graph::prelude::rand::distributions::WeightedIndex; use graph::prelude::rand::seq::IteratorRandom; use graph::prelude::rand::{self, Rng}; use itertools::Itertools; @@ -30,6 +31,7 @@ pub struct EthereumNetworkAdapter { /// that limit. That's a somewhat imprecise but convenient way to /// determine the number of connections limit: SubgraphLimit, + weight: f64, } #[async_trait] @@ -53,12 +55,14 @@ impl EthereumNetworkAdapter { capabilities: NodeCapabilities, adapter: Arc, limit: SubgraphLimit, + weight: f64, ) -> Self { Self { endpoint_metrics, capabilities, adapter, limit, + weight, } } @@ -86,6 +90,7 @@ pub struct EthereumNetworkAdapters { call_only_adapters: Vec, // Percentage of request that should be used to retest errored adapters. retest_percent: f64, + weighted: bool, } impl EthereumNetworkAdapters { @@ -95,6 +100,7 @@ impl EthereumNetworkAdapters { manager: ProviderManager::default(), call_only_adapters: vec![], retest_percent: DEFAULT_ADAPTER_ERROR_RETEST_PERCENT, + weighted: false, } } @@ -121,7 +127,7 @@ impl EthereumNetworkAdapters { ProviderCheckStrategy::MarkAsValid, ); - Self::new(chain_id, provider, call_only, None) + Self::new(chain_id, provider, call_only, None, false) } pub fn new( @@ -129,6 +135,7 @@ impl EthereumNetworkAdapters { manager: ProviderManager, call_only_adapters: Vec, retest_percent: Option, + weighted: bool, ) -> Self { #[cfg(debug_assertions)] call_only_adapters.iter().for_each(|a| { @@ -140,6 +147,7 @@ impl EthereumNetworkAdapters { manager, call_only_adapters, retest_percent: retest_percent.unwrap_or(DEFAULT_ADAPTER_ERROR_RETEST_PERCENT), + weighted, } } @@ -192,31 +200,38 @@ impl EthereumNetworkAdapters { // handle adapter selection from a list, implements the availability checking with an abstracted // source of the adapter list. fn cheapest_from( + &self, input: Vec<&EthereumNetworkAdapter>, required_capabilities: &NodeCapabilities, - retest_percent: f64, ) -> Result, Error> { let retest_rng: f64 = (&mut rand::rng()).random(); - let cheapest = input.into_iter().choose_multiple(&mut rand::rng(), 3); - let cheapest = cheapest.iter(); + if retest_rng < self.retest_percent { + if let Some(adapter) = input.iter().max_by_key(|a| a.current_error_count()) { + return Ok(adapter.adapter.clone()); + } + } - // If request falls below the retest threshold, use this request to try and - // reset the failed adapter. If a request succeeds the adapter will be more - // likely to be selected afterwards. - if retest_rng < retest_percent { - cheapest.max_by_key(|adapter| adapter.current_error_count()) + if self.weighted { + if input.is_empty() { + return Err(anyhow!( + "A matching Ethereum network with {:?} was not found.", + required_capabilities + )); + } + let weights: Vec<_> = input.iter().map(|a| a.weight).collect(); + if let Ok(dist) = WeightedIndex::new(&weights) { + let idx = dist.sample(&mut rand::rng()); + return Ok(input[idx].adapter.clone()); + } } else { - // The assumption here is that most RPC endpoints will not have limits - // which makes the check for low/high available capacity less relevant. - // So we essentially assume if it had available capacity when calling - // `all_cheapest_with` then it prolly maintains that state and so we - // just select whichever adapter is working better according to - // the number of errors. - cheapest.min_by_key(|adapter| adapter.current_error_count()) + let choices = input.into_iter().choose_multiple(&mut rand::rng(), 3); + if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) { + return Ok(adapter.adapter.clone()); + } } - .map(|adapter| adapter.adapter.clone()) - .ok_or(anyhow!( + + Err(anyhow!( "A matching Ethereum network with {:?} was not found.", required_capabilities )) @@ -226,13 +241,11 @@ impl EthereumNetworkAdapters { &self, required_capabilities: &NodeCapabilities, ) -> Result, Error> { - let cheapest = self.all_unverified_cheapest_with(required_capabilities); + let cheapest = self + .all_unverified_cheapest_with(required_capabilities) + .collect_vec(); - Self::cheapest_from( - cheapest.choose_multiple(&mut rand::rng(), 3), - required_capabilities, - self.retest_percent, - ) + self.cheapest_from(cheapest, required_capabilities) } /// This is the public entry point and should always use verified adapters @@ -243,9 +256,9 @@ impl EthereumNetworkAdapters { let cheapest = self .all_cheapest_with(required_capabilities) .await - .choose_multiple(&mut rand::rng(), 3); + .collect_vec(); - Self::cheapest_from(cheapest, required_capabilities, self.retest_percent) + self.cheapest_from(cheapest, required_capabilities) } pub async fn cheapest(&self) -> Option> { @@ -429,6 +442,7 @@ mod tests { }, eth_adapter.clone(), SubgraphLimit::Limit(3), + 1.0, )], vec![EthereumNetworkAdapter::new( metrics.cheap_clone(), @@ -438,6 +452,7 @@ mod tests { }, eth_call_adapter.clone(), SubgraphLimit::Limit(3), + 1.0, )], ) .await; @@ -535,6 +550,7 @@ mod tests { }, eth_call_adapter.clone(), SubgraphLimit::Unlimited, + 1.0, )], vec![EthereumNetworkAdapter::new( metrics.cheap_clone(), @@ -544,6 +560,7 @@ mod tests { }, eth_adapter.clone(), SubgraphLimit::Limit(2), + 1.0, )], ) .await; @@ -606,6 +623,7 @@ mod tests { }, eth_call_adapter.clone(), SubgraphLimit::Disabled, + 1.0, )], vec![EthereumNetworkAdapter::new( metrics.cheap_clone(), @@ -615,6 +633,7 @@ mod tests { }, eth_adapter.clone(), SubgraphLimit::Limit(3), + 1.0, )], ) .await; @@ -661,6 +680,7 @@ mod tests { }, eth_adapter.clone(), SubgraphLimit::Limit(3), + 1.0, )], vec![], ) @@ -756,11 +776,16 @@ mod tests { ProviderCheckStrategy::MarkAsValid, ); - let no_retest_adapters = - EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(0f64)); + let no_retest_adapters = EthereumNetworkAdapters::new( + chain_id.clone(), + manager.clone(), + vec![], + Some(0f64), + false, + ); let always_retest_adapters = - EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64)); + EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64), false); assert_eq!( no_retest_adapters @@ -844,8 +869,13 @@ mod tests { ProviderCheckStrategy::MarkAsValid, ); - let always_retest_adapters = - EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(1f64)); + let always_retest_adapters = EthereumNetworkAdapters::new( + chain_id.clone(), + manager.clone(), + vec![], + Some(1f64), + false, + ); assert_eq!( always_retest_adapters @@ -869,7 +899,7 @@ mod tests { ); let no_retest_adapters = - EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64)); + EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64), false); assert_eq!( no_retest_adapters .cheapest_with(&NodeCapabilities { @@ -909,7 +939,8 @@ mod tests { ProviderCheckStrategy::MarkAsValid, ); - let no_available_adapter = EthereumNetworkAdapters::new(chain_id, manager, vec![], None); + let no_available_adapter = + EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false); let res = no_available_adapter .cheapest_with(&NodeCapabilities { archive: true, diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 1f907539194..a8436030651 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -1,3 +1,5 @@ +weighted_rpc_steering = true + [general] query = "query_node_.*" @@ -46,26 +48,26 @@ ingestor = "index_0" [chains.mainnet] shard = "primary" provider = [ - { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, - { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, - { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, - { label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }}, + { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"], weight = 0.1 }, + { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, weight = 0.2 }, + { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }, weight = 0.3 }, + { label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }, weight = 0.4 }, ] [chains.ropsten] shard = "primary" provider = [ - { label = "ropsten-0", url = "http://rpc.ropsten.io", transport = "rpc", features = ["archive", "traces"] } + { label = "ropsten-0", url = "http://rpc.ropsten.io", transport = "rpc", features = ["archive", "traces"], weight = 1.0 } ] [chains.goerli] shard = "primary" provider = [ - { label = "goerli-0", url = "http://rpc.goerli.io", transport = "ipc", features = ["archive"] } + { label = "goerli-0", url = "http://rpc.goerli.io", transport = "ipc", features = ["archive"], weight = 1.0 } ] [chains.kovan] shard = "primary" provider = [ - { label = "kovan-0", url = "http://rpc.kovan.io", transport = "ws", features = [] } + { label = "kovan-0", url = "http://rpc.kovan.io", transport = "ws", features = [], weight = 1.0 } ] diff --git a/node/src/chain.rs b/node/src/chain.rs index 343b783908f..2638648104b 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -314,6 +314,7 @@ pub async fn create_ethereum_networks_for_chain( .await, ), web3.limit_for(&config.node), + provider.weight, ); if call_only { diff --git a/node/src/config.rs b/node/src/config.rs index 83ea7bf1cc3..356402a6ae9 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -48,6 +48,7 @@ pub struct Opt { pub ethereum_ws: Vec, pub ethereum_ipc: Vec, pub unsafe_config: bool, + pub weighted_rpc_steering: bool, } impl Default for Opt { @@ -64,6 +65,7 @@ impl Default for Opt { ethereum_ws: vec![], ethereum_ipc: vec![], unsafe_config: false, + weighted_rpc_steering: false, } } } @@ -73,6 +75,8 @@ pub struct Config { #[serde(skip, default = "default_node_id")] pub node: NodeId, pub general: Option, + #[serde(default)] + pub weighted_rpc_steering: bool, #[serde(rename = "store")] pub stores: BTreeMap, pub chains: ChainSection, @@ -196,6 +200,7 @@ impl Config { Ok(Config { node, general: None, + weighted_rpc_steering: opt.weighted_rpc_steering, stores, chains, deployment, @@ -503,6 +508,7 @@ impl ChainSection { headers: Default::default(), rules: vec![], }), + weight: 1.0, }; let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { shard: PRIMARY_SHARD.to_string(), @@ -602,6 +608,8 @@ fn btree_map_to_http_headers(kvs: BTreeMap) -> HeaderMap { pub struct Provider { pub label: String, pub details: ProviderDetails, + #[serde(default = "one_f64")] + pub weight: f64, } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] @@ -726,6 +734,9 @@ const DEFAULT_PROVIDER_FEATURES: [&str; 2] = ["traces", "archive"]; impl Provider { fn validate(&mut self) -> Result<()> { validate_name(&self.label).context("illegal provider name")?; + if self.weight < 0.0 || self.weight > 1.0 { + bail!("provider {} must have a weight between 0 and 1", self.label); + } match self.details { ProviderDetails::Firehose(ref mut firehose) @@ -820,6 +831,7 @@ impl<'de> Deserialize<'de> for Provider { { let mut label = None; let mut details = None; + let mut weight = None; let mut url = None; let mut transport = None; @@ -841,6 +853,12 @@ impl<'de> Deserialize<'de> for Provider { } details = Some(map.next_value()?); } + ProviderField::Weight => { + if weight.is_some() { + return Err(serde::de::Error::duplicate_field("weight")); + } + weight = Some(map.next_value()?); + } ProviderField::Url => { if url.is_some() { return Err(serde::de::Error::duplicate_field("url")); @@ -904,13 +922,18 @@ impl<'de> Deserialize<'de> for Provider { }), }; - Ok(Provider { label, details }) + Ok(Provider { + label, + details, + weight: weight.unwrap_or(1.0), + }) } } const FIELDS: &[&str] = &[ "label", "details", + "weight", "transport", "url", "features", @@ -925,6 +948,7 @@ impl<'de> Deserialize<'de> for Provider { enum ProviderField { Label, Details, + Weight, Match, // Deprecated fields @@ -1162,6 +1186,10 @@ fn one() -> usize { 1 } +fn one_f64() -> f64 { + 1.0 +} + fn default_node_id() -> NodeId { NodeId::new("default").unwrap() } @@ -1308,6 +1336,7 @@ mod tests { headers: HeaderMap::new(), rules: Vec::new(), }), + weight: 1.0, }, actual ); @@ -1334,6 +1363,7 @@ mod tests { headers: HeaderMap::new(), rules: Vec::new(), }), + weight: 1.0, }, actual ); @@ -1441,6 +1471,7 @@ mod tests { headers, rules: Vec::new(), }), + weight: 1.0, }, actual ); @@ -1466,6 +1497,7 @@ mod tests { headers: HeaderMap::new(), rules: Vec::new(), }), + weight: 1.0, }, actual ); @@ -1507,6 +1539,7 @@ mod tests { conn_pool_size: 20, rules: vec![], }), + weight: 1.0, }, actual ); @@ -1533,6 +1566,7 @@ mod tests { conn_pool_size: 20, rules: vec![], }), + weight: 1.0, }, actual ); @@ -1559,6 +1593,7 @@ mod tests { conn_pool_size: 20, rules: vec![], }), + weight: 1.0, }, actual ); @@ -1585,6 +1620,7 @@ mod tests { conn_pool_size: 20, rules: vec![], }), + weight: 1.0, }, actual ); @@ -1624,6 +1660,7 @@ mod tests { } ], }), + weight: 1.0, }, actual ); @@ -1663,6 +1700,7 @@ mod tests { } ], }), + weight: 1.0, }, actual ); @@ -1702,6 +1740,7 @@ mod tests { } ], }), + weight: 1.0, }, actual ); @@ -1741,6 +1780,7 @@ mod tests { } ], }), + weight: 1.0, }, actual ); @@ -1835,6 +1875,7 @@ mod tests { headers: HeaderMap::new(), rules: Vec::new(), }), + weight: 1.0, }, actual ); diff --git a/node/src/network_setup.rs b/node/src/network_setup.rs index d086c786f82..78655207d7b 100644 --- a/node/src/network_setup.rs +++ b/node/src/network_setup.rs @@ -108,6 +108,7 @@ pub struct Networks { pub rpc_provider_manager: ProviderManager, pub firehose_provider_manager: ProviderManager>, pub substreams_provider_manager: ProviderManager>, + pub weighted_rpc_steering: bool, } impl Networks { @@ -130,6 +131,7 @@ impl Networks { vec![].into_iter(), ProviderCheckStrategy::MarkAsValid, ), + weighted_rpc_steering: false, } } @@ -221,7 +223,12 @@ impl Networks { .chain(substreams.into_iter()) .collect(); - Ok(Networks::new(&logger, adapters, provider_checks)) + Ok(Networks::new( + &logger, + adapters, + provider_checks, + config.weighted_rpc_steering, + )) } pub async fn from_config_for_chain( @@ -266,6 +273,7 @@ impl Networks { logger: &Logger, adapters: Vec, provider_checks: &[Arc], + weighted_rpc_steering: bool, ) -> Self { let adapters2 = adapters.clone(); let eth_adapters = adapters.iter().flat_map(|a| a.as_rpc()).cloned().map( @@ -332,6 +340,7 @@ impl Networks { .map(|(chain_id, endpoints)| (chain_id, endpoints)), ProviderCheckStrategy::RequireAll(provider_checks), ), + weighted_rpc_steering, }; s @@ -445,6 +454,7 @@ impl Networks { self.rpc_provider_manager.clone(), eth_adapters, None, + self.weighted_rpc_steering, ) } } diff --git a/node/src/opt.rs b/node/src/opt.rs index 9928144396a..f87df56d486 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -102,6 +102,12 @@ pub struct Opt { help= "Ethereum network name (e.g. 'mainnet'), optional comma-seperated capabilities (eg 'full,archive'), and an Ethereum IPC pipe, separated by a ':'", )] pub ethereum_ipc: Vec, + #[clap( + long, + env = "GRAPH_WEIGHTED_RPC_STEERING", + help = "Enable weighted random steering for Ethereum RPCs" + )] + pub weighted_rpc_steering: bool, #[clap( long, value_name = "HOST:PORT", @@ -245,6 +251,7 @@ impl From for config::Opt { ethereum_rpc, ethereum_ws, ethereum_ipc, + weighted_rpc_steering, unsafe_config, .. } = opt; @@ -260,6 +267,7 @@ impl From for config::Opt { ethereum_rpc, ethereum_ws, ethereum_ipc, + weighted_rpc_steering, unsafe_config, } }