Skip to content

Commit 24f0fdf

Browse files
authored
Merge pull request #150 from firstbatchxyz/erhant/threading
feat: p2p in a separate thread
2 parents 87ee180 + f5be133 commit 24f0fdf

File tree

12 files changed

+722
-341
lines changed

12 files changed

+722
-341
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

compute/src/handlers/pingpong.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl ComputeHandler for PingpongHandler {
6464
Self::RESPONSE_TOPIC,
6565
&node.config.secret_key,
6666
);
67-
node.publish(message)?;
67+
node.publish(message).await?;
6868

6969
Ok(MessageAcceptance::Accept)
7070
}

compute/src/handlers/workflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl ComputeHandler for WorkflowHandler {
161161
};
162162

163163
// try publishing the result
164-
if let Err(publish_err) = node.publish(message) {
164+
if let Err(publish_err) = node.publish(message).await {
165165
let err_msg = format!("Could not publish result: {:?}", publish_err);
166166
log::error!("{}", err_msg);
167167

@@ -174,7 +174,7 @@ impl ComputeHandler for WorkflowHandler {
174174
Self::RESPONSE_TOPIC,
175175
&node.config.secret_key,
176176
);
177-
node.publish(message)?;
177+
node.publish(message).await?;
178178
}
179179

180180
Ok(acceptance)

compute/src/main.rs

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use dkn_compute::*;
2-
use eyre::{Context, Result};
2+
use eyre::Result;
33
use std::env;
44
use tokio_util::sync::CancellationToken;
55

@@ -25,7 +25,7 @@ async fn main() -> Result<()> {
2525
██║ ██║██╔══██╗██║██╔══██║ https://dria.co
2626
██████╔╝██║ ██║██║██║ ██║
2727
╚═════╝ ╚═╝ ╚═╝╚═╝╚═╝ ╚═╝
28-
"#,
28+
"#
2929
);
3030

3131
let token = CancellationToken::new();
@@ -52,10 +52,6 @@ async fn main() -> Result<()> {
5252
let service_check_token = token.clone();
5353
let config = tokio::spawn(async move {
5454
tokio::select! {
55-
_ = service_check_token.cancelled() => {
56-
log::info!("Service check cancelled.");
57-
config
58-
}
5955
result = config.workflows.check_services() => {
6056
if let Err(err) = result {
6157
log::error!("Error checking services: {:?}", err);
@@ -64,38 +60,44 @@ async fn main() -> Result<()> {
6460
log::warn!("Using models: {:#?}", config.workflows.models);
6561
config
6662
}
63+
_ = service_check_token.cancelled() => {
64+
log::info!("Service check cancelled.");
65+
config
66+
}
6767
}
6868
})
69-
.await
70-
.wrap_err("error during service checks")?;
71-
72-
if !token.is_cancelled() {
73-
// launch the node in a separate thread
74-
let node_token = token.clone();
75-
let node_handle = tokio::spawn(async move {
76-
match DriaComputeNode::new(config, node_token).await {
77-
Ok(mut node) => {
78-
if let Err(err) = node.launch().await {
79-
log::error!("Node launch error: {}", err);
80-
panic!("Node failed.")
81-
};
82-
}
83-
Err(err) => {
84-
log::error!("Node setup error: {}", err);
85-
panic!("Could not setup node.")
86-
}
87-
}
88-
});
69+
.await?;
8970

90-
// wait for tasks to complete
91-
if let Err(err) = node_handle.await {
92-
log::error!("Node handle error: {}", err);
93-
panic!("Could not exit Node thread handle.");
94-
};
95-
} else {
96-
log::warn!("Not launching node due to early exit.");
71+
// check early exit due to failed service check
72+
if token.is_cancelled() {
73+
log::warn!("Not launching node due to early exit, bye!");
74+
return Ok(());
9775
}
9876

77+
let node_token = token.clone();
78+
let (mut node, p2p) = DriaComputeNode::new(config, node_token).await?;
79+
80+
// launch the p2p in a separate thread
81+
log::info!("Spawning peer-to-peer client thread.");
82+
let p2p_handle = tokio::spawn(async move { p2p.run().await });
83+
84+
// launch the node in a separate thread
85+
log::info!("Spawning compute node thread.");
86+
let node_handle = tokio::spawn(async move {
87+
if let Err(err) = node.launch().await {
88+
log::error!("Node launch error: {}", err);
89+
panic!("Node failed.")
90+
};
91+
});
92+
93+
// wait for tasks to complete
94+
if let Err(err) = node_handle.await {
95+
log::error!("Node handle error: {}", err);
96+
};
97+
if let Err(err) = p2p_handle.await {
98+
log::error!("P2P handle error: {}", err);
99+
};
100+
99101
log::info!("Bye!");
100102
Ok(())
101103
}

0 commit comments

Comments
 (0)