Skip to content

Commit af4a67d

Browse files
authored
Merge pull request #92 from firstbatchxyz/erhant/rpc-links-some-rfks
`AvailableNodes` logic, handler refactors, workflows re-export
2 parents 4c71e9a + a27c6ad commit af4a67d

File tree

17 files changed

+328
-171
lines changed

17 files changed

+328
-171
lines changed

.env.example

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
## DRIA (required) ##
2-
# Secret key of your compute node (32 byte, hexadecimal, without 0x prefix).
3-
# e.g.: DKN_WALLET_SECRET_KEY=ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
2+
# Secret key of your compute node, 32 byte in hexadecimal.
3+
# e.g.: DKN_WALLET_SECRET_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
44
DKN_WALLET_SECRET_KEY=
5-
# model1,model2,model3,... (comma separated, case-insensitive)
6-
DKN_MODELS=phi3:3.8b
7-
# Public key of Dria Admin node (33-byte compressed, hexadecimal, without 0x prefix).
5+
# Public key of Dria Admin node, 33-byte (compressed) in hexadecimal.
86
# You don't need to change this, simply copy and paste it.
97
DKN_ADMIN_PUBLIC_KEY=0208ef5e65a9c656a6f92fb2c770d5d5e2ecffe02a6aade19207f75110be6ae658
8+
# model1,model2,model3,... (comma separated, case-insensitive)
9+
DKN_MODELS=phi3:3.8b
1010

1111
## DRIA (optional) ##
1212
# info | debug | error | none,dkn_compute=debug
@@ -22,7 +22,7 @@ DKN_BOOTSTRAP_NODES=
2222
OPENAI_API_KEY=
2323

2424
## Ollama (if used, optional) ##
25-
# do not change the host, it is used by Docker
25+
# do not change this, it is used by Docker
2626
OLLAMA_HOST=http://host.docker.internal
2727
# you can change the port if you would like
2828
OLLAMA_PORT=11434

Cargo.lock

Lines changed: 16 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dkn-compute"
3-
version = "0.1.3"
3+
version = "0.1.4"
44
edition = "2021"
55
license = "Apache-2.0"
66
readme = "README.md"
@@ -12,6 +12,7 @@ parking_lot = "0.12.2"
1212
serde = { version = "1.0", features = ["derive"] }
1313
serde_json = "1.0"
1414
async-trait = "0.1.81"
15+
reqwest = "0.12.5"
1516

1617
# utilities
1718
base64 = "0.22.0"
@@ -34,8 +35,7 @@ sha3 = "0.10.8"
3435
fastbloom-rs = "0.5.9"
3536

3637
# workflows
37-
ollama-workflows = { git = "https://github.com/andthattoo/ollama-workflows", rev = "274b26e" }
38-
ollama-rs = "0.2.0"
38+
ollama-workflows = { git = "https://github.com/andthattoo/ollama-workflows", rev = "25467d2" }
3939

