Skip to content

Commit c0727b6

Browse files
committed
monitor as subcrate
1 parent 907d0d2 commit c0727b6

File tree

11 files changed

+145
-44
lines changed

11 files changed

+145
-44
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
[workspace]
22
resolver = "2"
3-
members = ["compute", "p2p", "workflows", "utils"]
4-
# compute node is the default member, until Oracle comes in
5-
# then, a Launcher will be the default member
6-
default-members = ["compute"]
3+
members = ["compute", "p2p", "workflows", "utils", "monitor"]
74

85
[workspace.package]
96
edition = "2021"

Makefile

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,38 @@ endif
77
###############################################################################
88
.PHONY: launch # | Run with INFO logs in release mode
99
launch:
10-
RUST_LOG=none,dkn_compute=info,dkn_workflows=info,dkn_p2p=info cargo run --release
10+
RUST_LOG=none,dkn_compute=info,dkn_workflows=info,dkn_p2p=info \
11+
cargo run --release --bin dkn-compute
1112

1213
.PHONY: run # | Run with INFO logs
1314
run:
14-
cargo run
15+
cargo run --bin dkn-compute
16+
17+
.PHONY: monitor # | Run monitor node with INFO logs
18+
monitor:
19+
cargo run --bin dkn-monitor
1520

1621
.PHONY: debug # | Run with DEBUG logs with INFO log-level workflows
1722
debug:
18-
RUST_LOG=warn,dkn_compute=debug,dkn_workflows=debug,dkn_p2p=debug,ollama_workflows=info cargo run
23+
RUST_LOG=warn,dkn_compute=debug,dkn_workflows=debug,dkn_p2p=debug,ollama_workflows=info \
24+
cargo run --bin dkn-compute
1925

2026
.PHONY: trace # | Run with TRACE logs
2127
trace:
22-
RUST_LOG=warn,dkn_compute=trace,libp2p=debug cargo run
28+
RUST_LOG=warn,dkn_compute=trace,libp2p=debug \
29+
cargo run --bin dkn-compute
2330

2431
.PHONY: build # | Build
2532
build:
2633
cargo build --workspace
2734

2835
.PHONY: profile-cpu # | Profile CPU usage with flamegraph
2936
profile-cpu:
30-
DKN_EXIT_TIMEOUT=120 cargo flamegraph --root --profile=profiling
37+
DKN_EXIT_TIMEOUT=120 cargo flamegraph --root --profile=profiling --bin dkn-compute
3138

3239
.PHONY: profile-mem # | Profile memory usage with instruments
3340
profile-mem:
34-
DKN_EXIT_TIMEOUT=120 cargo instruments --profile=profiling -t Allocations
41+
DKN_EXIT_TIMEOUT=120 cargo instruments --profile=profiling -t Allocations --bin dkn-compute
3542

3643
.PHONY: ollama-versions
3744
ollama-versions:

compute/src/handlers/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
mod pingpong;
2-
pub use pingpong::PingpongHandler;
2+
pub use pingpong::*;
33

44
mod workflow;
5-
pub use workflow::{WorkflowHandler, WorkflowPayload};
5+
pub use workflow::*;

compute/src/lib.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
pub(crate) mod config;
2-
pub(crate) mod handlers;
3-
pub(crate) mod monitor;
4-
pub(crate) mod node;
5-
pub(crate) mod payloads;
6-
pub(crate) mod utils;
7-
pub(crate) mod workers;
1+
pub mod config;
2+
pub mod handlers;
3+
pub mod node;
4+
pub mod payloads;
5+
pub mod utils;
6+
pub mod workers;
87

98
/// Crate version of the compute node.
109
/// This value is attached within the published messages.
@@ -14,5 +13,3 @@ pub use utils::refresh_dria_nodes;
1413

1514
pub use config::DriaComputeNodeConfig;
1615
pub use node::DriaComputeNode;
17-
18-
pub use monitor::DriaMonitorNode;

