Skip to content

Commit 2030b33

Browse files
committed
much better eyre error printing
1 parent 1b80f9e commit 2030b33

File tree

8 files changed

+70
-57
lines changed

8 files changed

+70
-57
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ version = "0.2.6"
44
edition = "2021"
55
license = "Apache-2.0"
66
readme = "README.md"
7+
authors = ["Erhan Tezcan <[email protected]>"]
78

89
# profiling build for flamegraphs
910
[profile.profiling]

src/handlers/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
1+
use crate::{utils::DKNMessage, DriaComputeNode};
12
use async_trait::async_trait;
23
use eyre::Result;
34
use libp2p::gossipsub::MessageAcceptance;
45

5-
mod topics;
6-
pub use topics::*;
7-
86
mod pingpong;
97
pub use pingpong::PingpongHandler;
108

119
mod workflow;
1210
pub use workflow::WorkflowHandler;
1311

14-
use crate::{utils::DKNMessage, DriaComputeNode};
15-
1612
/// A DKN task is to be handled by the compute node, respecting this trait.
13+
///
14+
/// It is expected for the implemented handler to handle messages coming from `LISTEN_TOPIC`,
15+
/// and then respond back to the `RESPONSE_TOPIC`.
1716
#[async_trait]
1817
pub trait ComputeHandler {
18+
/// Gossipsub topic name to listen for incoming messages from the network.
19+
const LISTEN_TOPIC: &'static str;
20+
/// Gossipsub topic name to respond with messages to the network.
21+
const RESPONSE_TOPIC: &'static str;
22+
1923
/// A generic handler for DKN tasks.
24+
///
25+
/// Returns a `MessageAcceptance` value that tells the P2P client to accept the incoming message.
2026
async fn handle_compute(
2127
node: &mut DriaComputeNode,
2228
message: DKNMessage,
23-
result_topic: &str,
2429
) -> Result<MessageAcceptance>;
2530
}