4040
# peer-to-peer
4141
libp2p = { version = "0.53", features = [

compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ services:
1616
JINA_API_KEY: ${JINA_API_KEY}
1717
OLLAMA_HOST: ${OLLAMA_HOST}
1818
OLLAMA_PORT: ${OLLAMA_PORT}
19-
OLLAMA_AUTO_PULL: ${OLLAMA_AUTO_PULL}
19+
OLLAMA_AUTO_PULL: ${OLLAMA_AUTO_PULL:-true}
2020
network_mode: "host"
2121
extra_hosts:
2222
# for Linux, we need to add this line manually

examples/common/ollama.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::SystemTime;
22

3-
use ollama_rs::{
3+
use ollama_workflows::ollama_rs::{
44
generation::completion::{request::GenerationRequest, GenerationResponse},
55
Ollama,
66
};

src/config/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ impl DriaComputeNodeConfig {
3737
pub fn new() -> Self {
3838
let secret_key = match env::var("DKN_WALLET_SECRET_KEY") {
3939
Ok(secret_env) => {
40-
let secret_dec =
41-
hex::decode(secret_env).expect("Secret key should be 32-bytes hex encoded.");
40+
let secret_dec = hex::decode(secret_env.trim_start_matches("0x"))
41+
.expect("Secret key should be 32-bytes hex encoded.");
4242
SecretKey::parse_slice(&secret_dec).expect("Secret key should be parseable.")
4343
}
4444
Err(err) => {
@@ -60,7 +60,7 @@ impl DriaComputeNodeConfig {
6060

6161
let admin_public_key = match env::var("DKN_ADMIN_PUBLIC_KEY") {
6262
Ok(admin_public_key) => {
63-
let pubkey_dec = hex::decode(admin_public_key)
63+
let pubkey_dec = hex::decode(admin_public_key.trim_start_matches("0x"))
6464
.expect("Admin public key should be 33-bytes hex encoded.");
6565
PublicKey::parse_slice(&pubkey_dec, None)
6666
.expect("Admin public key should be parseable.")

src/config/ollama.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use ollama_rs::Ollama;
1+
use ollama_workflows::ollama_rs::Ollama;
22

33
const DEFAULT_OLLAMA_HOST: &str = "http://127.0.0.1";
44
const DEFAULT_OLLAMA_PORT: u16 = 11434;

src/errors/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use ollama_rs::error::OllamaError;
1+
use ollama_workflows::ollama_rs::error::OllamaError;
22

33
/// Alias for `Result<T, NodeError>`.
44
pub type NodeResult<T> = std::result::Result<T, NodeError>;
@@ -96,6 +96,15 @@ impl From<libp2p::gossipsub::SubscriptionError> for NodeError {
9696
}
9797
}
9898

99+
impl From<reqwest::Error> for NodeError {
100+
fn from(value: reqwest::Error) -> Self {
101+
Self {
102+
message: value.to_string(),
103+
source: "reqwest".to_string(),
104+
}
105+
}
106+
}
107+
99108
impl From<libp2p::gossipsub::PublishError> for NodeError {
100109
fn from(value: libp2p::gossipsub::PublishError) -> Self {
101110
Self {

src/handlers/mod.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
1+
use async_trait::async_trait;
2+
use libp2p::gossipsub::MessageAcceptance;
3+
14
mod pingpong;
2-
pub use pingpong::HandlesPingpong;
5+
pub use pingpong::PingpongHandler;
36

47
mod workflow;
5-
pub use workflow::HandlesWorkflow;
8+
pub use workflow::WorkflowHandler;
9+
10+
use crate::{errors::NodeResult, p2p::P2PMessage, DriaComputeNode};
11+
12+
#[async_trait]
13+
pub trait ComputeHandler {
14+
async fn handle_compute(
15+
node: &mut DriaComputeNode,
16+
message: P2PMessage,
17+
result_topic: &str,
18+
) -> NodeResult<MessageAcceptance>;
19+
}

src/handlers/pingpong.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
use crate::{
22
errors::NodeResult, node::DriaComputeNode, p2p::P2PMessage, utils::get_current_time_nanos,
33
};
4+
use async_trait::async_trait;
45
use libp2p::gossipsub::MessageAcceptance;
56
use ollama_workflows::{Model, ModelProvider};
67
use serde::{Deserialize, Serialize};
78

9+
use super::ComputeHandler;
10+
11+
pub struct PingpongHandler;
12+
813
#[derive(Serialize, Deserialize, Debug, Clone)]
914
struct PingpongPayload {
1015
uuid: String,
@@ -18,19 +23,10 @@ struct PingpongResponse {
1823
pub(crate) timestamp: u128,
1924
}
2025

21-
/// A ping-pong is a message sent by a node to indicate that it is alive.
22-
/// Compute nodes listen to `pong` topic, and respond to `ping` topic.
23-
pub trait HandlesPingpong {
24-
fn handle_heartbeat(
25-
&mut self,
26-
message: P2PMessage,
27-
result_topic: &str,
28-
) -> NodeResult<MessageAcceptance>;
29-
}
30-
31-
impl HandlesPingpong for DriaComputeNode {
32-
fn handle_heartbeat(
33-
&mut self,
26+
#[async_trait]
27+
impl ComputeHandler for PingpongHandler {
28+
async fn handle_compute(
29+
node: &mut DriaComputeNode,
3430
message: P2PMessage,
3531
result_topic: &str,
3632
) -> NodeResult<MessageAcceptance> {
@@ -53,15 +49,15 @@ impl HandlesPingpong for DriaComputeNode {
5349
// respond
5450
let response_body = PingpongResponse {
5551
uuid: pingpong.uuid.clone(),
56-
models: self.config.model_config.models.clone(),
52+
models: node.config.model_config.models.clone(),
5753
timestamp: get_current_time_nanos(),
5854
};
5955
let response = P2PMessage::new_signed(
6056
serde_json::json!(response_body).to_string(),
6157
result_topic,
62-
&self.config.secret_key,
58+
&node.config.secret_key,
6359
);
64-
self.publish(response)?;
60+
node.publish(response)?;
6561

6662
// accept message, someone else may be included in the filter
6763
Ok(MessageAcceptance::Accept)

0 commit comments

Comments
 (0)