Skip to content

Commit 55beb5a

Browse files
committed
change rpc-dial logic
1 parent 5752995 commit 55beb5a

File tree

9 files changed

+35
-344
lines changed

9 files changed

+35
-344
lines changed

compute/src/node/core.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ impl DriaComputeNode {
1010
/// This method is not expected to return until cancellation occurs for the given token.
1111
pub async fn run(&mut self, cancellation: CancellationToken) {
1212
/// Duration between refreshing for diagnostic prints.
13-
const DIAGNOSTIC_REFRESH_INTERVAL_SECS: Duration = Duration::from_secs(30);
13+
const DIAGNOSTIC_REFRESH_INTERVAL_SECS: Duration = Duration::from_secs(45);
1414
/// Duration between refreshing the available nodes.
15-
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: Duration = Duration::from_secs(2 * 60);
15+
const RPC_LIVENESS_REFRESH_INTERVAL_SECS: Duration = Duration::from_secs(2 * 60);
1616
/// Duration between each heartbeat sent to the RPC.
1717
const HEARTBEAT_INTERVAL_SECS: Duration = Duration::from_secs(60);
1818

1919
let mut diagnostic_refresh_interval =
2020
tokio::time::interval(DIAGNOSTIC_REFRESH_INTERVAL_SECS);
2121
diagnostic_refresh_interval.tick().await; // move each one tick
22-
let mut available_node_refresh_interval =
23-
tokio::time::interval(AVAILABLE_NODES_REFRESH_INTERVAL_SECS);
24-
available_node_refresh_interval.tick().await; // move each one tick
22+
let mut rpc_liveness_refresh_interval =
23+
tokio::time::interval(RPC_LIVENESS_REFRESH_INTERVAL_SECS);
24+
rpc_liveness_refresh_interval.tick().await; // move each one tick
2525

2626
let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL_SECS);
2727

@@ -52,8 +52,8 @@ impl DriaComputeNode {
5252
// check peer count every now and then
5353
_ = diagnostic_refresh_interval.tick() => self.handle_diagnostic_refresh().await,
5454

55-
// available nodes are refreshed every now and then
56-
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
55+
// check RPC, and get a new one if we are disconnected
56+
_ = rpc_liveness_refresh_interval.tick() => self.handle_rpc_liveness_check().await,
5757

5858
_ = heartbeat_interval.tick() => {
5959
if let Err(e) = self.send_heartbeat().await {

compute/src/node/diagnostic.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl DriaComputeNode {
9898
///
9999
/// If there is an error while doing that,
100100
/// it will try to get a new RPC node and dial it.
101-
pub(crate) async fn handle_available_nodes_refresh(&mut self) {
101+
pub(crate) async fn handle_rpc_liveness_check(&mut self) {
102102
log::debug!("Checking RPC connections for diagnostics.");
103103

104104
// check if we are connected
@@ -108,19 +108,15 @@ impl DriaComputeNode {
108108
.await
109109
.unwrap_or(false);
110110

111-
// if we are not connected, try to dial it again
111+
// if we are not connected, get a new RPC and dial it again
112112
if !is_connected {
113-
log::info!("Dialling RPC at: {}", self.dria_rpc.addr);
114-
if let Err(err) = self
115-
.dial_with_timeout(self.dria_rpc.peer_id, self.dria_rpc.addr.clone())
116-
.await
117-
{
118-
// if we also cannot dial it, get a new RPC node
119-
log::warn!(
120-
"Could not dial to RPC at: {}: {err:?}\nWill get a new RPC node.",
121-
self.dria_rpc.addr,
122-
);
123-
if let Ok(new_rpc) = DriaRPC::new(self.dria_rpc.network).await {
113+
// if we also cannot dial it, get a new RPC node
114+
log::warn!(
115+
"Connection to RPC {} is lost, geting a new one!",
116+
self.dria_rpc.addr,
117+
);
118+
match DriaRPC::new_for_network(self.dria_rpc.network).await {
119+
Ok(new_rpc) => {
124120
self.dria_rpc = new_rpc;
125121

126122
// now dial this new RPC again
@@ -129,14 +125,13 @@ impl DriaComputeNode {
129125
.await
130126
{
131127
// worst-case we cant dial this one too, just leave it for the next diagnostic
132-
log::error!("Could not dial the new RPC: {err:?}\nWill try again in the next diagnostic refresh.");
128+
log::error!("Could not dial the new RPC: {err:?}");
133129
}
134-
} else {
135-
log::error!("Could not get a new RPC node!\nWill try again in the next diagnostic refresh.");
136130
}
137-
} else {
138-
log::info!("Successfully dialled to RPC at: {}", self.dria_rpc.addr);
139-
}
131+
Err(err) => {
132+
log::error!("Could not get a new RPC node: {err:?}");
133+
}
134+
};
140135
} else {
141136
log::debug!("Connection with {} is intact.", self.dria_rpc.peer_id);
142137
}

compute/src/node/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl DriaComputeNode {
7070
let keypair = secret_to_keypair(&config.secret_key);
7171

7272
// get available rpc node
73-
let dria_nodes = DriaRPC::new(config.network_type)
73+
let dria_nodes = DriaRPC::new_for_network(config.network_type)
7474
.await
7575
.expect("could not get RPC to connect to");
7676

compute/src/utils/rpc.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,8 @@ pub struct DriaRPC {
1212
}
1313

1414
impl DriaRPC {
15-
/// Creates a new `AvailableNodes` struct for the given network type.
16-
///
17-
/// Will panic if anything goes wrong.
18-
pub async fn new(network: DriaNetworkType) -> Result<Self> {
19-
let addr = refresh_rpc_addr(&network).await?;
15+
/// Creates a new RPC target at the given type, along with a network type for refreshing the RPC address.
16+
pub fn new(addr: Multiaddr, network: DriaNetworkType) -> Result<Self> {
2017
let peer_id = addr
2118
.iter()
2219
.find_map(|p| match p {
@@ -31,12 +28,18 @@ impl DriaRPC {
3128
network,
3229
})
3330
}
31+
32+
/// Creates a new RPC target for the given network type.
33+
pub async fn new_for_network(network: DriaNetworkType) -> Result<Self> {
34+
let addr = get_rpc_for_network(&network).await?;
35+
Self::new(addr, network)
36+
}
3437
}
3538

3639
/// Calls the DKN API to get an RPC address for the given network type.
3740
///
3841
/// The peer id is expected to be within the multi-address.
39-
async fn refresh_rpc_addr(network: &DriaNetworkType) -> Result<Multiaddr> {
42+
async fn get_rpc_for_network(network: &DriaNetworkType) -> Result<Multiaddr> {
4043
#[derive(serde::Deserialize, Debug)]
4144
struct DriaNodesApiResponse {
4245
pub rpc: Multiaddr,
@@ -66,7 +69,7 @@ mod tests {
6669

6770
#[tokio::test]
6871
async fn test_dria_nodes() {
69-
let node = DriaRPC::new(DriaNetworkType::Community).await;
72+
let node = DriaRPC::new_for_network(DriaNetworkType::Community).await;
7073
assert!(node.is_ok());
7174
}
7275
}

monitor/Cargo.toml

Lines changed: 0 additions & 34 deletions
This file was deleted.

monitor/README.md

Lines changed: 0 additions & 19 deletions
This file was deleted.

monitor/src/main.rs

Lines changed: 0 additions & 78 deletions
This file was deleted.

0 commit comments

Comments
 (0)