Skip to content

Commit 00a169a

Browse files
committed
better timestamp logic, added heartbeat reqres
1 parent 111eab3 commit 00a169a

File tree

19 files changed

+370
-358
lines changed

19 files changed

+370
-358
lines changed

Cargo.lock

Lines changed: 12 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

compute/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ dotenvy.workspace = true
2727
base64 = "0.22.0"
2828
hex = "0.4.3"
2929
hex-literal = "0.4.1"
30-
uuid = { version = "1.8.0", features = ["v4"] }
30+
uuid = { version = "1.8.0", features = ["v4", "serde"] }
3131
rand.workspace = true
3232

3333
# logging & errors
@@ -59,6 +59,7 @@ public-ip-address = "0.3.2"
5959
dkn-p2p = { path = "../p2p" }
6060
dkn-utils = { path = "../utils" }
6161
dkn-workflows = { path = "../workflows" }
62+
chrono = { version = "0.4.40", features = ["serde"] }
6263

6364

6465
# vendor OpenSSL so that its easier to build cross-platform packages

compute/src/config.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -162,19 +162,3 @@ impl DriaComputeNodeConfig {
162162
Ok(())
163163
}
164164
}
165-
166-
#[cfg(test)]
167-
impl Default for DriaComputeNodeConfig {
168-
/// Creates a new config with dummy values.
169-
///
170-
/// Should only be used for testing purposes.
171-
fn default() -> Self {
172-
env::set_var(
173-
"DKN_WALLET_SECRET_KEY",
174-
"6e6f64656e6f64656e6f64656e6f64656e6f64656e6f64656e6f64656e6f6465",
175-
);
176-
env::set_var("DKN_MODELS", "gpt-3.5-turbo");
177-
178-
Self::new(Default::default())
179-
}
180-
}

