Skip to content

Commit 5a3600d

Browse files
committed
some kademlia fixes
1 parent 3f4a331 commit 5a3600d

File tree

7 files changed

+65
-17
lines changed

7 files changed

+65
-17
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Compute nodes can technically do any arbitrary task, from computing the square r
3434

3535
- **Workflows**: Each task is given in the form of a [workflow](https://github.com/andthattoo/ollama-workflows). Every workflow defines an agentic behavior for the chosen LLM, all captured in a single JSON file, and can represent things ranging from simple LLM generations to iterative web searching & reasoning.
3636

37-
## Node Running
37+
### Running a Node
3838

3939
Refer to [node guide](./docs/NODE_GUIDE.md) to quickly get started and run your own node!
4040

compute/src/node.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,17 @@ const PUBLISH_CHANNEL_BUFSIZE: usize = 1024;
2626

2727
pub struct DriaComputeNode {
2828
pub config: DriaComputeNodeConfig,
29-
pub p2p: DriaP2PCommander,
3029
pub available_nodes: AvailableNodes,
31-
/// Gossipsub message receiver.
30+
/// Peer-to-peer client commander to interact with the network.
31+
pub p2p: DriaP2PCommander,
32+
/// Gossipsub message receiver, used by peer-to-peer client in a separate thread.
3233
message_rx: mpsc::Receiver<(PeerId, MessageId, Message)>,
33-
/// Publish receiver to receive messages to be published.
34+
/// Publish receiver to receive messages to be published,
3435
publish_rx: mpsc::Receiver<WorkflowsWorkerOutput>,
3536
/// Workflow transmitter to send batchable tasks.
3637
workflow_batch_tx: Option<mpsc::Sender<WorkflowsWorkerInput>>,
3738
/// Workflow transmitter to send single tasks.
3839
workflow_single_tx: Option<mpsc::Sender<WorkflowsWorkerInput>>,
39-
// TODO: instead of piggybacking task metadata within channels, we can store them here
40-
// in a hashmap alone, and then use the task_id to get the metadata when needed
4140
// Single tasks hash-map
4241
pending_tasks_single: HashSet<String>,
4342
// Batch tasks hash-map
@@ -402,7 +401,7 @@ impl DriaComputeNode {
402401

403402
// print tasks count
404403
let [single, batch] = self.get_pending_task_count();
405-
log::info!("Pending Tasks (single/batch): {} / {}", single, batch);
404+
log::info!("Pending Tasks (single/batch): {} / {}", single, batch);
406405

407406
// completed tasks count
408407
log::debug!(

compute/src/workers/workflow.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use tokio::sync::mpsc;
44

55
use crate::payloads::TaskStats;
66

7+
// TODO: instead of piggybacking stuff here, maybe node can hold it in a hashmap w.r.t taskId
8+
79
pub struct WorkflowsWorkerInput {
810
pub entry: Option<Entry>,
911
pub executor: Executor,
@@ -59,7 +61,7 @@ impl WorkflowsWorker {
5961

6062
/// Closes the workflow receiver channel.
6163
fn shutdown(&mut self) {
62-
log::warn!("Closing workflows worker.");
64+
log::info!("Closing workflows worker.");
6365
self.workflow_rx.close();
6466
}
6567

p2p/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# DKN Peer-to-Peer Client
1+
# Dria Peer-to-Peer Client
22

33
Dria Knowledge Network is a peer-to-peer network, built over libp2p. This crate is a wrapper client to easily interact with DKN.
44

p2p/src/behaviour.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,14 @@ fn create_kademlia_behaviour(
6565
) -> kad::Behaviour<MemoryStore> {
6666
use kad::{Behaviour, Config};
6767

68-
const QUERY_TIMEOUT_SECS: u64 = 5 * 60;
69-
const RECORD_TTL_SECS: u64 = 30;
68+
const KADEMLIA_BOOTSTRAP_INTERVAL_SECS: u64 = 5 * 60; // default is 5 minutes
69+
const QUERY_TIMEOUT_SECS: u64 = 3 * 60; // default is 1 minute
7070

7171
let mut cfg = Config::new(protocol_name);
7272
cfg.set_query_timeout(Duration::from_secs(QUERY_TIMEOUT_SECS))
73-
.set_record_ttl(Some(Duration::from_secs(RECORD_TTL_SECS)));
73+
.set_periodic_bootstrap_interval(Some(Duration::from_secs(
74+
KADEMLIA_BOOTSTRAP_INTERVAL_SECS,
75+
)));
7476

7577
Behaviour::with_config(local_peer_id, MemoryStore::new(local_peer_id), cfg)
7678
}
@@ -157,7 +159,6 @@ fn create_gossipsub_behaviour(author: PeerId) -> Result<gossipsub::Behaviour> {
157159
MessageId::from(digest.to_be_bytes())
158160
};
159161

160-
// TODO: add data transform here later
161162
Behaviour::new(
162163
MessageAuthenticity::Author(author),
163164
ConfigBuilder::default()

p2p/src/client.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl DriaP2PClient {
154154
Some(c) => self.handle_command(c).await,
155155
// channel closed, thus shutting down the network event loop
156156
None=> {
157-
log::warn!("Closing P2P client.");
157+
log::info!("Closing peer-to-peer client.");
158158
return
159159
},
160160
},
@@ -253,6 +253,7 @@ impl DriaP2PClient {
253253

254254
// remove own peerId from Autonat server list
255255
self.swarm.behaviour_mut().autonat.remove_server(&peer_id);
256+
256257
let _ = sender.send(());
257258
}
258259
}
@@ -272,32 +273,78 @@ impl DriaP2PClient {
272273
}
273274
}
274275

276+
// kademlia events
275277
SwarmEvent::Behaviour(DriaBehaviourEvent::Kademlia(
276278
kad::Event::OutboundQueryProgressed {
277279
result: QueryResult::GetClosestPeers(result),
278280
..
279281
},
280282
)) => self.handle_closest_peers_result(result),
283+
284+
// identify events
281285
SwarmEvent::Behaviour(DriaBehaviourEvent::Identify(identify::Event::Received {
282286
peer_id,
283287
info,
284288
..
285289
})) => self.handle_identify_event(peer_id, info),
286290

291+
// autonat events
287292
SwarmEvent::Behaviour(DriaBehaviourEvent::Autonat(autonat::Event::StatusChanged {
288293
old,
289294
new,
290295
})) => {
291296
log::warn!("AutoNAT status changed from {:?} to {:?}", old, new);
292297
}
293298

299+
// log listen addreses
294300
SwarmEvent::NewListenAddr { address, .. } => {
295301
log::warn!("Local node is listening on {}", address);
296302
}
303+
304+
// add external address of peers to Kademlia routing table
305+
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
306+
self.swarm
307+
.behaviour_mut()
308+
.kademlia
309+
.add_address(&peer_id, address);
310+
}
311+
// add your own peer_id to kademlia as well
297312
SwarmEvent::ExternalAddrConfirmed { address } => {
298313
// this is usually the external address via relay
299314
log::info!("External address confirmed: {}", address);
315+
let peer_id = *self.swarm.local_peer_id();
316+
self.swarm
317+
.behaviour_mut()
318+
.kademlia
319+
.add_address(&peer_id, address);
300320
}
321+
322+
// SwarmEvent::IncomingConnectionError {
323+
// local_addr,
324+
// send_back_addr,
325+
// error,
326+
// connection_id,
327+
// } => {
328+
// log::debug!(
329+
// "Incoming connection {} error: from {} to {} - {:?}",
330+
// connection_id,
331+
// local_addr,
332+
// send_back_addr,
333+
// error
334+
// );
335+
// }
336+
// SwarmEvent::IncomingConnection {
337+
// connection_id,
338+
// local_addr,
339+
// send_back_addr,
340+
// } => {
341+
// log::debug!(
342+
// "Incoming connection {} attepmt: from {} to {}",
343+
// connection_id,
344+
// local_addr,
345+
// send_back_addr
346+
// );
347+
// }
301348
// SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
302349
// if let Some(peer_id) = peer_id {
303350
// log::warn!("Could not connect to peer {}: {:?}", peer_id, error);

workflows/README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
# DKN Workflows
1+
# Dria Workflows
22

3-
We make use of Ollama Workflows in DKN; however, we also want to make sure that the chosen models are valid and is performant enough (i.e. have enough TPS).
4-
This crate handles the configurations of models to be used, and implements various service checks.
3+
We make use of [Ollama Workflows](https://github.com/andthattoo/ollama-workflows) in Dria Knowledge Network; however, we also want to make sure that the chosen models are valid and is performant enough (i.e. have enough TPS). This crate handles the configurations of models to be used, and implements various service checks.
54

65
There are two types of services:
76

0 commit comments

Comments
 (0)