Skip to content

Commit f8f4318

Browse files
committed
task completion reports in the end, some readme rfks & removals
1 parent 0365ded commit f8f4318

File tree

10 files changed

+80
-139
lines changed

10 files changed

+80
-139
lines changed

.github/ISSUE_TEMPLATE/bug_report.md

Lines changed: 0 additions & 34 deletions
This file was deleted.

.github/ISSUE_TEMPLATE/feature_request.md

Lines changed: 0 additions & 28 deletions
This file was deleted.

README.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,11 @@
2828

2929
## About
3030

31-
A **Dria Compute Node** is a unit of computation within the Dria Knowledge Network. It's purpose is to process tasks given by the **Dria Admin Node**. To get started, see [node guide](./docs/NODE_GUIDE.md)!
32-
33-
### Tasks
34-
3531
Compute nodes can technically do any arbitrary task, from computing the square root of a given number to finding LLM outputs from a given prompt, or validating an LLM's output with respect to knowledge available on the web accessed via tools.
3632

37-
- **Ping/Pong**: Dria Admin Node broadcasts **ping** messages at a set interval, it is a required duty of the compute node to respond with a **pong** to these so that they can be included in the list of available nodes for task assignment. These tasks will respect the type of model provided within the pong message, e.g. if a task requires `gpt-4o` and you are running `phi3`, you won't be selected for that task.
33+
- **Heartbeats**: Every few seconds, a heartbeat ping is published into the network, and every compute node responds with a digitally-signed pong message to indicate that they are alive, along with additional information such as which nodes they are running & how many tasks they have so far.
3834

