Skip to content

Commit 0fbdb6e

Browse files
committed
lints, small rfks
1 parent ff870d3 commit 0fbdb6e

File tree

7 files changed

+27
-31
lines changed

7 files changed

+27
-31
lines changed

compute/src/node/core.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ impl DriaComputeNode {
5959
_ = heartbeat_interval.tick() => {
6060
if let Err(e) = self.send_heartbeat().await {
6161
log::error!("Error making heartbeat: {:?}", e);
62-
}
63-
},
62+
}
63+
},
6464

6565
// check if the cancellation token is cancelled
6666
// this is expected to be cancelled by the main thread with signal handling

compute/src/node/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use dkn_p2p::{
2-
libp2p::{request_response, PeerId},
3-
DriaP2PClient, DriaP2PCommander, DriaP2PProtocol,
2+
libp2p::PeerId, DriaP2PClient, DriaP2PCommander, DriaP2PProtocol, DriaReqResMessage,
43
};
54
use eyre::Result;
65
use std::collections::HashMap;
@@ -34,7 +33,7 @@ pub struct DriaComputeNode {
3433
/// This is used to track the heartbeats, and their acknowledgements.
3534
pub(crate) heartbeats: HashMap<uuid::Uuid, chrono::DateTime<chrono::Utc>>,
3635
/// Request-response message receiver, can have both a request or a response.
37-
reqres_rx: mpsc::Receiver<(PeerId, request_response::Message<Vec<u8>, Vec<u8>>)>,
36+
reqres_rx: mpsc::Receiver<(PeerId, DriaReqResMessage)>,
3837
/// Task response receiver, will respond to the request-response channel with the given result.
3938
task_output_rx: mpsc::Receiver<TaskWorkerOutput>,
4039
/// Task worker transmitter to send batchable tasks.

compute/src/node/reqres.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use colored::Colorize;
22
use dkn_p2p::libp2p::{
3-
request_response::{self, OutboundRequestId, ResponseChannel},
3+
request_response::{OutboundRequestId, ResponseChannel},
44
PeerId,
55
};
6+
use dkn_p2p::DriaReqResMessage;
67
use eyre::{eyre, Result};
78

89
use crate::{reqres::*, workers::task::TaskWorkerOutput};
@@ -16,13 +17,9 @@ impl DriaComputeNode {
1617
/// - Response is forwarded to [`handle_response`](DriaComputeNode::handle_response) method.
1718
///
1819
/// Does not return an error, but simply logs it to [`log::error`].
19-
pub(crate) async fn handle_reqres(
20-
&mut self,
21-
peer_id: PeerId,
22-
message: request_response::Message<Vec<u8>, Vec<u8>>,
23-
) {
20+
pub(crate) async fn handle_reqres(&mut self, peer_id: PeerId, message: DriaReqResMessage) {
2421
match message {
25-
request_response::Message::Request {
22+
DriaReqResMessage::Request {
2623
request,
2724
request_id,
2825
channel,
@@ -38,7 +35,7 @@ impl DriaComputeNode {
3835
}
3936
}
4037

41-
request_response::Message::Response {
38+
DriaReqResMessage::Response {
4239
response,
4340
request_id,
4441
} => {

compute/src/payloads/response.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ use serde::{Deserialize, Serialize};
55
use super::TaskStats;
66

77
/// A computation task is the task of computing a result from a given input. The result is encrypted with the public key of the requester.
8-
/// Plain result is signed by the compute node's private key, and a commitment is computed from the signature and plain result.
9-
///
10-
/// To check the commitment, one must decrypt the ciphertext and parse plaintext from it,
11-
/// and compute the digest using SHA256. That digest will then be used for the signature check.
128
#[derive(Debug, Clone, Serialize, Deserialize)]
139
#[serde(rename_all = "camelCase")]
1410
pub struct TaskResponsePayload {
@@ -23,10 +19,7 @@ pub struct TaskResponsePayload {
2319
}
2420

2521
impl TaskResponsePayload {
26-
/// Creates the payload of a computation result.
27-
///
28-
/// - Sign `task_id || payload` with node `self.secret_key`
29-
/// - Encrypt `result` with `task_public_key`
22+
/// Creates the payload of a computation with its (encrypted) result.
3023
pub fn new(
3124
result: impl AsRef<[u8]>,
3225
task_id: impl ToString,

compute/src/payloads/stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use serde::{Deserialize, Serialize};
22

33
/// Task stats for diagnostics.
4+
///
45
/// Returning this as the payload helps to debug the errors received at client side, and latencies.
56
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
67
#[serde(rename_all = "camelCase")]

p2p/src/client.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,18 @@ use crate::DriaP2PProtocol;
1616
use super::commands::DriaP2PCommand;
1717
use super::DriaP2PCommander;
1818

19+
/// Number of seconds before an idle connection is closed.
20+
const IDLE_CONNECTION_TIMEOUT_SECS: u64 = 240;
21+
/// Buffer size for command channel.
22+
const COMMAND_CHANNEL_BUFSIZE: usize = 1024;
23+
/// Buffer size for events channel.
24+
const MSG_CHANNEL_BUFSIZE: usize = 1024;
25+
26+
/// Request-response message type for Dria protocol, accepts bytes as both request and response.
27+
///
28+
/// The additional parsing must be done by the application itself (for now).
29+
pub type DriaReqResMessage = request_response::Message<Vec<u8>, Vec<u8>>;
30+
1931
/// Peer-to-peer client for Dria Knowledge Network.
2032
pub struct DriaP2PClient {
2133
pub peer_id: PeerId,
@@ -24,25 +36,19 @@ pub struct DriaP2PClient {
2436
/// Dria protocol, used for identifying the client.
2537
protocol: DriaP2PProtocol,
2638
/// Request-response protocol messages.
27-
reqres_tx: mpsc::Sender<(PeerId, request_response::Message<Vec<u8>, Vec<u8>>)>,
39+
reqres_tx: mpsc::Sender<(PeerId, DriaReqResMessage)>,
2840
/// Command receiver.
2941
cmd_rx: mpsc::Receiver<DriaP2PCommand>,
3042
}
3143

32-
/// Number of seconds before an idle connection is closed.
33-
const IDLE_CONNECTION_TIMEOUT_SECS: u64 = 240;
34-
/// Buffer size for command channel.
35-
const COMMAND_CHANNEL_BUFSIZE: usize = 1024;
36-
/// Buffer size for events channel.
37-
const MSG_CHANNEL_BUFSIZE: usize = 1024;
38-
3944
impl DriaP2PClient {
4045
/// Creates a new P2P client with the given keypair and listen address.
4146
///
4247
/// The `version` is used to create the protocol strings for the client, and its very important that
4348
/// they match with the clients existing within the network.
4449
///
4550
/// If for any reason the given `listen_addr` is not available, it will try to listen on a random port on `localhost`.
51+
#[allow(clippy::type_complexity)]
4652
pub fn new(
4753
keypair: Keypair,
4854
listen_addr: Multiaddr,
@@ -51,7 +57,7 @@ impl DriaP2PClient {
5157
) -> Result<(
5258
DriaP2PClient,
5359
DriaP2PCommander,
54-
mpsc::Receiver<(PeerId, request_response::Message<Vec<u8>, Vec<u8>>)>,
60+
mpsc::Receiver<(PeerId, DriaReqResMessage)>,
5561
)> {
5662
let peer_id = keypair.public().to_peer_id();
5763

p2p/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod behaviour;
22

33
mod client;
4-
pub use client::DriaP2PClient;
4+
pub use client::{DriaP2PClient, DriaReqResMessage};
55

66
mod commands;
77
pub use commands::{DriaP2PCommand, DriaP2PCommander};

0 commit comments

Comments
 (0)