Skip to content

Commit 702eb28

Browse files
authored
Merge pull request #148 from firstbatchxyz/erhant/node-refresh-rfks-peer-fixes
feat: multi-network & peer disconnections
2 parents ee8790f + 5a6feb5 commit 702eb28

File tree

26 files changed

+1316
-752
lines changed

26 files changed

+1316
-752
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.2.22"
10+
version = "0.2.23"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

compute/src/config.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,33 @@ use libsecp256k1::{PublicKey, SecretKey};
99

1010
use std::{env, str::FromStr};
1111

12+
/// Network type.
13+
#[derive(Default, Debug, Clone, Copy)]
14+
pub enum DriaNetworkType {
15+
#[default]
16+
Community,
17+
Pro,
18+
}
19+
20+
impl From<&str> for DriaNetworkType {
21+
fn from(s: &str) -> Self {
22+
match s {
23+
"community" => DriaNetworkType::Community,
24+
"pro" => DriaNetworkType::Pro,
25+
_ => Default::default(),
26+
}
27+
}
28+
}
29+
30+
impl DriaNetworkType {
31+
pub fn protocol_name(&self) -> &str {
32+
match self {
33+
DriaNetworkType::Community => "dria",
34+
DriaNetworkType::Pro => "dria-sdk",
35+
}
36+
}
37+
}
38+
1239
#[derive(Debug, Clone)]
1340
pub struct DriaComputeNodeConfig {
1441
/// Wallet secret/private key.
@@ -23,6 +50,8 @@ pub struct DriaComputeNodeConfig {
2350
pub p2p_listen_addr: Multiaddr,
2451
/// Workflow configurations, e.g. models and providers.
2552
pub workflows: DriaWorkflowsConfig,
53+
/// Network type of the node.
54+
pub network_type: DriaNetworkType,
2655
}
2756

2857
/// The default P2P network listen address.
@@ -104,13 +133,19 @@ impl DriaComputeNodeConfig {
104133
let p2p_listen_addr = Multiaddr::from_str(&p2p_listen_addr_str)
105134
.expect("Could not parse the given P2P listen address.");
106135

136+
// parse network type
137+
let network_type = env::var("DKN_NETWORK")
138+
.map(|s| DriaNetworkType::from(s.as_str()))
139+
.unwrap_or_default();
140+
107141
Self {
108142
admin_public_key,
109143
secret_key,
110144
public_key,
111145
address,
112146
workflows,
113147
p2p_listen_addr,
148+
network_type,
114149
}
115150
}
116151

compute/src/handlers/pingpong.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl ComputeHandler for PingpongHandler {
6464
Self::RESPONSE_TOPIC,
6565
&node.config.secret_key,
6666
);
67-
node.publish(message)?;
67+
node.publish(message).await?;
6868

6969
Ok(MessageAcceptance::Accept)
7070
}

compute/src/handlers/workflow.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ impl ComputeHandler for WorkflowHandler {
161161
};
162162

