Skip to content

Commit f1067e9

Browse files
committed
utils as package, code tidyups and refactors
1 parent 5a3600d commit f1067e9

31 files changed

+421
-375
lines changed

Cargo.lock

Lines changed: 79 additions & 71 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[workspace]
22
resolver = "2"
3-
members = ["compute", "p2p", "workflows"]
3+
members = ["compute", "p2p", "workflows", "utils"]
44
# compute node is the default member, until Oracle comes in
55
# then, a Launcher will be the default member
66
default-members = ["compute"]

compute/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ fastbloom-rs = "0.5.9"
4444

4545
# dria subcrates
4646
dkn-p2p = { path = "../p2p" }
47+
dkn-utils = { path = "../utils" }
4748
dkn-workflows = { path = "../workflows" }
4849

4950
# vendor OpenSSL so that its easier to build cross-platform packages

compute/src/config.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,13 @@ use crate::utils::{
22
address_in_use,
33
crypto::{secret_to_keypair, to_address},
44
};
5-
use dkn_p2p::libp2p::Multiaddr;
5+
use dkn_p2p::{libp2p::Multiaddr, DriaNetworkType};
66
use dkn_workflows::DriaWorkflowsConfig;
77
use eyre::{eyre, Result};
88
use libsecp256k1::{PublicKey, SecretKey};
99

1010
use std::{env, str::FromStr};
1111