compute/src/main.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,7 @@ async fn main() -> Result<()> {
131131
log::info!("Spawning compute node thread.");
132132
let node_token = cancellation.clone();
133133
task_tracker.spawn(async move {
134-
if let Err(err) = node.run(node_token).await {
135-
log::error!("Error within main node loop: {}", err);
136-
log::error!("Shutting down node.");
137-
node.shutdown().await.expect("could not shutdown node");
138-
};
134+
node.run(node_token).await;
139135
log::info!("Closing node.")
140136
});
141137

compute/src/node/core.rs

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use eyre::{eyre, Result};
1+
use eyre::Result;
22
use std::time::Duration;
33
use tokio_util::sync::CancellationToken;
44

@@ -7,54 +7,48 @@ use crate::{utils::DriaMessage, DriaComputeNode};
77
impl DriaComputeNode {
88
/// Runs the main loop of the compute node.
99
/// This method is not expected to return until cancellation occurs for the given token.
10-
pub async fn run(&mut self, cancellation: CancellationToken) -> Result<()> {
10+
pub async fn run(&mut self, cancellation: CancellationToken) {
1111
/// Number of seconds between refreshing for diagnostic prints.
1212
const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30;
1313
/// Number of seconds between refreshing the available nodes.
1414
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 10 * 60;
15+
/// Number of seconds between each heartbeat sent to the RPC.
16+
const HEARTBEAT_INTERVAL_SECS: u64 = 60;
1517

1618
// prepare durations for sleeps
1719
let mut diagnostic_refresh_interval =
1820
tokio::time::interval(Duration::from_secs(DIAGNOSTIC_REFRESH_INTERVAL_SECS));
19-
diagnostic_refresh_interval.tick().await; // move one tick
2021
let mut available_node_refresh_interval =
2122
tokio::time::interval(Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS));
22-
available_node_refresh_interval.tick().await; // move one tick
23+
let mut heartbeat_interval =
24+
tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
25+
26+
// move each one tick
27+
available_node_refresh_interval.tick().await;
28+
diagnostic_refresh_interval.tick().await;
29+
heartbeat_interval.tick().await;
2330

2431
loop {
2532
tokio::select! {
2633
// a task is completed by the worker & should be responded to the requesting peer
2734
task_response_msg_opt = self.task_output_rx.recv() => {
28-
let task_response_msg = task_response_msg_opt.ok_or(
29-
eyre!("Publish channel closed unexpectedly, we still have {} batch and {} single tasks.", self.pending_tasks_batch.len(), self.pending_tasks_single.len())
30-
)?; {
31-
if let Err(e) = self.handle_task_response(task_response_msg).await {
32-
log::error!("Error responding to task: {:?}", e);
33-
}
35+
if let Some(task_response_msg) = task_response_msg_opt {
36+
if let Err(e) = self.send_task_output(task_response_msg).await {
37+
log::error!("Error responding to task: {:?}", e);
38+
}
39+
} else {
40+
log::error!("task_output_rx channel closed unexpectedly, we still have {} batch and {} single tasks.", self.pending_tasks_batch.len(), self.pending_tasks_single.len());
41+
break;
3442
}
3543
},
3644

37-
// a GossipSub message is received from the channel
38-
// // this is expected to be sent by the p2p client
39-
// gossipsub_msg_opt = self.gossip_message_rx.recv() => {
40-
// let (propagation_peer_id, message_id, message) = gossipsub_msg_opt.ok_or(eyre!("message_rx channel closed unexpectedly"))?;
41-
42-
// // handle the message, returning a message acceptance for the received one
43-
// let acceptance = self.handle_message((propagation_peer_id, &message_id, message)).await;
44-
45-
// // validate the message based on the acceptance
46-
// // cant do anything but log if this gives an error as well
47-
// if let Err(e) = self.p2p.validate_message(&message_id, &propagation_peer_id, acceptance).await {
48-
// log::error!("Error validating message {}: {:?}", message_id, e);
49-
// }
50-
51-
// },
52-
53-
// a Request is received from the channel, sent by p2p client
54-
request_msg_opt = self.request_rx.recv() => {
55-
let request = request_msg_opt.ok_or(eyre!("request_rx channel closed unexpectedly"))?;
56-
if let Err(e) = self.handle_request(request).await {
57-
log::error!("Error handling request: {:?}", e);
45+
// a Request or Response is received by the p2p client
46+
reqres_msg_opt = self.reqres_rx.recv() => {
47+
if let Some((peer_id, message)) = reqres_msg_opt {
48+
self.handle_reqres(peer_id, message).await;
49+
} else {
50+
log::error!("reqres_rx channel closed unexpectedly.");
51+
break;
5852
}
5953
},
6054

@@ -64,19 +58,28 @@ impl DriaComputeNode {
6458
// available nodes are refreshed every now and then
6559
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
6660

61+
_ = heartbeat_interval.tick() => {
62+
if let Err(e) = self.send_heartbeat().await {
63+
log::error!("Error making heartbeat: {:?}", e);
64+
}
65+
},
66+
6767
// check if the cancellation token is cancelled
6868
// this is expected to be cancelled by the main thread with signal handling
69-
_ = cancellation.cancelled() => break,
69+
_ = cancellation.cancelled() => {
70+
log::info!("Cancellation received, shutting down the node.");
71+
break;
72+
},
7073
}
7174
}
7275

7376
// print one final diagnostic as a summary
7477
self.handle_diagnostic_refresh().await;
7578

7679
// shutdown channels
77-
self.shutdown().await?;
78-
79-
Ok(())
80+
if let Err(e) = self.shutdown().await {
81+
log::error!("Could not shutdown the node gracefully: {:?}", e);
82+
}
8083
}
8184

8285
/// Shorthand method to create a signed message with the given data and topic.
@@ -95,9 +98,12 @@ impl DriaComputeNode {
9598
log::debug!("Sending shutdown command to p2p client.");
9699
self.p2p.shutdown().await?;
97100

98-
log::debug!("Closing task response channel.");
101+
log::debug!("Closing task output channel.");
99102
self.task_output_rx.close();
100103

104+
log::debug!("Closing reqres channel.");
105+
self.reqres_rx.close();
106+
101107
Ok(())
102108
}
103109
}

compute/src/node/diagnostic.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use colored::Colorize;
22
use dkn_p2p::libp2p::multiaddr::Protocol;
33
use std::time::Duration;
4-
use tokio::time::Instant;
54

6-
use crate::{refresh_dria_nodes, utils::get_steps, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
5+
use crate::{refresh_dria_nodes, utils::get_points, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
76

8-
/// Number of seconds such that if the last ping is older than this, the node is considered unreachable.
9-
const PING_LIVENESS_SECS: u64 = 150;
7+
/// Number of seconds such that if the last heartbeat ACK is older than this, the node is considered unreachable.
8+
const HEARTBEAT_LIVENESS_SECS: Duration = Duration::from_secs(150);
109

1110
impl DriaComputeNode {
1211
/// Returns the task count within the channels, `single` and `batch`.
@@ -23,7 +22,7 @@ impl DriaComputeNode {
2322
let mut diagnostics = vec![format!("Diagnostics (v{}):", DRIA_COMPUTE_NODE_VERSION)];
2423

2524
// print steps
26-
if let Ok(steps) = get_steps(&self.config.address).await {
25+
if let Ok(steps) = get_points(&self.config.address).await {
2726
let earned = steps.score - self.initial_steps;
2827
diagnostics.push(format!(
2928
"$DRIA Points: {} total, {} earned in this run, within top {}%",
@@ -55,10 +54,10 @@ impl DriaComputeNode {
5554
.join(", ")
5655
));
5756

58-
// add network status as well
5957
// if we have not received pings for a while, we are considered offline
60-
let is_offline = Instant::now().duration_since(self.last_heartbeat_at)
61-
> Duration::from_secs(PING_LIVENESS_SECS);
58+
let is_offline = chrono::Utc::now() > self.last_heartbeat_at + HEARTBEAT_LIVENESS_SECS;
59+
60+
// if we have not yet received a heartbeat response, we are still connecting
6261
if self.num_heartbeats == 0 {
6362
// if we didnt have any pings, we might still be connecting
6463
diagnostics.push(format!("Node Status: {}", "CONNECTING".yellow()));
@@ -73,18 +72,13 @@ impl DriaComputeNode {
7372
));
7473
}
7574

76-
// add pings per second
77-
let elapsed = Instant::now().duration_since(self.started_at).as_secs_f64();
78-
let pings_per_second = self.num_heartbeats as f64 / elapsed; // elapsed is always > 0
79-
diagnostics.push(format!("Pings/sec: {:.3}", pings_per_second));
80-
8175
log::info!("{}", diagnostics.join("\n "));
8276

8377
// if offline, print this error message as well
8478
if is_offline {
8579
log::error!(
8680
"Node has not received any pings for at least {} seconds & it may be unreachable!\nPlease restart your node!",
87-
PING_LIVENESS_SECS
81+
HEARTBEAT_LIVENESS_SECS.as_secs()
8882
);
8983
}
9084

compute/src/node/mod.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use dkn_p2p::{
2-
libp2p::{request_response::ResponseChannel, PeerId},
2+
libp2p::{request_response, PeerId},
33
DriaNodes, DriaP2PClient, DriaP2PCommander, DriaP2PProtocol,
44
};
55
use eyre::Result;
66
use std::collections::HashMap;
7-
use tokio::{sync::mpsc, time::Instant};
7+
use tokio::sync::mpsc;
88

99
use crate::{
1010
config::*,
11-
utils::{crypto::secret_to_keypair, get_steps, refresh_dria_nodes, SpecCollector},
11+
utils::{crypto::secret_to_keypair, get_points, refresh_dria_nodes, SpecCollector},
1212
workers::task::{TaskWorker, TaskWorkerInput, TaskWorkerMetadata, TaskWorkerOutput},
1313
};
1414

@@ -27,13 +27,14 @@ pub struct DriaComputeNode {
2727
pub p2p: DriaP2PCommander,
2828
/// The last time the node had an acknowledged heartbeat.
2929
/// If this is too much, we can say that the node is not reachable by RPC.
30-
pub(crate) last_heartbeat_at: Instant,
30+
pub(crate) last_heartbeat_at: chrono::DateTime<chrono::Utc>,
3131
/// Number of pings received.
3232
pub(crate) num_heartbeats: u64,
33-
/// The time the node was started.
34-
pub(crate) started_at: Instant,
35-
/// Request-response request receiver.
36-
request_rx: mpsc::Receiver<(PeerId, Vec<u8>, ResponseChannel<Vec<u8>>)>,
33+
/// A mapping of heartbeat UUIDs to their deadlines.
34+
/// This is used to track the heartbeats, and their acknowledgements.
35+
pub(crate) heartbeats: HashMap<uuid::Uuid, chrono::DateTime<chrono::Utc>>,
36+
/// Request-response message receiver, can have both a request or a response.
37+
reqres_rx: mpsc::Receiver<(PeerId, request_response::Message<Vec<u8>, Vec<u8>>)>,
3738
/// Task response receiver, will respond to the request-response channel with the given result.
3839
task_output_rx: mpsc::Receiver<TaskWorkerOutput>,
3940
/// Task worker transmitter to send batchable tasks.
@@ -109,7 +110,7 @@ impl DriaComputeNode {
109110

110111
let model_names = config.workflows.get_model_names();
111112

112-
let initial_steps = get_steps(&config.address)
113+
let initial_steps = get_points(&config.address)
113114
.await
114115
.map(|s| s.score)
115116
.unwrap_or_default();
@@ -121,7 +122,7 @@ impl DriaComputeNode {
121122
dria_nodes,
122123
// receivers
123124
task_output_rx: publish_rx,
124-
request_rx,
125+
reqres_rx: request_rx,
125126
// transmitters
126127
task_request_batch_tx: task_batch_tx,
127128
task_request_single_tx: task_single_tx,
@@ -130,12 +131,13 @@ impl DriaComputeNode {
130131
pending_tasks_batch: HashMap::new(),
131132
completed_tasks_single: 0,
132133
completed_tasks_batch: 0,
133-
// others
134+
// heartbeats
135+
heartbeats: HashMap::new(),
136+
last_heartbeat_at: chrono::Utc::now(),
137+
num_heartbeats: 0,
138+
// misc
134139
initial_steps,
135140
spec_collector: SpecCollector::new(model_names),
136-
last_heartbeat_at: Instant::now(),
137-
num_heartbeats: 0,
138-
started_at: Instant::now(),
139141
},
140142
p2p_client,
141143
task_batch_worker,

0 commit comments

Comments
 (0)