Skip to content

Commit 819e4d6

Browse files
committed
nodes refactor, some log changes again
1 parent 00c5d9e commit 819e4d6

File tree

6 files changed

+55
-25
lines changed

6 files changed

+55
-25
lines changed

compute/src/handlers/workflow.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,7 @@ impl WorkflowHandler {
129129

130130
// convert payload to message
131131
let payload_str = serde_json::json!(payload).to_string();
132-
log::info!(
133-
"Publishing result for task {}\n{}",
134-
task.task_id,
135-
payload_str
136-
);
132+
log::info!("Publishing result for task {}", task.task_id);
137133
DriaMessage::new(payload_str, Self::RESPONSE_TOPIC)
138134
}
139135
Err(err) => {

compute/src/node.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,7 @@ impl DriaComputeNode {
8181
let (p2p_client, p2p_commander, message_rx) = DriaP2PClient::new(
8282
keypair,
8383
config.p2p_listen_addr.clone(),
84-
available_nodes.bootstrap_nodes.clone().into_iter(),
85-
available_nodes.relay_nodes.clone().into_iter(),
86-
available_nodes.rpc_nodes.clone().into_iter(),
84+
&available_nodes,
8785
protocol,
8886
)?;
8987

monitor/src/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@ async fn main() -> eyre::Result<()> {
3333
let (client, commander, msg_rx) = DriaP2PClient::new(
3434
keypair,
3535
listen_addr,
36-
nodes.bootstrap_nodes.into_iter(),
37-
nodes.relay_nodes.into_iter(),
38-
nodes.rpc_nodes.into_iter(),
36+
&nodes,
3937
DriaP2PProtocol::new_major_minor(network.protocol_name()),
4038
)?;
4139

p2p/src/client.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::time::Duration;
1010
use tokio::sync::mpsc;
1111

1212
use crate::behaviour::{DriaBehaviour, DriaBehaviourEvent};
13-
use crate::DriaP2PProtocol;
13+
use crate::{DriaNodes, DriaP2PProtocol};
1414

1515
use super::commands::DriaP2PCommand;
1616
use super::DriaP2PCommander;
@@ -46,9 +46,7 @@ impl DriaP2PClient {
4646
pub fn new(
4747
keypair: Keypair,
4848
listen_addr: Multiaddr,
49-
bootstraps: impl Iterator<Item = Multiaddr>,
50-
relays: impl Iterator<Item = Multiaddr>,
51-
rpcs: impl Iterator<Item = Multiaddr>,
49+
nodes: &DriaNodes,
5250
protocol: DriaP2PProtocol,
5351
) -> Result<(
5452
DriaP2PClient,
@@ -89,15 +87,18 @@ impl DriaP2PClient {
8987
.set_mode(Some(libp2p::kad::Mode::Server));
9088

9189
// initiate bootstrap
92-
for addr in bootstraps {
90+
for addr in &nodes.bootstrap_nodes {
9391
log::info!("Dialling bootstrap: {:#?}", addr);
9492
if let Some(peer_id) = addr.iter().find_map(|p| match p {
9593
Protocol::P2p(peer_id) => Some(peer_id),
9694
_ => None,
9795
}) {
9896
swarm.dial(addr.clone())?;
9997
log::info!("Adding {} to Kademlia routing table", addr);
100-
swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
98+
swarm
99+
.behaviour_mut()
100+
.kademlia
101+
.add_address(&peer_id, addr.clone());
101102
} else {
102103
log::warn!("Missing peerID in address: {}", addr);
103104
}
@@ -115,17 +116,29 @@ impl DriaP2PClient {
115116
// listen on all interfaces for incoming connections
116117
log::info!("Listening p2p network on: {}", listen_addr);
117118
swarm.listen_on(listen_addr)?;
118-
for addr in relays {
119+
120+
// listen on relay addresses with p2p circuit
121+
for addr in &nodes.relay_nodes {
119122
log::info!("Listening to relay: {}", addr);
120123
swarm.listen_on(addr.clone().with(Protocol::P2pCircuit))?;
121124
}
122125

123126
// dial rpc nodes
124-
for rpc_addr in rpcs {
127+
for rpc_addr in &nodes.rpc_nodes {
125128
log::info!("Dialing RPC node: {}", rpc_addr);
126-
swarm.dial(rpc_addr)?;
129+
swarm.dial(rpc_addr.clone())?;
127130
}
128131

132+
// add rpcs as explicit peers
133+
// TODO: may not be necessary
134+
// for rpc_peer_id in &nodes.rpc_peerids {
135+
// log::info!("Adding {} as explicit peer.", rpc_peer_id);
136+
// swarm
137+
// .behaviour_mut()
138+
// .gossipsub
139+
// .add_explicit_peer(rpc_peer_id);
140+
// }
141+
129142
// create commander
130143
let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_CHANNEL_BUFSIZE);
131144
let commander = DriaP2PCommander::new(cmd_tx, protocol.clone());

p2p/src/nodes.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,26 @@ impl DriaNodes {
2929
}
3030
}
3131

32+
pub fn with_relay_nodes(mut self, addresses: impl IntoIterator<Item = Multiaddr>) -> Self {
33+
self.relay_nodes.extend(addresses);
34+
self
35+
}
36+
37+
pub fn with_bootstrap_nodes(mut self, addresses: impl IntoIterator<Item = Multiaddr>) -> Self {
38+
self.bootstrap_nodes.extend(addresses);
39+
self
40+
}
41+
42+
pub fn with_rpc_nodes(mut self, addresses: impl IntoIterator<Item = Multiaddr>) -> Self {
43+
self.rpc_nodes.extend(addresses);
44+
self
45+
}
46+
47+
pub fn with_rpc_peer_ids(mut self, addresses: impl IntoIterator<Item = PeerId>) -> Self {
48+
self.rpc_peerids.extend(addresses);
49+
self
50+
}
51+
3252
/// Parses static bootstrap & relay nodes from environment variables.
3353
///
3454
/// The environment variables are:

p2p/tests/listen_test.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use dkn_p2p::{DriaP2PClient, DriaP2PProtocol};
1+
use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol};
22
use eyre::Result;
33
use libp2p_identity::Keypair;
44

@@ -12,13 +12,18 @@ async fn test_listen_topic_once() -> Result<()> {
1212
.is_test(true)
1313
.try_init();
1414

15+
let listen_addr = "/ip4/0.0.0.0/tcp/4001".parse()?;
16+
17+
// prepare nodes
18+
let nodes = DriaNodes::new(dkn_p2p::DriaNetworkType::Community)
19+
.with_bootstrap_nodes(["/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4".parse()?])
20+
.with_relay_nodes(["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?]);
21+
1522
// spawn P2P client in another task
1623
let (client, mut commander, mut msg_rx) = DriaP2PClient::new(
1724
Keypair::generate_secp256k1(),
18-
"/ip4/0.0.0.0/tcp/4001".parse()?,
19-
vec!["/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4".parse()?].into_iter(),
20-
vec!["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?].into_iter(),
21-
vec![].into_iter(),
25+
listen_addr,
26+
&nodes,
2227
DriaP2PProtocol::default(),
2328
)
2429
.expect("could not create p2p client");

0 commit comments

Comments
 (0)