Skip to content

Commit 5a6feb5

Browse files
committed
fix test loggers, enable peers function
1 parent 24f0fdf commit 5a6feb5

File tree

7 files changed

+85
-86
lines changed

7 files changed

+85
-86
lines changed

compute/src/node.rs

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,6 @@ use crate::{
2121
/// Number of seconds between refreshing the Kademlia DHT.
2222
const PEER_REFRESH_INTERVAL_SECS: u64 = 30;
2323

24-
/// **Dria Compute Node**
25-
///
26-
/// Internally, the node will create a new P2P client with the given secret key.
27-
/// This P2P client, although created synchronously, requires a tokio runtime.
28-
///
29-
/// ### Example
30-
///
31-
/// ```rs
32-
/// let config = DriaComputeNodeConfig::new();
33-
/// let mut node = DriaComputeNode::new(config, CancellationToken::new())?;
34-
/// node.check_services().await?;
35-
/// node.launch().await?;
36-
/// ```
3724
pub struct DriaComputeNode {
3825
pub config: DriaComputeNodeConfig,
3926
pub p2p: DriaP2PCommander,
@@ -44,10 +31,13 @@ pub struct DriaComputeNode {
4431
}
4532

4633
impl DriaComputeNode {
34+
/// Creates a new `DriaComputeNode` with the given configuration and cancellation token.
35+
///
36+
/// Returns the node instance and p2p client together. P2p MUST be run in a separate task before this node is used at all.
4737
pub async fn new(
4838
config: DriaComputeNodeConfig,
4939
cancellation: CancellationToken,
50-
) -> Result<(Self, DriaP2PClient)> {
40+
) -> Result<(DriaComputeNode, DriaP2PClient)> {
5141
// create the keypair from secret key
5242
let keypair = secret_to_keypair(&config.secret_key);
5343

@@ -139,16 +129,11 @@ impl DriaComputeNode {
139129
Ok(())
140130
}
141131

142-
/// Returns the list of connected peers.
143-
// #[inline(always)]
144-
// pub fn peers(
145-
// &self,
146-
// ) -> Vec<(
147-
// &dkn_p2p::libp2p_identity::PeerId,
148-
// Vec<&gossipsub::TopicHash>,
149-
// )> {
150-
// self.p2p.peers()
151-
// }
132+
/// Returns the list of connected peers, `mesh` and `all`.
133+
#[inline]
134+
pub async fn peers(&self) -> Result<(Vec<PeerId>, Vec<PeerId>)> {
135+
self.p2p.peers().await
136+
}
152137

153138
/// Launches the main loop of the compute node.
154139
/// This method is not expected to return until cancellation occurs.
@@ -285,16 +270,24 @@ impl DriaComputeNode {
285270
self.unsubscribe(WorkflowHandler::LISTEN_TOPIC).await?;
286271
self.unsubscribe(WorkflowHandler::RESPONSE_TOPIC).await?;
287272

273+
// shutdown channels
274+
self.shutdown().await?;
275+
276+
Ok(())
277+
}
278+
279+
/// Shutdown channels between p2p and yourself.
280+
pub async fn shutdown(&mut self) -> Result<()> {
288281
// send shutdown signal
282+
log::debug!("Sending shutdown command to p2p client.");
289283
self.p2p.shutdown().await?;
290284

291-
// close msg channel
285+
// close message channel
292286
log::debug!("Closing message channel.");
293287
self.msg_rx.close();
294288

295289
Ok(())
296290
}
297-
298291
/// Parses a given raw Gossipsub message to a prepared P2PMessage object.
299292
/// This prepared message includes the topic, payload, version and timestamp.
300293
///
@@ -322,34 +315,47 @@ impl DriaComputeNode {
322315

323316
#[cfg(test)]
324317
mod tests {
325-
// use super::*;
326-
// use std::env;
327-
328-
// #[tokio::test]
329-
// #[ignore = "run this manually"]
330-
// async fn test_publish_message() {
331-
// env::set_var("RUST_LOG", "info");
332-
// let _ = env_logger::try_init();
333-
334-
// // create node
335-
// let cancellation = CancellationToken::new();
336-
// let mut node = DriaComputeNode::new(DriaComputeNodeConfig::default(), cancellation.clone())
337-
// .await
338-
// .expect("should create node");
339-
340-
// // launch & wait for a while for connections
341-
// log::info!("Waiting a bit for peer setup.");
342-
// tokio::select! {
343-
// _ = node.launch() => (),
344-
// _ = tokio::time::sleep(tokio::time::Duration::from_secs(20)) => cancellation.cancel(),
345-
// }
346-
// log::info!("Connected Peers:\n{:#?}", node.peers());
347-
348-
// // publish a dummy message
349-
// let topic = "foo";
350-
// let message = DKNMessage::new("hello from the other side", topic);
351-
// node.subscribe(topic).expect("should subscribe");
352-
// node.publish(message).expect("should publish");
353-
// node.unsubscribe(topic).expect("should unsubscribe");
354-
// }
318+
use super::*;
319+
use std::env;
320+
321+
#[tokio::test]
322+
#[ignore = "run this manually"]
323+
async fn test_publish_message() -> eyre::Result<()> {
324+
env::set_var("RUST_LOG", "none,dkn_compute=debug,dkn_p2p=debug");
325+
let _ = env_logger::builder().is_test(true).try_init();
326+
327+
// create node
328+
let cancellation = CancellationToken::new();
329+
let (mut node, p2p) =
330+
DriaComputeNode::new(DriaComputeNodeConfig::default(), cancellation.clone())
331+
.await
332+
.expect("should create node");
333+
334+
// spawn p2p task
335+
let p2p_task = tokio::spawn(async move { p2p.run().await });
336+
337+
// launch & wait for a while for connections
338+
log::info!("Waiting a bit for peer setup.");
339+
tokio::select! {
340+
_ = node.launch() => (),
341+
_ = tokio::time::sleep(tokio::time::Duration::from_secs(20)) => cancellation.cancel(),
342+
}
343+
log::info!("Connected Peers:\n{:#?}", node.peers().await?);
344+
345+
// publish a dummy message
346+
let topic = "foo";
347+
let message = DKNMessage::new("hello from the other side", topic);
348+
node.subscribe(topic).await.expect("should subscribe");
349+
node.publish(message).await.expect("should publish");
350+
node.unsubscribe(topic).await.expect("should unsubscribe");
351+
352+
// close everything
353+
log::info!("Shutting down node.");
354+
node.p2p.shutdown().await?;
355+
356+
// wait for task handle
357+
p2p_task.await?;
358+
359+
Ok(())
360+
}
355361
}

p2p/src/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ impl DriaP2PCommander {
233233
}
234234

235235
/// Get peers counts (mesh & all) of the GossipSub pool.
236-
/// Returns a tuple of the mesh peer count and all peer count.
236+
/// Returns a tuple of the mesh peers count and all peers count.
237237
pub async fn peer_counts(&self) -> Result<(usize, usize)> {
238238
let (sender, receiver) = oneshot::channel();
239239

p2p/tests/listen_test.rs

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,40 @@ use libp2p::Multiaddr;
44
use libp2p_identity::Keypair;
55
use std::{env, str::FromStr};
66

7-
const TOPIC: &str = "pong";
8-
const LOG_LEVEL: &str = "none,listen_test=debug,dkn_p2p=debug";
9-
107
// FIXME: not working!!!
118
#[tokio::test]
12-
#[ignore = "run manually with logs"]
9+
#[ignore = "run this manually"]
1310
async fn test_listen_topic_once() -> Result<()> {
14-
env::set_var("RUST_LOG", LOG_LEVEL);
15-
let _ = env_logger::builder().is_test(true).try_init();
11+
const TOPIC: &str = "pong";
1612

17-
// setup client
18-
let keypair = Keypair::generate_secp256k1();
19-
let addr = Multiaddr::from_str("/ip4/0.0.0.0/tcp/4001")?;
20-
let bootstraps = vec![Multiaddr::from_str(
21-
"/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4",
22-
)?];
23-
let relays = vec![Multiaddr::from_str(
24-
"/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa",
25-
)?];
26-
let protocol = DriaP2PProtocol::new_major_minor("dria");
13+
env::set_var("RUST_LOG", "none,listen_test=debug,dkn_p2p=debug");
14+
let _ = env_logger::builder().is_test(true).try_init();
2715

2816
// spawn P2P client in another task
2917
let (client, mut commander, mut msg_rx) = DriaP2PClient::new(
30-
keypair,
31-
addr,
32-
bootstraps.into_iter(),
33-
relays.into_iter(),
18+
Keypair::generate_secp256k1(),
19+
Multiaddr::from_str("/ip4/0.0.0.0/tcp/4001")?,
20+
vec![Multiaddr::from_str(
21+
"/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4",
22+
)?].into_iter(),
23+
vec![Multiaddr::from_str(
24+
"/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa",
25+
)?]
26+
.into_iter(),
3427
vec![].into_iter(),
35-
protocol,
28+
DriaP2PProtocol::new_major_minor("dria"),
3629
)
3730
.expect("could not create p2p client");
3831

32+
// spawn task
33+
let p2p_task = tokio::spawn(async move { client.run().await });
34+
3935
// subscribe to the given topic
4036
commander
4137
.subscribe(TOPIC)
4238
.await
4339
.expect("could not subscribe");
4440

45-
// spawn task
46-
let p2p_task = tokio::spawn(async move { client.run().await });
47-
4841
// wait for a single gossipsub message on this topic
4942
log::info!("Waiting for messages...");
5043
let message = msg_rx.recv().await;

workflows/src/providers/gemini.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ mod tests {
185185
let _ = dotenvy::dotenv(); // read api key
186186
assert!(env::var(ENV_VAR_NAME).is_ok(), "should have api key");
187187
env::set_var("RUST_LOG", "none,dkn_workflows=debug");
188-
let _ = env_logger::try_init();
188+
let _ = env_logger::builder().is_test(true).try_init();
189189

190190
let models = vec![
191191
Model::Gemini10Pro,

workflows/src/providers/openai.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ mod tests {
172172
let _ = dotenvy::dotenv(); // read api key
173173
assert!(env::var(ENV_VAR_NAME).is_ok(), "should have api key");
174174
env::set_var("RUST_LOG", "none,dkn_workflows=debug");
175-
let _ = env_logger::try_init();
175+
let _ = env_logger::builder().is_test(true).try_init();
176176

177177
let models = vec![Model::GPT4Turbo, Model::GPT4o, Model::GPT4oMini];
178178
let config = OpenAIConfig::new();

workflows/src/providers/openrouter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ mod tests {
120120
let _ = dotenvy::dotenv(); // read api key
121121
assert!(env::var(ENV_VAR_NAME).is_ok(), "should have api key");
122122
env::set_var("RUST_LOG", "none,dkn_workflows=debug");
123-
let _ = env_logger::try_init();
123+
let _ = env_logger::builder().is_test(true).try_init();
124124

125125
let models = vec![Model::GPT4Turbo, Model::GPT4o, Model::GPT4oMini];
126126
let config = OpenRouterConfig::new();

workflows/tests/models_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const LOG_LEVEL: &str = "none,dkn_workflows=debug";
88
#[ignore = "requires Ollama"]
99
async fn test_ollama_check() -> Result<()> {
1010
env::set_var("RUST_LOG", LOG_LEVEL);
11-
let _ = env_logger::try_init();
11+
let _ = env_logger::builder().is_test(true).try_init();
1212

1313
let models = vec![Model::Phi3_5Mini];
1414
let mut model_config = DriaWorkflowsConfig::new(models);
@@ -27,7 +27,7 @@ async fn test_ollama_check() -> Result<()> {
2727
async fn test_openai_check() -> Result<()> {
2828
let _ = dotenvy::dotenv(); // read api key
2929
env::set_var("RUST_LOG", LOG_LEVEL);
30-
let _ = env_logger::try_init();
30+
let _ = env_logger::builder().is_test(true).try_init();
3131

3232
let models = vec![Model::GPT4Turbo];
3333
let mut model_config = DriaWorkflowsConfig::new(models);

0 commit comments

Comments
 (0)