Skip to content

Commit 625a7b3

Browse files
committed
refactor: improve weighted load balancing implementation
- Refactor the adapter selection logic for better clarity and maintainability. - Add a test case to verify the weighted selection logic.
1 parent c3d0c5b commit 625a7b3

File tree

1 file changed

+153
-24
lines changed

1 file changed

+153
-24
lines changed

chain/ethereum/src/network.rs

Lines changed: 153 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -197,46 +197,83 @@ impl EthereumNetworkAdapters {
197197
Self::available_with_capabilities(all, required_capabilities)
198198
}
199199

200-
// handle adapter selection from a list, implements the availability checking with an abstracted
200+
""" // handle adapter selection from a list, implements the availability checking with an abstracted
201201
// source of the adapter list.
202202
fn cheapest_from(
203203
&self,
204204
input: Vec<&EthereumNetworkAdapter>,
205205
required_capabilities: &NodeCapabilities,
206206
) -> Result<Arc<EthereumAdapter>, Error> {
207-
let retest_rng: f64 = (&mut rand::rng()).random();
207+
// First, try to re-test an errored adapter
208+
if let Some(adapter) = self.retest_errored_adapter(&input) {
209+
return Ok(adapter);
210+
}
211+
212+
// If no adapter was re-tested, select the best available adapter
213+
self.select_best_adapter(input, required_capabilities)
214+
}
208215

216+
fn retest_errored_adapter(&self, input: &[&EthereumNetworkAdapter]) -> Option<Arc<EthereumAdapter>> {
217+
let retest_rng: f64 = (&mut rand::rng()).random();
209218
if retest_rng < self.retest_percent {
210-
if let Some(adapter) = input.iter().max_by_key(|a| a.current_error_count()) {
211-
return Ok(adapter.adapter.clone());
212-
}
219+
input
220+
.iter()
221+
.max_by_key(|a| a.current_error_count())
222+
.map(|adapter| adapter.adapter.clone())
223+
} else {
224+
None
213225
}
226+
}
214227

228+
fn select_best_adapter(
229+
&self,
230+
input: Vec<&EthereumNetworkAdapter>,
231+
required_capabilities: &NodeCapabilities,
232+
) -> Result<Arc<EthereumAdapter>, Error> {
215233
if self.weighted {
216-
if input.is_empty() {
217-
return Err(anyhow!(
218-
"A matching Ethereum network with {:?} was not found.",
219-
required_capabilities
220-
));
221-
}
222-
let weights: Vec<_> = input.iter().map(|a| a.weight).collect();
223-
if let Ok(dist) = WeightedIndex::new(&weights) {
224-
let idx = dist.sample(&mut rand::rng());
225-
return Ok(input[idx].adapter.clone());
226-
}
234+
self.select_weighted_adapter(input, required_capabilities)
227235
} else {
228-
let choices = input.into_iter().choose_multiple(&mut rand::rng(), 3);
229-
if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) {
230-
return Ok(adapter.adapter.clone());
231-
}
236+
self.select_random_adapter(input, required_capabilities)
232237
}
238+
}
233239

234-
Err(anyhow!(
235-
"A matching Ethereum network with {:?} was not found.",
236-
required_capabilities
237-
))
240+
fn select_weighted_adapter(
241+
&self,
242+
input: Vec<&EthereumNetworkAdapter>,
243+
required_capabilities: &NodeCapabilities,
244+
) -> Result<Arc<EthereumAdapter>, Error> {
245+
if input.is_empty() {
246+
return Err(anyhow!(
247+
"A matching Ethereum network with {:?} was not found.",
248+
required_capabilities
249+
));
250+
}
251+
let weights: Vec<_> = input.iter().map(|a| a.weight).collect();
252+
if let Ok(dist) = WeightedIndex::new(&weights) {
253+
let idx = dist.sample(&mut rand::rng());
254+
Ok(input[idx].adapter.clone())
255+
} else {
256+
// Fallback to random selection if weights are invalid
257+
self.select_random_adapter(input, required_capabilities)
258+
}
238259
}
239260

261+
fn select_random_adapter(
262+
&self,
263+
input: Vec<&EthereumNetworkAdapter>,
264+
required_capabilities: &NodeCapabilities,
265+
) -> Result<Arc<EthereumAdapter>, Error> {
266+
let choices = input.into_iter().choose_multiple(&mut rand::rng(), 3);
267+
if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) {
268+
Ok(adapter.adapter.clone())
269+
} else {
270+
Err(anyhow!(
271+
"A matching Ethereum network with {:?} was not found.",
272+
required_capabilities
273+
))
274+
}
275+
}""
276+
240277
pub(crate) fn unverified_cheapest_with(
241278
&self,
242279
required_capabilities: &NodeCapabilities,
@@ -976,4 +1013,96 @@ mod tests {
9761013
.await,
9771014
)
9781015
}
1016+
1017+
#[tokio::test]
1018+
async fn test_weighted_adapter_selection() {
1019+
let metrics = Arc::new(EndpointMetrics::mock());
1020+
let logger = graph::log::logger(true);
1021+
let mock_registry = Arc::new(MetricsRegistry::mock());
1022+
let transport = Transport::new_rpc(
1023+
Url::parse("http://127.0.0.1").unwrap(),
1024+
HeaderMap::new(),
1025+
metrics.clone(),
1026+
"",
1027+
);
1028+
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));
1029+
1030+
let adapter1 = Arc::new(
1031+
EthereumAdapter::new(
1032+
logger.clone(),
1033+
"adapter1".to_string(),
1034+
transport.clone(),
1035+
provider_metrics.clone(),
1036+
true,
1037+
false,
1038+
)
1039+
.await,
1040+
);
1041+
1042+
let adapter2 = Arc::new(
1043+
EthereumAdapter::new(
1044+
logger.clone(),
1045+
"adapter2".to_string(),
1046+
transport.clone(),
1047+
provider_metrics.clone(),
1048+
true,
1049+
false,
1050+
)
1051+
.await,
1052+
);
1053+
1054+
let adapters = EthereumNetworkAdapters::for_testing(
1055+
vec![
1056+
EthereumNetworkAdapter::new(
1057+
metrics.cheap_clone(),
1058+
NodeCapabilities {
1059+
archive: true,
1060+
traces: false,
1061+
},
1062+
adapter1.clone(),
1063+
SubgraphLimit::Unlimited,
1064+
0.2,
1065+
),
1066+
EthereumNetworkAdapter::new(
1067+
metrics.cheap_clone(),
1068+
NodeCapabilities {
1069+
archive: true,
1070+
traces: false,
1071+
},
1072+
adapter2.clone(),
1073+
SubgraphLimit::Unlimited,
1074+
0.8,
1075+
),
1076+
],
1077+
vec![],
1078+
)
1079+
.await;
1080+
1081+
let mut adapters = adapters;
1082+
adapters.weighted = true;
1083+
1084+
let mut adapter1_count = 0;
1085+
let mut adapter2_count = 0;
1086+
1087+
for _ in 0..1000 {
1088+
let selected_adapter = adapters
1089+
.cheapest_with(&NodeCapabilities {
1090+
archive: true,
1091+
traces: false,
1092+
})
1093+
.await
1094+
.unwrap();
1095+
1096+
if selected_adapter.provider() == "adapter1" {
1097+
adapter1_count += 1;
1098+
} else {
1099+
adapter2_count += 1;
1100+
}
1101+
}
1102+
1103+
// Check that the selection is roughly proportional to the weights.
1104+
// Allow for a 10% tolerance.
1105+
assert!(adapter1_count > 100 && adapter1_count < 300);
1106+
assert!(adapter2_count > 700 && adapter2_count < 900);
1107+
}
9791108
}

0 commit comments

Comments
 (0)