Skip to content

Commit 5629e1a

Browse files
committed
slight renames, fix sleep with interval
1 parent e5b2671 commit 5629e1a

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

compute/src/handlers/pingpong.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl PingpongHandler {
6666
uuid: pingpong.uuid.clone(),
6767
models: node.config.workflows.models.clone(),
6868
timestamp: get_current_time_nanos(),
69-
tasks: node.task_count(),
69+
tasks: node.get_active_task_count(),
7070
};
7171

7272
// publish message

compute/src/node.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use crate::{
1616
workers::workflow::{WorkflowsWorker, WorkflowsWorkerInput, WorkflowsWorkerOutput},
1717
};
1818

19-
/// Number of seconds between refreshing the Kademlia DHT.
20-
const PEER_REFRESH_INTERVAL_SECS: u64 = 30;
19+
/// Number of seconds between refreshing for diagnostic prints.
20+
const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30;
2121
/// Number of seconds between refreshing the available nodes.
2222
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 30 * 60; // 30 minutes
2323

@@ -123,7 +123,7 @@ impl DriaComputeNode {
123123
}
124124

125125
/// Returns the task count within the channels, `single` and `batch`.
126-
pub fn task_count(&self) -> [usize; 2] {
126+
pub fn get_active_task_count(&self) -> [usize; 2] {
127127
[
128128
self.workflow_single_tx.max_capacity() - self.workflow_single_tx.capacity(),
129129
self.workflow_batch_tx.max_capacity() - self.workflow_batch_tx.capacity(),
@@ -251,9 +251,10 @@ impl DriaComputeNode {
251251
/// This method is not expected to return until cancellation occurs.
252252
pub async fn run(&mut self) -> Result<()> {
253253
// prepare durations for sleeps
254-
let peer_refresh_duration = Duration::from_secs(PEER_REFRESH_INTERVAL_SECS);
255-
let available_node_refresh_duration =
256-
Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS);
254+
let mut peer_refresh_interval =
255+
tokio::time::interval(Duration::from_secs(DIAGNOSTIC_REFRESH_INTERVAL_SECS));
256+
let mut available_node_refresh_interval =
257+
tokio::time::interval(Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS));
257258

258259
// subscribe to topics
259260
self.subscribe(PingpongHandler::LISTEN_TOPIC).await?;
@@ -264,9 +265,9 @@ impl DriaComputeNode {
264265
loop {
265266
tokio::select! {
266267
// check peer count every now and then
267-
_ = tokio::time::sleep(peer_refresh_duration) => self.handle_peer_refresh().await,
268+
_ = peer_refresh_interval.tick() => self.handle_diagnostic_refresh().await,
268269
// available nodes are refreshed every now and then
269-
_ = tokio::time::sleep(available_node_refresh_duration) => self.handle_available_nodes_refresh().await,
270+
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
270271
// a Workflow message to be published is received from the channel
271272
// this is expected to be sent by the workflow worker
272273
publish_msg = self.publish_batch_rx.recv() => {
@@ -336,11 +337,16 @@ impl DriaComputeNode {
336337
}
337338

338339
/// Peer refresh simply reports the peer count to the user.
339-
async fn handle_peer_refresh(&self) {
340+
async fn handle_diagnostic_refresh(&self) {
341+
// print peer counts
340342
match self.p2p.peer_counts().await {
341343
Ok((mesh, all)) => log::info!("Peer Count (mesh/all): {} / {}", mesh, all),
342344
Err(e) => log::error!("Error getting peer counts: {:?}", e),
343345
}
346+
347+
// print task counts
348+
let [single, batch] = self.get_active_task_count();
349+
log::info!("Active Task Count (single/batch): {} / {}", single, batch);
344350
}
345351

346352
/// Updates the local list of available nodes by refreshing it.

0 commit comments

Comments
 (0)