Skip to content

Commit 6ef4de6

Browse files
committed
some naming fixes, todo check tests
1 parent c3da1c9 commit 6ef4de6

File tree

5 files changed

+104
-80
lines changed

5 files changed

+104
-80
lines changed

compute/src/node/core.rs

Lines changed: 39 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use eyre::Result;
1+
use eyre::{eyre, Result};
22
use std::time::Duration;
33
use tokio_util::sync::CancellationToken;
44

5-
use crate::{node::PingpongHandler, reqres::TaskResponder, utils::DriaMessage, DriaComputeNode};
5+
use crate::{node::PingpongHandler, utils::DriaMessage, DriaComputeNode};
66

77
impl DriaComputeNode {
88
/// Runs the main loop of the compute node.
@@ -27,72 +27,45 @@ impl DriaComputeNode {
2727

2828
loop {
2929
tokio::select! {
30-
// a Workflow message to be published is received from the channel
31-
// this is expected to be sent by the workflow worker
32-
publish_msg_opt = self.publish_rx.recv() => {
33-
if let Some(publish_msg) = publish_msg_opt {
34-
// remove the task from pending tasks based on its batchability
35-
let task_metadata = match publish_msg.batchable {
36-
true => {
37-
self.completed_tasks_batch += 1;
38-
self.pending_tasks_batch.remove(&publish_msg.task_id)
39-
},
40-
false => {
41-
self.completed_tasks_single += 1;
42-
self.pending_tasks_single.remove(&publish_msg.task_id)
43-
}
44-
};
45-
46-
// respond to the request
47-
match task_metadata {
48-
Some(channel) => {
49-
TaskResponder::handle_respond(self, publish_msg, channel).await?;
50-
}
51-
None => log::error!("Channel not found for task id: {}", publish_msg.task_id),
52-
}
53-
} else {
54-
log::error!("Publish channel closed unexpectedly, we still have {} batch and {} single tasks.", self.pending_tasks_batch.len(), self.pending_tasks_single.len());
55-
break;
56-
};
30+
// a task is completed by the worker & should be responded to the requesting peer
31+
task_response_msg_opt = self.task_output_rx.recv() => {
32+
let task_response_msg = task_response_msg_opt.ok_or(
33+
eyre!("Publish channel closed unexpectedly, we still have {} batch and {} single tasks.", self.pending_tasks_batch.len(), self.pending_tasks_single.len())
34+
)?; {
35+
self.handle_task_response(task_response_msg).await?;
36+
}
5737
},
5838

59-
// check peer count every now and then
60-
_ = diagnostic_refresh_interval.tick() => self.handle_diagnostic_refresh().await,
61-
62-
// available nodes are refreshed every now and then
63-
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
64-
6539
// a GossipSub message is received from the channel
6640
// this is expected to be sent by the p2p client
67-
gossipsub_msg_opt = self.message_rx.recv() => {
68-
if let Some((peer_id, message_id, message)) = gossipsub_msg_opt {
69-
// handle the message, returning a message acceptance for the received one
70-
let acceptance = self.handle_message((peer_id, &message_id, message)).await;
71-
72-
// validate the message based on the acceptance
73-
// cant do anything but log if this gives an error as well
74-
if let Err(e) = self.p2p.validate_message(&message_id, &peer_id, acceptance).await {
75-
log::error!("Error validating message {}: {:?}", message_id, e);
76-
}
77-
} else {
78-
log::error!("message_rx channel closed unexpectedly.");
79-
break;
80-
};
41+
gossipsub_msg_opt = self.gossip_message_rx.recv() => {
42+
let (peer_id, message_id, message) = gossipsub_msg_opt.ok_or(eyre!("message_rx channel closed unexpectedly."))?;
43+
44+
// handle the message, returning a message acceptance for the received one
45+
let acceptance = self.handle_message((peer_id, &message_id, message)).await;
46+
47+
// validate the message based on the acceptance
48+
// cant do anything but log if this gives an error as well
49+
if let Err(e) = self.p2p.validate_message(&message_id, &peer_id, acceptance).await {
50+
log::error!("Error validating message {}: {:?}", message_id, e);
51+
}
52+
8153
},
8254

83-
// a Response message is received from the channel
84-
// this is expected to be sent by the p2p client
55+
// a Request is received from the channel, sent by p2p client
8556
request_msg_opt = self.request_rx.recv() => {
86-
if let Some((peer_id, data, channel)) = request_msg_opt {
87-
if let Err(e) = self.handle_request((peer_id, data, channel)).await {
88-
log::error!("Error handling request: {:?}", e);
89-
}
90-
} else {
91-
log::error!("request_rx channel closed unexpectedly.");
92-
break;
93-
};
57+
let request = request_msg_opt.ok_or(eyre!("request_rx channel closed unexpectedly."))?;
58+
if let Err(e) = self.handle_request(request).await {
59+
log::error!("Error handling request: {:?}", e);
60+
}
9461
},
9562

63+
// check peer count every now and then
64+
_ = diagnostic_refresh_interval.tick() => self.handle_diagnostic_refresh().await,
65+
66+
// available nodes are refreshed every now and then
67+
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
68+
9669
// check if the cancellation token is cancelled
9770
// this is expected to be cancelled by the main thread with signal handling
9871
_ = cancellation.cancelled() => break,
@@ -119,15 +92,18 @@ impl DriaComputeNode {
11992
}
12093

12194
/// Shutdown channels between p2p, worker and yourself.
95+
///
96+
/// Can be inlined as it is called only once from very few places.
97+
#[inline]
12298
pub async fn shutdown(&mut self) -> Result<()> {
12399
log::debug!("Sending shutdown command to p2p client.");
124100
self.p2p.shutdown().await?;
125101

126-
log::debug!("Closing message channel.");
127-
self.message_rx.close();
102+
log::debug!("Closing gossip message receipt channel.");
103+
self.gossip_message_rx.close();
128104

129-
log::debug!("Closing publish channel.");
130-
self.publish_rx.close();
105+
log::debug!("Closing task response channel.");
106+
self.task_output_rx.close();
131107

132108
Ok(())
133109
}

compute/src/node/diagnostic.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,17 @@ impl DriaComputeNode {
4848

4949
log::info!("{}", diagnostics.join("\n "));
5050

51+
// check liveness of the node w.r.t last ping-pong time
5152
if self.last_pinged_at < Instant::now() - Duration::from_secs(PING_LIVENESS_SECS) {
5253
log::error!(
53-
"Node has not received any pings for at least {} seconds & it may be unreachable!\nPlease restart your node!",
54-
PING_LIVENESS_SECS
55-
);
54+
"Node has not received any pings for at least {} seconds & it may be unreachable!\nPlease restart your node!",
55+
PING_LIVENESS_SECS
56+
);
57+
}
58+
59+
// added rpc nodes check, sometimes this happens when API is down / bugs for some reason
60+
if self.dria_nodes.rpc_peerids.is_empty() {
61+
log::error!("No RPC peerids were found to be available, please restart your node!",);
5662
}
5763
}
5864

compute/src/node/gossipsub.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,10 @@ impl DriaComputeNode {
152152
mod tests {
153153
use super::*;
154154
use crate::DriaComputeNodeConfig;
155+
use tokio::time::sleep;
155156
use tokio_util::sync::CancellationToken;
156157

158+
// FIXME: test is failing
157159
#[tokio::test]
158160
#[ignore = "run this manually"]
159161
async fn test_publish_message() -> eyre::Result<()> {
@@ -173,11 +175,16 @@ mod tests {
173175
let p2p_task = tokio::spawn(async move { p2p.run().await });
174176

175177
// launch & wait for a while for connections
176-
log::info!("Waiting a bit for peer setup.");
177178
let run_cancellation = cancellation.clone();
179+
let wait_duration = tokio::time::Duration::from_secs(20);
180+
log::info!(
181+
"Waiting a bit ({}ms) for peer setup.",
182+
wait_duration.as_millis()
183+
);
184+
sleep(wait_duration).await;
178185
tokio::select! {
179186
_ = node.run(run_cancellation) => (),
180-
_ = tokio::time::sleep(tokio::time::Duration::from_secs(20)) => cancellation.cancel(),
187+
_ = tokio::time::sleep(wait_duration) => cancellation.cancel(),
181188
}
182189
log::info!("Connected Peers:\n{:#?}", node.peers().await?);
183190

compute/src/node/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,17 @@ pub struct DriaComputeNode {
3535
/// If this is too much, we can say that the node is not reachable by RPC.
3636
pub last_pinged_at: Instant,
3737
/// Gossipsub message receiver, used by peer-to-peer client in a separate thread.
38-
message_rx: mpsc::Receiver<(PeerId, MessageId, Message)>,
38+
///
39+
/// It will publish messages sent to this channel to the network.
40+
gossip_message_rx: mpsc::Receiver<(PeerId, MessageId, Message)>,
3941
/// Request-response request receiver.
4042
request_rx: mpsc::Receiver<(PeerId, Vec<u8>, ResponseChannel<Vec<u8>>)>,
41-
/// Publish receiver to receive messages to be published.
42-
publish_rx: mpsc::Receiver<TaskWorkerOutput>,
43+
/// Task response receiver, will respond to the request-response channel with the given result.
44+
task_output_rx: mpsc::Receiver<TaskWorkerOutput>,
4345
/// Task worker transmitter to send batchable tasks.
44-
task_batch_tx: Option<mpsc::Sender<TaskWorkerInput>>,
46+
task_request_batch_tx: Option<mpsc::Sender<TaskWorkerInput>>,
4547
/// Task worker transmitter to send single tasks.
46-
task_single_tx: Option<mpsc::Sender<TaskWorkerInput>>,
48+
task_request_single_tx: Option<mpsc::Sender<TaskWorkerInput>>,
4749
// Single tasks
4850
pending_tasks_single: HashMap<String, TaskWorkerMetadata>,
4951
// Batchable tasks
@@ -118,12 +120,12 @@ impl DriaComputeNode {
118120
p2p: p2p_commander,
119121
dria_nodes,
120122
// receivers
121-
publish_rx,
122-
message_rx,
123+
task_output_rx: publish_rx,
124+
gossip_message_rx: message_rx,
123125
request_rx,
124126
// transmitters
125-
task_batch_tx,
126-
task_single_tx,
127+
task_request_batch_tx: task_batch_tx,
128+
task_request_single_tx: task_single_tx,
127129
// task trackers
128130
pending_tasks_single: HashMap::new(),
129131
pending_tasks_batch: HashMap::new(),

compute/src/node/reqres.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use dkn_p2p::libp2p::{request_response::ResponseChannel, PeerId};
22
use eyre::{eyre, Result};
33

4-
use crate::reqres::*;
4+
use crate::{reqres::*, workers::task::TaskWorkerOutput};
55

66
use super::DriaComputeNode;
77

@@ -38,6 +38,7 @@ impl DriaComputeNode {
3838
Ok(())
3939
}
4040

41+
/// Handles a Specifications request received from the network.
4142
async fn handle_spec_request(
4243
&mut self,
4344
peer_id: PeerId,
@@ -73,6 +74,11 @@ impl DriaComputeNode {
7374
Ok(())
7475
}
7576

77+
/// Handles a Task request received from the network.
78+
///
79+
/// Based on the task type, the task is sent to the appropriate worker & metadata is stored in memory.
80+
/// This metadata will be used during response as well, and we can count the number of tasks at hand by
81+
/// looking at the number metadata stored.
7682
async fn handle_task_request(
7783
&mut self,
7884
peer_id: PeerId,
@@ -86,7 +92,7 @@ impl DriaComputeNode {
8692
if let Err(e) = match task_input.batchable {
8793
// this is a batchable task, send it to batch worker
8894
// and keep track of the task id in pending tasks
89-
true => match self.task_batch_tx {
95+
true => match self.task_request_batch_tx {
9096
Some(ref mut tx) => {
9197
self.pending_tasks_batch
9298
.insert(task_input.task_id.clone(), task_metadata);
@@ -101,7 +107,7 @@ impl DriaComputeNode {
101107

102108
// this is a single task, send it to single worker
103109
// and keep track of the task id in pending tasks
104-
false => match self.task_single_tx {
110+
false => match self.task_request_single_tx {
105111
Some(ref mut tx) => {
106112
self.pending_tasks_single
107113
.insert(task_input.task_id.clone(), task_metadata);
@@ -117,4 +123,31 @@ impl DriaComputeNode {
117123

118124
Ok(())
119125
}
126+
127+
pub(crate) async fn handle_task_response(
128+
&mut self,
129+
task_response: TaskWorkerOutput,
130+
) -> Result<()> {
131+
// remove the task from pending tasks, and get its metadata
132+
let task_metadata = match task_response.batchable {
133+
true => {
134+
self.completed_tasks_batch += 1; // TODO: this should be done in success
135+
self.pending_tasks_batch.remove(&task_response.task_id)
136+
}
137+
false => {
138+
self.completed_tasks_single += 1; // TODO: this should be done in success
139+
self.pending_tasks_single.remove(&task_response.task_id)
140+
}
141+
};
142+
143+
// respond to the response channel with the result
144+
match task_metadata {
145+
Some(channel) => {
146+
TaskResponder::handle_respond(self, task_response, channel).await?;
147+
}
148+
None => log::error!("Channel not found for task id: {}", task_response.task_id),
149+
};
150+
151+
Ok(())
152+
}
120153
}

0 commit comments

Comments
 (0)