39-
- **Workflows**: Each task is given in the form of a workflow, based on [Ollama Workflows](https://github.com/andthattoo/ollama-workflows). In simple terms, each workflow defines the agentic behavior of an LLM, all captured in a single JSON file, and can represent things ranging from simple LLM generations to iterative web searching.
35+
- **Workflows**: Each task is given in the form of a [workflow](https://github.com/andthattoo/ollama-workflows). Every workflow defines an agentic behavior for the chosen LLM, all captured in a single JSON file, and can represent things ranging from simple LLM generations to iterative web searching & reasoning.
4036

4137
## Node Running
4238

compute/src/handlers/workflow.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,7 @@ impl WorkflowHandler {
5454

5555
// check task inclusion via the bloom filter
5656
if !task.filter.contains(&node.config.address)? {
57-
log::info!(
58-
"Task {} does not include this node within the filter.",
59-
task.task_id
60-
);
57+
log::info!("Task {} ignored due to filter.", task.task_id);
6158

6259
// accept the message, someone else may be included in filter
6360
return Ok(Either::Left(MessageAcceptance::Accept));

compute/src/main.rs

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker};
77
async fn main() -> Result<()> {
88
let dotenv_result = dotenvy::dotenv();
99

10-
// TODO: remove me later when the launcher is fixed
11-
amend_log_levels();
12-
1310
env_logger::builder()
1411
.format_timestamp(Some(env_logger::TimestampPrecision::Millis))
1512
.init();
@@ -75,21 +72,23 @@ async fn main() -> Result<()> {
7572
// create the node
7673
let (mut node, p2p, worker_batch, worker_single) = DriaComputeNode::new(config).await?;
7774

78-
// spawn threads
75+
// spawn p2p client first
7976
log::info!("Spawning peer-to-peer client thread.");
8077
task_tracker.spawn(async move { p2p.run().await });
8178

79+
// spawn batch worker thread if we are using such models (e.g. OpenAI, Gemini, OpenRouter)
8280
if let Some(mut worker_batch) = worker_batch {
8381
log::info!("Spawning workflows batch worker thread.");
8482
task_tracker.spawn(async move { worker_batch.run_batch().await });
8583
}
8684

85+
// spawn single worker thread if we are using such models (e.g. Ollama)
8786
if let Some(mut worker_single) = worker_single {
8887
log::info!("Spawning workflows single worker thread.");
8988
task_tracker.spawn(async move { worker_single.run().await });
9089
}
9190

92-
// launch the node in a separate thread
91+
// spawn compute node thread
9392
log::info!("Spawning compute node thread.");
9493
let node_token = cancellation.clone();
9594
task_tracker.spawn(async move {
@@ -165,37 +164,3 @@ async fn wait_for_termination(cancellation: CancellationToken) -> Result<()> {
165164

166165
Ok(())
167166
}
168-
169-
// #[deprecated]
170-
/// Very CRUDE fix due to launcher log level bug
171-
///
172-
/// TODO: remove me later when the launcher is fixed
173-
pub fn amend_log_levels() {
174-
if let Ok(rust_log) = std::env::var("RUST_LOG") {
175-
let log_level = if rust_log.contains("dkn_compute=info") {
176-
"info"
177-
} else if rust_log.contains("dkn_compute=debug") {
178-
"debug"
179-
} else if rust_log.contains("dkn_compute=trace") {
180-
"trace"
181-
} else {
182-
return;
183-
};
184-
185-
// check if it contains other log levels
186-
let mut new_rust_log = rust_log.clone();
187-
if !rust_log.contains("dkn_p2p") {
188-
new_rust_log = format!("{},{}={}", new_rust_log, "dkn_p2p", log_level);
189-
}
190-
if !rust_log.contains("dkn_workflows") {
191-
new_rust_log = format!("{},{}={}", new_rust_log, "dkn_workflows", log_level);
192-
}
193-
std::env::set_var("RUST_LOG", new_rust_log);
194-
} else {
195-
// TODO: use env_logger default function instead of this
196-
std::env::set_var(
197-
"RUST_LOG",
198-
"none,dkn_compute=info,dkn_p2p=info,dkn_workflows=info",
199-
);
200-
}
201-
}

compute/src/node.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ pub struct DriaComputeNode {
4242
pending_tasks_single: HashSet<String>,
4343
// Batch tasks hash-map
4444
pending_tasks_batch: HashSet<String>,
45+
/// Completed single tasks count
46+
completed_tasks_single: usize,
47+
/// Completed batch tasks count
48+
completed_tasks_batch: usize,
4549
}
4650

4751
impl DriaComputeNode {
@@ -114,6 +118,8 @@ impl DriaComputeNode {
114118
workflow_single_tx,
115119
pending_tasks_single: HashSet::new(),
116120
pending_tasks_batch: HashSet::new(),
121+
completed_tasks_single: 0,
122+
completed_tasks_batch: 0,
117123
},
118124
p2p_client,
119125
workflows_batch_worker,
@@ -317,8 +323,14 @@ impl DriaComputeNode {
317323
if let Some(publish_msg) = publish_msg_opt {
318324
// remove the task from pending tasks based on its batchability
319325
match publish_msg.batchable {
320-
true => self.pending_tasks_batch.remove(&publish_msg.task_id),
321-
false => self.pending_tasks_single.remove(&publish_msg.task_id),
326+
true => {
327+
self.completed_tasks_batch += 1;
328+
self.pending_tasks_batch.remove(&publish_msg.task_id);
329+
},
330+
false => {
331+
self.completed_tasks_single += 1;
332+
self.pending_tasks_single.remove(&publish_msg.task_id);
333+
}
322334
};
323335

324336
// publish the message
@@ -357,6 +369,9 @@ impl DriaComputeNode {
357369
self.unsubscribe(WorkflowHandler::LISTEN_TOPIC).await?;
358370
self.unsubscribe(WorkflowHandler::RESPONSE_TOPIC).await?;
359371

372+
// print one final diagnostic as a summary
373+
self.handle_diagnostic_refresh().await;
374+
360375
// shutdown channels
361376
self.shutdown().await?;
362377

@@ -385,9 +400,16 @@ impl DriaComputeNode {
385400
Err(e) => log::error!("Error getting peer counts: {:?}", e),
386401
}
387402

388-
// print task counts
403+
// print tasks count
389404
let [single, batch] = self.get_pending_task_count();
390-
log::info!("Pending Task Count (single/batch): {} / {}", single, batch);
405+
log::info!("Pending Tasks (single/batch): {} / {}", single, batch);
406+
407+
// completed tasks count
408+
log::debug!(
409+
"Completed Tasks (single/batch): {} / {}",
410+
self.completed_tasks_single,
411+
self.completed_tasks_batch
412+
);
391413
}
392414

393415
/// Updates the local list of available nodes by refreshing it.

compute/src/workers/workflow.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub struct WorkflowsWorkerOutput {
2626
pub batchable: bool,
2727
}
2828

29+
/// Workflows worker is a task executor that can process workflows in parallel / series.
30+
///
31+
/// It is expected to be spawned in another thread, with `run_batch` for batch processing and `run` for single processing.
2932
pub struct WorkflowsWorker {
3033
workflow_rx: mpsc::Receiver<WorkflowsWorkerInput>,
3134
publish_tx: mpsc::Sender<WorkflowsWorkerOutput>,
@@ -95,12 +98,6 @@ impl WorkflowsWorker {
9598
.workflow_rx
9699
.recv_many(&mut task_buffer, Self::BATCH_SIZE)
97100
.await;
98-
debug_assert!(
99-
num_tasks <= Self::BATCH_SIZE,
100-
"drain cant be larger than batch size"
101-
);
102-
// TODO: just to be sure, can be removed later
103-
debug_assert_eq!(num_tasks, task_buffer.len());
104101

105102
if num_tasks == 0 {
106103
return self.shutdown();
@@ -186,15 +183,14 @@ impl WorkflowsWorker {
186183
}
187184
_ => {
188185
unreachable!(
189-
"drain cant be larger than batch size ({} > {})",
186+
"number of tasks cant be larger than batch size ({} > {})",
190187
num_tasks,
191188
Self::BATCH_SIZE
192189
);
193190
}
194191
};
195192

196193
// publish all results
197-
// TODO: make this a part of executor as well
198194
log::info!("Publishing {} workflow results", results.len());
199195
for result in results {
200196
if let Err(e) = self.publish_tx.send(result).await {

p2p/src/client.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,15 @@ impl DriaP2PClient {
244244
let _ = sender.send((mesh, all));
245245
}
246246
DriaP2PCommand::Shutdown { sender } => {
247+
// close the command channel
247248
self.cmd_rx.close();
249+
250+
// remove own peerId from Kademlia DHT
251+
let peer_id = self.swarm.local_peer_id().clone();
252+
self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
253+
254+
// remove own peerId from Autonat server list
255+
self.swarm.behaviour_mut().autonat.remove_server(&peer_id);
248256
let _ = sender.send(());
249257
}
250258
}

workflows/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ Note that the underlying [Ollama Workflows](https://github.com/andthattoo/ollama
2020

2121
## Usage
2222

23-
DKN Workflows make use of several environment variables, respecting the providers.
23+
DKN Workflows make use of several environment variables, with respect to several model providers.
2424

25-
- `OLLAMA_HOST` is used to connect to Ollama server
26-
- `OLLAMA_PORT` is used to connect to Ollama server
25+
- `OLLAMA_HOST` is used to connect to **Ollama** server
26+
- `OLLAMA_PORT` is used to connect to **Ollama** server
2727
- `OLLAMA_AUTO_PULL` indicates whether we should pull missing models automatically or not
28-
- `OPENAI_API_KEY` is used for OpenAI requests
29-
- `GEMINI_API_KEY` is used for Gemini requests
28+
- `OPENAI_API_KEY` is used for **OpenAI** requests
29+
- `GEMINI_API_KEY` is used for **Gemini** requests
3030
- `SERPER_API_KEY` is optional API key to use **Serper**, for better Workflow executions
3131
- `JINA_API_KEY` is optional API key to use **Jina**, for better Workflow executions
3232

workflows/tests/models_test.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
use dkn_workflows::{DriaWorkflowsConfig, Model, ModelProvider};
22
use eyre::Result;
3-
use std::env;
43

5-
const LOG_LEVEL: &str = "none,dkn_workflows=debug";
4+
fn setup() {
5+
// read api key from .env
6+
let _ = dotenvy::dotenv();
7+
8+
// set logger
9+
let _ = env_logger::builder()
10+
.parse_filters("none,dkn_workflows=debug")
11+
.is_test(true)
12+
.try_init();
13+
}
614

715
#[tokio::test]
816
#[ignore = "requires Ollama"]
917
async fn test_ollama_check() -> Result<()> {
10-
env::set_var("RUST_LOG", LOG_LEVEL);
11-
let _ = env_logger::builder().is_test(true).try_init();
18+
setup();
1219

1320
let models = vec![Model::Phi3_5Mini];
1421
let mut model_config = DriaWorkflowsConfig::new(models);
@@ -25,9 +32,7 @@ async fn test_ollama_check() -> Result<()> {
2532
#[tokio::test]
2633
#[ignore = "requires OpenAI"]
2734
async fn test_openai_check() -> Result<()> {
28-
let _ = dotenvy::dotenv(); // read api key
29-
env::set_var("RUST_LOG", LOG_LEVEL);
30-
let _ = env_logger::builder().is_test(true).try_init();
35+
setup();
3136

3237
let models = vec![Model::GPT4Turbo];
3338
let mut model_config = DriaWorkflowsConfig::new(models);
@@ -41,11 +46,25 @@ async fn test_openai_check() -> Result<()> {
4146
}
4247

4348
#[tokio::test]
44-
async fn test_empty() -> Result<()> {
45-
let mut model_config = DriaWorkflowsConfig::new(vec![]);
49+
#[ignore = "requires Gemini"]
50+
async fn test_gemini_check() -> Result<()> {
51+
setup();
4652

47-
let result = model_config.check_services().await;
48-
assert!(result.is_err());
53+
let models = vec![Model::Gemini15Flash];
54+
let mut model_config = DriaWorkflowsConfig::new(models);
55+
model_config.check_services().await?;
4956

57+
assert_eq!(
58+
model_config.models[0],
59+
(ModelProvider::Gemini, Model::Gemini15Flash)
60+
);
5061
Ok(())
5162
}
63+
64+
#[tokio::test]
65+
async fn test_empty() {
66+
assert!(DriaWorkflowsConfig::new(vec![])
67+
.check_services()
68+
.await
69+
.is_err());
70+
}

0 commit comments

Comments
 (0)