Skip to content

Commit b0c91d7

Browse files
authored
Timestamps + Workflow update (#143)
* added timestamps to task results * update workflows
1 parent 5d5763c commit b0c91d7

File tree

9 files changed

+198
-92
lines changed

9 files changed

+198
-92
lines changed

Cargo.lock

Lines changed: 86 additions & 60 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.2.20"
10+
version = "0.2.21"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

compute/src/handlers/workflow.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use std::time::Instant;
2+
13
use async_trait::async_trait;
24
use dkn_p2p::libp2p::gossipsub::MessageAcceptance;
35
use dkn_workflows::{Entry, Executor, ModelProvider, ProgramMemory, Workflow};
46
use eyre::{eyre, Context, Result};
57
use libsecp256k1::PublicKey;
68
use serde::Deserialize;
79

8-
use crate::payloads::{TaskErrorPayload, TaskRequestPayload, TaskResponsePayload};
10+
use crate::payloads::{TaskErrorPayload, TaskRequestPayload, TaskResponsePayload, TaskStats};
911
use crate::utils::{get_current_time_nanos, DKNMessage};
1012
use crate::DriaComputeNode;
1113

@@ -38,6 +40,7 @@ impl ComputeHandler for WorkflowHandler {
3840
let task = message
3941
.parse_payload::<TaskRequestPayload<WorkflowPayload>>(true)
4042
.wrap_err("Could not parse workflow task")?;
43+
let mut task_stats = TaskStats::default().record_received_at();
4144

4245
// check if deadline is past or not
4346
let current_time = get_current_time_nanos();
@@ -90,6 +93,7 @@ impl ComputeHandler for WorkflowHandler {
9093

9194
// execute workflow with cancellation
9295
let exec_result: Result<String>;
96+
let exec_started_at = Instant::now();
9397
tokio::select! {
9498
_ = node.cancellation.cancelled() => {
9599
log::info!("Received cancellation, quitting all tasks.");
@@ -99,10 +103,10 @@ impl ComputeHandler for WorkflowHandler {
99103
exec_result = exec_result_inner.map_err(|e| eyre!("Execution error: {}", e.to_string()));
100104
}
101105
}
106+
task_stats = task_stats.record_execution_time(exec_started_at);
102107

103-
let (publish_result, acceptance) = match exec_result {
108+
let (message, acceptance) = match exec_result {
104109
Ok(result) => {
105-
log::warn!("Task {} result:", result);
106110
// obtain public key from the payload
107111
let task_public_key_bytes =
108112
hex::decode(&task.public_key).wrap_err("Could not decode public key")?;
@@ -115,44 +119,56 @@ impl ComputeHandler for WorkflowHandler {
115119
&task_public_key,
116120
&node.config.secret_key,
117121
model_name,
122+
task_stats.record_published_at(),
118123
)?;
119124
let payload_str = serde_json::to_string(&payload)
120125
.wrap_err("Could not serialize response payload")?;
121126

122-
// publish the result
123-
// accept so that if there are others included in filter they can do the task
127+
// prepare signed message
128+
log::debug!(
129+
"Publishing result for task {}\n{}",
130+
task.task_id,
131+
payload_str
132+
);
124133
let message = DKNMessage::new(payload_str, Self::RESPONSE_TOPIC);
125-
(node.publish(message), MessageAcceptance::Accept)
134+
// accept so that if there are others included in filter they can do the task
135+
(message, MessageAcceptance::Accept)
126136
}
127137
Err(err) => {
128138
// use pretty display string for error logging with causes
129139
let err_string = format!("{:#}", err);
130140
log::error!("Task {} failed: {}", task.task_id, err_string);
131141

132142
// prepare error payload
133-
let error_payload =
134-
TaskErrorPayload::new(task.task_id.clone(), err_string, model_name);
143+
let error_payload = TaskErrorPayload {
144+
task_id: task.task_id.clone(),
145+
error: err_string,
146+
model: model_name,
147+
stats: task_stats.record_published_at(),
148+
};
135149
let error_payload_str = serde_json::to_string(&error_payload)
136150
.wrap_err("Could not serialize error payload")?;
137151

138-
// publish the error result for diagnostics
139-
// ignore just in case, workflow may be bugged
152+
// prepare signed message
140153
let message = DKNMessage::new_signed(
141154
error_payload_str,
142155
Self::RESPONSE_TOPIC,
143156
&node.config.secret_key,
144157
);
145-
(node.publish(message), MessageAcceptance::Ignore)
158+
// ignore just in case, workflow may be bugged
159+
(message, MessageAcceptance::Ignore)
146160
}
147161
};
148162

149-
// if for some reason we couldnt publish the result, publish the error itself so that RPC doesnt hang
150-
if let Err(publish_err) = publish_result {
163+
// try publishing the result
164+
165+
if let Err(publish_err) = node.publish(message) {
151166
let err_msg = format!("Could not publish result: {:?}", publish_err);
152167
log::error!("{}", err_msg);
168+
153169
let payload = serde_json::json!({
154170
"taskId": task.task_id,
155-
"error": err_msg
171+
"error": err_msg,
156172
});
157173
let message = DKNMessage::new_signed(
158174
payload.to_string(),

compute/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ impl DriaComputeNode {
232232
} else if std::matches!(topic_str, PingpongHandler::RESPONSE_TOPIC | WorkflowHandler::RESPONSE_TOPIC) {
233233
// since we are responding to these topics, we might receive messages from other compute nodes
234234
// we can gracefully ignore them and propagate it to to others
235-
log::debug!("Ignoring message for topic: {}", topic_str);
235+
log::trace!("Ignoring message for topic: {}", topic_str);
236236
self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Accept)?;
237237
} else {
238238
// reject this message as its from a foreign topic

compute/src/payloads/error.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use serde::{Deserialize, Serialize};
22

3+
use super::TaskStats;
4+
35
/// A task error response.
46
/// Returning this as the payload helps to debug the errors received at client side.
57
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -11,14 +13,6 @@ pub struct TaskErrorPayload {
1113
pub error: String,
1214
/// Name of the model that caused the error.
1315
pub model: String,
14-
}
15-
16-
impl TaskErrorPayload {
17-
pub fn new(task_id: String, error: String, model: String) -> Self {
18-
Self {
19-
task_id,
20-
error,
21-
model,
22-
}
23-
}
16+
/// Task statistics.
17+
pub stats: TaskStats,
2418
}

compute/src/payloads/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ pub use request::TaskRequestPayload;
66

77
mod response;
88
pub use response::TaskResponsePayload;
9+
10+
mod stats;
11+
pub use stats::TaskStats;

compute/src/payloads/response.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use eyre::Result;
33
use libsecp256k1::{PublicKey, SecretKey};
44
use serde::{Deserialize, Serialize};
55

6+
use super::TaskStats;
7+
68
/// A computation task is the task of computing a result from a given input. The result is encrypted with the public key of the requester.
79
/// Plain result is signed by the compute node's private key, and a commitment is computed from the signature and plain result.
810
///
@@ -19,6 +21,8 @@ pub struct TaskResponsePayload {
1921
pub ciphertext: String,
2022
/// Name of the model used for this task.
2123
pub model: String,
24+
/// Stats about the task execution.
25+
pub stats: TaskStats,
2226
}
2327

2428
impl TaskResponsePayload {
@@ -32,6 +36,7 @@ impl TaskResponsePayload {
3236
encrypting_public_key: &PublicKey,
3337
signing_secret_key: &SecretKey,
3438
model: String,
39+
stats: TaskStats,
3540
) -> Result<Self> {
3641
// create the message `task_id || payload`
3742
let mut preimage = Vec::new();
@@ -47,6 +52,7 @@ impl TaskResponsePayload {
4752
signature,
4853
ciphertext,
4954
model,
55+
stats,
5056
})
5157
}
5258
}
@@ -74,9 +80,15 @@ mod tests {
7480
let task_id = uuid::Uuid::new_v4().to_string();
7581

7682
// creates a signed and encrypted payload
77-
let payload =
78-
TaskResponsePayload::new(RESULT, &task_id, &task_pk, &signer_sk, MODEL.to_string())
79-
.expect("Should create payload");
83+
let payload = TaskResponsePayload::new(
84+
RESULT,
85+
&task_id,
86+
&task_pk,
87+
&signer_sk,
88+
MODEL.to_string(),
89+
Default::default(),
90+
)
91+
.expect("Should create payload");
8092

8193
// decrypt result and compare it to plaintext
8294
let ciphertext_bytes = hex::decode(payload.ciphertext).unwrap();

compute/src/payloads/stats.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use serde::{Deserialize, Serialize};
2+
use std::time::Instant;
3+
4+
use crate::utils::get_current_time_nanos;
5+
6+
/// A task stat.
7+
/// Returning this as the payload helps to debug the errors received at client side.
8+
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
9+
#[serde(rename_all = "camelCase")]
10+
pub struct TaskStats {
11+
/// Timestamp at which the task was received from network & parsed.
12+
pub received_at: u128,
13+
/// Timestamp at which the task was published back to network.
14+
pub published_at: u128,
15+
/// Time taken to execute the task.
16+
pub execution_time: u128,
17+
}
18+
19+
impl TaskStats {
20+
/// Records the current timestamp within `received_at`.
21+
pub fn record_received_at(mut self) -> Self {
22+
// can unwrap safely here as UNIX_EPOCH is always smaller than now
23+
self.received_at = get_current_time_nanos();
24+
self
25+
}
26+
27+
/// Records the current timestamp within `published_at`.
28+
pub fn record_published_at(mut self) -> Self {
29+
self.published_at = get_current_time_nanos();
30+
self
31+
}
32+
33+
pub fn record_execution_time(mut self, started_at: Instant) -> Self {
34+
self.execution_time = Instant::now().duration_since(started_at).as_nanos();
35+
self
36+
}
37+
}
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use super::*;
42+
43+
#[test]
44+
fn test_stats() {
45+
let mut stats = TaskStats::default();
46+
47+
assert_eq!(stats.received_at, 0);
48+
stats = stats.record_received_at();
49+
assert_ne!(stats.received_at, 0);
50+
51+
assert_eq!(stats.published_at, 0);
52+
stats = stats.record_published_at();
53+
assert_ne!(stats.published_at, 0);
54+
}
55+
}

compute/src/utils/misc.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@ use dkn_p2p::libp2p::{multiaddr::Protocol, Multiaddr};
22
use port_check::is_port_reachable;
33
use std::{
44
net::{Ipv4Addr, SocketAddrV4},
5-
time::{Duration, SystemTime},
5+
time::SystemTime,
66
};
77

88
/// Returns the current time in nanoseconds since the Unix epoch.
99
///
1010
/// If a `SystemTimeError` occurs, will return 0 just to keep things running.
11-
#[inline]
11+
#[inline(always)]
1212
pub fn get_current_time_nanos() -> u128 {
1313
SystemTime::now()
1414
.duration_since(SystemTime::UNIX_EPOCH)
1515
.unwrap_or_else(|e| {
1616
log::error!("Error getting current time: {}", e);
17-
Duration::new(0, 0)
17+
Default::default()
1818
})
1919
.as_nanos()
2020
}

0 commit comments

Comments
 (0)