Skip to content

Commit 62f4c73

Browse files
committed
Update pubsub_subscribe
1 parent effb587 commit 62f4c73

File tree

2 files changed

+34
-24
lines changed

2 files changed

+34
-24
lines changed

rust/hermes-ipfs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ rust-ipfs = { version = "0.15.0", git = "https://github.com/dariusc93/rust-ipfs"
2222
tokio = "1.46.0"
2323
futures = "0.3.31"
2424
libp2p = "0.56.0"
25+
connexa = { version = "0.4.1", features = ["identify", "dcutr", "gossipsub", "autonat", "relay", "kad", "keypair_base64_encoding", "ping", "request-response", "request-response-misc", "rendezvous", "mdns"] }
2526

2627
[dev-dependencies]
2728
# Dependencies used by examples

rust/hermes-ipfs/src/lib.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub use rust_ipfs::Multiaddr;
3131
pub use rust_ipfs::PeerId;
3232
use rust_ipfs::{
3333
builder::UninitializedIpfs, dag::ResolveError, dummy, gossipsub::IntoGossipsubTopic,
34-
unixfs::AddOpt, PubsubEvent, Quorum, ToRecordKey,
34+
unixfs::AddOpt, GossipsubMessage, PubsubEvent, Quorum, ToRecordKey,
3535
};
3636

3737
#[derive(Debug, Display, From, Into)]
@@ -481,21 +481,19 @@ impl HermesIpfs {
481481
///
482482
/// ## Returns
483483
///
484-
/// * `SubscriptionStream`
484+
/// * Stream of `GossipsubEvent`
485485
///
486486
/// ## Errors
487487
///
488488
/// Returns error if unable to subscribe to pubsub topic.
489-
// pub async fn pubsub_subscribe(
490-
// &self,
491-
// topic: impl Into<String>,
492-
// ) -> anyhow::Result<SubscriptionStream> {
493-
// self.node.pubsub_subscribe(topic).await
494-
495-
// // TODO ?
496-
// // self.node.pubsub_subscribe(topic.into()).await?;
497-
// // let stream = self.node.pubsub_listener(topic.into()).await?;
498-
// }
489+
pub async fn pubsub_subscribe(
490+
&self,
491+
topic: impl Into<String>,
492+
) -> anyhow::Result<BoxStream<'static, connexa::prelude::GossipsubEvent>> {
493+
let topic = topic.into();
494+
self.node.pubsub_subscribe(&topic).await?;
495+
self.node.pubsub_listener(&topic).await
496+
}
499497

500498
/// Unsubscribes from a pubsub topic.
501499
///
@@ -650,15 +648,26 @@ impl FromStr for GetIpfsFile {
650648
}
651649
}
652650

653-
///// Handle stream of messages from the IPFS pubsub topic
654-
// pub fn subscription_stream_task(
655-
// stream: SubscriptionStream,
656-
// handler: fn(PubsubMessage),
657-
// ) -> tokio::task::JoinHandle<()> {
658-
// tokio::spawn(async move {
659-
// pin_mut!(stream);
660-
// while let Some(msg) = stream.next().await {
661-
// handler(msg);
662-
// }
663-
// })
664-
// }
651+
/// Handle stream of messages from the IPFS pubsub topic
652+
pub fn subscription_stream_task<H>(
653+
stream: BoxStream<'static, connexa::prelude::GossipsubEvent>,
654+
handler: H,
655+
) -> tokio::task::JoinHandle<()>
656+
where
657+
H: Fn(GossipsubMessage) + Send + 'static,
658+
{
659+
tokio::spawn(async move {
660+
pin_mut!(stream);
661+
while let Some(msg) = stream.next().await {
662+
match msg {
663+
connexa::prelude::GossipsubEvent::Subscribed { peer_id } => {
664+
unimplemented!("GossipsubEvent::Subscribed")
665+
},
666+
connexa::prelude::GossipsubEvent::Unsubscribed { peer_id } => {
667+
unimplemented!("GossipsubEvent::Unsubscribed")
668+
},
669+
connexa::prelude::GossipsubEvent::Message { message } => handler(message),
670+
};
671+
}
672+
})
673+
}

0 commit comments

Comments
 (0)