Skip to content

feat: add weighted rpc steering #6090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: feature/add-weighted-random-steering-load-balancing
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 64 additions & 33 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graph::components::network_provider::ProviderManager;
use graph::components::network_provider::ProviderName;
use graph::endpoint::EndpointMetrics;
use graph::firehose::{AvailableCapacity, SubgraphLimit};
use graph::prelude::rand::distributions::WeightedIndex;
use graph::prelude::rand::seq::IteratorRandom;
use graph::prelude::rand::{self, Rng};
use itertools::Itertools;
Expand All @@ -30,6 +31,7 @@ pub struct EthereumNetworkAdapter {
/// that limit. That's a somewhat imprecise but convenient way to
/// determine the number of connections
limit: SubgraphLimit,
weight: f64,
}

#[async_trait]
Expand All @@ -53,12 +55,14 @@ impl EthereumNetworkAdapter {
capabilities: NodeCapabilities,
adapter: Arc<EthereumAdapter>,
limit: SubgraphLimit,
weight: f64,
) -> Self {
Self {
endpoint_metrics,
capabilities,
adapter,
limit,
weight,
}
}

Expand Down Expand Up @@ -86,6 +90,7 @@ pub struct EthereumNetworkAdapters {
call_only_adapters: Vec<EthereumNetworkAdapter>,
// Percentage of request that should be used to retest errored adapters.
retest_percent: f64,
weighted: bool,
}

impl EthereumNetworkAdapters {
Expand All @@ -95,6 +100,7 @@ impl EthereumNetworkAdapters {
manager: ProviderManager::default(),
call_only_adapters: vec![],
retest_percent: DEFAULT_ADAPTER_ERROR_RETEST_PERCENT,
weighted: false,
}
}

Expand All @@ -121,14 +127,15 @@ impl EthereumNetworkAdapters {
ProviderCheckStrategy::MarkAsValid,
);

Self::new(chain_id, provider, call_only, None)
Self::new(chain_id, provider, call_only, None, false)
}

pub fn new(
chain_id: ChainName,
manager: ProviderManager<EthereumNetworkAdapter>,
call_only_adapters: Vec<EthereumNetworkAdapter>,
retest_percent: Option<f64>,
weighted: bool,
) -> Self {
#[cfg(debug_assertions)]
call_only_adapters.iter().for_each(|a| {
Expand All @@ -140,6 +147,7 @@ impl EthereumNetworkAdapters {
manager,
call_only_adapters,
retest_percent: retest_percent.unwrap_or(DEFAULT_ADAPTER_ERROR_RETEST_PERCENT),
weighted,
}
}

Expand Down Expand Up @@ -192,31 +200,38 @@ impl EthereumNetworkAdapters {
// handle adapter selection from a list, implements the availability checking with an abstracted
// source of the adapter list.
fn cheapest_from(
&self,
input: Vec<&EthereumNetworkAdapter>,
required_capabilities: &NodeCapabilities,
retest_percent: f64,
) -> Result<Arc<EthereumAdapter>, Error> {
let retest_rng: f64 = (&mut rand::rng()).random();

let cheapest = input.into_iter().choose_multiple(&mut rand::rng(), 3);
let cheapest = cheapest.iter();
if retest_rng < self.retest_percent {
if let Some(adapter) = input.iter().max_by_key(|a| a.current_error_count()) {
return Ok(adapter.adapter.clone());
}
}

// If request falls below the retest threshold, use this request to try and
// reset the failed adapter. If a request succeeds the adapter will be more
// likely to be selected afterwards.
if retest_rng < retest_percent {
cheapest.max_by_key(|adapter| adapter.current_error_count())
if self.weighted {
if input.is_empty() {
return Err(anyhow!(
"A matching Ethereum network with {:?} was not found.",
required_capabilities
));
}
let weights: Vec<_> = input.iter().map(|a| a.weight).collect();
if let Ok(dist) = WeightedIndex::new(&weights) {
let idx = dist.sample(&mut rand::rng());
return Ok(input[idx].adapter.clone());
}
} else {
// The assumption here is that most RPC endpoints will not have limits
// which makes the check for low/high available capacity less relevant.
// So we essentially assume if it had available capacity when calling
// `all_cheapest_with` then it prolly maintains that state and so we
// just select whichever adapter is working better according to
// the number of errors.
cheapest.min_by_key(|adapter| adapter.current_error_count())
let choices = input.into_iter().choose_multiple(&mut rand::rng(), 3);
if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) {
return Ok(adapter.adapter.clone());
}
}
.map(|adapter| adapter.adapter.clone())
.ok_or(anyhow!(

Err(anyhow!(
"A matching Ethereum network with {:?} was not found.",
required_capabilities
))
Expand All @@ -226,13 +241,11 @@ impl EthereumNetworkAdapters {
&self,
required_capabilities: &NodeCapabilities,
) -> Result<Arc<EthereumAdapter>, Error> {
let cheapest = self.all_unverified_cheapest_with(required_capabilities);
let cheapest = self
.all_unverified_cheapest_with(required_capabilities)
.collect_vec();

Self::cheapest_from(
cheapest.choose_multiple(&mut rand::rng(), 3),
required_capabilities,
self.retest_percent,
)
self.cheapest_from(cheapest, required_capabilities)
}

/// This is the public entry point and should always use verified adapters
Expand All @@ -243,9 +256,9 @@ impl EthereumNetworkAdapters {
let cheapest = self
.all_cheapest_with(required_capabilities)
.await
.choose_multiple(&mut rand::rng(), 3);
.collect_vec();

Self::cheapest_from(cheapest, required_capabilities, self.retest_percent)
self.cheapest_from(cheapest, required_capabilities)
}

pub async fn cheapest(&self) -> Option<Arc<EthereumAdapter>> {
Expand Down Expand Up @@ -429,6 +442,7 @@ mod tests {
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
1.0,
)],
vec![EthereumNetworkAdapter::new(
metrics.cheap_clone(),
Expand All @@ -438,6 +452,7 @@ mod tests {
},
eth_call_adapter.clone(),
SubgraphLimit::Limit(3),
1.0,
)],
)
.await;
Expand Down Expand Up @@ -535,6 +550,7 @@ mod tests {
},
eth_call_adapter.clone(),
SubgraphLimit::Unlimited,
1.0,
)],
vec![EthereumNetworkAdapter::new(
metrics.cheap_clone(),
Expand All @@ -544,6 +560,7 @@ mod tests {
},
eth_adapter.clone(),
SubgraphLimit::Limit(2),
1.0,
)],
)
.await;
Expand Down Expand Up @@ -606,6 +623,7 @@ mod tests {
},
eth_call_adapter.clone(),
SubgraphLimit::Disabled,
1.0,
)],
vec![EthereumNetworkAdapter::new(
metrics.cheap_clone(),
Expand All @@ -615,6 +633,7 @@ mod tests {
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
1.0,
)],
)
.await;
Expand Down Expand Up @@ -661,6 +680,7 @@ mod tests {
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
1.0,
)],
vec![],
)
Expand Down Expand Up @@ -756,11 +776,16 @@ mod tests {
ProviderCheckStrategy::MarkAsValid,
);