compute/src/payloads/request.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ pub struct TaskRequestPayload<T> {
88
/// The unique identifier of the task.
99
pub task_id: String,
1010
/// The deadline of the task in nanoseconds.
11-
pub(crate) deadline: u128,
11+
pub deadline: u128,
1212
/// The input to the compute function.
13-
pub(crate) input: T,
13+
pub input: T,
1414
/// The Bloom filter of the task.
15-
pub(crate) filter: TaskFilter,
15+
pub filter: TaskFilter,
1616
/// The public key of the requester, in hexadecimals.
17-
pub(crate) public_key: String,
17+
pub public_key: String,
1818
}

compute/src/utils/message.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@ use serde::{Deserialize, Serialize};
1212
#[derive(Serialize, Deserialize, Debug, Clone)]
1313
pub struct DriaMessage {
1414
/// Base64 encoded payload, stores the main result.
15-
pub(crate) payload: String,
15+
pub payload: String,
1616
/// The topic of the message, derived from `TopicHash`
1717
///
1818
/// NOTE: This can be obtained via `TopicHash` in GossipSub
19-
pub(crate) topic: String,
19+
pub topic: String,
2020
/// The version of the Dria Compute Node
2121
///
2222
/// NOTE: This can be obtained via Identify protocol version
23-
pub(crate) version: String,
23+
pub version: String,
2424
/// Identity protocol string of the Dria Compute Node
2525
#[serde(default)]
26-
pub(crate) identity: String,
26+
pub identity: String,
2727
/// The timestamp of the message, in nanoseconds
2828
///
2929
/// NOTE: This can be obtained via `DataTransform` in GossipSub
30-
pub(crate) timestamp: u128,
30+
pub timestamp: u128,
3131
}
3232

3333
/// 65-byte signature as hex characters take up 130 characters.
@@ -79,7 +79,7 @@ impl DriaMessage {
7979
}
8080

