Skip to content

Commit d3ca0b8

Browse files
committed
more tidyups and sanitizations
1 parent 6de5628 commit d3ca0b8

File tree

13 files changed

+76
-105
lines changed

13 files changed

+76
-105
lines changed

Makefile

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ profile-mem:
3737
version:
3838
@cargo pkgid | cut -d@ -f2
3939

40-
.PHONY: ollama-cpu # | Run Ollama CPU container
41-
ollama-cpu:
42-
docker run -p=11434:11434 -v=${HOME}/.ollama:/root/.ollama ollama/ollama
43-
4440
###############################################################################
4541
.PHONY: test # | Run tests
4642
test:
@@ -58,7 +54,7 @@ format:
5854
###############################################################################
5955
.PHONY: docs # | Generate & open crate documentation
6056
docs:
61-
cargo doc --open --no-deps
57+
cargo doc --open --no-deps --document-private-items
6258

6359
.PHONY: env # | Print active environment
6460
env:

src/config/ollama.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::time::Duration;
2-
31
use eyre::{eyre, Result};
42
use ollama_workflows::{
53
ollama_rs::{
@@ -12,6 +10,7 @@ use ollama_workflows::{
1210
},
1311
Model,
1412
};
13+
use std::time::Duration;
1514

1615
const DEFAULT_OLLAMA_HOST: &str = "http://127.0.0.1";
1716
const DEFAULT_OLLAMA_PORT: u16 = 11434;

src/handlers/workflow.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl ComputeHandler for WorkflowHandler {
9292
.map(|prompt| Entry::try_value_or_str(&prompt));
9393

9494
// execute workflow with cancellation
95+
// TODO: is there a better way to handle this?
9596
let result: String;
9697
tokio::select! {
9798
_ = node.cancellation.cancelled() => {
@@ -111,7 +112,17 @@ impl ComputeHandler for WorkflowHandler {
111112
}
112113

113114
// publish the result
114-
node.send_result(result_topic, &task.public_key, &task.task_id, result)?;
115+
let payload = P2PMessage::new_signed_encrypted_payload(
116+
result,
117+
&task.task_id,
118+
&task.public_key,
119+
&node.config.secret_key,
120+
)?;
121+
let payload_str = payload.to_string()?;
122+
let message = P2PMessage::new(payload_str, result_topic);
123+
124+
node.publish(message)?;
125+
115126
Ok(MessageAcceptance::Accept)
116127
}
117128
}

src/main.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use dkn_compute::{DriaComputeNode, DriaComputeNodeConfig};
1+
use dkn_compute::*;
22
use eyre::Result;
33
use tokio_util::sync::CancellationToken;
44

@@ -17,12 +17,11 @@ async fn main() -> Result<()> {
1717
1818
██████╗ ██████╗ ██╗ █████╗
1919
██╔══██╗██╔══██╗██║██╔══██╗ Dria Compute Node
20-
██║ ██║██████╔╝██║███████║ v{}
20+
██║ ██║██████╔╝██║███████║ v{DRIA_COMPUTE_NODE_VERSION}
2121
██║ ██║██╔══██╗██║██╔══██║ https://dria.co
2222
██████╔╝██║ ██║██║██║ ██║
2323
╚═════╝ ╚═╝ ╚═╝╚═╝╚═╝ ╚═╝
2424
"#,
25-
dkn_compute::DRIA_COMPUTE_NODE_VERSION
2625
);
2726

2827
let token = CancellationToken::new();

src/node.rs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ impl DriaComputeNode {
185185
// validate the message based on the result
186186
match handle_result {
187187
Ok(acceptance) => {
188-
189188
self.p2p.validate_message(&message_id, &peer_id, acceptance)?;
190189
},
191190
Err(err) => {
@@ -235,32 +234,13 @@ impl DriaComputeNode {
235234

236235
// check dria signature
237236
// NOTE: when we have many public keys, we should check the signature against all of them
237+
// TODO: public key here will be given dynamically
238238
if !message.is_signed(&self.config.admin_public_key)? {
239239
return Err(eyre!("Invalid signature."));
240240
}
241241

242242
Ok(message)
243243
}
244-
245-
/// Given a task with `id` and respective `public_key`, sign-then-encrypt the result.
246-
pub fn send_result<R: AsRef<[u8]>>(
247-
&mut self,
248-
response_topic: &str,
249-
public_key: &[u8],
250-
task_id: &str,
251-
result: R,
252-
) -> Result<()> {
253-
let payload = P2PMessage::new_signed_encrypted_payload(
254-
result.as_ref(),
255-
task_id,
256-
public_key,
257-
&self.config.secret_key,
258-
)?;
259-
let payload_str = payload.to_string()?;
260-
let message = P2PMessage::new(payload_str, response_topic);
261-
262-
self.publish(message)
263-
}
264244
}
265245

266246
#[cfg(test)]

src/p2p/client.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ use libp2p_identity::Keypair;
1212
use std::time::Duration;
1313
use std::time::Instant;
1414

15+
use super::*;
1516
use crate::utils::AvailableNodes;
1617

17-
use super::{DriaBehaviour, DriaBehaviourEvent, P2P_KADEMLIA_PROTOCOL, P2P_PROTOCOL_STRING};
18-
19-
/// Underlying libp2p client.
18+
/// P2P client, exposes a simple interface to handle P2P communication.
2019
pub struct P2PClient {
2120
/// `Swarm` instance, everything is accesses through this one.
2221
swarm: Swarm<DriaBehaviour>,
23-
/// Peer count for (All, Mesh).
22+
/// Peer count for All and Mesh peers.
23+
///
24+
/// Mesh usually contains much fewer peers than All, as they are the ones
25+
/// used for actual gossipping.
2426
peer_count: (usize, usize),
2527
/// Last time the peer count was refreshed.
2628
peer_last_refreshed: Instant,
@@ -253,7 +255,7 @@ impl P2PClient {
253255
if let Some(kad_protocol) = info
254256
.protocols
255257
.iter()
256-
.find(|p| p.to_string().starts_with(P2P_KADEMLIA_PREFIX!()))
258+
.find(|p| p.to_string().starts_with(P2P_KADEMLIA_PREFIX))
257259
{
258260
// if it matches our protocol, add it to the Kademlia routing table
259261
if *kad_protocol == P2P_KADEMLIA_PROTOCOL {

src/p2p/data_transform.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
///! https://docs.rs/libp2p-gossipsub/latest/libp2p_gossipsub/trait.DataTransform.html
12
use libp2p::gossipsub::{DataTransform, Message, RawMessage, TopicHash};
23
use std::io::{Error, ErrorKind};
34
use std::time::{SystemTime, UNIX_EPOCH};

src/p2p/mod.rs

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,10 @@
1-
use libp2p::StreamProtocol;
2-
3-
/// Kademlia protocol prefix, as a macro so that it can be used in const macros.
4-
macro_rules! P2P_KADEMLIA_PREFIX {
5-
() => {
6-
"/dria/kad/"
7-
};
8-
}
9-
10-
/// Kademlia protocol prefix, as a macro so that it can be used in const macros.
11-
macro_rules! P2P_IDENTITY_PREFIX {
12-
() => {
13-
"dria/"
14-
};
15-
}
16-
17-
/// Kademlia protocol version, in the form of `/dria/kad/<version>`.
18-
/// Notice the `/` at the start.
19-
pub(crate) const P2P_KADEMLIA_PROTOCOL: StreamProtocol = StreamProtocol::new(concat!(
20-
P2P_KADEMLIA_PREFIX!(),
21-
env!("CARGO_PKG_VERSION_MAJOR"),
22-
".",
23-
env!("CARGO_PKG_VERSION_MINOR")
24-
));
25-
26-
/// Protocol string, checked by Identify protocol
27-
pub(crate) const P2P_PROTOCOL_STRING: &str = concat!(
28-
P2P_IDENTITY_PREFIX!(),
29-
env!("CARGO_PKG_VERSION_MAJOR"),
30-
".",
31-
env!("CARGO_PKG_VERSION_MINOR")
32-
);
33-
341
mod behaviour;
352
pub use behaviour::{DriaBehaviour, DriaBehaviourEvent};
363

374
mod client;
385
pub use client::P2PClient;
396

7+
mod versioning;
8+
pub use versioning::*;
9+
4010
mod data_transform;

src/p2p/versioning.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use libp2p::StreamProtocol;
2+
3+
/// Kademlia protocol prefix, as a macro so that it can be used in literal-expecting constants.
4+
macro_rules! P2P_KADEMLIA_PREFIX {
5+
() => {
6+
"/dria/kad/"
7+
};
8+
}
9+
pub const P2P_KADEMLIA_PREFIX: &str = P2P_KADEMLIA_PREFIX!();
10+
11+
/// Identity protocol name prefix, as a macro so that it can be used in literal-expecting constants.
12+
macro_rules! P2P_IDENTITY_PREFIX {
13+
() => {
14+
"dria/"
15+
};
16+
}
17+
18+
/// Kademlia protocol version, in the form of `/dria/kad/<version>`, **notice the `/` at the start**.
19+
///
20+
/// It is important that this protocol matches EXACTLY among the nodes, otherwise there is a protocol-level logic
21+
/// that will prevent peers from finding eachother within the DHT.
22+
pub const P2P_KADEMLIA_PROTOCOL: StreamProtocol = StreamProtocol::new(concat!(
23+
P2P_KADEMLIA_PREFIX!(),
24+
env!("CARGO_PKG_VERSION_MAJOR"),
25+
".",
26+
env!("CARGO_PKG_VERSION_MINOR")
27+
));
28+
29+
/// Protocol string, checked by Identify protocol handlers.
30+
pub const P2P_PROTOCOL_STRING: &str = concat!(
31+
P2P_IDENTITY_PREFIX!(),
32+
env!("CARGO_PKG_VERSION_MAJOR"),
33+
".",
34+
env!("CARGO_PKG_VERSION_MINOR")
35+
);

src/utils/crypto.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@ pub fn keccak256hash(data: impl AsRef<[u8]>) -> [u8; 32] {
2222
/// and then (x || y) is hashed using Keccak256. The last 20 bytes of this hash is taken as the address.
2323
#[inline]
2424
pub fn to_address(public_key: &PublicKey) -> [u8; 20] {
25-
let public_key_serial = &public_key.serialize()[1..];
25+
let public_key_xy = &public_key.serialize()[1..];
2626
let mut addr = [0u8; 20];
27-
addr.copy_from_slice(&keccak256hash(public_key_serial)[12..32]);
27+
addr.copy_from_slice(&keccak256hash(public_key_xy)[12..32]);
2828
addr
2929
}
3030

31-
// TODO: add peerId
32-
3331
/// Shorthand to sign a digest (bytes) with node's secret key and return signature & recovery id
3432
/// serialized to 65 byte hex-string.
3533
#[inline]

0 commit comments

Comments
 (0)