Skip to content

Commit e5b2671

Browse files
committed
add task report to heartbeat, doc fixes
1 parent a17b54e commit e5b2671

File tree

5 files changed

+100
-36
lines changed

5 files changed

+100
-36
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# load .env
22
ifneq (,$(wildcard ./.env))
3-
include .env
3+
include ./.env
44
export
55
endif
66

compute/src/handlers/pingpong.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ struct PingpongResponse {
2222
pub(crate) uuid: String,
2323
pub(crate) models: Vec<(ModelProvider, Model)>,
2424
pub(crate) timestamp: u128,
25+
/// Number of tasks in the channel currently, `single` and `batch`.
26+
pub(crate) tasks: [usize; 2],
2527
}
2628

2729
impl PingpongHandler {
@@ -64,6 +66,7 @@ impl PingpongHandler {
6466
uuid: pingpong.uuid.clone(),
6567
models: node.config.workflows.models.clone(),
6668
timestamp: get_current_time_nanos(),
69+
tasks: node.task_count(),
6770
};
6871

6972
// publish message

compute/src/node.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ impl DriaComputeNode {
122122
Ok(())
123123
}
124124

125+
/// Returns the task count within the channels, `single` and `batch`.
126+
pub fn task_count(&self) -> [usize; 2] {
127+
[
128+
self.workflow_single_tx.max_capacity() - self.workflow_single_tx.capacity(),
129+
self.workflow_batch_tx.max_capacity() - self.workflow_batch_tx.capacity(),
130+
]
131+
}
132+
125133
/// Publishes a given message to the network w.r.t the topic of it.
126134
///
127135
/// Internally, identity is attached to the the message which is then JSON serialized to bytes

p2p/README.md

Lines changed: 86 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,92 @@ dkn-p2p = { git = "https://github.com/firstbatchxyz/dkn-compute-node" }
1212

1313
## Usage
1414

15-
The P2P client is expected to be run within a separate thread, and it has two types of interactions:
15+
The peer-to-peer client, when created, returns 3 things:
16+
17+
- `client`: the actual peer-to-peer client that should be run in a **separate thread**.
18+
- `commander`: a small client that exposes peer-to-peer functions with oneshot channels, so that we can communicate with the client in another thread.
19+
- `channel`: a message channel receiver, it is expected to handle GossipSub messages that are handled & sent by the client.
20+
21+
### Client
22+
23+
Here is an example where we create the said entities:
24+
25+
```rs
26+
use dkn_p2p::{DriaP2PClient, DriaP2PProtocol};
27+
28+
// your wallet, or something random maybe
29+
let keypair = Keypair::generate_secp256k1();
30+
31+
// your listen address
32+
let addr = Multiaddr::from_str("/ip4/0.0.0.0/tcp/4001")?;
33+
34+
// static bootstrap & relay & rpc addresses
35+
let bootstraps = vec![Multiaddr::from_str(
36+
"some-multiaddrs-here"
37+
)?];
38+
let relays = vec![Multiaddr::from_str(
39+
"some-multiaddrs-here"
40+
)?];
41+
let rpcs = vec![Multiaddr::from_str(
42+
"some-multiaddrs-here"
43+
)?];
44+
45+
let protocol = "0.2";
46+
47+
// `new` returns 3 things:
48+
// - p2p client itself, to be given to a thread
49+
// - p2p commander, a small client to be able to speak with the p2p in another thread
50+
// - `msg_rx`, the channel to listen for gossipsub messages
51+
let (client, mut commander, mut msg_rx) = DriaP2PClient::new(
52+
keypair,
53+
addr,
54+
bootstraps,
55+
relays,
56+
rpc,
57+
protocol
58+
)?;
59+
```
60+
61+
Now, you can give the peer-to-peer client to a thread and store its handle:
62+
63+
```rs
64+
let task_handle = tokio::spawn(async move { client.run().await });
65+
```
66+
67+
This task handle should be `await`'ed at the end of the program to ensure thread has exited correctly.
68+
69+
### Commander
70+
71+
You can communicate with this thread using the `commander` entity. For example, here is how one would subscribe to a topic:
72+
73+
```rs
74+
commander
75+
.subscribe("your-topic")
76+
.await
77+
.expect("could not subscribe");
78+
```
79+
80+
### Channel
81+
82+
The message channel should be handled with `recv` (or `recv_many` to process in batches) to process the GossipSub messages.
83+
84+
```rs
85+
loop {
86+
match msg_rx.recv().await {
87+
Some(msg) => {
88+
todo!("handle stuff")
89+
}
90+
None => {
91+
todo!("channel closed");
92+
break
93+
}
94+
}
95+
}
96+
```
97+
98+
### Interactions
99+
100+
Here is how the whole thing works in a bit more detail:
16101

17102
- **Events**: When a message is received within the Swarm event handler, it is returned via a `mpsc` channel. Here, the p2p is `Sender` and your application must be the `Receiver`. The client handles many events, and only sends GossipSub message receipts via this channel so that the application can handle them however they would like.
18103

@@ -54,35 +139,3 @@ sequenceDiagram
54139
P ->> C: o_tx.send(output)
55140
deactivate P
56141
```
57-
58-
<!--
59-
60-
FIXME: REMOVE COMMENTS
61-
62-
You can create the client as follows:
63-
64-
```rs
65-
use dkn_p2p::DriaP2PClient;
66-
67-
// your wallet, or something random maybe
68-
let keypair = Keypair::generate_secp256k1();
69-
70-
// your listen address
71-
let addr = Multiaddr::from_str("/ip4/0.0.0.0/tcp/4001")?;
72-
73-
// static bootstrap & relay addresses
74-
let bootstraps = vec![Multiaddr::from_str(
75-
"some-multiaddrs-here"
76-
)?];
77-
let relays = vec![Multiaddr::from_str(
78-
"some-multiaddrs-here"
79-
)?];
80-
81-
// protocol version number, usually derived as `{major}.{minor}`
82-
let version = "0.2";
83-
84-
// create the client!
85-
let mut client = DriaP2PClient::new(keypair, addr, &bootstraps, &relays, "0.2")?;
86-
```
87-
88-
Then, you can use its underlying functions, such as `subscribe`, `process_events` and `unsubscribe`. In particular, `process_events` handles all p2p events and returns a GossipSub message when it is received. -->

p2p/tests/listen_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async fn test_listen_topic_once() -> Result<()> {
2929
.expect("could not create p2p client");
3030

3131
// spawn task
32-
let p2p_task = tokio::spawn(async move { client.run().await });
32+
let task_handle = tokio::spawn(async move { client.run().await });
3333

3434
// subscribe to the given topic
3535
commander
@@ -61,7 +61,7 @@ async fn test_listen_topic_once() -> Result<()> {
6161
msg_rx.close();
6262

6363
log::info!("Waiting for p2p task to finish...");
64-
p2p_task.await?;
64+
task_handle.await?;
6565

6666
log::info!("Done!");
6767
Ok(())

0 commit comments

Comments
 (0)