Skip to content

Commit 66e579c

Browse files
committed
better rpc selection
1 parent de734ed commit 66e579c

File tree

4 files changed

+35
-11
lines changed

4 files changed

+35
-11
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.6.2"
10+
version = "0.6.3"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

compute/src/node/rpc.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use dkn_p2p::libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
22
use dkn_utils::{DriaNetwork, SemanticVersion};
33
use eyre::{Context, OptionExt, Result};
4+
use rand::seq::SliceRandom;
45
use std::fmt::Debug;
56

67
/// The connected RPC node, as per the Star network topology.
@@ -43,23 +44,46 @@ async fn get_rpc_for_network(
4344
network: &DriaNetwork,
4445
version: &SemanticVersion,
4546
) -> Result<Multiaddr> {
47+
const MIN_MARGIN: usize = 150;
48+
4649
let response = reqwest::get(network.discovery_url(version)).await?;
4750
let rpcs_and_peer_counts = response
4851
.json::<Vec<(Multiaddr, usize)>>()
4952
.await
5053
.wrap_err("could not parse API response")?;
5154

52-
// returns the RPC address with the least peer count (for load balancing)
53-
rpcs_and_peer_counts
55+
// ensure that the response contains at least one RPC
56+
if rpcs_and_peer_counts.is_empty() {
57+
eyre::bail!("no RPCs were returned by discovery API");
58+
}
59+
60+
// get the minimum count of peers from all RPCs
61+
let min_peer_count = rpcs_and_peer_counts
62+
.iter()
63+
.map(|(_, peer_count)| *peer_count)
64+
.min()
65+
.unwrap(); // safe to unwrap because we checked for empty earlier
66+
67+
// choose the RPCs that have peers in range `[min_peer_count, min_peer_count + MIN_MARGIN]`
68+
let rpcs_and_peer_counts: Vec<(Multiaddr, usize)> = rpcs_and_peer_counts
5469
.into_iter()
55-
.min_by_key(|(_, peer_count)| *peer_count)
56-
.ok_or_eyre("no RPCs were returned by discovery API")
70+
.filter(|(_, peer_count)| {
71+
(min_peer_count..=min_peer_count + MIN_MARGIN).contains(peer_count)
72+
})
73+
.collect();
74+
75+
// pick a random RPC from the filtered list
76+
let chosen_rpc = rpcs_and_peer_counts
77+
.choose(&mut rand::thread_rng())
78+
.cloned()
5779
.map(|(addr, _)| addr)
80+
.unwrap(); // safe to unwrap because we checked for empty earlier
81+
82+
Ok(chosen_rpc)
5883
}
5984

6085
#[cfg(test)]
6186
mod tests {
62-
6387
use super::*;
6488

6589
#[tokio::test]

executor/src/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl DriaExecutorsManager {
115115
model_perf.extend(
116116
models
117117
.iter()
118-
.map(|m| (m.clone(), SpecModelPerformance::ExecutionFailed)),
118+
.map(|m| (*m, SpecModelPerformance::ExecutionFailed)),
119119
);
120120
// clear models
121121
models.clear();

0 commit comments

Comments
 (0)