8181
/// Decodes and parses the base64 payload into JSON for the provided type `T`.
82-
pub(crate) fn parse_payload<T: for<'a> Deserialize<'a>>(&self, signed: bool) -> Result<T> {
82+
pub fn parse_payload<T: for<'a> Deserialize<'a>>(&self, signed: bool) -> Result<T> {
8383
let payload = self.decode_payload()?;
8484

8585
let body = if signed {

monitor/Cargo.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
[package]
2+
name = "dkn-monitor"
3+
version.workspace = true
4+
edition.workspace = true
5+
license.workspace = true
6+
readme = "README.md"
7+
authors = ["Erhan Tezcan <[email protected]>"]
8+
9+
[dependencies]
10+
# async stuff
11+
tokio-util.workspace = true
12+
tokio.workspace = true
13+
async-trait.workspace = true
14+
15+
# serialize & deserialize
16+
serde.workspace = true
17+
serde_json.workspace = true
18+
19+
# http & networking
20+
reqwest.workspace = true
21+
22+
# utilities
23+
dotenvy.workspace = true
24+
25+
# logging & errors
26+
env_logger.workspace = true
27+
log.workspace = true
28+
eyre.workspace = true
29+
30+
libsecp256k1 = "0.7.1"
31+
32+
# dria subcrates
33+
dkn-p2p = { path = "../p2p" }
34+
dkn-utils = { path = "../utils" }
35+
dkn-compute = { path = "../compute" }

compute/src/bin/monitor.rs renamed to monitor/src/main.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,38 @@
1-
use dkn_compute::{refresh_dria_nodes, DriaMonitorNode};
1+
use dkn_compute::refresh_dria_nodes;
22
use dkn_p2p::{
33
libp2p_identity::Keypair, DriaNetworkType, DriaNodes, DriaP2PClient, DriaP2PProtocol,
44
};
55
use tokio_util::sync::CancellationToken;
66

7+
mod node;
8+
use node::DriaMonitorNode;
9+
710
#[tokio::main]
811
async fn main() -> eyre::Result<()> {
912
dotenvy::dotenv().expect("could not load .env");
1013

1114
env_logger::builder()
1215
.filter(None, log::LevelFilter::Off)
13-
.filter_module("dkn_p2p", log::LevelFilter::Info)
16+
.filter_module("dkn_p2p", log::LevelFilter::Warn)
1417
.filter_module("dkn_compute", log::LevelFilter::Info)
15-
.filter_module("monitor", log::LevelFilter::Info)
18+
.filter_module("dkn_monitor", log::LevelFilter::Info)
1619
.parse_default_env() // reads RUST_LOG variable
1720
.init();
1821

19-
log::info!("Starting Dria Task Monitor");
20-
21-
let network = DriaNetworkType::Pro;
22+
let network = std::env::var("DKN_NETWORK")
23+
.map(|s| DriaNetworkType::from(s.as_str()))
24+
.unwrap_or(DriaNetworkType::Pro);
2225
let mut nodes = DriaNodes::new(network);
2326
refresh_dria_nodes(&mut nodes).await?;
2427

2528
// setup p2p client
29+
let listen_addr = "/ip4/0.0.0.0/tcp/4069".parse()?;
30+
log::info!("Listen Address: {}", listen_addr);
2631
let keypair = Keypair::generate_secp256k1();
2732
log::info!("PeerID: {}", keypair.public().to_peer_id());
2833
let (client, commander, msg_rx) = DriaP2PClient::new(
29-
Keypair::generate_secp256k1(),
30-
"/ip4/0.0.0.0/tcp/4069".parse()?,
34+
keypair,
35+
listen_addr,
3136
nodes.bootstrap_nodes.into_iter(),
3237
nodes.relay_nodes.into_iter(),
3338
nodes.rpc_nodes.into_iter(),
@@ -54,6 +59,11 @@ async fn main() -> eyre::Result<()> {
5459
});
5560

5661
// create monitor node
62+
log::info!(
63+
"Monitoring {} network (protocol: {}).",
64+
network,
65+
network.protocol_name()
66+
);
5767
let mut monitor = DriaMonitorNode::new(commander, msg_rx);
5868

5969
// setup monitor

compute/src/monitor.rs renamed to monitor/src/node.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::HashMap;
22

3-
use crate::{
3+
use dkn_compute::{
44
handlers::{WorkflowHandler, WorkflowPayload},
55
payloads::{TaskRequestPayload, TaskResponsePayload},
66
utils::DriaMessage,
@@ -56,6 +56,9 @@ impl DriaMonitorNode {
5656
self.p2p.shutdown().await?;
5757
self.msg_rx.close();
5858

59+
// print tasks one final time
60+
self.handle_task_print();
61+
5962
Ok(())
6063
}
6164

@@ -65,16 +68,16 @@ impl DriaMonitorNode {
6568

6669
loop {
6770
tokio::select! {
71+
// handle gossipsub message
6872
message = self.msg_rx.recv() => match message {
6973
Some(message) => match self.handle_message(message).await {
7074
Ok(_) => {}
7175
Err(e) => log::error!("Error handling message: {:?}", e),
7276
}
73-
None => break, // channel closed
77+
None => break, // channel closed, we can return now
7478
},
75-
_ = task_print_interval.tick() => {
76-
log::info!("Current seen tasks: {:#?}", self.tasks.keys().collect::<Vec<_>>());
77-
}
79+
// print task counts
80+
_ = task_print_interval.tick() => self.handle_task_print(),
7881
_ = token.cancelled() => break,
7982
}
8083
}
@@ -116,4 +119,26 @@ impl DriaMonitorNode {
116119
}
117120
Ok(())
118121
}
122+
123+
fn handle_task_print(&self) {
124+
let seen_task_ids = self.tasks.keys().collect::<Vec<_>>();
125+
let seen_result_ids = self.results.keys().collect::<Vec<_>>();
126+
127+
// print the tasks that have not been responded to
128+
let pending_tasks = seen_task_ids
129+
.iter()
130+
.filter(|id| !seen_result_ids.contains(*id))
131+
.map(|id| self.tasks.get(*id).unwrap())
132+
.collect::<Vec<_>>();
133+
134+
log::info!(
135+
"Pending tasks ({} / {}): {:#?}",
136+
pending_tasks.len(),
137+
self.tasks.len(),
138+
pending_tasks
139+
.iter()
140+
.map(|t| t.task_id.clone())
141+
.collect::<Vec<_>>()
142+
);
143+
}
119144
}

0 commit comments

Comments
 (0)