Skip to content

Commit 0b9dabd

Browse files
authored
feat(rust): Implement dht_provide() and dht_get_providers() (#666)
* Implement `dht_provide()` and `dht_get_providers()` * Cleanup * Fix lints * `identity()` returns identity, not peer id
1 parent 546cbad commit 0b9dabd

File tree

4 files changed

+68
-14
lines changed

4 files changed

+68
-14
lines changed

rust/hermes-ipfs/examples/provide-content-dht.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ async fn connect_node_a_upload_and_provide(
1313
println!("");
1414
let peer_id_a = hermes_ipfs.identity(None).await?;
1515
let addresses = hermes_ipfs.listening_addresses().await?;
16-
println!("* Peer ID: {peer_id_a}");
16+
println!("* Peer ID: {}", peer_id_a.peer_id);
1717
for addr in addresses {
1818
println!(" * {addr}");
1919
}
@@ -34,7 +34,7 @@ async fn connect_node_a_upload_and_provide(
3434
println!("***************************************");
3535
println!("* Providing content to DHT:");
3636
println!("");
37-
println!("* Providing {cid} as peer {peer_id_a}");
37+
println!("* Providing {cid} as peer {}", peer_id_a.peer_id);
3838
println!("***************************************");
3939
println!("");
4040
Ok((hermes_ipfs, ipfs_path))
@@ -48,7 +48,7 @@ async fn connect_node_b_to_node_a(node_a: &HermesIpfs) -> anyhow::Result<HermesI
4848
println!("");
4949
let peer_id_b = hermes_ipfs_b.identity(None).await?;
5050
// node_b.connect(peer_id_a).await?;
51-
println!("* Peer ID: {peer_id_b}");
51+
println!("* Peer ID: {}", peer_id_b.peer_id);
5252
println!("* Listening addresses:");
5353
let addresses = hermes_ipfs_b.listening_addresses().await?;
5454
for addr in addresses {
@@ -63,7 +63,7 @@ async fn connect_node_b_to_node_a(node_a: &HermesIpfs) -> anyhow::Result<HermesI
6363
let node_a_addresses = node_a.listening_addresses().await?;
6464
let peer_a = node_a.identity(None).await?;
6565
for addr in node_a_addresses {
66-
hermes_ipfs_b.add_peer(peer_a, addr.clone()).await?;
66+
hermes_ipfs_b.add_peer(peer_a.peer_id, addr.clone()).await?;
6767
println!(" * {addr} - CONNECTED");
6868
}
6969
println!("***************************************");

rust/hermes-ipfs/examples/pubsub.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ async fn start_bootstrapped_nodes() -> anyhow::Result<(HermesIpfs, HermesIpfs)>
2525
println!("***************************************");
2626
println!("* Hermes IPFS node A has started.");
2727
let peer_id_a = hermes_a.identity(None).await?;
28-
println!(" Peer ID: {peer_id_a}");
28+
println!(" Peer ID: {}", peer_id_a.peer_id);
2929
let addresses = hermes_a.listening_addresses().await?;
3030
let a_address = addresses[0].clone();
31-
let a_p2p = a_address.with(rust_ipfs::Protocol::P2p(peer_id_a));
31+
let a_p2p = a_address.with(rust_ipfs::Protocol::P2p(peer_id_a.peer_id));
3232
println!(" P2P addr: {a_p2p}");
3333
println!("***************************************");
3434
println!("* Hermes IPFS node B has started.");
3535
let hermes_b = HermesIpfs::start().await?;
3636
let peer_id_b = hermes_b.identity(None).await?;
37-
println!(" Peer ID: {peer_id_b}");
37+
println!(" Peer ID: {}", peer_id_b.peer_id);
3838
let addresses = hermes_b.listening_addresses().await?;
3939
let b_address = addresses[0].clone();
40-
let b_p2p = b_address.with(rust_ipfs::Protocol::P2p(peer_id_b));
40+
let b_p2p = b_address.with(rust_ipfs::Protocol::P2p(peer_id_b.peer_id));
4141
println!(" P2P addr: {b_p2p}");
4242
println!("***************************************");
4343
println!("* Bootstrapping node A.");

rust/hermes-ipfs/examples/put-get-dht.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ async fn start_bootstrapped_nodes() -> anyhow::Result<(HermesIpfs, HermesIpfs)>
99
let hermes_a = HermesIpfs::start().await?;
1010
println!("***************************************");
1111
println!("* Hermes IPFS node A has started.");
12-
let peer_id_a = hermes_a.identity(None).await?;
12+
let peer_id_a = hermes_a.identity(None).await?.peer_id;
1313
println!(" Peer ID: {peer_id_a}");
1414
let addresses = hermes_a.listening_addresses().await?;
1515
let a_address = addresses[0].clone();
@@ -18,7 +18,7 @@ async fn start_bootstrapped_nodes() -> anyhow::Result<(HermesIpfs, HermesIpfs)>
1818
println!("***************************************");
1919
println!("* Hermes IPFS node B has started.");
2020
let hermes_b = HermesIpfs::start().await?;
21-
let peer_id_b = hermes_b.identity(None).await?;
21+
let peer_id_b = hermes_b.identity(None).await?.peer_id;
2222
println!(" Peer ID: {peer_id_b}");
2323
let addresses = hermes_b.listening_addresses().await?;
2424
let b_address = addresses[0].clone();

rust/hermes-ipfs/src/lib.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
//!
33
//! Provides support for storage, and `PubSub` functionality.
44
5-
use std::{convert::Infallible, str::FromStr};
5+
use std::{collections::HashSet, convert::Infallible, str::FromStr};
66

77
use derive_more::{Display, From, Into};
8-
use futures::{StreamExt, pin_mut, stream::BoxStream};
8+
use futures::{StreamExt, TryStreamExt, pin_mut, stream::BoxStream};
99
/// IPFS Content Identifier.
1010
pub use ipld_core::cid::Cid;
1111
/// IPLD
@@ -259,8 +259,8 @@ impl HermesIpfs {
259259
pub async fn identity(
260260
&self,
261261
peer_id: Option<PeerId>,
262-
) -> anyhow::Result<PeerId> {
263-
self.node.identity(peer_id).await.map(|p| p.peer_id)
262+
) -> anyhow::Result<PeerInfo> {
263+
self.node.identity(peer_id).await
264264
}
265265

266266
/// Add peer to address book.
@@ -395,13 +395,67 @@ impl HermesIpfs {
395395
) -> anyhow::Result<Vec<u8>> {
396396
let record_stream = self.node.dht_get(key).await?;
397397
pin_mut!(record_stream);
398+
// TODO: We only ever return a single value from the stream. We might want to improve
399+
// this.
398400
let record = record_stream
399401
.next()
400402
.await
401403
.ok_or(anyhow::anyhow!("No record found"))?;
402404
Ok(record.value)
403405
}
404406

407+
/// Announce this node as a provider for the given DHT key.
408+
///
409+
/// ## Parameters
410+
///
411+
/// * `key` - Key identifying the content or resource to provide on the DHT.
412+
///
413+
/// ## Returns
414+
///
415+
/// * `Result<()>` — Indicates whether the provider announcement succeeded.
416+
///
417+
/// ## Errors
418+
///
419+
/// Returns an error if announcing provider information to the DHT fails.
420+
pub async fn dht_provide(
421+
&self,
422+
key: impl AsRef<[u8]> + ToRecordKey,
423+
) -> anyhow::Result<()> {
424+
self.node.dht_provide(key).await
425+
}
426+
427+
/// Retrieve all providers for the given DHT key.
428+
///
429+
/// ## Parameters
430+
///
431+
/// * `key` - Key identifying the content or resource in the DHT.
432+
///
433+
/// ## Returns
434+
///
435+
/// * `Result<HashSet<PeerId>>` — A set containing all `PeerId`s reported as providers
436+
/// for the given key.
437+
///
438+
/// ## Errors
439+
///
440+
/// Returns an error if the provider stream fails or if retrieving provider
441+
/// information from the DHT encounters an underlying error.
442+
pub async fn dht_get_providers(
443+
&self,
444+
key: impl AsRef<[u8]> + ToRecordKey,
445+
) -> anyhow::Result<HashSet<PeerId>> {
446+
Ok(self
447+
.node
448+
.dht_get_providers(key)
449+
.await?
450+
.try_fold(HashSet::new(), |mut acc, set| {
451+
async move {
452+
acc.extend(set);
453+
Ok(acc)
454+
}
455+
})
456+
.await?)
457+
}
458+
405459
/// Add address to bootstrap nodes.
406460
///
407461
/// ## Parameters

0 commit comments

Comments
 (0)