Skip to content

Commit 5da7e6b

Browse files
committed
Wire the pubsub events
1 parent 6b7b55d commit 5da7e6b

File tree

1 file changed

+23
-27
lines changed

1 file changed

+23
-27
lines changed

rust/hermes-ipfs/src/lib.rs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -449,27 +449,6 @@ impl HermesIpfs {
449449
self.node.bootstrap().await
450450
}
451451

452-
/// Returns a stream of pubsub swarm events for a topic.
453-
///
454-
/// ## Parameters
455-
///
456-
/// * `topic` - `impl Into<Option<String>>`
457-
///
458-
/// ## Returns
459-
///
460-
/// * A result with `BoxStream<'static, PubsubEvent>`
461-
///
462-
/// ## Errors
463-
///
464-
/// Returns error if unable to retrieve pubsub swarm events.
465-
pub async fn pubsub_events(
466-
&self,
467-
topic: impl Into<Option<String>>,
468-
) -> anyhow::Result<BoxStream<'static, PubsubEvent>> {
469-
//self.node.pubsub_events(topic).await
470-
todo!()
471-
}
472-
473452
/// Subscribes to a pubsub topic.
474453
///
475454
/// ## Parameters
@@ -632,25 +611,42 @@ impl FromStr for GetIpfsFile {
632611
}
633612
}
634613

614+
/// GossipsubEvents related to subscription state
615+
#[derive(Display, Debug)]
616+
pub enum SubscriptionStatusEvent {
617+
/// Peer has been subscribed
618+
Subscribed {
619+
/// Peer id
620+
peer_id: PeerId,
621+
},
622+
/// Peer has been unsubscribed
623+
Unsubscribed {
624+
/// Peer id
625+
peer_id: PeerId,
626+
},
627+
}
628+
635629
/// Handle stream of messages from the IPFS pubsub topic
636-
pub fn subscription_stream_task<H>(
630+
pub fn subscription_stream_task<MH, SH>(
637631
stream: BoxStream<'static, connexa::prelude::GossipsubEvent>,
638-
handler: H,
632+
message_handler: MH,
633+
subscription_handler: SH,
639634
) -> tokio::task::JoinHandle<()>
640635
where
641-
H: Fn(GossipsubMessage) + Send + 'static,
636+
MH: Fn(GossipsubMessage) + Send + 'static,
637+
SH: Fn(SubscriptionStatusEvent) + Send + 'static,
642638
{
643639
tokio::spawn(async move {
644640
pin_mut!(stream);
645641
while let Some(msg) = stream.next().await {
646642
match msg {
647643
connexa::prelude::GossipsubEvent::Subscribed { peer_id } => {
648-
unimplemented!("GossipsubEvent::Subscribed")
644+
subscription_handler(SubscriptionStatusEvent::Subscribed { peer_id })
649645
},
650646
connexa::prelude::GossipsubEvent::Unsubscribed { peer_id } => {
651-
unimplemented!("GossipsubEvent::Unsubscribed")
647+
subscription_handler(SubscriptionStatusEvent::Unsubscribed { peer_id })
652648
},
653-
connexa::prelude::GossipsubEvent::Message { message } => handler(message),
649+
connexa::prelude::GossipsubEvent::Message { message } => message_handler(message),
654650
};
655651
}
656652
})

0 commit comments

Comments
 (0)