Skip to content

Commit 3a2e249

Browse files
committed
now entire message is published, not just payload
1 parent d3ca0b8 commit 3a2e249

File tree

9 files changed

+45
-43
lines changed

9 files changed

+45
-43
lines changed

src/handlers/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ pub use pingpong::PingpongHandler;
88
mod workflow;
99
pub use workflow::WorkflowHandler;
1010

11-
use crate::{utils::P2PMessage, DriaComputeNode};
11+
use crate::{utils::DKNMessage, DriaComputeNode};
1212

1313
#[async_trait]
1414
pub trait ComputeHandler {
1515
async fn handle_compute(
1616
node: &mut DriaComputeNode,
17-
message: P2PMessage,
17+
message: DKNMessage,
1818
result_topic: &str,
1919
) -> Result<MessageAcceptance>;
2020
}

src/handlers/pingpong.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
node::DriaComputeNode,
3-
utils::{get_current_time_nanos, P2PMessage},
3+
utils::{get_current_time_nanos, DKNMessage},
44
};
55
use async_trait::async_trait;
66
use eyre::Result;
@@ -29,7 +29,7 @@ struct PingpongResponse {
2929
impl ComputeHandler for PingpongHandler {
3030
async fn handle_compute(
3131
node: &mut DriaComputeNode,
32-
message: P2PMessage,
32+
message: DKNMessage,
3333
result_topic: &str,
3434
) -> Result<MessageAcceptance> {
3535
let pingpong = message.parse_payload::<PingpongPayload>(true)?;
@@ -54,12 +54,13 @@ impl ComputeHandler for PingpongHandler {
5454
models: node.config.model_config.models.clone(),
5555
timestamp: get_current_time_nanos(),
5656
};
57-
let response = P2PMessage::new_signed(
57+
58+
let message = DKNMessage::new_signed(
5859
serde_json::json!(response_body).to_string(),
5960
result_topic,
6061
&node.config.secret_key,
6162
);
62-
node.publish(response)?;
63+
node.publish(message)?;
6364

6465
// accept message, someone else may be included in the filter
6566
Ok(MessageAcceptance::Accept)
@@ -72,7 +73,7 @@ mod tests {
7273
utils::{
7374
crypto::{sha256hash, to_address},
7475
filter::FilterPayload,
75-
P2PMessage,
76+
DKNMessage,
7677
},
7778
DriaComputeNodeConfig,
7879
};
@@ -87,7 +88,7 @@ mod tests {
8788
"0208ef5e65a9c656a6f92fb2c770d5d5e2ecffe02a6aade19207f75110be6ae658"
8889
))
8990
.expect("Should parse public key");
90-
let message = P2PMessage {
91+
let message = DKNMessage {
9192
payload: "Y2RmODcyNDlhY2U3YzQ2MDIzYzNkMzBhOTc4ZWY3NjViMWVhZDlmNWJhMDUyY2MxMmY0NzIzMjQyYjc0YmYyODFjMDA1MTdmMGYzM2VkNTgzMzk1YWUzMTY1ODQ3NWQyNDRlODAxYzAxZDE5MjYwMDM1MTRkNzEwMThmYTJkNjEwMXsidXVpZCI6ICI4MWE2M2EzNC05NmM2LTRlNWEtOTliNS02YjI3NGQ5ZGUxNzUiLCAiZGVhZGxpbmUiOiAxNzE0MTI4NzkyfQ==".to_string(),
9293
topic: "heartbeat".to_string(),
9394
version: "0.0.0".to_string(),

src/handlers/workflow.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::Deserialize;
66

77
use crate::node::DriaComputeNode;
88
use crate::utils::payload::{TaskRequest, TaskRequestPayload};
9-
use crate::utils::{get_current_time_nanos, P2PMessage};
9+
use crate::utils::{get_current_time_nanos, DKNMessage};
1010

1111
use super::ComputeHandler;
1212

@@ -29,7 +29,7 @@ struct WorkflowPayload {
2929
impl ComputeHandler for WorkflowHandler {
3030
async fn handle_compute(
3131
node: &mut DriaComputeNode,
32-
message: P2PMessage,
32+
message: DKNMessage,
3333
result_topic: &str,
3434
) -> Result<MessageAcceptance> {
3535
let task = message.parse_payload::<TaskRequestPayload<WorkflowPayload>>(true)?;
@@ -111,16 +111,17 @@ impl ComputeHandler for WorkflowHandler {
111111
}
112112
}
113113

114-
// publish the result
115-
let payload = P2PMessage::new_signed_encrypted_payload(
114+
// prepare signed and encrypted payload
115+
let payload = DKNMessage::new_signed_encrypted_payload(
116116
result,
117117
&task.task_id,
118118
&task.public_key,
119119
&node.config.secret_key,
120120
)?;
121121
let payload_str = payload.to_string()?;
122-
let message = P2PMessage::new(payload_str, result_topic);
122+
let message = DKNMessage::new(payload_str, result_topic);
123123

124+
// publish the result
124125
node.publish(message)?;
125126

126127
Ok(MessageAcceptance::Accept)

src/node.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
config::DriaComputeNodeConfig,
88
handlers::{ComputeHandler, PingpongHandler, WorkflowHandler},
99
p2p::P2PClient,
10-
utils::{crypto::secret_to_keypair, AvailableNodes, P2PMessage},
10+
utils::{crypto::secret_to_keypair, AvailableNodes, DKNMessage},
1111
};
1212

1313
/// Number of seconds between refreshing the Admin RPC PeerIDs from Dria server.
@@ -56,8 +56,8 @@ impl DriaComputeNode {
5656
let p2p = P2PClient::new(keypair, listen_addr, &available_nodes)?;
5757

5858
Ok(DriaComputeNode {
59-
config,
6059
p2p,
60+
config,
6161
cancellation,
6262
available_nodes,
6363
available_nodes_last_refreshed: tokio::time::Instant::now(),
@@ -86,16 +86,18 @@ impl DriaComputeNode {
8686
Ok(())
8787
}
8888

89-
/// Publishes a given message to the network.
90-
/// The topic is expected to be provided within the message struct.
91-
pub fn publish(&mut self, message: P2PMessage) -> Result<()> {
92-
let message_bytes = message.payload.as_bytes().to_vec();
89+
/// Publishes a given message to the network w.r.t the topic of it.
90+
///
91+
/// Internally, the message is JSON serialized to bytes and then published to the network as is.
92+
pub fn publish(&mut self, message: DKNMessage) -> Result<()> {
93+
let message_bytes = serde_json::to_vec(&message)?;
9394
let message_id = self.p2p.publish(&message.topic, message_bytes)?;
9495
log::info!("Published message ({}) to {}", message_id, message.topic);
9596
Ok(())
9697
}
9798

9899
/// Returns the list of connected peers.
100+
#[inline(always)]
99101
pub fn peers(&self) -> Vec<(&libp2p_identity::PeerId, Vec<&gossipsub::TopicHash>)> {
100102
self.p2p.peers()
101103
}
@@ -223,13 +225,14 @@ impl DriaComputeNode {
223225
/// This prepared message includes the topic, payload, version and timestamp.
224226
///
225227
/// This also checks the signature of the message, expecting a valid signature from admin node.
228+
// TODO: move this somewhere?
226229
pub fn parse_message_to_prepared_message(
227230
&self,
228231
message: gossipsub::Message,
229-
) -> Result<P2PMessage> {
232+
) -> Result<DKNMessage> {
230233
// the received message is expected to use IdentHash for the topic, so we can see the name of the topic immediately.
231234
log::debug!("Parsing {} message.", message.topic.as_str());
232-
let message = P2PMessage::try_from(message)?;
235+
let message = DKNMessage::try_from(message)?;
233236
log::debug!("Parsed: {}", message);
234237

235238
// check dria signature
@@ -270,7 +273,7 @@ mod tests {
270273

271274
// publish a dummy message
272275
let topic = "foo";
273-
let message = P2PMessage::new("hello from the other side", topic);
276+
let message = DKNMessage::new("hello from the other side", topic);
274277
node.subscribe(topic).expect("should subscribe");
275278
node.publish(message).expect("should publish");
276279
node.unsubscribe(topic).expect("should unsubscribe");

src/utils/crypto.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ mod tests {
6060
use hex::decode;
6161
use libsecp256k1::{recover, sign, verify, Message, PublicKey, SecretKey};
6262

63-
const DUMMY_KEY: &[u8; 32] = b"driadriadriadriadriadriadriadria";
64-
const MESSAGE: &[u8] = "hello world".as_bytes();
63+
const DUMMY_SECRET_KEY: &[u8; 32] = b"driadriadriadriadriadriadriadria";
64+
const MESSAGE: &[u8] = b"hello world";
6565

6666
#[test]
6767
fn test_hash() {
@@ -73,7 +73,7 @@ mod tests {
7373

7474
#[test]
7575
fn test_address() {
76-
let sk = SecretKey::parse_slice(DUMMY_KEY).expect("Should parse key.");
76+
let sk = SecretKey::parse_slice(DUMMY_SECRET_KEY).expect("Should parse key.");
7777
let pk = PublicKey::from_secret_key(&sk);
7878
let addr = to_address(&pk);
7979
assert_eq!(
@@ -84,7 +84,7 @@ mod tests {
8484

8585
#[test]
8686
fn test_encrypt_decrypt() {
87-
let sk = SecretKey::parse_slice(DUMMY_KEY).expect("Should parse private key slice.");
87+
let sk = SecretKey::parse_slice(DUMMY_SECRET_KEY).expect("Should parse private key slice.");
8888
let pk = PublicKey::from_secret_key(&sk);
8989
let (sk, pk) = (&sk.serialize(), &pk.serialize());
9090

@@ -96,7 +96,7 @@ mod tests {
9696
#[test]
9797
fn test_sign_verify() {
9898
let secret_key =
99-
SecretKey::parse_slice(DUMMY_KEY).expect("Should parse private key slice.");
99+
SecretKey::parse_slice(DUMMY_SECRET_KEY).expect("Should parse private key slice.");
100100

101101
// sign the message using the secret key
102102
let digest = sha256hash(MESSAGE);
@@ -121,7 +121,7 @@ mod tests {
121121
#[ignore = "run only with profiler if wanted"]
122122
fn test_memory_usage() {
123123
let secret_key =
124-
SecretKey::parse_slice(DUMMY_KEY).expect("Should parse private key slice.");
124+
SecretKey::parse_slice(DUMMY_SECRET_KEY).expect("Should parse private key slice.");
125125
let public_key = PublicKey::from_secret_key(&secret_key);
126126

127127
// sign the message using the secret key

src/utils/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use fastbloom_rs::{BloomFilter, Hashes, Membership};
33
use serde::{Deserialize, Serialize};
44
use serde_json::{json, to_string};
55

6-
/// A task filter is used to determine if a node is selected.
6+
/// A task Blfilter is used to determine if a node is selected.
77
///
88
/// The filter is a Bloom Filter with a set of items and a false positive rate, it is serialized as a hex string.
99
#[derive(Serialize, Deserialize, Debug, Clone)]

src/utils/message.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
1919
/// - version is given within the Identify protocol
2020
/// - timestamp is available at protocol level via DataTransform
2121
#[derive(Serialize, Deserialize, Debug, Clone)]
22-
pub struct P2PMessage {
22+
pub struct DKNMessage {
2323
pub(crate) payload: String,
2424
pub(crate) topic: String,
2525
pub(crate) version: String,
@@ -33,7 +33,7 @@ pub struct P2PMessage {
3333
/// and therefore use 128 characters: SIGNATURE_SIZE - 2.
3434
const SIGNATURE_SIZE_HEX: usize = 130;
3535

36-
impl P2PMessage {
36+
impl DKNMessage {
3737
/// Creates a new message with current timestamp and version equal to the crate version.
3838
///
3939
/// - `payload` is gives as bytes. It is to be `base64` encoded internally.
@@ -91,7 +91,6 @@ impl P2PMessage {
9191
ciphertext: hex::encode(ciphertext),
9292
signature: format!("{}{}", hex::encode(signature), hex::encode(recid)),
9393
task_id: task_id.to_string(),
94-
timestamp: get_current_time_nanos(),
9594
})
9695
}
9796

@@ -136,7 +135,7 @@ impl P2PMessage {
136135
}
137136
}
138137

139-
impl fmt::Display for P2PMessage {
138+
impl fmt::Display for DKNMessage {
140139
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141140
let payload_decoded = self
142141
.decode_payload()
@@ -151,7 +150,7 @@ impl fmt::Display for P2PMessage {
151150
}
152151
}
153152

154-
impl TryFrom<libp2p::gossipsub::Message> for P2PMessage {
153+
impl TryFrom<libp2p::gossipsub::Message> for DKNMessage {
155154
type Error = serde_json::Error;
156155

157156
fn try_from(value: libp2p::gossipsub::Message) -> Result<Self, Self::Error> {
@@ -186,7 +185,7 @@ mod tests {
186185

187186
#[test]
188187
fn test_display_message() {
189-
let message = P2PMessage::new(b"hello world", "test-topic");
188+
let message = DKNMessage::new(b"hello world", "test-topic");
190189
println!("{}", message);
191190
}
192191

@@ -195,7 +194,7 @@ mod tests {
195194
// create payload & message
196195
let body = TestStruct::default();
197196
let payload = serde_json::to_vec(&json!(body)).expect("Should serialize");
198-
let message = P2PMessage::new(payload, TOPIC);
197+
let message = DKNMessage::new(payload, TOPIC);
199198

200199
// decode message
201200
let message_body = message.decode_payload().expect("Should decode");
@@ -221,7 +220,7 @@ mod tests {
221220
// create payload & message with signature & body
222221
let body = TestStruct::default();
223222
let body_str = serde_json::to_string(&body).unwrap();
224-
let message = P2PMessage::new_signed(body_str, TOPIC, &sk);
223+
let message = DKNMessage::new_signed(body_str, TOPIC, &sk);
225224

226225
// decode message
227226
let message_body = message.decode_payload().expect("Should decode");
@@ -253,7 +252,7 @@ mod tests {
253252
let task_public_key = PublicKey::from_secret_key(&task_secret_key);
254253

255254
// create payload
256-
let payload = P2PMessage::new_signed_encrypted_payload(
255+
let payload = DKNMessage::new_signed_encrypted_payload(
257256
RESULT,
258257
TASK_ID,
259258
&task_public_key.serialize(),

src/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub mod filter;
33
pub mod payload;
44

55
mod message;
6-
pub use message::P2PMessage;
6+
pub use message::DKNMessage;
77

88
mod available_nodes;
99
pub use available_nodes::AvailableNodes;

src/utils/payload.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@ use crate::utils::{filter::FilterPayload, get_current_time_nanos};
1313
#[derive(Debug, Clone, Serialize, Deserialize)]
1414
#[serde(rename_all = "camelCase")]
1515
pub struct TaskResponsePayload {
16+
/// The unique identifier of the task.
17+
pub task_id: String,
1618
/// A signature on the digest of plaintext result, prepended with task id.
1719
pub signature: String,
1820
/// Computation result encrypted with the public key of the task.
1921
pub ciphertext: String,
20-
/// The unique identifier of the task.
21-
pub task_id: String,
22-
/// Timestamp of the response.
23-
pub timestamp: u128,
2422
}
2523

2624
impl TaskResponsePayload {

0 commit comments

Comments
 (0)