Skip to content

Commit 03b0f4f

Browse files
committed
add peer print & fix tick issue
1 parent c0727b6 commit 03b0f4f

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

compute/src/node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,10 @@ impl DriaComputeNode {
319319
// prepare durations for sleeps
320320
let mut peer_refresh_interval =
321321
tokio::time::interval(Duration::from_secs(DIAGNOSTIC_REFRESH_INTERVAL_SECS));
322+
peer_refresh_interval.tick().await; // move one tick
322323
let mut available_node_refresh_interval =
323324
tokio::time::interval(Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS));
325+
available_node_refresh_interval.tick().await; // move one tick
324326

325327
// subscribe to topics
326328
self.subscribe(PingpongHandler::LISTEN_TOPIC).await?;

monitor/src/node.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tokio::sync::mpsc;
1717
use tokio_util::sync::CancellationToken;
1818

1919
const TASK_PRINT_INTERVAL_SECS: u64 = 20;
20+
const PEER_PRINT_INTERVAL_SECS: u64 = 40;
2021

2122
pub struct DriaMonitorNode {
2223
pub p2p: DriaP2PCommander,
@@ -39,13 +40,20 @@ impl DriaMonitorNode {
3940
results: HashMap::new(),
4041
}
4142
}
43+
44+
/// Setup the monitor node.
45+
///
46+
/// Subscribes to task topics.
4247
pub async fn setup(&self) -> Result<()> {
4348
self.p2p.subscribe(WorkflowHandler::LISTEN_TOPIC).await?;
4449
self.p2p.subscribe(WorkflowHandler::RESPONSE_TOPIC).await?;
4550

4651
Ok(())
4752
}
4853

54+
/// Shutdown the monitor node.
55+
///
56+
/// Unsubscribes from task topics, closes channels.
4957
pub async fn shutdown(&mut self) -> Result<()> {
5058
log::info!("Shutting down monitor");
5159
self.p2p.unsubscribe(WorkflowHandler::LISTEN_TOPIC).await?;
@@ -62,9 +70,16 @@ impl DriaMonitorNode {
6270
Ok(())
6371
}
6472

73+
/// Run the monitor node.
6574
pub async fn run(&mut self, token: CancellationToken) {
6675
let mut task_print_interval =
6776
tokio::time::interval(tokio::time::Duration::from_secs(TASK_PRINT_INTERVAL_SECS));
77+
let mut peer_print_interval =
78+
tokio::time::interval(tokio::time::Duration::from_secs(PEER_PRINT_INTERVAL_SECS));
79+
80+
// move one ticks
81+
task_print_interval.tick().await;
82+
peer_print_interval.tick().await;
6883

6984
loop {
7085
tokio::select! {
@@ -76,13 +91,27 @@ impl DriaMonitorNode {
7691
}
7792
None => break, // channel closed, we can return now
7893
},
79-
// print task counts
8094
_ = task_print_interval.tick() => self.handle_task_print(),
95+
_ = peer_print_interval.tick() => self.handle_peer_print().await,
8196
_ = token.cancelled() => break,
8297
}
8398
}
8499
}
85100

101+
async fn handle_peer_print(&self) {
102+
match self.p2p.peer_counts().await {
103+
Ok((mesh, all)) => {
104+
log::info!("Peer count: {} / {}", mesh, all);
105+
}
106+
Err(e) => {
107+
log::error!("Error getting peer counts: {:?}", e);
108+
}
109+
}
110+
}
111+
112+
/// Handle incoming gossipsub message.
113+
///
114+
/// Records the `task` and `result` messages only, does not respond to anything else.
86115
async fn handle_message(
87116
&mut self,
88117
(peer_id, message_id, gossipsub_message): (PeerId, MessageId, Message),
@@ -120,6 +149,7 @@ impl DriaMonitorNode {
120149
Ok(())
121150
}
122151

152+
/// Print the tasks (ids) that have not been responded to.
123153
fn handle_task_print(&self) {
124154
let seen_task_ids = self.tasks.keys().collect::<Vec<_>>();
125155
let seen_result_ids = self.results.keys().collect::<Vec<_>>();

0 commit comments

Comments
 (0)