Skip to content

Commit 5eed72c

Browse files
committed
better message structures
1 parent 3a2e249 commit 5eed72c

File tree

3 files changed

+54
-65
lines changed

3 files changed

+54
-65
lines changed

src/handlers/workflow.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use ollama_workflows::{Entry, Executor, ModelProvider, ProgramMemory, Workflow};
55
use serde::Deserialize;
66

77
use crate::node::DriaComputeNode;
8-
use crate::utils::payload::{TaskRequest, TaskRequestPayload};
8+
use crate::utils::payload::{TaskRequestPayload, TaskResponsePayload};
99
use crate::utils::{get_current_time_nanos, DKNMessage};
1010

1111
use super::ComputeHandler;
@@ -62,12 +62,6 @@ impl ComputeHandler for WorkflowHandler {
6262
// obtain public key from the payload
6363
let task_public_key = hex::decode(&task.public_key)?;
6464

65-
let task = TaskRequest {
66-
task_id: task.task_id,
67-
input: task.input,
68-
public_key: task_public_key,
69-
};
70-
7165
// read model / provider from the task
7266
let (model_provider, model) = node
7367
.config
@@ -112,16 +106,16 @@ impl ComputeHandler for WorkflowHandler {
112106
}
113107

114108
// prepare signed and encrypted payload
115-
let payload = DKNMessage::new_signed_encrypted_payload(
109+
let payload = TaskResponsePayload::new(
116110
result,
117111
&task.task_id,
118-
&task.public_key,
112+
&task_public_key,
119113
&node.config.secret_key,
120114
)?;
121115
let payload_str = payload.to_string()?;
122-
let message = DKNMessage::new(payload_str, result_topic);
123116

124117
// publish the result
118+
let message = DKNMessage::new(payload_str, result_topic);
125119
node.publish(message)?;
126120

127121
Ok(MessageAcceptance::Accept)

src/utils/message.rs

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::utils::{
22
crypto::{sha256hash, sign_bytes_recoverable},
33
get_current_time_nanos,
4-
payload::TaskResponsePayload,
54
};
65
use base64::{prelude::BASE64_STANDARD, Engine};
76
use core::fmt;
@@ -10,19 +9,22 @@ use eyre::Result;
109
use libsecp256k1::SecretKey;
1110
use serde::{Deserialize, Serialize};
1211

13-
/// A parsed message from gossipsub. When first received, the message data is simply a vector of bytes.
14-
/// We treat that bytearray as a stringified JSON object, and parse it into this struct.
15-
///
16-
/// TODO: these are all available at protocol level as well
17-
/// - payload is the data itself
18-
/// - topic is available as TopicHash of Gossipsub
19-
/// - version is given within the Identify protocol
20-
/// - timestamp is available at protocol level via DataTransform
12+
/// A message within Dria Knowledge Network.
2113
#[derive(Serialize, Deserialize, Debug, Clone)]
2214
pub struct DKNMessage {
15+
/// Base64 encoded data
2316
pub(crate) payload: String,
17+
/// The topic of the message, derived from `TopicHash`
18+
///
19+
/// NOTE: This can be obtained via TopicHash in GossipSub
2420
pub(crate) topic: String,
21+
/// The version of the Dria Compute Node
22+
///
23+
/// NOTE: This can be obtained via Identify protocol version
2524
pub(crate) version: String,
25+
/// The timestamp of the message, in nanoseconds
26+
///
27+
/// NOTE: This can be obtained via DataTransform in GossipSub
2628
pub(crate) timestamp: u128,
2729
}
2830

@@ -61,39 +63,6 @@ impl DKNMessage {
6163
Self::new(signed_payload, topic)
6264
}
6365

64-
/// Creates the payload of a computation result.
65-
///
66-
/// - Sign `task_id || payload` with node `self.secret_key`
67-
/// - Encrypt `result` with `task_public_key`
68-
/// TODO: this is not supposed to be here
69-
pub fn new_signed_encrypted_payload(
70-
payload: impl AsRef<[u8]>,
71-
task_id: &str,
72-
encrypting_public_key: &[u8],
73-
signing_secret_key: &SecretKey,
74-
) -> Result<TaskResponsePayload> {
75-
// create the message `task_id || payload`
76-
let mut preimage = Vec::new();
77-
preimage.extend_from_slice(task_id.as_ref());
78-
preimage.extend_from_slice(payload.as_ref());
79-
80-
// sign the message
81-
// TODO: use `sign_recoverable` here instead?
82-
let digest = libsecp256k1::Message::parse(&sha256hash(preimage));
83-
let (signature, recid) = libsecp256k1::sign(&digest, signing_secret_key);
84-
let signature: [u8; 64] = signature.serialize();
85-
let recid: [u8; 1] = [recid.serialize()];
86-
87-
// encrypt payload itself
88-
let ciphertext = ecies::encrypt(encrypting_public_key, payload.as_ref())?;
89-
90-
Ok(TaskResponsePayload {
91-
ciphertext: hex::encode(ciphertext),
92-
signature: format!("{}{}", hex::encode(signature), hex::encode(recid)),
93-
task_id: task_id.to_string(),
94-
})
95-
}
96-
9766
/// Decodes the base64 payload into bytes.
9867
pub fn decode_payload(&self) -> Result<Vec<u8>, base64::DecodeError> {
9968
BASE64_STANDARD.decode(&self.payload)
@@ -161,6 +130,7 @@ impl TryFrom<libp2p::gossipsub::Message> for DKNMessage {
161130
#[cfg(test)]
162131
mod tests {
163132
use super::*;
133+
use crate::utils::payload::TaskResponsePayload;
164134
use crate::{utils::crypto::sha256hash, DriaComputeNodeConfig};
165135
use ecies::decrypt;
166136
use libsecp256k1::SecretKey;
@@ -252,7 +222,7 @@ mod tests {
252222
let task_public_key = PublicKey::from_secret_key(&task_secret_key);
253223

254224
// create payload
255-
let payload = DKNMessage::new_signed_encrypted_payload(
225+
let payload = TaskResponsePayload::new(
256226
RESULT,
257227
TASK_ID,
258228
&task_public_key.serialize(),

src/utils/payload.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
use super::crypto::sha256hash;
2+
use crate::utils::{filter::FilterPayload, get_current_time_nanos};
13
use eyre::Result;
24
use fastbloom_rs::BloomFilter;
5+
use libsecp256k1::SecretKey;
36
use serde::{Deserialize, Serialize};
47
use uuid::Uuid;
58

6-
use crate::utils::{filter::FilterPayload, get_current_time_nanos};
7-
89
/// A computation task is the task of computing a result from a given input. The result is encrypted with the public key of the requester.
910
/// Plain result is signed by the compute node's private key, and a commitment is computed from the signature and plain result.
1011
///
@@ -15,16 +16,48 @@ use crate::utils::{filter::FilterPayload, get_current_time_nanos};
1516
pub struct TaskResponsePayload {
1617
/// The unique identifier of the task.
1718
pub task_id: String,
18-
/// A signature on the digest of plaintext result, prepended with task id.
19+
/// Signature of the payload with task id, hexadecimally encoded.
1920
pub signature: String,
20-
/// Computation result encrypted with the public key of the task.
21+
/// Result encrypted with the public key of the task, Hexadecimally encoded.
2122
pub ciphertext: String,
2223
}
2324

2425
impl TaskResponsePayload {
2526
pub fn to_string(&self) -> Result<String> {
2627
serde_json::to_string(&serde_json::json!(self)).map_err(Into::into)
2728
}
29+
30+
/// Creates the payload of a computation result.
31+
///
32+
/// - Sign `task_id || payload` with node `self.secret_key`
33+
/// - Encrypt `result` with `task_public_key`
34+
pub fn new(
35+
payload: impl AsRef<[u8]>,
36+
task_id: &str,
37+
encrypting_public_key: &[u8],
38+
signing_secret_key: &SecretKey,
39+
) -> Result<Self> {
40+
// create the message `task_id || payload`
41+
let mut preimage = Vec::new();
42+
preimage.extend_from_slice(task_id.as_ref());
43+
preimage.extend_from_slice(payload.as_ref());
44+
45+
// sign the message
46+
// TODO: use `sign_recoverable` here instead?
47+
let digest = libsecp256k1::Message::parse(&sha256hash(preimage));
48+
let (signature, recid) = libsecp256k1::sign(&digest, signing_secret_key);
49+
let signature: [u8; 64] = signature.serialize();
50+
let recid: [u8; 1] = [recid.serialize()];
51+
52+
// encrypt payload itself
53+
let ciphertext = ecies::encrypt(encrypting_public_key, payload.as_ref())?;
54+
55+
Ok(TaskResponsePayload {
56+
ciphertext: hex::encode(ciphertext),
57+
signature: format!("{}{}", hex::encode(signature), hex::encode(recid)),
58+
task_id: task_id.to_string(),
59+
})
60+
}
2861
}
2962

3063
/// A generic task request, given by Dria.
@@ -55,11 +88,3 @@ impl<T> TaskRequestPayload<T> {
5588
}
5689
}
5790
}
58-
59-
/// A parsed `TaskRequestPayload`.
60-
#[derive(Debug, Clone)]
61-
pub struct TaskRequest<T> {
62-
pub task_id: String,
63-
pub(crate) input: T,
64-
pub(crate) public_key: Vec<u8>,
65-
}

0 commit comments

Comments
 (0)