12-
/// Network type.
13-
#[derive(Default, Debug, Clone, Copy)]
14-
pub enum DriaNetworkType {
15-
#[default]
16-
Community,
17-
Pro,
18-
}
19-
20-
impl From<&str> for DriaNetworkType {
21-
fn from(s: &str) -> Self {
22-
match s {
23-
"community" => DriaNetworkType::Community,
24-
"pro" => DriaNetworkType::Pro,
25-
_ => Default::default(),
26-
}
27-
}
28-
}
29-
30-
impl DriaNetworkType {
31-
pub fn protocol_name(&self) -> &str {
32-
match self {
33-
DriaNetworkType::Community => "dria",
34-
DriaNetworkType::Pro => "dria-sdk",
35-
}
36-
}
37-
}
38-
3912
#[derive(Debug, Clone)]
4013
pub struct DriaComputeNodeConfig {
4114
/// Wallet secret/private key.

compute/src/handlers/pingpong.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
use crate::{
2-
utils::{get_current_time_nanos, DKNMessage},
3-
DriaComputeNode,
4-
};
1+
use crate::{utils::DriaMessage, DriaComputeNode};
52
use dkn_p2p::libp2p::gossipsub::MessageAcceptance;
3+
use dkn_utils::get_current_time_nanos;
64
use dkn_workflows::{Model, ModelProvider};
75
use eyre::{Context, Result};
86
use serde::{Deserialize, Serialize};
@@ -42,7 +40,7 @@ impl PingpongHandler {
4240
/// 7. Returns `MessageAcceptance::Accept` so that ping is propagated to others as well.
4341
pub(crate) async fn handle_ping(
4442
node: &mut DriaComputeNode,
45-
ping_message: &DKNMessage,
43+
ping_message: &DriaMessage,
4644
) -> Result<MessageAcceptance> {
4745
let pingpong = ping_message
4846
.parse_payload::<PingpongPayload>(true)
@@ -70,7 +68,7 @@ impl PingpongHandler {
7068
};
7169

7270
// publish message
73-
let message = DKNMessage::new_signed(
71+
let message = DriaMessage::new_signed(
7472
serde_json::json!(response_body).to_string(),
7573
Self::RESPONSE_TOPIC,
7674
&node.config.secret_key,

compute/src/handlers/workflow.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use dkn_p2p::libp2p::gossipsub::MessageAcceptance;
2+
use dkn_utils::get_current_time_nanos;
23
use dkn_workflows::{Entry, Executor, ModelProvider, Workflow};
34
use eyre::{Context, Result};
45
use libsecp256k1::PublicKey;
56
use serde::Deserialize;
67
use tokio_util::either::Either;
78

89
use crate::payloads::{TaskErrorPayload, TaskRequestPayload, TaskResponsePayload, TaskStats};
9-
use crate::utils::{get_current_time_nanos, DKNMessage};
10+
use crate::utils::DriaMessage;
1011
use crate::workers::workflow::*;
1112
use crate::DriaComputeNode;
1213

@@ -31,7 +32,7 @@ impl WorkflowHandler {
3132

3233
pub(crate) async fn handle_compute(
3334
node: &mut DriaComputeNode,
34-
compute_message: &DKNMessage,
35+
compute_message: &DriaMessage,
3536
) -> Result<Either<MessageAcceptance, WorkflowsWorkerInput>> {
3637
let stats = TaskStats::new().record_received_at();
3738
let task = compute_message
@@ -133,7 +134,7 @@ impl WorkflowHandler {
133134
task.task_id,
134135
payload_str
135136
);
136-
DKNMessage::new(payload_str, Self::RESPONSE_TOPIC)
137+
DriaMessage::new(payload_str, Self::RESPONSE_TOPIC)
137138
}
138139
Err(err) => {
139140
// use pretty display string for error logging with causes
@@ -150,7 +151,7 @@ impl WorkflowHandler {
150151
let error_payload_str = serde_json::json!(error_payload).to_string();
151152

152153
// prepare signed message
153-
DKNMessage::new_signed(
154+
DriaMessage::new_signed(
154155
error_payload_str,
155156
Self::RESPONSE_TOPIC,
156157
&node.config.secret_key,
@@ -167,7 +168,7 @@ impl WorkflowHandler {
167168
"taskId": task.task_id,
168169
"error": err_msg,
169170
});
170-
let message = DKNMessage::new_signed(
171+
let message = DriaMessage::new_signed(
171172
payload.to_string(),
172173
Self::RESPONSE_TOPIC,
173174
&node.config.secret_key,

compute/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ pub(crate) mod workers;
99
/// This value is attached within the published messages.
1010
pub const DRIA_COMPUTE_NODE_VERSION: &str = env!("CARGO_PKG_VERSION");
1111

12-
pub use config::{DriaComputeNodeConfig, DriaNetworkType};
12+
pub use config::DriaComputeNodeConfig;
1313
pub use node::DriaComputeNode;

compute/src/node.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use dkn_p2p::{
33
gossipsub::{Message, MessageAcceptance, MessageId},
44
PeerId,
55
},
6-
DriaP2PClient, DriaP2PCommander, DriaP2PProtocol,
6+
DriaNodes, DriaP2PClient, DriaP2PCommander, DriaP2PProtocol,
77
};
88
use eyre::Result;
99
use std::collections::HashSet;
@@ -13,7 +13,7 @@ use tokio_util::{either::Either, sync::CancellationToken};
1313
use crate::{
1414
config::*,
1515
handlers::*,
16-
utils::{crypto::secret_to_keypair, AvailableNodes, DKNMessage},
16+
utils::{crypto::secret_to_keypair, refresh_dria_nodes, DriaMessage},
1717
workers::workflow::{WorkflowsWorker, WorkflowsWorkerInput, WorkflowsWorkerOutput},
1818
};
1919

@@ -26,7 +26,8 @@ const PUBLISH_CHANNEL_BUFSIZE: usize = 1024;
2626

2727
pub struct DriaComputeNode {
2828
pub config: DriaComputeNodeConfig,
29-
pub available_nodes: AvailableNodes,
29+
/// Pre-defined nodes that belong to Dria, e.g. bootstraps, relays and RPCs.
30+
pub dria_nodes: DriaNodes,
3031
/// Peer-to-peer client commander to interact with the network.
3132
pub p2p: DriaP2PCommander,
3233
/// Gossipsub message receiver, used by peer-to-peer client in a separate thread.
@@ -63,10 +64,10 @@ impl DriaComputeNode {
6364
let keypair = secret_to_keypair(&config.secret_key);
6465

6566
// get available nodes (bootstrap, relay, rpc) for p2p
66-
let mut available_nodes = AvailableNodes::new(config.network_type);
67-
available_nodes.populate_with_statics();
68-
available_nodes.populate_with_env();
69-
if let Err(e) = available_nodes.populate_with_api().await {
67+
let mut available_nodes = DriaNodes::new(config.network_type)
68+
.with_statics()
69+
.with_envs();
70+
if let Err(e) = refresh_dria_nodes(&mut available_nodes).await {
7071
log::error!("Error populating available nodes: {:?}", e);
7172
};
7273

@@ -81,7 +82,7 @@ impl DriaComputeNode {
8182
config.p2p_listen_addr.clone(),
8283
available_nodes.bootstrap_nodes.clone().into_iter(),
8384
available_nodes.relay_nodes.clone().into_iter(),
84-
available_nodes.rpc_addrs.clone().into_iter(),
85+
available_nodes.rpc_nodes.clone().into_iter(),
8586
protocol,
8687
)?;
8788

@@ -110,7 +111,7 @@ impl DriaComputeNode {
110111
DriaComputeNode {
111112
config,
112113
p2p: p2p_commander,
113-
available_nodes,
114+
dria_nodes: available_nodes,
114115
message_rx,
115116
publish_rx,
116117
workflow_batch_tx,
@@ -160,7 +161,7 @@ impl DriaComputeNode {
160161
///
161162
/// Internally, identity is attached to the the message which is then JSON serialized to bytes
162163
/// and then published to the network as is.
163-
pub async fn publish(&mut self, mut message: DKNMessage) -> Result<()> {
164+
pub async fn publish(&mut self, mut message: DriaMessage) -> Result<()> {
164165
// attach protocol name to the message
165166
message = message.with_identity(self.p2p.protocol().name.clone());
166167

@@ -203,17 +204,17 @@ impl DriaComputeNode {
203204
);
204205

205206
// ensure that message is from the known RPCs
206-
if !self.available_nodes.rpc_nodes.contains(&source_peer_id) {
207+
if !self.dria_nodes.rpc_peerids.contains(&source_peer_id) {
207208
log::warn!(
208209
"Received message from unauthorized source: {}",
209210
source_peer_id
210211
);
211-
log::debug!("Allowed sources: {:#?}", self.available_nodes.rpc_nodes);
212+
log::debug!("Allowed sources: {:#?}", self.dria_nodes.rpc_peerids);
212213
return MessageAcceptance::Ignore;
213214
}
214215

215216
// parse the raw gossipsub message to a prepared DKN message
216-
let message = match DKNMessage::try_from_gossipsub_message(
217+
let message = match DriaMessage::try_from_gossipsub_message(
217218
&message,
218219
&self.config.admin_public_key,
219220
) {
@@ -414,20 +415,22 @@ impl DriaComputeNode {
414415
/// Updates the local list of available nodes by refreshing it.
415416
/// Dials the RPC nodes again for better connectivity.
416417
async fn handle_available_nodes_refresh(&mut self) {
417-
log::info!("Refreshing available nodes.");
418+
log::info!("Refreshing available Dria nodes.");
418419

419420
// refresh available nodes
420-
if let Err(e) = self.available_nodes.populate_with_api().await {
421+
if let Err(e) = refresh_dria_nodes(&mut self.dria_nodes).await {
421422
log::error!("Error refreshing available nodes: {:?}", e);
422423
};
423424

424425
// dial all rpc nodes
425-
for rpc_addr in self.available_nodes.rpc_addrs.iter() {
426-
log::debug!("Dialling RPC node: {}", rpc_addr);
426+
for rpc_addr in self.dria_nodes.rpc_nodes.iter() {
427+
log::info!("Dialling RPC node: {}", rpc_addr);
427428
if let Err(e) = self.p2p.dial(rpc_addr.clone()).await {
428429
log::warn!("Error dialling RPC node: {:?}", e);
429430
};
430431
}
432+
433+
log::info!("Finished refreshing!");
431434
}
432435
}
433436

@@ -462,7 +465,7 @@ mod tests {
462465

463466
// publish a dummy message
464467
let topic = "foo";
465-
let message = DKNMessage::new("hello from the other side", topic);
468+
let message = DriaMessage::new("hello from the other side", topic);
466469
node.subscribe(topic).await.expect("should subscribe");
467470
node.publish(message).await.expect("should publish");
468471
node.unsubscribe(topic).await.expect("should unsubscribe");

compute/src/payloads/stats.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1+
use dkn_utils::get_current_time_nanos;
12
use serde::{Deserialize, Serialize};
23
use std::time::Instant;
34

4-
use crate::utils::get_current_time_nanos;
5-
65
/// Task stats for diagnostics.
76
/// Returning this as the payload helps to debug the errors received at client side, and latencies.
87
#[derive(Default, Debug, Clone, Serialize, Deserialize)]

0 commit comments

Comments
 (0)