Skip to content

Commit 85e4290

Browse files
committed
added versioning to protocols
1 parent 857c79b commit 85e4290

File tree

7 files changed

+91
-64
lines changed

7 files changed

+91
-64
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dkn-compute"
3-
version = "0.1.5"
3+
version = "0.1.6"
44
edition = "2021"
55
license = "Apache-2.0"
66
readme = "README.md"
@@ -31,6 +31,7 @@ url = "2.5.0"
3131
urlencoding = "2.1.3"
3232
uuid = { version = "1.8.0", features = ["v4"] }
3333
rand = "0.8.5"
34+
semver = "1.0.23"
3435

3536
# logging
3637
env_logger = "0.11.3"
@@ -64,11 +65,9 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "be2ed5
6465
"quic",
6566
"kad",
6667
] }
67-
6868
libp2p-identity = { version = "0.2.9", features = ["secp256k1"] }
6969
tracing = { version = "0.1.40" }
7070
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
71-
semver = "1.0.23"
7271

7372

7473
[dev-dependencies]

src/config/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,20 @@ impl DriaComputeNodeConfig {
105105
}
106106
}
107107

108-
/// Check if the required compute services are running, e.g. if Ollama
109-
/// is detected as a provider for the chosen models, it will check that
110-
/// Ollama is running.
108+
/// Check if the required compute services are running.
109+
/// This has several steps:
110+
///
111+
/// - If Ollama models are used, hardcoded models are checked locally, and for
112+
/// external models, the workflow is tested with a simple task with timeout.
113+
/// - If OpenAI models are used, the API key is checked and the models are tested
114+
///
115+
/// If both type of models are used, both services are checked.
116+
/// In the end, bad models are filtered out and we simply check if we are left if any valid models at all.
117+
/// If not, an error is returned.
111118
pub async fn check_services(&mut self) -> Result<(), String> {
112119
log::info!("Checking configured services.");
120+
121+
// TODO: can refactor (provider, model) logic here
113122
let unique_providers = self.model_config.get_providers();
114123

115124
let mut good_models = Vec::new();
@@ -121,8 +130,10 @@ impl DriaComputeNodeConfig {
121130
.get_models_for_provider(ModelProvider::Ollama);
122131

123132
// ensure that the models are pulled / pull them if not
124-
let timeout = Duration::from_secs(30);
125-
let good_ollama_models = self.ollama_config.check(ollama_models, timeout).await?;
133+
let good_ollama_models = self
134+
.ollama_config
135+
.check(ollama_models, Duration::from_secs(30))
136+
.await?;
126137
good_models.extend(
127138
good_ollama_models
128139
.into_iter()

src/config/ollama.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,12 @@ impl OllamaConfig {
9191
// check hardcoded models & pull them if available
9292
// these are not used directly by the user, but are needed for the workflows
9393
log::debug!("Checking hardcoded models: {:#?}", self.hardcoded_models);
94+
// only check if model is contained in local_models
95+
// we dont check workflows for hardcoded models
9496
for model in &self.hardcoded_models {
9597
if !local_models.contains(model) {
9698
self.try_pull(&ollama, model.to_owned()).await?;
9799
}
98-
99-
// we dont check workflows for hardcoded models
100100
}
101101

102102
// check external models & pull them if available
@@ -107,10 +107,10 @@ impl OllamaConfig {
107107
self.try_pull(&ollama, model.to_string()).await?;
108108
}
109109

110-
let ok = self
110+
if self
111111
.test_workflow(model.clone(), test_workflow_timeout)
112-
.await;
113-
if ok {
112+
.await
113+
{
114114
good_models.push(model);
115115
}
116116
}
@@ -207,11 +207,11 @@ impl OllamaConfig {
207207
let mut memory = ProgramMemory::new();
208208
tokio::select! {
209209
_ = tokio::time::sleep(timeout) => {
210-
log::warn!("Ignoring model {}: Timeout", model);
210+
log::warn!("Ignoring model {}: Workflow timed out", model);
211211
},
212212
result = executor.execute(None, workflow, &mut memory) => {
213213
if result.is_empty() {
214-
log::warn!("Ignoring model {}: Empty Result", model);
214+
log::warn!("Ignoring model {}: Workflow returned empty result", model);
215215
} else {
216216
log::info!("Accepting model {}", model);
217217
return true;

src/p2p/behaviour.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour {
115115
MessageId::from(hasher.finish().to_string())
116116
};
117117

118+
// TODO: add data transform here later
119+
118120
Behaviour::new(
119121
MessageAuthenticity::Author(author),
120122
ConfigBuilder::default()

src/p2p/client.rs

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::p2p::{AvailableNodes, P2P_PROTOCOL_STRING};
1+
use crate::p2p::AvailableNodes;
22
use crate::DRIA_COMPUTE_NODE_VERSION;
33
use libp2p::futures::StreamExt;
44
use libp2p::gossipsub::{
@@ -14,7 +14,11 @@ use semver::Version;
1414
use tokio::time::Duration;
1515
use tokio::time::Instant;
1616

17-
use super::{DriaBehaviour, DriaBehaviourEvent, P2P_KADEMLIA_PROTOCOL};
17+
use super::{DriaBehaviour, DriaBehaviourEvent};
18+
19+
/// Used as default for unparsable versions.
20+
/// Will not match with any valid version.
21+
const ZERO_VERSION: Version = Version::new(0, 0, 0);
1822

1923
/// Underlying libp2p client.
2024
pub struct P2PClient {
@@ -247,67 +251,55 @@ impl P2PClient {
247251
///
248252
/// - For Kademlia, we check the kademlia protocol and then add the address to the Kademlia routing table.
249253
fn handle_identify_event(&mut self, peer_id: PeerId, info: identify::Info) {
254+
// we only care about the observed address, although there may be other addresses at `info.listen_addrs`
250255
let addr = info.observed_addr;
251256

252257
// check protocol string
253-
let (left, right) = info.protocol_version.split_at(5); // equals "dria/".len()
254-
if left != "dria/" {
258+
let protocol_ok = self.check_version_with_prefix(&info.protocol_version, "/dria/");
259+
if !protocol_ok {
255260
log::warn!(
256-
"Identify: Peer {} is from different protocol: (have {}, want {})",
261+
"Identify: Peer {} has different Identify protocol: (have {}, want {})",
257262
peer_id,
258263
info.protocol_version,
259-
P2P_PROTOCOL_STRING
264+
self.version
260265
);
261266
return;
262267
}
263268

264-
// check version
265-
match semver::Version::parse(right) {
266-
Ok(peer_version) => {
267-
if peer_version.minor != self.version.minor {
268-
log::warn!(
269-
"Identify: Peer {} has different version: (have {}, want {})",
270-
peer_id,
271-
peer_version,
272-
self.version
273-
);
274-
return;
275-
}
276-
}
277-
Err(err) => {
278-
log::error!(
279-
"Identify: Peer {} version could not be parsed: {}",
280-
peer_id,
281-
err,
282-
);
283-
return;
284-
}
285-
}
269+
// check kademlia protocol
270+
if let Some(kad_protocol) = info
271+
.protocols
272+
.iter()
273+
.find(|p| p.to_string().starts_with("/dria/kad/"))
274+
{
275+
let protocol_ok =
276+
self.check_version_with_prefix(&kad_protocol.to_string(), "/dria/kad/");
286277

287-
// we only care about the observed address, although there may be other addresses at `info.listen_addrs`
288-
if info.protocols.iter().any(|p| *p == P2P_KADEMLIA_PROTOCOL) {
289278
// if it matches our protocol, add it to the Kademlia routing table
290-
log::info!(
291-
"Identify: {} peer {} identified at {}",
292-
info.protocol_version,
293-
peer_id,
294-
addr
295-
);
279+
if protocol_ok {
280+
log::info!(
281+
"Identify: {} peer {} identified at {}",
282+
kad_protocol,
283+
peer_id,
284+
addr
285+
);
296286

297-
self.swarm
298-
.behaviour_mut()
299-
.kademlia
300-
.add_address(&peer_id, addr);
301-
} else {
302-
log::trace!(
303-
"Identify: Incoming from different protocol, address {}. PeerID is {}",
304-
addr,
305-
peer_id
306-
);
287+
self.swarm
288+
.behaviour_mut()
289+
.kademlia
290+
.add_address(&peer_id, addr);
291+
} else {
292+
log::warn!(
293+
"Identify: Peer {} has different Kademlia version: (have {}, want {})",
294+
peer_id,
295+
kad_protocol,
296+
self.version
297+
);
298+
}
307299
}
308300
}
309301

310-
/// Handles the results of a Kademlia closest peers search, either adding peers to Gossipsub or logging timeout errors.
302+
/// Handles the results of a Kademlia closest peers search, simply logs it.
311303
fn handle_closest_peers_result(
312304
&mut self,
313305
result: Result<GetClosestPeersOk, GetClosestPeersError>,
@@ -373,4 +365,22 @@ impl P2PClient {
373365
}
374366
}
375367
}
368+
369+
/// Generic function to split a string such as `prefix || version` and check that the major & minor versions are the same.
370+
///
371+
/// Some examples:
372+
/// - `self.check_version_with_prefix("dria/")` for identity
373+
/// - `self.check_version_with_prefix("dria/kad/")` for Kademlia
374+
///
375+
/// Returns whether the version is ok.
376+
fn check_version_with_prefix(&self, p: &str, prefix: &str) -> bool {
377+
let parsed_version = p
378+
.strip_prefix(prefix)
379+
.and_then(|v| Version::parse(v).ok())
380+
.unwrap_or(ZERO_VERSION);
381+
382+
p.starts_with(prefix)
383+
&& parsed_version.major == self.version.major
384+
&& parsed_version.minor == self.version.minor
385+
}
376386
}

src/p2p/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
use libp2p::StreamProtocol;
22

3-
pub(crate) const P2P_KADEMLIA_PROTOCOL: StreamProtocol = StreamProtocol::new("/dria/kad/1.0.0");
3+
/// Kademlia protocol version, in the form of `/dria/kad/<version>`.
4+
/// Notice the `/` at the start.
5+
pub(crate) const P2P_KADEMLIA_PROTOCOL: StreamProtocol =
6+
StreamProtocol::new(concat!("/dria/kad/", env!("CARGO_PKG_VERSION")));
7+
8+
/// Kademlia protocol version, in the form of `dria/<version>`.
49
pub(crate) const P2P_PROTOCOL_STRING: &str = concat!("dria/", env!("CARGO_PKG_VERSION"));
510

611
mod behaviour;

0 commit comments

Comments
 (0)