Skip to content

Commit 1b04a9f

Browse files
committed
feat: add weighted rpc steering
1 parent e434138 commit 1b04a9f

File tree

5 files changed

+122
-35
lines changed

5 files changed

+122
-35
lines changed

chain/ethereum/src/network.rs

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use graph::components::network_provider::ProviderManager;
66
use graph::components::network_provider::ProviderName;
77
use graph::endpoint::EndpointMetrics;
88
use graph::firehose::{AvailableCapacity, SubgraphLimit};
9+
use graph::prelude::rand::distributions::WeightedIndex;
910
use graph::prelude::rand::seq::IteratorRandom;
1011
use graph::prelude::rand::{self, Rng};
1112
use itertools::Itertools;
@@ -30,6 +31,7 @@ pub struct EthereumNetworkAdapter {
3031
/// that limit. That's a somewhat imprecise but convenient way to
3132
/// determine the number of connections
3233
limit: SubgraphLimit,
34+
weight: usize,
3335
}
3436

3537
#[async_trait]
@@ -53,12 +55,14 @@ impl EthereumNetworkAdapter {
5355
capabilities: NodeCapabilities,
5456
adapter: Arc<EthereumAdapter>,
5557
limit: SubgraphLimit,
58+
weight: usize,
5659
) -> Self {
5760
Self {
5861
endpoint_metrics,
5962
capabilities,
6063
adapter,
6164
limit,
65+
weight,
6266
}
6367
}
6468

@@ -86,6 +90,7 @@ pub struct EthereumNetworkAdapters {
8690
call_only_adapters: Vec<EthereumNetworkAdapter>,
8791
// Percentage of request that should be used to retest errored adapters.
8892
retest_percent: f64,
93+
weighted: bool,
8994
}
9095

9196
impl EthereumNetworkAdapters {
@@ -95,6 +100,7 @@ impl EthereumNetworkAdapters {
95100
manager: ProviderManager::default(),
96101
call_only_adapters: vec![],
97102
retest_percent: DEFAULT_ADAPTER_ERROR_RETEST_PERCENT,
103+
weighted: false,
98104
}
99105
}
100106

@@ -121,14 +127,15 @@ impl EthereumNetworkAdapters {
121127
ProviderCheckStrategy::MarkAsValid,
122128
);
123129

124-
Self::new(chain_id, provider, call_only, None)
130+
Self::new(chain_id, provider, call_only, None, false)
125131
}
126132

