Skip to content

Commit 857c79b

Browse files
committed
added service check workflow tests & identify versioning, todo kad versioning
1 parent 26e3575 commit 857c79b

File tree

13 files changed

+328
-107
lines changed

13 files changed

+328
-107
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "be2ed5
6868
libp2p-identity = { version = "0.2.9", features = ["secp256k1"] }
6969
tracing = { version = "0.1.40" }
7070
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
71+
semver = "1.0.23"
7172

7273

7374
[dev-dependencies]

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828

2929
## About
3030

31-
A **Dria Compute Node** is a unit of computation within the Dria Knowledge Network. It's purpose is to process tasks given by the **Dria Admin Node**, and receive rewards for providing correct results.
32-
33-
To get started, [setup](#setup) your envrionment and then see [usage](#usage) to run the node.
31+
A **Dria Compute Node** is a unit of computation within the Dria Knowledge Network. It's purpose is to process tasks given by the **Dria Admin Node**. To get started, [setup](#setup) your envrionment and then see [usage](#usage) to run the node.
3432

3533
### Tasks
3634

src/config/mod.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use ollama::OllamaConfig;
99
use ollama_workflows::ModelProvider;
1010
use openai::OpenAIConfig;
1111

12-
use std::env;
12+
use std::{env, time::Duration};
1313

1414
#[derive(Debug, Clone)]
1515
pub struct DriaComputeNodeConfig {
@@ -108,31 +108,49 @@ impl DriaComputeNodeConfig {
108108
/// Check if the required compute services are running, e.g. if Ollama
109109
/// is detected as a provider for the chosen models, it will check that
110110
/// Ollama is running.
111-
pub async fn check_services(&self) -> Result<(), String> {
111+
pub async fn check_services(&mut self) -> Result<(), String> {
112112
log::info!("Checking configured services.");
113113
let unique_providers = self.model_config.get_providers();
114114

115+
let mut good_models = Vec::new();
116+
115117
// if Ollama is a provider, check that it is running & Ollama models are pulled (or pull them)
116118
if unique_providers.contains(&ModelProvider::Ollama) {
117119
let ollama_models = self
118120
.model_config
119121
.get_models_for_provider(ModelProvider::Ollama);
120-
self.ollama_config
121-
.check(ollama_models.into_iter().map(|m| m.to_string()).collect())
122-
.await?;
122+
123+
// ensure that the models are pulled / pull them if not
124+
let timeout = Duration::from_secs(30);
125+
let good_ollama_models = self.ollama_config.check(ollama_models, timeout).await?;
126+
good_models.extend(
127+
good_ollama_models
128+
.into_iter()
129+
.map(|m| (ModelProvider::Ollama, m)),
130+
);
123131
}
124132

125133
// if OpenAI is a provider, check that the API key is set
126134
if unique_providers.contains(&ModelProvider::OpenAI) {
127135
let openai_models = self
128136
.model_config
129137
.get_models_for_provider(ModelProvider::OpenAI);
130-
self.openai_config
131-
.check(openai_models.into_iter().map(|m| m.to_string()).collect())
132-
.await?;
138+
139+
let good_openai_models = self.openai_config.check(openai_models).await?;
140+
good_models.extend(
141+
good_openai_models
142+
.into_iter()
143+
.map(|m| (ModelProvider::OpenAI, m)),
144+
);
133145
}
134146

135-
Ok(())
147+
// update good models
148+
if good_models.is_empty() {
149+
return Err("No good models found, please check logs for errors.".into());
150+
} else {
151+
self.model_config.models = good_models;
152+
Ok(())
153+
}
136154
}
137155
}
138156

src/config/models.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl ModelConfig {
3838
Self { models }
3939
}
4040

41+
/// Returns the models that belong to a given providers from the config.
4142
pub fn get_models_for_provider(&self, provider: ModelProvider) -> Vec<Model> {
4243
self.models
4344
.iter()

src/config/ollama.rs

Lines changed: 145 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use ollama_workflows::ollama_rs::Ollama;
1+
use std::time::Duration;
2+
3+
use ollama_workflows::{ollama_rs::Ollama, Executor, Model, ProgramMemory, Workflow};
24

35
const DEFAULT_OLLAMA_HOST: &str = "http://127.0.0.1";
46
const DEFAULT_OLLAMA_PORT: u16 = 11434;
@@ -46,12 +48,11 @@ impl OllamaConfig {
4648
.unwrap_or(DEFAULT_OLLAMA_PORT);
4749

4850
// Ollama workflows may require specific models to be loaded regardless of the choices
49-
let hardcoded_models = HARDCODED_MODELS
50-
.into_iter()
51-
.map(|s| s.to_string())
52-
.collect();
51+
let hardcoded_models = HARDCODED_MODELS.iter().map(|s| s.to_string()).collect();
5352

54-
let auto_pull = std::env::var("OLLAMA_AUTO_PULL").unwrap_or_default() == "true";
53+
let auto_pull = std::env::var("OLLAMA_AUTO_PULL")
54+
.map(|s| s == "true")
55+
.unwrap_or_default();
5556

5657
Self {
5758
host,
@@ -61,19 +62,20 @@ impl OllamaConfig {
6162
}
6263
}
6364

64-
/// Check if requested models exist.
65-
pub async fn check(&self, external_models: Vec<String>) -> Result<(), String> {
65+
/// Check if requested models exist in Ollama, and then tests them using a workflow.
66+
pub async fn check(
67+
&self,
68+
external_models: Vec<Model>,
69+
test_workflow_timeout: Duration,
70+
) -> Result<Vec<Model>, String> {
6671
log::info!(
67-
"Checking Ollama requirements (auto-pull {})",
68-
if self.auto_pull { "on" } else { "off" }
72+
"Checking Ollama requirements (auto-pull {}, workflow timeout: {}s)",
73+
if self.auto_pull { "on" } else { "off" },
74+
test_workflow_timeout.as_secs()
6975
);
7076

7177
let ollama = Ollama::new(&self.host, self.port);
7278

73-
// the list of required models is those given in DKN_MODELS and the hardcoded ones
74-
let mut required_models = self.hardcoded_models.clone();
75-
required_models.extend(external_models);
76-
7779
// fetch local models
7880
let local_models = match ollama.list_local_models().await {
7981
Ok(models) => models.into_iter().map(|m| m.name).collect::<Vec<_>>(),
@@ -84,34 +86,139 @@ impl OllamaConfig {
8486
}
8587
}
8688
};
89+
log::info!("Found local Ollama models: {:#?}", local_models);
90+
91+
// check hardcoded models & pull them if available
92+
// these are not used directly by the user, but are needed for the workflows
93+
log::debug!("Checking hardcoded models: {:#?}", self.hardcoded_models);
94+
for model in &self.hardcoded_models {
95+
if !local_models.contains(model) {
96+
self.try_pull(&ollama, model.to_owned()).await?;
97+
}
98+
99+
// we dont check workflows for hardcoded models
100+
}
101+
102+
// check external models & pull them if available
103+
// and also run a test workflow for them
104+
let mut good_models = Vec::new();
105+
for model in external_models {
106+
if !local_models.contains(&model.to_string()) {
107+
self.try_pull(&ollama, model.to_string()).await?;
108+
}
109+
110+
let ok = self
111+
.test_workflow(model.clone(), test_workflow_timeout)
112+
.await;
113+
if ok {
114+
good_models.push(model);
115+
}
116+
}
117+
118+
log::info!(
119+
"Ollama checks are finished, using models: {:#?}",
120+
good_models
121+
);
122+
Ok(good_models)
123+
}
87124

88-
// check that each required model exists here
89-
log::debug!("Checking required models: {:#?}", required_models);
90-
log::debug!("Found local models: {:#?}", local_models);
91-
for model in required_models {
92-
if !local_models.iter().any(|m| *m == model) {
93-
log::warn!("Model {} not found in Ollama", model);
94-
if self.auto_pull {
95-
// if auto-pull is enabled, pull the model
96-
log::info!(
97-
"Downloading missing model {} (this may take a while)",
98-
model
99-
);
100-
let status = ollama
101-
.pull_model(model, false)
102-
.await
103-
.map_err(|e| format!("Error pulling model with Ollama: {}", e))?;
104-
log::debug!("Pulled model with Ollama, final status: {:#?}", status);
125+
/// Pulls a model if `auto_pull` exists, otherwise returns an error.
126+
async fn try_pull(&self, ollama: &Ollama, model: String) -> Result<(), String> {
127+
log::warn!("Model {} not found in Ollama", model);
128+
if self.auto_pull {
129+
// if auto-pull is enabled, pull the model
130+
log::info!(
131+
"Downloading missing model {} (this may take a while)",
132+
model
133+
);
134+
let status = ollama
135+
.pull_model(model, false)
136+
.await
137+
.map_err(|e| format!("Error pulling model with Ollama: {}", e))?;
138+
log::debug!("Pulled model with Ollama, final status: {:#?}", status);
139+
Ok(())
140+
} else {
141+
// otherwise, give error
142+
log::error!("Please download missing model with: ollama pull {}", model);
143+
log::error!("Or, set OLLAMA_AUTO_PULL=true to pull automatically.");
144+
return Err("Required model not pulled in Ollama.".into());
145+
}
146+
}
147+
148+
/// Runs a small workflow to test Ollama Workflows.
149+
///
150+
/// This is to see if a given system can execute Ollama workflows for their chosen models,
151+
/// e.g. if they have enough RAM/CPU and such.
152+
pub async fn test_workflow(&self, model: Model, timeout: Duration) -> bool {
153+
// this is the test workflow that we will run
154+
// TODO: when Workflow's have `Clone`, we can remove the repetitive parsing here
155+
let workflow = serde_json::from_value::<Workflow>(serde_json::json!({
156+
"name": "Simple",
157+
"description": "This is a simple workflow",
158+
"config":{
159+
"max_steps": 5,
160+
"max_time": 100,
161+
"max_tokens": 100,
162+
"tools": []
163+
},
164+
"tasks":[
165+
{
166+
"id": "A",
167+
"name": "Random Poem",
168+
"description": "Writes a poem about Kapadokya.",
169+
"prompt": "Please write a poem about Kapadokya.",
170+
"inputs":[],
171+
"operator": "generation",
172+
"outputs":[
173+
{
174+
"type": "write",
175+
"key": "poem",
176+
"value": "__result"
177+
}
178+
]
179+
},
180+
{
181+
"id": "__end",
182+
"name": "end",
183+
"description": "End of the task",
184+
"prompt": "End of the task",
185+
"inputs": [],
186+
"operator": "end",
187+
"outputs": []
188+
}
189+
],
190+
"steps":[
191+
{
192+
"source":"A",
193+
"target":"end"
194+
}
195+
],
196+
"return_value":{
197+
"input":{
198+
"type": "read",
199+
"key": "poem"
200+
}
201+
}
202+
}))
203+
.expect("Preset workflow should be parsed");
204+
205+
log::info!("Testing model {}", model);
206+
let executor = Executor::new_at(model.clone(), &self.host, self.port);
207+
let mut memory = ProgramMemory::new();
208+
tokio::select! {
209+
_ = tokio::time::sleep(timeout) => {
210+
log::warn!("Ignoring model {}: Timeout", model);
211+
},
212+
result = executor.execute(None, workflow, &mut memory) => {
213+
if result.is_empty() {
214+
log::warn!("Ignoring model {}: Empty Result", model);
105215
} else {
106-
// otherwise, give error
107-
log::error!("Please download it with: ollama pull {}", model);
108-
log::error!("Or, set OLLAMA_AUTO_PULL=true to pull automatically.");
109-
return Err("Required model not pulled in Ollama.".into());
216+
log::info!("Accepting model {}", model);
217+
return true;
110218
}
111219
}
112-
}
220+
};
113221

114-
log::info!("Ollama setup is all good.",);
115-
Ok(())
222+
return false;
116223
}
117224
}

src/config/openai.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![allow(unused)]
22

3+
use ollama_workflows::Model;
34
use serde::Deserialize;
45

56
const OPENAI_API_KEY: &str = "OPENAI_API_KEY";
@@ -38,14 +39,18 @@ impl OpenAIConfig {
3839
Self { api_key }
3940
}
4041

41-
/// Check if requested models exist.
42-
pub async fn check(&self, models: Vec<String>) -> Result<(), String> {
42+
/// Check if requested models exist &
43+
///
44+
///
45+
pub async fn check(&self, models: Vec<Model>) -> Result<Vec<Model>, String> {
4346
log::info!("Checking OpenAI requirements");
4447

48+
// check API key
4549
let Some(api_key) = &self.api_key else {
4650
return Err("OpenAI API key not found".into());
4751
};
4852

53+
// fetch models
4954
let client = reqwest::Client::new();
5055
let request = client
5156
.get(OPENAI_MODELS_API)
@@ -58,25 +63,40 @@ impl OpenAIConfig {
5863
.await
5964
.map_err(|e| format!("Failed to send request: {}", e))?;
6065

66+
// parse response
6167
if response.status().is_client_error() {
6268
return Err(format!(
6369
"Failed to fetch OpenAI models:\n{}",
6470
response.text().await.unwrap_or_default()
6571
));
6672
}
73+
let openai_models = response
74+
.json::<OpenAIModelsResponse>()
75+
.await
76+
.map_err(|e| e.to_string())?;
6777

68-
let openai_models = response.json::<OpenAIModelsResponse>().await.unwrap();
78+
// check if models exist and select those that are available
79+
let mut available_models = Vec::new();
6980
for requested_model in models {
70-
if !openai_models.data.iter().any(|m| m.id == requested_model) {
71-
return Err(format!(
72-
"Model {} not found in your OpenAI account.",
81+
if !openai_models
82+
.data
83+
.iter()
84+
.any(|m| m.id == requested_model.to_string())
85+
{
86+
log::warn!(
87+
"Model {} not found in your OpenAI account, ignoring it.",
7388
requested_model
74-
));
89+
);
90+
} else {
91+
available_models.push(requested_model);
7592
}
7693
}
7794

78-
log::info!("OpenAI setup is all good.",);
79-
Ok(())
95+
log::info!(
96+
"OpenAI checks are finished, using models: {:#?}",
97+
available_models
98+
);
99+
Ok(available_models)
80100
}
81101
}
82102

0 commit comments

Comments
 (0)