Skip to content

Commit 86b2877

Browse files
authored
added error reporting on publish, update workflows (#139)
* added error reporting on publish, update workflows * rm println, fix lints * som pub edits * Added `dial` and `network_info` * update workflows, fix lintings
1 parent 02d4dc0 commit 86b2877

File tree

12 files changed

+436
-597
lines changed

12 files changed

+436
-597
lines changed

Cargo.lock

Lines changed: 311 additions & 543 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.2.18"
10+
version = "0.2.19"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

compute/src/config.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::utils::{address_in_use, crypto::to_address};
1+
use crate::utils::{
2+
address_in_use,
3+
crypto::{secret_to_keypair, to_address},
4+
};
25
use dkn_p2p::libp2p::Multiaddr;
36
use dkn_workflows::DriaWorkflowsConfig;
47
use eyre::{eyre, Result};
@@ -71,14 +74,21 @@ impl DriaComputeNodeConfig {
7174
panic!("Please provide an admin public key.");
7275
}
7376
};
77+
78+
let address = to_address(&public_key);
79+
log::info!("Node Address: 0x{}", hex::encode(address));
80+
81+
// to this here to log the peer id at start
82+
log::info!(
83+
"Node PeerID: {}",
84+
secret_to_keypair(&secret_key).public().to_peer_id()
85+
);
86+
7487
log::info!(
7588
"Admin Public Key: 0x{}",
7689
hex::encode(admin_public_key.serialize_compressed())
7790
);
7891

79-
let address = to_address(&public_key);
80-
log::info!("Node Address: 0x{}", hex::encode(address));
81-
8292
let workflows =
8393
DriaWorkflowsConfig::new_from_csv(&env::var("DKN_MODELS").unwrap_or_default());
8494
#[cfg(not(test))]