163163
// try publishing the result
164-
165-
if let Err(publish_err) = node.publish(message) {
164+
if let Err(publish_err) = node.publish(message).await {
166165
let err_msg = format!("Could not publish result: {:?}", publish_err);
167166
log::error!("{}", err_msg);
168167

@@ -175,7 +174,7 @@ impl ComputeHandler for WorkflowHandler {
175174
Self::RESPONSE_TOPIC,
176175
&node.config.secret_key,
177176
);
178-
node.publish(message)?;
177+
node.publish(message).await?;
179178
}
180179

181180
Ok(acceptance)

compute/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ pub(crate) mod utils;
1111
/// This value is attached within the published messages.
1212
pub const DRIA_COMPUTE_NODE_VERSION: &str = env!("CARGO_PKG_VERSION");
1313

14-
pub use config::DriaComputeNodeConfig;
14+
pub use config::{DriaComputeNodeConfig, DriaNetworkType};
1515
pub use node::DriaComputeNode;

compute/src/main.rs

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use dkn_compute::*;
2-
use eyre::{Context, Result};
2+
use eyre::Result;
33
use std::env;
44
use tokio_util::sync::CancellationToken;
55

@@ -25,7 +25,7 @@ async fn main() -> Result<()> {
2525
██║ ██║██╔══██╗██║██╔══██║ https://dria.co
2626
██████╔╝██║ ██║██║██║ ██║
2727
╚═════╝ ╚═╝ ╚═╝╚═╝╚═╝ ╚═╝
28-
"#,
28+
"#
2929
);
3030

3131
let token = CancellationToken::new();
@@ -52,10 +52,6 @@ async fn main() -> Result<()> {
5252
let service_check_token = token.clone();
5353
let config = tokio::spawn(async move {
5454
tokio::select! {
55-
_ = service_check_token.cancelled() => {
56-
log::info!("Service check cancelled.");
57-
config
58-
}
5955
result = config.workflows.check_services() => {
6056
if let Err(err) = result {
6157
log::error!("Error checking services: {:?}", err);
@@ -64,38 +60,44 @@ async fn main() -> Result<()> {
6460
log::warn!("Using models: {:#?}", config.workflows.models);
6561
config
6662
}
63+
_ = service_check_token.cancelled() => {
64+
log::info!("Service check cancelled.");
65+
config
66+
}
6767
}
6868
})
69-
.await
70-
.wrap_err("error during service checks")?;
71-
72-
if !token.is_cancelled() {
73-
// launch the node in a separate thread
74-
let node_token = token.clone();
75-
let node_handle = tokio::spawn(async move {
76-
match DriaComputeNode::new(config, node_token).await {
77-
Ok(mut node) => {
78-
if let Err(err) = node.launch().await {
79-
log::error!("Node launch error: {}", err);
80-
panic!("Node failed.")
81-
};
82-
}
83-
Err(err) => {
84-
log::error!("Node setup error: {}", err);
85-
panic!("Could not setup node.")
86-
}
87-
}
88-
});
69+
.await?;
8970

90-
// wait for tasks to complete
91-
if let Err(err) = node_handle.await {
92-
log::error!("Node handle error: {}", err);
93-
panic!("Could not exit Node thread handle.");
94-
};
95-
} else {
96-
log::warn!("Not launching node due to early exit.");
71+
// check early exit due to failed service check
72+
if token.is_cancelled() {
73+
log::warn!("Not launching node due to early exit, bye!");
74+
return Ok(());
9775
}
9876

77+
let node_token = token.clone();
78+
let (mut node, p2p) = DriaComputeNode::new(config, node_token).await?;
79+
80+
// launch the p2p in a separate thread
81+
log::info!("Spawning peer-to-peer client thread.");
82+
let p2p_handle = tokio::spawn(async move { p2p.run().await });
83+
84+
// launch the node in a separate thread
85+
log::info!("Spawning compute node thread.");
86+
let node_handle = tokio::spawn(async move {
87+
if let Err(err) = node.launch().await {
88+
log::error!("Node launch error: {}", err);
89+
panic!("Node failed.")
90+
};
91+
});
92+
93+
// wait for tasks to complete
94+
if let Err(err) = node_handle.await {
95+
log::error!("Node handle error: {}", err);
96+
};
97+
if let Err(err) = p2p_handle.await {
98+
log::error!("P2P handle error: {}", err);
99+
};
100+
99101
log::info!("Bye!");
100102
Ok(())
101103
}
@@ -158,6 +160,7 @@ async fn wait_for_termination(cancellation: CancellationToken) -> Result<()> {
158160
Ok(())
159161
}
160162

163+
// #[deprecated]
161164
/// Very CRUDE fix due to launcher log level bug
162165
///
163166
/// TODO: remove me later when the launcher is fixed

0 commit comments

Comments
 (0)