Skip to content

Commit 5b7fc05

Browse files
committed
added logs for req-res
1 parent 0a98c18 commit 5b7fc05

File tree

6 files changed

+38
-39
lines changed

6 files changed

+38
-39
lines changed

compute/src/node.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,17 +354,28 @@ impl DriaComputeNode {
354354

355355
// respond w.r.t data
356356
let response_data = if let Ok(req) = SpecResponder::try_parse_request(&data) {
357+
log::info!(
358+
"Got a spec request from peer {} with id {}",
359+
peer_id,
360+
req.request_id
361+
);
362+
357363
let response = SpecResponder::respond(req, self.spec_collector.collect().await);
358-
serde_json::to_vec(&response).unwrap()
364+
serde_json::to_vec(&response)?
359365
} else if let Ok(req) = WorkflowResponder::try_parse_request(&data) {
360366
log::info!("Received a task request with id: {}", req.task_id);
361367
return Err(eyre::eyre!(
362368
"REQUEST RESPONSE FOR TASKS ARE NOT IMPLEMENTED YET"
363369
));
364370
} else {
365-
return Err(eyre::eyre!("Received unknown request: {:?}", data));
371+
return Err(eyre::eyre!(
372+
"Received unknown request from {}: {:?}",
373+
peer_id,
374+
data,
375+
));
366376
};
367377

378+
log::info!("Responding to peer {}", peer_id);
368379
self.p2p.respond(response_data, channel).await
369380
}
370381

compute/src/responders/specs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
66
#[derive(Serialize, Deserialize)]
77
pub struct Request {
88
/// UUID of the specs request, prevents replay attacks.
9-
request_id: String,
9+
pub request_id: String,
1010
}
1111

1212
#[derive(Serialize, Deserialize)]

p2p/src/behaviour.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,9 @@ impl DriaBehaviour {
5353
fn create_request_response_behaviour(
5454
protocol_name: StreamProtocol,
5555
) -> request_response::cbor::Behaviour<Vec<u8>, Vec<u8>> {
56-
use request_response::{Behaviour, ProtocolSupport};
56+
use request_response::{Behaviour, Config, ProtocolSupport};
5757

58-
Behaviour::new(
59-
[(protocol_name, ProtocolSupport::Full)],
60-
request_response::Config::default(),
61-
)
58+
Behaviour::new([(protocol_name, ProtocolSupport::Full)], Config::default())
6259
}
6360

6461
/// Configures the connection limits.

p2p/src/protocol.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,22 @@ impl Default for DriaP2PProtocol {
4141

4242
impl DriaP2PProtocol {
4343
/// Creates a new instance of the protocol with the given `name` and `version`.
44-
pub fn new(name: &str, version: &str) -> Self {
44+
pub fn new(name: impl ToString, version: impl ToString) -> Self {
45+
let name = name.to_string();
46+
let version = version.to_string();
47+
4548
let identity = format!("{}/{}", name, version);
46-
let kademlia = format!("/{}/kad/{}", name, version);
47-
let request_response = format!("/{}/rr/{}", name, version);
49+
let kademlia =
50+
StreamProtocol::try_from_owned(format!("/{}/kad/{}", name, version)).unwrap();
51+
let request_response =
52+
StreamProtocol::try_from_owned(format!("/{}/rr/{}", name, version)).unwrap();
4853

4954
Self {
50-
name: name.to_string(),
51-
version: version.to_string(),
55+
name,
56+
version,
5257
identity,
53-
kademlia: StreamProtocol::try_from_owned(kademlia).unwrap(), // guaranteed to unwrap
54-
request_response: StreamProtocol::try_from_owned(request_response).unwrap(), // guaranteed to unwrap
58+
kademlia,
59+
request_response,
5560
}
5661
}
5762

p2p/tests/gossipsub_test.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,11 @@ async fn test_gossipsub() -> Result<()> {
3535
listen_addr,
3636
&nodes,
3737
DriaP2PProtocol::default(),
38-
)
39-
.expect("could not create p2p client");
40-
41-
// spawn task
38+
)?;
4239
let task_handle = tokio::spawn(async move { client.run().await });
4340

44-
// subscribe to the given topic
45-
commander
46-
.subscribe(TOPIC)
47-
.await
48-
.expect("could not subscribe");
49-
5041
// wait for a single gossipsub message on this topic
42+
commander.subscribe(TOPIC).await?;
5143
log::info!("Waiting for messages...");
5244
let message = msg_rx.recv().await;
5345
match message {
@@ -58,20 +50,13 @@ async fn test_gossipsub() -> Result<()> {
5850
log::warn!("No message received for topic: {}", TOPIC);
5951
}
6052
}
53+
commander.unsubscribe(TOPIC).await?;
6154

62-
// unsubscribe to the given topic
63-
commander
64-
.unsubscribe(TOPIC)
65-
.await
66-
.expect("could not unsubscribe");
67-
68-
// close command channel
69-
commander.shutdown().await.expect("could not shutdown");
70-
71-
// close message channel
55+
// close everything
56+
commander.shutdown().await?;
7257
msg_rx.close();
7358

74-
log::info!("Waiting for p2p task to finish...");
59+
// wait for handle to return
7560
task_handle.await?;
7661

7762
log::info!("Done!");

p2p/tests/request_test.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::str::FromStr;
22

3+
use dkn_p2p::DriaNetworkType::Community;
34
use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol};
45
use eyre::Result;
56
use libp2p::PeerId;
@@ -25,9 +26,9 @@ async fn test_request_message() -> Result<()> {
2526
let listen_addr = "/ip4/0.0.0.0/tcp/4001".parse()?;
2627

2728
// prepare nodes
28-
let nodes = DriaNodes::new(dkn_p2p::DriaNetworkType::Community)
29-
.with_bootstrap_nodes(["/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4".parse()?])
30-
.with_relay_nodes(["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?]);
29+
let nodes = DriaNodes::new(Community)
30+
.with_bootstrap_nodes(Community.get_static_bootstrap_nodes())
31+
.with_relay_nodes(Community.get_static_relay_nodes());
3132

3233
// spawn P2P client in another task
3334
let (client, mut commander, mut msg_rx, mut req_rx) = DriaP2PClient::new(

0 commit comments

Comments
 (0)