127133
pub fn new(
128134
chain_id: ChainName,
129135
manager: ProviderManager<EthereumNetworkAdapter>,
130136
call_only_adapters: Vec<EthereumNetworkAdapter>,
131137
retest_percent: Option<f64>,
138+
weighted: bool,
132139
) -> Self {
133140
#[cfg(debug_assertions)]
134141
call_only_adapters.iter().for_each(|a| {
@@ -140,6 +147,7 @@ impl EthereumNetworkAdapters {
140147
manager,
141148
call_only_adapters,
142149
retest_percent: retest_percent.unwrap_or(DEFAULT_ADAPTER_ERROR_RETEST_PERCENT),
150+
weighted,
143151
}
144152
}
145153

@@ -192,31 +200,38 @@ impl EthereumNetworkAdapters {
192200
// handle adapter selection from a list, implements the availability checking with an abstracted
193201
// source of the adapter list.
194202
fn cheapest_from(
203+
&self,
195204
input: Vec<&EthereumNetworkAdapter>,
196205
required_capabilities: &NodeCapabilities,
197-
retest_percent: f64,
198206
) -> Result<Arc<EthereumAdapter>, Error> {
199207
let retest_rng: f64 = (&mut rand::rng()).random();
200208

201-
let cheapest = input.into_iter().choose_multiple(&mut rand::rng(), 3);
202-
let cheapest = cheapest.iter();
209+
if retest_rng < self.retest_percent {
210+
if let Some(adapter) = input.iter().max_by_key(|a| a.current_error_count()) {
211+
return Ok(adapter.adapter.clone());
212+
}
213+
}
203214

204-
// If request falls below the retest threshold, use this request to try and
205-
// reset the failed adapter. If a request succeeds the adapter will be more
206-
// likely to be selected afterwards.
207-
if retest_rng < retest_percent {
208-
cheapest.max_by_key(|adapter| adapter.current_error_count())
215+
if self.weighted {
216+
if input.is_empty() {
217+
return Err(anyhow!(
218+
"A matching Ethereum network with {:?} was not found.",
219+
required_capabilities
220+
));
221+
}
222+
let weights: Vec<_> = input.iter().map(|a| a.weight).collect();
223+
if let Ok(dist) = WeightedIndex::new(&weights) {
224+
let idx = dist.sample(&mut rand::rng());
225+
return Ok(input[idx].adapter.clone());
226+
}
209227
} else {
210-
// The assumption here is that most RPC endpoints will not have limits
211-
// which makes the check for low/high available capacity less relevant.
212-
// So we essentially assume if it had available capacity when calling
213-
// `all_cheapest_with` then it prolly maintains that state and so we
214-
// just select whichever adapter is working better according to
215-
// the number of errors.
216-
cheapest.min_by_key(|adapter| adapter.current_error_count())
228+
let choices = input.into_iter().choose_multiple(&mut rand::rng(), 3);
229+
if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) {
230+
return Ok(adapter.adapter.clone());
231+
}
217232
}
218-
.map(|adapter| adapter.adapter.clone())
219-
.ok_or(anyhow!(
233+
234+
Err(anyhow!(
220235
"A matching Ethereum network with {:?} was not found.",
221236
required_capabilities
222237
))
@@ -226,13 +241,11 @@ impl EthereumNetworkAdapters {
226241
&self,
227242
required_capabilities: &NodeCapabilities,
228243
) -> Result<Arc<EthereumAdapter>, Error> {
229-
let cheapest = self.all_unverified_cheapest_with(required_capabilities);
244+
let cheapest = self
245+
.all_unverified_cheapest_with(required_capabilities)
246+
.collect_vec();
230247

231-
Self::cheapest_from(
232-
cheapest.choose_multiple(&mut rand::rng(), 3),
233-
required_capabilities,
234-
self.retest_percent,
235-
)
248+
self.cheapest_from(cheapest, required_capabilities)
236249
}
237250

238251
/// This is the public entry point and should always use verified adapters
@@ -243,9 +256,9 @@ impl EthereumNetworkAdapters {
243256
let cheapest = self
244257
.all_cheapest_with(required_capabilities)
245258
.await
246-
.choose_multiple(&mut rand::rng(), 3);
259+
.collect_vec();
247260

248-
Self::cheapest_from(cheapest, required_capabilities, self.retest_percent)
261+
self.cheapest_from(cheapest, required_capabilities)
249262
}
250263

251264
pub async fn cheapest(&self) -> Option<Arc<EthereumAdapter>> {
@@ -429,6 +442,7 @@ mod tests {
429442
},
430443
eth_adapter.clone(),
431444
SubgraphLimit::Limit(3),
445+
1,
432446
)],
433447
vec![EthereumNetworkAdapter::new(
434448
metrics.cheap_clone(),
@@ -438,6 +452,7 @@ mod tests {
438452
},
439453
eth_call_adapter.clone(),
440454
SubgraphLimit::Limit(3),
455+
1,
441456
)],
442457
)
443458
.await;
@@ -535,6 +550,7 @@ mod tests {
535550
},
536551
eth_call_adapter.clone(),
537552
SubgraphLimit::Unlimited,
553+
1,
538554
)],
539555
vec![EthereumNetworkAdapter::new(
540556
metrics.cheap_clone(),
@@ -544,6 +560,7 @@ mod tests {
544560
},
545561
eth_adapter.clone(),
546562
SubgraphLimit::Limit(2),
563+
1,
547564
)],
548565
)
549566
.await;
@@ -606,6 +623,7 @@ mod tests {
606623
},
607624
eth_call_adapter.clone(),
608625
SubgraphLimit::Disabled,
626+
1,
609627
)],
610628
vec![EthereumNetworkAdapter::new(
611629
metrics.cheap_clone(),
@@ -615,6 +633,7 @@ mod tests {
615633
},
616634
eth_adapter.clone(),
617635
SubgraphLimit::Limit(3),
636+
1,
618637
)],
619638
)
620639
.await;
@@ -661,6 +680,7 @@ mod tests {
661680
},
662681
eth_adapter.clone(),
663682
SubgraphLimit::Limit(3),
683+
1,
664684
)],
665685
vec![],
666686
)
@@ -756,11 +776,16 @@ mod tests {
756776
ProviderCheckStrategy::MarkAsValid,
757777
);
758778

759-
let no_retest_adapters =
760-
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(0f64));
779+
let no_retest_adapters = EthereumNetworkAdapters::new(
780+
chain_id.clone(),
781+
manager.clone(),
782+
vec![],
783+
Some(0f64),
784+
false,
785+
);
761786

762787
let always_retest_adapters =
763-
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64));
788+
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64), false);
764789

765790
assert_eq!(
766791
no_retest_adapters
@@ -844,8 +869,13 @@ mod tests {
844869
ProviderCheckStrategy::MarkAsValid,
845870
);
846871

847-
let always_retest_adapters =
848-
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(1f64));
872+
let always_retest_adapters = EthereumNetworkAdapters::new(
873+
chain_id.clone(),
874+
manager.clone(),
875+
vec![],
876+
Some(1f64),
877+
false,
878+
);
849879

850880
assert_eq!(
851881
always_retest_adapters
@@ -869,7 +899,7 @@ mod tests {
869899
);
870900

871901
let no_retest_adapters =
872-
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64));
902+
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64), false);
873903
assert_eq!(
874904
no_retest_adapters
875905
.cheapest_with(&NodeCapabilities {
@@ -909,7 +939,8 @@ mod tests {
909939
ProviderCheckStrategy::MarkAsValid,
910940
);
911941

912-
let no_available_adapter = EthereumNetworkAdapters::new(chain_id, manager, vec![], None);
942+
let no_available_adapter =
943+
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false);
913944
let res = no_available_adapter
914945
.cheapest_with(&NodeCapabilities {
915946
archive: true,

node/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ pub async fn create_ethereum_networks_for_chain(
314314
.await,
315315
),
316316
web3.limit_for(&config.node),
317+
provider.weight,
317318
);
318319

319320
if call_only {

0 commit comments

Comments
 (0)