src/handlers/pingpong.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ struct PingpongResponse {
2727

2828
#[async_trait]
2929
impl ComputeHandler for PingpongHandler {
30+
const LISTEN_TOPIC: &'static str = "ping";
31+
const RESPONSE_TOPIC: &'static str = "pong";
32+
3033
async fn handle_compute(
3134
node: &mut DriaComputeNode,
3235
message: DKNMessage,
33-
result_topic: &str,
3436
) -> Result<MessageAcceptance> {
3537
let pingpong = message.parse_payload::<PingpongPayload>(true)?;
3638

@@ -58,7 +60,7 @@ impl ComputeHandler for PingpongHandler {
5860
// publish message
5961
let message = DKNMessage::new_signed(
6062
serde_json::json!(response_body).to_string(),
61-
result_topic,
63+
Self::RESPONSE_TOPIC,
6264
&node.config.secret_key,
6365
);
6466
node.publish(message)?;

src/handlers/topics.rs

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/handlers/workflow.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use async_trait::async_trait;
2-
use eyre::{eyre, Result};
2+
use eyre::{eyre, Context, Result};
33
use libp2p::gossipsub::MessageAcceptance;
44
use ollama_workflows::{Entry, Executor, ModelProvider, ProgramMemory, Workflow};
55
use serde::Deserialize;
@@ -27,13 +27,17 @@ struct WorkflowPayload {
2727

2828
#[async_trait]
2929
impl ComputeHandler for WorkflowHandler {
30+
const LISTEN_TOPIC: &'static str = "task";
31+
const RESPONSE_TOPIC: &'static str = "results";
32+
3033
async fn handle_compute(
3134
node: &mut DriaComputeNode,
3235
message: DKNMessage,
33-
result_topic: &str,
3436
) -> Result<MessageAcceptance> {
3537
let config = &node.config;
36-
let task = message.parse_payload::<TaskRequestPayload<WorkflowPayload>>(true)?;
38+
let task = message
39+
.parse_payload::<TaskRequestPayload<WorkflowPayload>>(true)
40+
.wrap_err("Could not parse error")?;
3741

3842
// check if deadline is past or not
3943
let current_time = get_current_time_nanos();
@@ -56,7 +60,7 @@ impl ComputeHandler for WorkflowHandler {
5660
task.task_id
5761
);
5862

59-
// accept the message, someonelse may be included in filter
63+
// accept the message, someone else may be included in filter
6064
return Ok(MessageAcceptance::Accept);
6165
}
6266

@@ -86,14 +90,15 @@ impl ComputeHandler for WorkflowHandler {
8690
return Ok(MessageAcceptance::Accept);
8791
},
8892
exec_result_inner = executor.execute(entry.as_ref(), task.input.workflow, &mut memory) => {
89-
exec_result = exec_result_inner.map_err(|e| eyre!("{}", e.to_string()));
93+
exec_result = exec_result_inner.map_err(|e| eyre!("Execution error: {}", e.to_string()));
9094
}
9195
}
9296

9397
match exec_result {
9498
Ok(result) => {
9599
// obtain public key from the payload
96-
let task_public_key = hex::decode(&task.public_key)?;
100+
let task_public_key =
101+
hex::decode(&task.public_key).wrap_err("Could not decode public key")?;
97102

98103
// prepare signed and encrypted payload
99104
let payload = TaskResponsePayload::new(
@@ -102,24 +107,27 @@ impl ComputeHandler for WorkflowHandler {
102107
&task_public_key,
103108
&config.secret_key,
104109
)?;
105-
let payload_str = serde_json::to_string(&payload)?;
110+
let payload_str =
111+
serde_json::to_string(&payload).wrap_err("Could not serialize payload")?;
106112

107113
// publish the result
108-
let message = DKNMessage::new(payload_str, result_topic);
114+
let message = DKNMessage::new(payload_str, Self::RESPONSE_TOPIC);
109115
node.publish(message)?;
110116

111117
// accept so that if there are others included in filter they can do the task
112118
Ok(MessageAcceptance::Accept)
113119
}
114120
Err(err) => {
115-
log::error!("Task {} failed: {}", task.task_id, err);
121+
// use pretty display string for error logging with causes
122+
let err_string = format!("{:#}", err);
123+
log::error!("Task {} failed: {}", task.task_id, err_string);
116124

117125
// prepare error payload
118-
let error_payload = TaskErrorPayload::new(task.task_id, err.to_string());
126+
let error_payload = TaskErrorPayload::new(task.task_id, err_string);
119127
let error_payload_str = serde_json::to_string(&error_payload)?;
120128

121129
// publish the error result for diagnostics
122-
let message = DKNMessage::new(error_payload_str, result_topic);
130+
let message = DKNMessage::new(error_payload_str, Self::RESPONSE_TOPIC);
123131
node.publish(message)?;
124132

125133
// ignore just in case, workflow may be bugged

src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ async fn main() -> Result<()> {
3838

3939
#[cfg(not(feature = "profiling"))]
4040
if let Err(err) = wait_for_termination(cancellation_token.clone()).await {
41-
log::error!("Error waiting for termination: {}", err);
41+
log::error!("Error waiting for termination: {:?}", err);
4242
log::error!("Cancelling due to unexpected error.");
4343
cancellation_token.cancel();
4444
};
@@ -55,7 +55,7 @@ async fn main() -> Result<()> {
5555
}
5656
result = config_clone.check_services() => {
5757
if let Err(err) = result {
58-
log::error!("Error checking services: {}", err);
58+
log::error!("Error checking services: {:?}", err);
5959
panic!("Service check failed.")
6060
}
6161
}
@@ -101,7 +101,7 @@ async fn main() -> Result<()> {
101101
///
102102
/// Handles Unix and Windows [target families](https://doc.rust-lang.org/reference/conditional-compilation.html#target_family).
103103
#[allow(unused)]
104-
async fn wait_for_termination(cancellation: CancellationToken) -> std::io::Result<()> {
104+
async fn wait_for_termination(cancellation: CancellationToken) -> Result<()> {
105105
#[cfg(unix)]
106106
{
107107
use tokio::signal::unix::{signal, SignalKind};

src/node.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{str::FromStr, time::Duration};
44
use tokio_util::sync::CancellationToken;
55

66
use crate::{
7-
config::DriaComputeNodeConfig,
7+
config::*,
88
handlers::*,
99
p2p::P2PClient,
1010
utils::{crypto::secret_to_keypair, AvailableNodes, DKNMessage},
@@ -13,6 +13,19 @@ use crate::{
1313
/// Number of seconds between refreshing the Admin RPC PeerIDs from Dria server.
1414
const RPC_PEER_ID_REFRESH_INTERVAL_SECS: u64 = 30;
1515

16+
/// **Dria Compute Node**
17+
///
18+
/// Internally, the node will create a new P2P client with the given secret key.
19+
/// This P2P client, although created synchronously, requires a tokio runtime.
20+
///
21+
/// ### Example
22+
///
23+
/// ```rs
24+
/// let config = DriaComputeNodeConfig::new();
25+
/// let mut node = DriaComputeNode::new(config, CancellationToken::new())?;
26+
/// node.check_services().await?;
27+
/// node.launch().await?;
28+
/// ```
1629
pub struct DriaComputeNode {
1730
pub config: DriaComputeNodeConfig,
1831
pub p2p: P2PClient,
@@ -22,19 +35,6 @@ pub struct DriaComputeNode {
2235
}
2336

2437
impl DriaComputeNode {
25-
/// Create a new compute node with the given configuration and cancellation token.
26-
///
27-
/// Internally, the node will create a new P2P client with the given secret key.
28-
/// This P2P client, although created synchronously, requires a tokio runtime.
29-
///
30-
/// ### Example
31-
///
32-
/// ```rs
33-
/// let config = DriaComputeNodeConfig::new();
34-
/// let mut node = DriaComputeNode::new(config, CancellationToken::new())?;
35-
/// node.check_services().await?;
36-
/// node.launch().await?;
37-
/// ```
3838
pub async fn new(
3939
config: DriaComputeNodeConfig,
4040
cancellation: CancellationToken,
@@ -106,10 +106,10 @@ impl DriaComputeNode {
106106
/// This method is not expected to return until cancellation occurs.
107107
pub async fn launch(&mut self) -> Result<()> {
108108
// subscribe to topics
109-
self.subscribe(PINGPONG_LISTEN_TOPIC)?;
110-
self.subscribe(PINGPONG_RESPONSE_TOPIC)?;
111-
self.subscribe(WORKFLOW_LISTEN_TOPIC)?;
112-
self.subscribe(WORKFLOW_RESPONSE_TOPIC)?;
109+
self.subscribe(PingpongHandler::LISTEN_TOPIC)?;
110+
self.subscribe(PingpongHandler::RESPONSE_TOPIC)?;
111+
self.subscribe(WorkflowHandler::LISTEN_TOPIC)?;
112+
self.subscribe(WorkflowHandler::RESPONSE_TOPIC)?;
113113

114114
// main loop, listens for message events in particular
115115
// the underlying p2p client is expected to handle the rest within its own loop
@@ -127,7 +127,7 @@ impl DriaComputeNode {
127127
let topic_str = topic.as_str();
128128

129129
// handle message w.r.t topic
130-
if std::matches!(topic_str, PINGPONG_LISTEN_TOPIC | WORKFLOW_LISTEN_TOPIC) {
130+
if std::matches!(topic_str, PingpongHandler::LISTEN_TOPIC | WorkflowHandler::LISTEN_TOPIC) {
131131
// ensure that the message is from a valid source (origin)
132132
let source_peer_id = match message.source {
133133
Some(peer) => peer,
@@ -159,7 +159,7 @@ impl DriaComputeNode {
159159
let message = match self.parse_message_to_prepared_message(message.clone()) {
160160
Ok(message) => message,
161161
Err(e) => {
162-
log::error!("Error parsing message: {}", e);
162+
log::error!("Error parsing message: {:?}", e);
163163
log::debug!("Message: {}", String::from_utf8_lossy(&message.data));
164164
self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?;
165165
continue;
@@ -168,11 +168,11 @@ impl DriaComputeNode {
168168

169169
// then handle the prepared message
170170
let handler_result = match topic_str {
171-
WORKFLOW_LISTEN_TOPIC => {
172-
WorkflowHandler::handle_compute(self, message, WORKFLOW_RESPONSE_TOPIC).await
171+
WorkflowHandler::LISTEN_TOPIC => {
172+
WorkflowHandler::handle_compute(self, message).await
173173
}
174-
PINGPONG_LISTEN_TOPIC => {
175-
PingpongHandler::handle_compute(self, message, PINGPONG_RESPONSE_TOPIC).await
174+
PingpongHandler::LISTEN_TOPIC => {
175+
PingpongHandler::handle_compute(self, message).await
176176
}
177177
// TODO: can we do this in a nicer way?
178178
// TODO: yes, cast to enum above and let type-casting do the work
@@ -185,11 +185,11 @@ impl DriaComputeNode {
185185
self.p2p.validate_message(&message_id, &peer_id, acceptance)?;
186186
},
187187
Err(err) => {
188-
log::error!("Error handling {} message: {}", topic_str, err);
188+
log::error!("Error handling {} message: {:?}", topic_str, err);
189189
self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?;
190190
}
191191
}
192-
} else if std::matches!(topic_str, PINGPONG_RESPONSE_TOPIC | WORKFLOW_RESPONSE_TOPIC) {
192+
} else if std::matches!(topic_str, PingpongHandler::RESPONSE_TOPIC | WorkflowHandler::RESPONSE_TOPIC) {
193193
// since we are responding to these topics, we might receive messages from other compute nodes
194194
// we can gracefully ignore them and propagate it to to others
195195
log::debug!("Ignoring message for topic: {}", topic_str);
@@ -205,10 +205,10 @@ impl DriaComputeNode {
205205
}
206206

207207
// unsubscribe from topics
208-
self.unsubscribe(PINGPONG_LISTEN_TOPIC)?;
209-
self.unsubscribe(PINGPONG_RESPONSE_TOPIC)?;
210-
self.unsubscribe(WORKFLOW_LISTEN_TOPIC)?;
211-
self.unsubscribe(WORKFLOW_RESPONSE_TOPIC)?;
208+
self.unsubscribe(PingpongHandler::LISTEN_TOPIC)?;
209+
self.unsubscribe(PingpongHandler::RESPONSE_TOPIC)?;
210+
self.unsubscribe(WorkflowHandler::LISTEN_TOPIC)?;
211+
self.unsubscribe(WorkflowHandler::RESPONSE_TOPIC)?;
212212

213213
Ok(())
214214
}

src/payloads/error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use serde::{Deserialize, Serialize};
22

3-
/// A generic task request, given by Dria.
3+
/// A task error response.
4+
/// Returning this as the payload helps to debug the errors received at client side.
45
#[derive(Debug, Clone, Serialize, Deserialize)]
56
#[serde(rename_all = "camelCase")]
67
pub struct TaskErrorPayload {

0 commit comments

Comments
 (0)