let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(0f64));
let no_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager.clone(),
vec![],
Some(0f64),
false,
);

let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64));
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64), false);

assert_eq!(
no_retest_adapters
Expand Down Expand Up @@ -844,8 +869,13 @@ mod tests {
ProviderCheckStrategy::MarkAsValid,
);

let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(1f64));
let always_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager.clone(),
vec![],
Some(1f64),
false,
);

assert_eq!(
always_retest_adapters
Expand All @@ -869,7 +899,7 @@ mod tests {
);

let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64));
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64), false);
assert_eq!(
no_retest_adapters
.cheapest_with(&NodeCapabilities {
Expand Down Expand Up @@ -909,7 +939,8 @@ mod tests {
ProviderCheckStrategy::MarkAsValid,
);

let no_available_adapter = EthereumNetworkAdapters::new(chain_id, manager, vec![], None);
let no_available_adapter =
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false);
let res = no_available_adapter
.cheapest_with(&NodeCapabilities {
archive: true,
Expand Down
16 changes: 9 additions & 7 deletions node/resources/tests/full_config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
weighted_rpc_steering = true

[general]
query = "query_node_.*"

Expand Down Expand Up @@ -46,26 +48,26 @@ ingestor = "index_0"
[chains.mainnet]
shard = "primary"
provider = [
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }},
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }},
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"], weight = 0.1 },
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, weight = 0.2 },
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }, weight = 0.3 },
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }, weight = 0.4 },
]

[chains.ropsten]
shard = "primary"
provider = [
{ label = "ropsten-0", url = "http://rpc.ropsten.io", transport = "rpc", features = ["archive", "traces"] }
{ label = "ropsten-0", url = "http://rpc.ropsten.io", transport = "rpc", features = ["archive", "traces"], weight = 1.0 }
]

[chains.goerli]
shard = "primary"
provider = [
{ label = "goerli-0", url = "http://rpc.goerli.io", transport = "ipc", features = ["archive"] }
{ label = "goerli-0", url = "http://rpc.goerli.io", transport = "ipc", features = ["archive"], weight = 1.0 }
]

[chains.kovan]
shard = "primary"
provider = [
{ label = "kovan-0", url = "http://rpc.kovan.io", transport = "ws", features = [] }
{ label = "kovan-0", url = "http://rpc.kovan.io", transport = "ws", features = [], weight = 1.0 }
]
1 change: 1 addition & 0 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ pub async fn create_ethereum_networks_for_chain(
.await,
),
web3.limit_for(&config.node),
provider.weight,
);

if call_only {
Expand Down
Loading
Loading