compute/src/handlers/workflow.rs

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ impl ComputeHandler for WorkflowHandler {
3535
node: &mut DriaComputeNode,
3636
message: DKNMessage,
3737
) -> Result<MessageAcceptance> {
38-
let config = &node.config;
3938
let task = message
4039
.parse_payload::<TaskRequestPayload<WorkflowPayload>>(true)
4140
.wrap_err("Could not parse workflow task")?;
@@ -55,7 +54,7 @@ impl ComputeHandler for WorkflowHandler {
5554
}
5655

5756
// check task inclusion via the bloom filter
58-
if !task.filter.contains(&config.address)? {
57+
if !task.filter.contains(&node.config.address)? {
5958
log::info!(
6059
"Task {} does not include this node within the filter.",
6160
task.task_id
@@ -66,16 +65,19 @@ impl ComputeHandler for WorkflowHandler {
6665
}
6766

6867
// read model / provider from the task
69-
let (model_provider, model) = config.workflows.get_any_matching_model(task.input.model)?;
68+
let (model_provider, model) = node
69+
.config
70+
.workflows
71+
.get_any_matching_model(task.input.model)?;
7072
let model_name = model.to_string(); // get model name, we will pass it in payload
7173
log::info!("Using model {} for task {}", model_name, task.task_id);
7274

7375
// prepare workflow executor
7476
let executor = if model_provider == ModelProvider::Ollama {
7577
Executor::new_at(
7678
model,
77-
&config.workflows.ollama.host,
78-
config.workflows.ollama.port,
79+
&node.config.workflows.ollama.host,
80+
node.config.workflows.ollama.port,
7981
)
8082
} else {
8183
Executor::new(model)
@@ -93,13 +95,14 @@ impl ComputeHandler for WorkflowHandler {
9395
log::info!("Received cancellation, quitting all tasks.");
9496
return Ok(MessageAcceptance::Accept);
9597
},
96-
exec_result_inner = executor.execute(entry.as_ref(), task.input.workflow, &mut memory) => {
98+
exec_result_inner = executor.execute(entry.as_ref(), &task.input.workflow, &mut memory) => {
9799
exec_result = exec_result_inner.map_err(|e| eyre!("Execution error: {}", e.to_string()));
98100
}
99101
}
100102

101-
match exec_result {
103+
let (publish_result, acceptance) = match exec_result {
102104
Ok(result) => {
105+
log::warn!("Task {} result:", result);
103106
// obtain public key from the payload
104107
let task_public_key_bytes =
105108
hex::decode(&task.public_key).wrap_err("Could not decode public key")?;
@@ -110,36 +113,55 @@ impl ComputeHandler for WorkflowHandler {
110113
result,
111114
&task.task_id,
112115
&task_public_key,
113-
&config.secret_key,
116+
&node.config.secret_key,
114117
model_name,
115118
)?;
116119
let payload_str = serde_json::to_string(&payload)
117120
.wrap_err("Could not serialize response payload")?;
118121

119122
// publish the result
120-
let message = DKNMessage::new(payload_str, Self::RESPONSE_TOPIC);
121-
node.publish(message)?;
122-
123123
// accept so that if there are others included in filter they can do the task
124-
Ok(MessageAcceptance::Accept)
124+
let message = DKNMessage::new(payload_str, Self::RESPONSE_TOPIC);
125+
(node.publish(message), MessageAcceptance::Accept)
125126
}
126127
Err(err) => {
127128
// use pretty display string for error logging with causes
128129
let err_string = format!("{:#}", err);
129130
log::error!("Task {} failed: {}", task.task_id, err_string);
130131

131132
// prepare error payload
132-
let error_payload = TaskErrorPayload::new(task.task_id, err_string, model_name);
133+
let error_payload =
134+
TaskErrorPayload::new(task.task_id.clone(), err_string, model_name);
133135
let error_payload_str = serde_json::to_string(&error_payload)
134136
.wrap_err("Could not serialize error payload")?;
135137

136138
// publish the error result for diagnostics
137-
let message = DKNMessage::new(error_payload_str, Self::RESPONSE_TOPIC);
138-
node.publish(message)?;
139-
140139
// ignore just in case, workflow may be bugged
141-
Ok(MessageAcceptance::Ignore)
140+
let message = DKNMessage::new_signed(
141+
error_payload_str,
142+
Self::RESPONSE_TOPIC,
143+
&node.config.secret_key,
144+
);
145+
(node.publish(message), MessageAcceptance::Ignore)
142146
}
147+
};
148+
149+
// if for some reason we couldnt publish the result, publish the error itself so that RPC doesnt hang
150+
if let Err(publish_err) = publish_result {
151+
let err_msg = format!("Could not publish result: {:?}", publish_err);
152+
log::error!("{}", err_msg);
153+
let payload = serde_json::json!({
154+
"taskId": task.task_id,
155+
"error": err_msg
156+
});
157+
let message = DKNMessage::new_signed(
158+
payload.to_string(),
159+
Self::RESPONSE_TOPIC,
160+
&node.config.secret_key,
161+
);
162+
node.publish(message)?;
143163
}
164+
165+
Ok(acceptance)
144166
}
145167
}

compute/src/node.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,24 @@ impl DriaComputeNode {
6060
);
6161

6262
// create p2p client
63-
let p2p = DriaP2PClient::new(
63+
let mut p2p = DriaP2PClient::new(
6464
keypair,
6565
config.p2p_listen_addr.clone(),
6666
&available_nodes.bootstrap_nodes,
6767
&available_nodes.relay_nodes,
6868
P2P_VERSION,
6969
)?;
7070

71+
// dial rpc nodes
72+
if available_nodes.rpc_addrs.is_empty() {
73+
log::warn!("No RPC nodes found to be dialled!");
74+
} else {
75+
for rpc_addr in &available_nodes.rpc_addrs {
76+
log::info!("Dialing RPC node: {}", rpc_addr);
77+
p2p.dial(rpc_addr.clone())?;
78+
}
79+
}
80+
7181
Ok(DriaComputeNode {
7282
p2p,
7383
config,
@@ -136,6 +146,7 @@ impl DriaComputeNode {
136146
event = self.p2p.process_events() => {
137147
// refresh admin rpc peer ids
138148
if self.available_nodes_last_refreshed.elapsed() > Duration::from_secs(RPC_PEER_ID_REFRESH_INTERVAL_SECS) {
149+
log::info!("Refreshing available nodes.");
139150
self.available_nodes = AvailableNodes::get_available_nodes().await.unwrap_or_default().join(self.available_nodes.clone()).sort_dedup();
140151
self.available_nodes_last_refreshed = tokio::time::Instant::now();
141152
}
@@ -156,12 +167,17 @@ impl DriaComputeNode {
156167
}
157168
};
158169

170+
// log::info!(
171+
// "Received {} message ({})\nFrom: {}\nSource: {}",
172+
// topic_str,
173+
// message_id,
174+
// peer_id,
175+
// );
159176
log::info!(
160-
"Received {} message ({})\nFrom: {}\nSource: {}",
177+
"Received {} message ({}) from {}",
161178
topic_str,
162179
message_id,
163180
peer_id,
164-
source_peer_id
165181
);
166182

167183
// ensure that message is from the static RPCs

compute/src/utils/available_nodes.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,20 @@ const STATIC_RPC_PEER_IDS: [&str; 0] = [];
2525
/// API URL for refreshing the Admin RPC PeerIDs from Dria server.
2626
const RPC_PEER_ID_REFRESH_API_URL: &str = "https://dkn.dria.co/available-nodes";
2727

28-
#[derive(serde::Deserialize, Debug)]
29-
pub struct AvailableNodesApiResponse {
30-
pub bootstraps: Vec<String>,
31-
pub relays: Vec<String>,
32-
pub rpcs: Vec<String>,
33-
}
34-
28+
/// Available nodes within the hybrid P2P network.
29+
///
30+
/// - Bootstrap: used for Kademlia DHT bootstrap.
31+
/// - Relay: used for DCutR relay protocol.
32+
/// - RPC: used for RPC nodes for task & ping messages.
33+
///
34+
/// Note that while bootstrap & relay nodes are `Multiaddr`, RPC nodes are `PeerId` because we communicate
35+
/// with them via GossipSub only.
3536
#[derive(Debug, Default, Clone)]
3637
pub struct AvailableNodes {
3738
pub bootstrap_nodes: Vec<Multiaddr>,
3839
pub relay_nodes: Vec<Multiaddr>,
3940
pub rpc_nodes: Vec<PeerId>,
41+
pub rpc_addrs: Vec<Multiaddr>,
4042
}
4143

4244
impl AvailableNodes {
@@ -66,6 +68,7 @@ impl AvailableNodes {
6668
bootstrap_nodes: parse_vec(bootstrap_nodes),
6769
relay_nodes: parse_vec(relay_nodes),
6870
rpc_nodes: vec![],
71+
rpc_addrs: vec![],
6972
}
7073
}
7174

@@ -75,6 +78,7 @@ impl AvailableNodes {
7578
bootstrap_nodes: parse_vec(STATIC_BOOTSTRAP_NODES.to_vec()),
7679
relay_nodes: parse_vec(STATIC_RELAY_NODES.to_vec()),
7780
rpc_nodes: parse_vec(STATIC_RPC_PEER_IDS.to_vec()),
81+
rpc_addrs: vec![],
7882
}
7983
}
8084

@@ -83,7 +87,7 @@ impl AvailableNodes {
8387
self.bootstrap_nodes.extend(other.bootstrap_nodes);
8488
self.relay_nodes.extend(other.relay_nodes);
8589
self.rpc_nodes.extend(other.rpc_nodes);
86-
90+
self.rpc_addrs.extend(other.rpc_addrs);
8791
self
8892
}
8993

@@ -98,18 +102,31 @@ impl AvailableNodes {
98102
self.rpc_nodes.sort_unstable();
99103
self.rpc_nodes.dedup();
100104

105+
self.rpc_addrs.sort_unstable();
106+
self.rpc_addrs.dedup();
107+
101108
self
102109
}
103110

104111
/// Refreshes the available nodes for Bootstrap, Relay and RPC nodes.
105112
pub async fn get_available_nodes() -> Result<Self> {
113+
#[derive(serde::Deserialize, Debug)]
114+
struct AvailableNodesApiResponse {
115+
pub bootstraps: Vec<String>,
116+
pub relays: Vec<String>,
117+
pub rpcs: Vec<String>,
118+
#[serde(rename = "rpcAddrs")]
119+
pub rpc_addrs: Vec<String>,
120+
}
121+
106122
let response = reqwest::get(RPC_PEER_ID_REFRESH_API_URL).await?;
107123
let response_body = response.json::<AvailableNodesApiResponse>().await?;
108124

109125
Ok(Self {
110126
bootstrap_nodes: parse_vec(response_body.bootstraps),
111127
relay_nodes: parse_vec(response_body.relays),
112128
rpc_nodes: parse_vec(response_body.rpcs),
129+
rpc_addrs: parse_vec(response_body.rpc_addrs),
113130
})
114131
}
115132
}
@@ -137,9 +154,6 @@ mod tests {
137154
#[tokio::test]
138155
#[ignore = "run this manually"]
139156
async fn test_get_available_nodes() {
140-
std::env::set_var("RUST_LOG", "info");
141-
let _ = env_logger::try_init();
142-
143157
let available_nodes = AvailableNodes::get_available_nodes().await.unwrap();
144158
println!("{:#?}", available_nodes);
145159
}

p2p/src/behaviour.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ use libp2p::{autonat, dcutr, gossipsub, identify, kad, relay};
1010

1111
#[derive(libp2p::swarm::NetworkBehaviour)]
1212
pub struct DriaBehaviour {
13-
pub(crate) relay: relay::client::Behaviour,
14-
pub(crate) gossipsub: gossipsub::Behaviour,
15-
pub(crate) kademlia: kad::Behaviour<MemoryStore>,
16-
pub(crate) identify: identify::Behaviour,
17-
pub(crate) autonat: autonat::Behaviour,
18-
pub(crate) dcutr: dcutr::Behaviour,
13+
pub relay: relay::client::Behaviour,
14+
pub gossipsub: gossipsub::Behaviour,
15+
pub kademlia: kad::Behaviour<MemoryStore>,
16+
pub identify: identify::Behaviour,
17+
pub autonat: autonat::Behaviour,
18+
pub dcutr: dcutr::Behaviour,
1919
}
2020

2121
impl DriaBehaviour {

p2p/src/client.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use super::*;
2-
use eyre::Result;
2+
use eyre::{Context, Result};
33
use libp2p::futures::StreamExt;
44
use libp2p::gossipsub::{
55
Message, MessageAcceptance, MessageId, PublishError, SubscriptionError, TopicHash,
66
};
77
use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult};
8-
use libp2p::{
9-
autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, swarm::SwarmEvent, tcp, yamux,
10-
};
8+
use libp2p::swarm::{dial_opts::DialOpts, NetworkInfo, SwarmEvent};
9+
use libp2p::{autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, tcp, yamux};
1110
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
1211
use libp2p_identity::Keypair;
1312
use std::time::{Duration, Instant};
@@ -137,6 +136,12 @@ impl DriaP2PClient {
137136
})
138137
}
139138

139+
/// Returns the network information, such as the number of
140+
/// incoming and outgoing connections.
141+
pub fn network_info(&self) -> NetworkInfo {
142+
self.swarm.network_info()
143+
}
144+
140145
/// Subscribe to a topic.
141146
pub fn subscribe(&mut self, topic_name: &str) -> Result<bool, SubscriptionError> {
142147
log::debug!("Subscribing to {}", topic_name);
@@ -206,6 +211,11 @@ impl DriaP2PClient {
206211
self.swarm.behaviour().gossipsub.all_peers().collect()
207212
}
208213

214+
/// Dials a given peer.
215+
pub fn dial(&mut self, peer_id: impl Into<DialOpts>) -> Result<()> {
216+
self.swarm.dial(peer_id).wrap_err("could not dial")
217+
}
218+
209219
/// Listens to the Swarm for incoming messages.
210220
///
211221
/// This method should be called in a loop to keep the client running.

workflows/src/apis/serper.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ impl SerperConfig {
4848
log::debug!("Serper API key not found, skipping Serper check");
4949
return Ok(());
5050
};
51-
println!("API KEY: {}", api_key);
5251
log::info!("Serper API key found, checking Serper service");
5352

5453
// make a dummy request

0 commit comments

Comments
 (0)